-
11. 스프링 배치 청크 프로세스 (Chunk)BackEnd/Spring Batch 2021. 12. 24. 17:30반응형
Architecture
Architecture in transaction
Chunk란 여러 개의 아이템을 묶은 하나의 덩어리, 블록을 의미합니다. 한번에 하나씩 아이템을 입력 받아 Chunk 단위의 덩어리로 만든 후 Chunk 단위로 트랜잭션을 처리합니다. 즉, Chunk 단위의 Commit과 Rollback이 이루어집니다. 일반적으로 대용량 데이터를 한번에 처리하는 것이 아닌 청크 단위로 쪼개어서 더 이상 처리할 데이터가 없을 때까지 반복해서 입출력하는데 사용됩니다.
Chunk<I> vs Chunk<O>
- Chunk<I>는 ItemReader로 읽은 하나의 아이템을 Chunk에서 정한 개수만큼 반복해서 저장하는 타입
- Chunk<O>는 ItemReader로부터 전달받은 Chunk<I>를 참조해서 ItemProcessor에서 적절하게 가공, 필터링한 다음 ItemWriter에 전달하는 타입
ChunkOrientedTasklet
스프링 배치에서 제공하는 Tasklet의 구현체로서 Chunk 지향 프로세싱을 담당하는 도메인 객체입니다. ItemReader, ItemWriter, ItemProcessor를 사용해 Chunk 기반의 데이터 입출력 처리를 담당합니다. TaskletStep에 의해 반복적으로 실행되며 ChunkOrientedTasklet이 실행될 때마다 매번 새로운 트랜잭션이 생성되어 처리가 이루어집니다. exception이 발생할 경우, 해당 Chunk는 롤백 되며 이전에 커밋한 Chunk는 완료된 상태가 유지됩니다. 내부적으로 ItemReader를 핸들링하는 ChunkProvider와 ItemProcessor, ItemWriter를 핸들링하는 ChunkProcessor 타입의 구현체를 가집니다.
API
// StepBuilderFactory > StepBuilder > SimpleStepBuilder > TaskletStep public Step chunkStep() { return stepBuilderFactory.get("chunkStep") // .<I, O>chunk(10) or .<I, O>chunk(CompletionPolicy)로 설정합니다. // chunk size(commit interval) 설정 // input, ouput 제네릭타입 설정 .<I, O>chunk(10) // Chunk 프로세스를 완료하기 위한 정책 설정 클래스 지정 .<I, O>chunk(CompletionPolicy) // 소스로부터 item을 읽거나 가져오는 ItemReader 구현체 설정 .reader(itemReader()) // item을 변경, 가공, 필터링하기 위한 ItemProcessor 구현체 설정 .processor(itemProcessor()) // item을 목적지에 쓰거나 보내기 위한 ItemWriter 구현체 설정 .writer(itemWriter()) // 재시작 데이터를 관리하는 콜백에 대한 스트림 등록 .stream(itemStream()) // Item이 JMS, Message Queue Server와 같은 트랜잭션 외부에서 읽혀지고 캐시할 것인지 여부, 기본값: false .readerIsTransactionalQueue() // Chunk 프로세스가 진행되는 특정 시점에 콜백 제공받도록 ChunkListener 설정 .listener(ChunkListener) .build(); }
ChunkProvider
ItemReader를 사용해서 소스로부터 아이템을 Chunk size만큼 읽어서 Chunk 단위로 만들어 제공하는 도메인 객체입니다. Chunk<I>를 만들고 내부적으로 반복문을 사용해 ItemReader.read()를 계속 호출하며 아이템을 Chunk에 넣습니다. 외부로부터 ChunkProvider가 호출될 때마다 항상 새로운 Chunk가 생성됩니다. 반복문 종료 시점은 Chunk size만큼 아이템을 읽으면 반복문이 종료되고 ChunkProcessor로 넘어갑니다. 또한 ItemReader가 읽은 아이템이 null일 경우 반복문이 종료되고 해당 Step 반복문까지 종료됩니다. 기본 구현체로 SimpleChunkProvider와 FaultTolerantChunkProvider가 있습니다.
ChunkProcessor
ItemProcessor를 사용해서 아이템을 변형, 가공, 필터링하고 ItemWriter를 사용해서 Chunk 데이터를 저장 및 출력합니다. Chunk<O>를 만들고 앞에서 넘어온 Chunk<I>의 아이템을 한 건씩 처리한 후 Chunk<O>에 저장합니다. 외부로부터 ChunkProcessor가 호출될 때마다 항상 새로운 Chunk가 생성됩니다. ItemProcessor는 설정 시 선택사항으로서 만약 객체가 존재하지 않을 경우 ItemReader에서 읽은 아이템 그대로 Chunk<O>에 저장됩니다. ItemProcessor처리가 완료되면 Chunk<O>에 있는 List<Item>을 ItemWriter에게 전달합니다. ItemWriter 처리가 완료되면 Chunk 트랜잭션이 종료하게 되고 Step 반복문에서 ChunkOrientedTasklet이 새롭게 실행됩니다. 기본 구현체로서 SimpleChunkProcessor와 FaultTolerantChunkProcessor가 있습니다.
ItemReader<T>
다양한 입력으로부터 데이터를 읽어서 제공하는 인터페이스로 ChunkOrientedTasklet 실행 시 필수적 요소로 설정해야 합니다.
- 플랫(Flat) 파일: csv, txt (고정 위치로 정의된 데이터 필드나 특수문자로 구별된 데이터의 행)
- XML, Json
- Database
- JMS, RabbitMQ와 같은 Message Queuing 서비스
- Custom Reader: 구현 시 멀티 스레드 환경에서 스레드에 안전하게 구현할 필요가 있음
T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException
- 입력 데이터를 읽고 다음 데이터로 이동
- 아이템 하나를 리턴하며 더이상 아이템이 없을 경우 null 리턴
- 아이템 하나는 파일의 한줄, DB의 한 row 혹은 XML 파일에서 하나의 엘리먼트
- 더이상 처리해야 할 아이템이 없어도 예외가 발생하지 않고 ItemProcessor와 같은 다음 단계로 넘어감
구현체
FILE
- FlatFileItemReader: csv, txt
- StaxEventItemReader: XML
- JsonItemReader: Json
- MultiResourceItemReader: 여러 개의 파일 조합
DB
- JdbcCursorItemReader: JDBC Cursor Type
- JpaCursorItemReader: JPA Cursor Type
- JdbcPagingItemReader: JDBC Paging Type
- JpaPagingItemReader: JPA Paging Type
- ItemReaderAdapter: Reuse Exist Service
Support / Custom
- SynchronizedItemStreamReader: Convert to Thread-safe ItemReader
- CustomItemReader: Customizing ItemReader
다수의 구현체들이 ItemReader와 ItemStream 인터페이스를 동시에 구현하고 있습니다.
- 파일의 스트림을 열거나 종료, DB 커넥션을 열거나 종료, 입력 장치 초기화 등의 작업
- ExecutionContext이 read와 관련된 여러가지 상태 정보를 저장해서 재시작 시 다시 참조 하도록 지원
ItemProcessor<I, O>
데이터를 출력하기 전에 데이터를 가공, 변형, 필터링하는 역할로 ItemReader 및 ItemWriter와 분리되어 비즈니스 로직을 구현할 수 있습니다. ItemReader로부터 받은 아이템을 특정 타입으로 변환하거나 필터과정을 거쳐 원하는 아이템들만 ItemWriter에게 넘겨줄 수 있습니다. ItemProcessor에서 process() 실행결과 null을 반환하면 Chunk<O>에 저장되지 않기 때문에 결국 ItemWriter에 전달되지 않습니다. ChunkOrientedTasklet 실행 시 선택적 요소로 청크 기반 프로세싱에서 ItemProcessor 단계가 반드시 필요한 것은 아닙니다. ItemProcessor는 ItemStream을 구현하지 않으며, 거의 대부분 Customizing해서 사용하기 때문에 기본적으로 제공되는 구현체가 적습니다.
O process(@NonNull | item) throws Exception
- <I> 제네릭은 ItemReader에서 받을 데이터 타입 지정
- <O> 제네릭은 ItemWriter에게 보낼 데이터 타입 지정
- 아이템 하나씩 가공 처리하며 null 리턴할 경우 해당 아이템은 Chunk<O>에 저장되지 않음
구현체
- ClassifierCompositeItemProcessor: 분류에 따른 선택적 처리
- CompositeItemProcessor: ItemProcessor들을 연결해서 처리
- CustomItemProcessor: Customizing ItemProcessor
ItemWriter<T>
Chunk 단위로 데이터를 받아 일괄 출력 작업을 위한 인터페이스입니다. 아이템 하나가 아닌 아이템 리스트를 전달 받으며, ChunkOrientedTasklet 실행 시 필수적 요소로 설정해야 합니다.
- 플랫(Flat) 파일: csv, txt
- XML, Json
- Database
- JMS, RabbitMQ와 같은 Message Queuing 서비스
- Mail Service
- Custom Writer
void write(List<? extends T> items) throws Exception
- 출력 데이터를 아이템 리스트로 받아 처리
- 출력이 완료되고 트랜잭션이 종료되면 새로운 Chunk 단위 프로세스로 이동
구현체
FILE
- FlatFileItemWriter: csv, txt
- StaxEventItemWriter: XML
- JsonFileItemWriter: Json
- MultiResourceItemWriter: 여러 개의 파일 조합
DB
- JdbcBatchItemWriter: JDBC Type
- JpaItemWriter: JPA Type
- ItemWriterAdapter: Reuse Exist Service
Custom
- CustomWriter: Customizing ItemWriter
다수의 구현체들이 ItemWriter와 ItemStream을 동시에 구현하고 있으며, 보통 ItemReader 구현체와 1:1 대응 관계인 구현체들로 구성되어 있습니다.
- 파일의 스트림을 열거나 종료, DB 커넥션을 열거나 종료, 출력 장치 초기화 등의 작업
ItemStream
ItemReader와 ItemWriter 처리 과정 중 상태를 저장하고 오류가 발생하면 해당 상태를 참조하여 실패한 곳에서 재시작 하도록 지원합니다. 리소스를 열고(open) 닫아야(close)하며 입출력 장치 초기화 등의 작업을 해야 하는 경우 사용합니다. ExecutionContext를 매개변수로 받아서 상태 정보를 업데이트 합니다. ItemReader 및 ItemWriter는 ItemStream을 구현해야 합니다.
ItemStream 구조
// read, write 메서드 호출전에 파일이나 커넥션이 필요한 리소스에 접근하도록 초기화 작업 void open(ExecutionContext executionContext) throws ItemStreamException // 현재까지 진행된 모든 상태를 저장 void update(ExecutionContext executionContext) throws ItemStreamException // 열려 있는 모든 리소스를 안전하게 해제하고 닫음 void close() throws ItemStreamException
실행순서
- ItemStream open(ExecutionContext) > ItemReader > ItemStream update(ExecutionContext)
- > ItemProcessor
- > ItemStream open(ExecutionContext) > ItemWriter > ItemStream update(ExecutionContext)
- > ItemStream close()
아래는 Chunk size를 2로해서 "item1", "item2", "item3","item4", "item5", "item6"를 입력받고 "my_" 문자열을 prefix로 붙여 출력하는 예제코드입니다. 추가 샘플코드는 링크된 Github 소스(branches: Part5)를 참고하시면 됩니다.
package io.springbatch.springbatchlecture; import lombok.RequiredArgsConstructor; import org.springframework.batch.core.*; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.JobScope; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.job.builder.FlowBuilder; import org.springframework.batch.core.job.flow.Flow; import org.springframework.batch.core.job.flow.FlowExecutionStatus; import org.springframework.batch.core.job.flow.JobExecutionDecider; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.support.ListItemReader; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.Arrays; import java.util.List; @RequiredArgsConstructor @Configuration public class ChunkConfiguration { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; @Bean public Job job() { return jobBuilderFactory.get("batchJob") .incrementer(new RunIdIncrementer()) .start(step1()) .next(step2()) .build(); } @Bean public Step step1() { return stepBuilderFactory.get("step1") .<String, String>chunk(2) .reader(new ListItemReader<>(Arrays.asList("item1", "item2", "item3","item4", "item5", "item6"))) .processor(new ItemProcessor<String, String>() { @Override public String process(String item) throws Exception { Thread.sleep(300); System.out.println(item); return "my_" + item; } }) .writer(new ItemWriter<String>() { @Override public void write(List<? extends String> items) throws Exception { Thread.sleep(1000); System.out.println(items); } }) .build(); } @Bean public Step step2() { return stepBuilderFactory.get("step2") .tasklet((contribution, chunkContext) -> { System.out.println("step2 has executed"); return RepeatStatus.FINISHED; }) .build(); } }
[참고자료]
반응형'BackEnd > Spring Batch' 카테고리의 다른 글
13. ItemReader (XML) (0) 2021.12.30 12. ItemReader (File) (0) 2021.12.30 10. Scope (0) 2021.12.24 09. Flow (0) 2021.12.23 08. 배치 상태 유형 (0) 2021.12.22