-
29. 리스너 (Listener)BackEnd/Spring Batch 2022. 1. 13. 22:20반응형
Listener는 배치 흐름 중에 Job, Step, Chunk 단계의 실행 전후에 발생하는 이벤트를 받아 용도에 맞게 활용할 수 있도록 제공하는 인터셉터 개념의 클래스입니다. 각 단계별로 로그를 기록하거나 소요된 시간을 계산하거나 실행상태 정보들을 참조 및 조회할 수 있습니다. 이벤트를 받기 위해서는 Listener를 등록해야 하며 등록은 API 설정에서 각 단계별로 지정할 수 있습니다.
Listeners
- JobExecutionListener: Job 실행 전후
- StepExecutionListener: Step 실행 전후
- ChunkListener: Chunk 실행 전후 (Tasklet 실행 전후), 오류 시점
- ItemReadListener: ItemReader 실행 전후, 오류 시점, item이 null일 경우 호출 안됨
- ItemProcessListener: ItemProcessor 실행 전후, 오류 시점, item이 null일 경우 호출 안됨
- ItemWriteListener: ItemWriter 실행 전후, 오류 시점, item이 null일 경우 호출 안됨
- SkipListener: 읽기, 쓰기, 처리 Skip 실행 시점, Item 처리가 Skip 될 경우 Skip 된 item을 추적함
- RetryListener: Retry 시작, 종료, 에러 시점
구현 방법
JobExecutionListener
Job의 성공여부와 상관없이 호출되며, 성공/실패 여부는 JobExecution을 통해 알 수 있습니다.
- void beforeJob(JobExecution jobExecution) // Job 실행 전 호출
- void afterJob(JobExecution jobExecution) // Job 실행 후 호출
public Job job() { return jobBuilderFactory.get("job") .start(step()) .next(flow()) .listener(JobExecutionListener) .listener(Object) // 어노테이션방식 .build(); }
StepExecutionListener
Step의 성공여부와 상관없이 호출되며, 성공/실패 여부는 StepExecution을 통해 알 수 있습니다.
- void beforeStep(StepExecution stepExecution) // Step 실행 전 호출
- ExitStatus afterStep(StepExecution stepExecution) // Step 실행 후 호출, ExitStatus를 변경하면 최종 종료코드로 반영
public Job job() { return stepBuilderFactory.get("step") .tasklet(tasklet()) .listener(StepExecutionListener) .listener(Object) // 어노테이션방식 .build(); }
package io.springbatch.springbatchlecture; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.ItemReadListener; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider; import org.springframework.batch.item.database.JdbcBatchItemWriter; import org.springframework.batch.item.database.JdbcCursorItemReader; import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder; import org.springframework.batch.item.support.SynchronizedItemStreamReader; import org.springframework.batch.item.support.builder.SynchronizedItemStreamReaderBuilder; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.task.TaskExecutor; import org.springframework.jdbc.core.BeanPropertyRowMapper; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import javax.sql.DataSource; @RequiredArgsConstructor @Configuration @Slf4j public class JobAndStepListenerConfiguration { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; private final CustomStepListener customStepListener; @Bean public Job job() throws Exception { return jobBuilderFactory.get("batchJob") .incrementer(new RunIdIncrementer()) .start(step1()) .next(step2()) .listener(new CustomJobListener()) .build(); } @Bean public Step step1() { return stepBuilderFactory.get("step1") .tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { // throw new RuntimeException("failed"); return RepeatStatus.FINISHED; } }) .listener(customStepListener) .build(); } @Bean public Step step2() { return stepBuilderFactory.get("step2") .tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { return RepeatStatus.FINISHED; } }) .listener(customStepListener) .build(); } }
CustomJobListener.java
package io.springbatch.springbatchlecture; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecutionListener; public class CustomJobListener implements JobExecutionListener { @Override public void beforeJob(JobExecution JobExecution) { System.out.println("JobExecution.getJobName() : " + JobExecution.getJobInstance().getJobName()); } @Override public void afterJob(JobExecution JobExecution) { System.out.println("JobExecution.getStatus() : " + JobExecution.getStatus()); } }
CustomStepListener.java
package io.springbatch.springbatchlecture; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.StepExecutionListener; import org.springframework.stereotype.Component; @Component public class CustomStepListener implements StepExecutionListener { @Override public void beforeStep(StepExecution stepExecution) { System.out.println("stepExecution.getStepName() : " + stepExecution.getStepName()); } @Override public ExitStatus afterStep(StepExecution stepExecution) { System.out.println("stepExecution.getStatus() : " + stepExecution.getStatus()); return ExitStatus.COMPLETED; } }
ChunkListener
- void beforeChunk(ChunkContext context) // 트랜잭션 시작 전 호출 (ItemReader의 read() 메소드 호출 전)
- void afterChunk(ChunkContext context) // Chunk 커밋 후 호출 (ItemWriter의 write() 메소드 호출한 후), 롤백 시 호출되지 않음
- void afterChunkError(ChunkContext context) // 오류 발생 및 롤백 시 호출
public Job job() { return stepBuilderFactory.get("step") .chunk(chunkSize) .listener(ChunkListener) .listener(Object) // 어노테이션방식 .build(); }
ItemReadListener
- void beforeRead() // read() 메소드 호출 전 매번 호출
- void afterRead(T item) // read() 메소드 호출이 성공할 때마다 호출
- void onReadError(Exception ex) // 읽는 도중 오류가 발생하면 호출
public Job job() { return stepBuilderFactory.get("step") .chunk(chunkSize) .reader(ItemReader) .listener(ItemReadListener) .listener(Object) // 어노테이션방식 .build(); }
ItemProcessListener
- void beforeProcess(T item) // process() 메소드 호출 전 호출
- void afterProcess(T item, @Nullable S result) // process() 메소드 호출 성공 시 호출
- void onProcessError(T item, Exception e) // 처리 도중 오류 발생 시 호출
public Job job() { return stepBuilderFactory.get("step") .chunk(chunkSize) .reader(ItemReader) .processor(ItemProcessor) .listener(ItemProcessListener) .listener(Object) // 어노테이션방식 .build(); }
ItemWriteListener
- void beforeWrite(List<? extends S> items) // write() 메소드 호출 전 호출
- void afterWrite(List<? extends S> items) // write() 메소드 호출 성공 시 호출
- void onWriteError(Exception exception, List<? extends S> items) // 쓰기 도중 오류 발생 시 호출
public Job job() { return stepBuilderFactory.get("step") .chunk(chunkSize) .reader(ItemReader) .writer(ItemWriter) .listener(ItemWriteListener) .listener(Object) // 어노테이션방식 .build(); }
package io.springbatch.springbatchlecture; import lombok.RequiredArgsConstructor; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.support.ListItemReader; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.Arrays; import java.util.List; @RequiredArgsConstructor @Configuration public class ChunkListenerConfiguration { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; private final CustomChunkListener customChunkListener; @Bean public Job job() throws Exception { return jobBuilderFactory.get("batchJob") .incrementer(new RunIdIncrementer()) .start(step1()) .build(); } @Bean public Step step1() throws Exception { return stepBuilderFactory.get("step1") .<Integer, String>chunk(10) .listener(customChunkListener) .listener(new CustomItemReadListener()) .listener(new CustomItemProcessListener()) .listener(new CustomItemWriteListener()) .reader(listItemReader()) .processor((ItemProcessor) item -> { throw new RuntimeException("failed"); // return "item" + item; }) .writer((ItemWriter<String>) items -> { throw new RuntimeException("failed"); }) .build(); } @Bean public ItemReader<Integer> listItemReader() { List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10); return new ListItemReader<>(list); } }
SkipListener
- void onSkipInRead(Throwable t) // read 수행 중 Skip이 발생할 경우 호출
- void onSkipInWrite(S item, Throwable t) // write 수행 중 Skip이 발생할 경우 호출
- void onSkipInProcess(T item, Throwable t) // process 수행 중 Skip이 발생할 경우 호출
package io.springbatch.springbatchlecture; import lombok.RequiredArgsConstructor; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.*; import org.springframework.batch.item.support.ListItemReader; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.Arrays; import java.util.List; @RequiredArgsConstructor @Configuration public class SkipListenerConfiguration { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; private final CustomSkipListener customSkipListener; @Bean public Job job() throws Exception { return jobBuilderFactory.get("batchJob") .incrementer(new RunIdIncrementer()) .start(step1()) .build(); } @Bean public Step step1() throws Exception { return stepBuilderFactory.get("step1") .<Integer, String>chunk(10) .reader(listItemReader()) .processor(new ItemProcessor<Integer, String>() { @Override public String process(Integer item) throws Exception { if (item == 4) { throw new CustomSkipException("process skipped"); } System.out.println("process : " + item); return "item" + item; } }) .writer(new ItemWriter<String>() { @Override public void write(List<? extends String> items) throws Exception { for (String item : items) { if (item.equals("item5")) { throw new CustomSkipException("write skipped"); } System.out.println("write : " + item); } } }) .faultTolerant() .skip(CustomSkipException.class) .skipLimit(3) .listener(customSkipListener) .build(); } @Bean public ItemReader<Integer> listItemReader() { List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10); return new LinkedListItemReader<>(list); } }
RetryListener
- boolean open(RetryContext context, RetryCallback<T, E> callback) // 재시도 전 매번 호출, false를 반환할 경우 retry 시도하지 않음
- void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) // 재시도 후 매번 호출
- void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) // 재시도 실패 시마다 호출
package io.springbatch.springbatchlecture; import lombok.RequiredArgsConstructor; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.ItemReader; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.Arrays; import java.util.List; @RequiredArgsConstructor @Configuration public class RetryListenerConfiguration { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; private final CustomRetryListener customRetryListener; @Bean public Job job(){ return jobBuilderFactory.get("batchJob") .incrementer(new RunIdIncrementer()) .start(step1()) .build(); } @Bean public Step step1(){ return stepBuilderFactory.get("step1") .<Integer, String>chunk(10) .reader(listItemReader()) .processor(new CustomItemProcessor()) .writer(new CustomItemWriter()) .faultTolerant() .retry(CustomRetryException.class) .retryLimit(2) .listener(customRetryListener) .build(); } @Bean public ItemReader<Integer> listItemReader() { List<Integer> list = Arrays.asList(1,2,3,4); // List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10); return new LinkedListItemReader<>(list); } }
[참고자료]
반응형'BackEnd > Spring Batch' 카테고리의 다른 글
30. Spring Batch Test (1) 2022.01.14 28. 멀티 스레드 프로세싱 (Multi Thread Processing) (0) 2022.01.08 27. Skip & Retry Architect (0) 2022.01.07 26. Retry (0) 2022.01.07 25. Skip (0) 2022.01.06