Spring Batch 멀티 프로세스
FutureTask
- FutureTask<O> task = new FutureTask<>(Callable<V>)
- Thread 가 수행하는 Task로서 Callable 을 실행시키고 결과를 Future<V>에 담아 반환
- Runnable 은 스레드를 실행하고 코드를 실행할 수 있는 인터페이스를 제공하며, 반환 값을 가질 수 없다. 반면에 Callable 은 - 레드를 실행하고 코드를 실행하면서 반환 값을 가질 수 있다.
스프링 배치 멀티 스레드 프로세싱
- Step 안에 ItemProcessor 가 비동기적으로 동작하는 구조
- AsyncItemProcessor 와 AsyncItemWriter 가 함께 구성이 되어야 함
기본 처리 방식은 아래와 같다.
Job
- Step
- ItemReader
- AsyncItemProcessor -> ItemProcessor
- AsyncItemWriter -> ItemWriter
(-> 는 delegate 의미)
AsyncItemProcessor 는 ItemProcessor 에 위임하게 되는데 내부적으로 TaskExecutor 를 가지고 있다. (new SyncTaskExecutor() 이거는 Thread 생성하고 Task를 할당해주는 역할을 하는듯) 그리고나서 Thread가 수행하는 Task로 Callable을 실행시키고 결과를 Future<V> 에 담에서 반환하게 된다.
//아래 순서대로 실행되는듯 하다.
AsyncItemProcessor
ItemProcessor<I, O> delegate;
TaskExecutor taskExecutor = new SyncTaskExecutor();
FutureTask<O> task = new FutureTask<>(Callable<V>)
요약
- taskExecutor 에 execute 가 실행이 되면 FutureTask<O> task = new FutureTask<>(Callable<V>) 에 Callable 이 실행이 되는데 Callable 은 ItemProcessor 를 실행시킨다.
@Nullable
public Future<0> process (final I item) throws Exception {
final StepExecution stepExecution = getStepExecution();
FutureTask<0> task = new FutureTask<>(new Callable<0>() {
public 0 call() throws Exception {
if (stepExecution != null) {
StepSynchronizationManager.register(stepExecution);
}
try {
return delegate.process(item); //이게 마지막에 동작하나 보다.
}
finally {
if (stepExecution != null) {
StepSynchronizationManager.close();
}
});
}
taskExecutor.execute(task);
return task;
public Step step() throws Execption {
return stepBuilderFactory.get(“step”)
.chunk(100)
.reader(pagingItemReader())
.processor(asyncItemProceesor())
.writer(asyncItemWriter())
.build()
}
스프링 배치 멀티 스레드 프로세싱
- Step 내에서 멀티 스레드로 Chunk 기반 처리가 이루어지는 구조
- TaskExecutorRepeatTemplate 이 반복자로 사용되며 설정한 개수(throttleLimit) 개수 만큼 스레드 생성
- Job > Step > TaskExecutorRepeatTemplate
- TaskExecutorRepeatTemplate (멀티스레드)
- Runnable -> RepeatCallback -> ChunkOrientedTasklet
공통 처리
- ChunkOrientedTasklet
- ItemReader -> ItemProceesor -> ItemWriter
- 위 과정이 모두 thread-safe 하다.
TaskExecutorRepeatTemplate
- 조절 제한 개수
- int throttleLimit = DEFAULT_THROTTLE_LIMIT - Thread 를 조절 제한 수 만큼 생성하고 Task를 할당
- TaskExecutor taskExecutor = new SyncTaskExecutor();
- 단 위와 같은 방식은 동기적으로 처리된다.
Multi-threaded Step
- Job -> taskletStep -> TaskExecutorRepeatTemplate -> TaskExecutor -> 스레드 생성 -> Runnable 안에 ChunkOrientedTasklet 이 있음 -> ItemReader - ItemProcessor - ItemWriter 가 동작
주의
- ItemReader 는 DB에서 데이터를 읽어올텐데 데이터 동기화가 필요하니 각 스레드 별로 thread-safe 가 필요함, 데이터 중복 등 문제 발생, 청크는 동시성 이슈 때문에 스레드 별로 하나씩 갖는다.
- ItemReader에서는 Synchronize 로 동시성 이슈가 없도록 한다.
- 각각의 스레드는 스택을 가지는데 청크를 담는다. 공유가 되지 않는다.
스레드에 안전한 페이징 제공
- jdbcPagingItemReader
- jpaPagingItemReader
public Step step() throws Exception {
return stepBuilderFactory.get(“step”)
.<Customer, Customer>chunk(100)
.reader(pagingItemReader())
.processor(customerItemProcessor())
.writer(customerItemWriter())
.taskExecutor(taskExecutor())
.build()
}
- 스레드 생성 및 실행을 위한 taskExecutor 설정
- 자바에서 스레드 풀 관리할 수 있는 ThreadPoolTaskExecutor
- 이런식으로 병렬 처리 할 수 있따.
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(4);
taskExecutor.setMaxPoolSize(8);
taskExecutor.setThreadNamePrefix(“async-thread”);
return taskExecutor;
}
- SimpleChunkProvider 클래스에서 provide 메소드는 Chunk<T> inputs = new Chunk<>() 로 Chunk 를 스레드 별로 생성하게 된다.
- Cursor
- Cursor 는 Thread 동기화를 위한 작업이 없다. 중복된 데이터를 읽어올 수 있다.
Parallel Steps
- SplitState를 여러 개의 Flow 들을 병렬적으로 실행하는 구조
- 실행이 다 완료된 후 FlowExecutionStatus 결과들을 취합해서 다음 단계를 결정한다.
- Job > SimpleFlow > SplitState (flows) > TaskExecutor -> 스레드 Worker (Future Task > SimpleFlow) > FlowExecution 실행 결과 > Collection<FlowExecution> > aggregator.aggregate(results) > FlowExecutionAggregator 반환하는거 같은데 → SimpleFlow
- (return FlowExecutionStatus : COMPLETE, STOPPED, UNKNOWN 최종 실행결과의 상태값을 반환하여 다음 Step을 결정하도록 한다.)
SplitState
- 병렬로 수행할 Flow 들을 담은 컬렉션
- Collection<Flow> flows; - Thread 생성하고 Task를 할당
- TaskExecutor taskExecutor; - 병렬로 수행 후 하나의 종료 상태로 집계하는 클래스
- FlowExecutionAggregator aggregator - FlowExecutionStatus handle(final FlowExecutor executor)
Parallel Steps
public Job job() {
return jobBuilderFactory.get(“job”)
.start(flow1())
.split(TaskExecutor).add(flow2(), flow3())
.next(flow4())
.end()
.build();
}
- taskExecutor 에서 flow 개수만큼 스레드를 생성해서 각 flow를 실행시킨다.
- next는 메인스레드가 실행시킨다.
동시성 이슈 해결
- 메인 스레드 말고 병렬 스레드 이용 시 split(flow).add(flow2) 이런식으로 처리가 된다고 하면 전역 변수가 공유되고 있으면 동시성 이슈가 발생할 수 있다.
- lock 사용 (성능은 조금 떨어지겠지만)
private Object lock = new Object();
...
synchronized(lock) {
//전역변수 작업 라인
}
'Spring' 카테고리의 다른 글
스프링 핵심원리 고급편 - threadLocal, 템플릿 메서드 패턴 & 콜백 패턴 (0) | 2024.01.14 |
---|---|
Criteria API (0) | 2023.01.28 |
의존성 주입 (1) | 2023.01.21 |
Spring Security (0) | 2021.12.14 |
빈 생명주기 & 콜백 (0) | 2021.12.14 |
댓글
이 글 공유하기
다른 글
-
스프링 핵심원리 고급편 - threadLocal, 템플릿 메서드 패턴 & 콜백 패턴
스프링 핵심원리 고급편 - threadLocal, 템플릿 메서드 패턴 & 콜백 패턴
2024.01.14 -
Criteria API
Criteria API
2023.01.28 -
의존성 주입
의존성 주입
2023.01.21 -
Spring Security
Spring Security
2021.12.14