BackEnd/Spring Batch
29. 리스너 (Listener)
hanseom
2022. 1. 13. 22:20
반응형
Listener는 배치 흐름 중에 Job, Step, Chunk 단계의 실행 전후에 발생하는 이벤트를 받아 용도에 맞게 활용할 수 있도록 제공하는 인터셉터 개념의 클래스입니다. 각 단계별로 로그를 기록하거나 소요된 시간을 계산하거나 실행상태 정보들을 참조 및 조회할 수 있습니다. 이벤트를 받기 위해서는 Listener를 등록해야 하며 등록은 API 설정에서 각 단계별로 지정할 수 있습니다.
Listeners
- JobExecutionListener: Job 실행 전후
- StepExecutionListener: Step 실행 전후
- ChunkListener: Chunk 실행 전후 (Tasklet 실행 전후), 오류 시점
- ItemReadListener: ItemReader 실행 전후, 오류 시점, item이 null일 경우 호출 안됨
- ItemProcessListener: ItemProcessor 실행 전후, 오류 시점, item이 null일 경우 호출 안됨
- ItemWriteListener: ItemWriter 실행 전후, 오류 시점, item이 null일 경우 호출 안됨
- SkipListener: 읽기, 쓰기, 처리 Skip 실행 시점, Item 처리가 Skip 될 경우 Skip 된 item을 추적함
- RetryListener: Retry 시작, 종료, 에러 시점

구현 방법

JobExecutionListener
Job의 성공여부와 상관없이 호출되며, 성공/실패 여부는 JobExecution을 통해 알 수 있습니다.
- void beforeJob(JobExecution jobExecution) // Job 실행 전 호출
- void afterJob(JobExecution jobExecution) // Job 실행 후 호출
public Job job() {
return jobBuilderFactory.get("job")
.start(step())
.next(flow())
.listener(JobExecutionListener)
.listener(Object) // 어노테이션방식
.build();
}
StepExecutionListener
Step의 성공여부와 상관없이 호출되며, 성공/실패 여부는 StepExecution을 통해 알 수 있습니다.
- void beforeStep(StepExecution stepExecution) // Step 실행 전 호출
- ExitStatus afterStep(StepExecution stepExecution) // Step 실행 후 호출, ExitStatus를 변경하면 최종 종료코드로 반영
public Job job() {
return stepBuilderFactory.get("step")
.tasklet(tasklet())
.listener(StepExecutionListener)
.listener(Object) // 어노테이션방식
.build();
}
package io.springbatch.springbatchlecture;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemReadListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.batch.item.support.SynchronizedItemStreamReader;
import org.springframework.batch.item.support.builder.SynchronizedItemStreamReaderBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.sql.DataSource;
@RequiredArgsConstructor
@Configuration
@Slf4j
public class JobAndStepListenerConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final CustomStepListener customStepListener;
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("batchJob")
.incrementer(new RunIdIncrementer())
.start(step1())
.next(step2())
.listener(new CustomJobListener())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
// throw new RuntimeException("failed");
return RepeatStatus.FINISHED;
}
})
.listener(customStepListener)
.build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
return RepeatStatus.FINISHED;
}
})
.listener(customStepListener)
.build();
}
}
CustomJobListener.java
package io.springbatch.springbatchlecture;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
public class CustomJobListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution JobExecution) {
System.out.println("JobExecution.getJobName() : " + JobExecution.getJobInstance().getJobName());
}
@Override
public void afterJob(JobExecution JobExecution) {
System.out.println("JobExecution.getStatus() : " + JobExecution.getStatus());
}
}
CustomStepListener.java
package io.springbatch.springbatchlecture;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.stereotype.Component;
@Component
public class CustomStepListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println("stepExecution.getStepName() : " + stepExecution.getStepName());
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println("stepExecution.getStatus() : " + stepExecution.getStatus());
return ExitStatus.COMPLETED;
}
}
ChunkListener
- void beforeChunk(ChunkContext context) // 트랜잭션 시작 전 호출 (ItemReader의 read() 메소드 호출 전)
- void afterChunk(ChunkContext context) // Chunk 커밋 후 호출 (ItemWriter의 write() 메소드 호출한 후), 롤백 시 호출되지 않음
- void afterChunkError(ChunkContext context) // 오류 발생 및 롤백 시 호출
public Job job() {
return stepBuilderFactory.get("step")
.chunk(chunkSize)
.listener(ChunkListener)
.listener(Object) // 어노테이션방식
.build();
}
ItemReadListener
- void beforeRead() // read() 메소드 호출 전 매번 호출
- void afterRead(T item) // read() 메소드 호출이 성공할 때마다 호출
- void onReadError(Exception ex) // 읽는 도중 오류가 발생하면 호출
public Job job() {
return stepBuilderFactory.get("step")
.chunk(chunkSize)
.reader(ItemReader)
.listener(ItemReadListener)
.listener(Object) // 어노테이션방식
.build();
}
ItemProcessListener
- void beforeProcess(T item) // process() 메소드 호출 전 호출
- void afterProcess(T item, @Nullable S result) // process() 메소드 호출 성공 시 호출
- void onProcessError(T item, Exception e) // 처리 도중 오류 발생 시 호출
public Job job() {
return stepBuilderFactory.get("step")
.chunk(chunkSize)
.reader(ItemReader)
.processor(ItemProcessor)
.listener(ItemProcessListener)
.listener(Object) // 어노테이션방식
.build();
}
ItemWriteListener
- void beforeWrite(List<? extends S> items) // write() 메소드 호출 전 호출
- void afterWrite(List<? extends S> items) // write() 메소드 호출 성공 시 호출
- void onWriteError(Exception exception, List<? extends S> items) // 쓰기 도중 오류 발생 시 호출
public Job job() {
return stepBuilderFactory.get("step")
.chunk(chunkSize)
.reader(ItemReader)
.writer(ItemWriter)
.listener(ItemWriteListener)
.listener(Object) // 어노테이션방식
.build();
}
package io.springbatch.springbatchlecture;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
import java.util.List;
@RequiredArgsConstructor
@Configuration
public class ChunkListenerConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final CustomChunkListener customChunkListener;
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("batchJob")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}
@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.<Integer, String>chunk(10)
.listener(customChunkListener)
.listener(new CustomItemReadListener())
.listener(new CustomItemProcessListener())
.listener(new CustomItemWriteListener())
.reader(listItemReader())
.processor((ItemProcessor) item -> {
throw new RuntimeException("failed");
// return "item" + item;
})
.writer((ItemWriter<String>) items -> {
throw new RuntimeException("failed");
})
.build();
}
@Bean
public ItemReader<Integer> listItemReader() {
List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
return new ListItemReader<>(list);
}
}
SkipListener
- void onSkipInRead(Throwable t) // read 수행 중 Skip이 발생할 경우 호출
- void onSkipInWrite(S item, Throwable t) // write 수행 중 Skip이 발생할 경우 호출
- void onSkipInProcess(T item, Throwable t) // process 수행 중 Skip이 발생할 경우 호출

package io.springbatch.springbatchlecture;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.*;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
import java.util.List;
@RequiredArgsConstructor
@Configuration
public class SkipListenerConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final CustomSkipListener customSkipListener;
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("batchJob")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}
@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.<Integer, String>chunk(10)
.reader(listItemReader())
.processor(new ItemProcessor<Integer, String>() {
@Override
public String process(Integer item) throws Exception {
if (item == 4) {
throw new CustomSkipException("process skipped");
}
System.out.println("process : " + item);
return "item" + item;
}
})
.writer(new ItemWriter<String>() {
@Override
public void write(List<? extends String> items) throws Exception {
for (String item : items) {
if (item.equals("item5")) {
throw new CustomSkipException("write skipped");
}
System.out.println("write : " + item);
}
}
})
.faultTolerant()
.skip(CustomSkipException.class)
.skipLimit(3)
.listener(customSkipListener)
.build();
}
@Bean
public ItemReader<Integer> listItemReader() {
List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
return new LinkedListItemReader<>(list);
}
}
RetryListener
- boolean open(RetryContext context, RetryCallback<T, E> callback) // 재시도 전 매번 호출, false를 반환할 경우 retry 시도하지 않음
- void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) // 재시도 후 매번 호출
- void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) // 재시도 실패 시마다 호출

package io.springbatch.springbatchlecture;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
import java.util.List;
@RequiredArgsConstructor
@Configuration
public class RetryListenerConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final CustomRetryListener customRetryListener;
@Bean
public Job job(){
return jobBuilderFactory.get("batchJob")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}
@Bean
public Step step1(){
return stepBuilderFactory.get("step1")
.<Integer, String>chunk(10)
.reader(listItemReader())
.processor(new CustomItemProcessor())
.writer(new CustomItemWriter())
.faultTolerant()
.retry(CustomRetryException.class)
.retryLimit(2)
.listener(customRetryListener)
.build();
}
@Bean
public ItemReader<Integer> listItemReader() {
List<Integer> list = Arrays.asList(1,2,3,4);
// List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
return new LinkedListItemReader<>(list);
}
}
[참고자료]
반응형