BackEnd/Spring Batch

29. 리스너 (Listener)

hanseom 2022. 1. 13. 22:20
반응형

  Listener는 배치 흐름 중에 Job, Step, Chunk 단계의 실행 전후에 발생하는 이벤트를 받아 용도에 맞게 활용할 수 있도록 제공하는 인터셉터 개념의 클래스입니다. 각 단계별로 로그를 기록하거나 소요된 시간을 계산하거나 실행상태 정보들을 참조 및 조회할 수 있습니다. 이벤트를 받기 위해서는 Listener를 등록해야 하며 등록은 API 설정에서 각 단계별로 지정할 수 있습니다.

 

Listeners

  • JobExecutionListener: Job 실행 전후
  • StepExecutionListener: Step 실행 전후
  • ChunkListener: Chunk 실행 전후 (Tasklet 실행 전후), 오류 시점
  • ItemReadListener: ItemReader 실행 전후, 오류 시점, item이 null일 경우 호출 안됨
  • ItemProcessListener: ItemProcessor 실행 전후, 오류 시점, item이 null일 경우 호출 안됨
  • ItemWriteListener: ItemWriter 실행 전후, 오류 시점, item이 null일 경우 호출 안됨
  • SkipListener: 읽기, 쓰기, 처리 Skip 실행 시점, Item 처리가 Skip 될 경우 Skip 된 item을 추적함
  • RetryListener: Retry 시작, 종료, 에러 시점

 

구현 방법

 

JobExecutionListener

  Job의 성공여부와 상관없이 호출되며, 성공/실패 여부는 JobExecution을 통해 알 수 있습니다.

  • void beforeJob(JobExecution jobExecution) // Job 실행 전 호출
  • void afterJob(JobExecution jobExecution)    // Job 실행 후 호출
  public Job job() {
    return jobBuilderFactory.get("job")
        .start(step())
        .next(flow())
        .listener(JobExecutionListener)
        .listener(Object) // 어노테이션방식
        .build();
  }

 

StepExecutionListener

  Step의 성공여부와 상관없이 호출되며, 성공/실패 여부는 StepExecution을 통해 알 수 있습니다.

  • void beforeStep(StepExecution stepExecution)        // Step 실행 전 호출
  • ExitStatus afterStep(StepExecution stepExecution) // Step 실행 후 호출, ExitStatus를 변경하면 최종 종료코드로 반영
  public Job job() {
    return stepBuilderFactory.get("step")
        .tasklet(tasklet())
        .listener(StepExecutionListener)
        .listener(Object) // 어노테이션방식
        .build();
  }
package io.springbatch.springbatchlecture;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemReadListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.batch.item.support.SynchronizedItemStreamReader;
import org.springframework.batch.item.support.builder.SynchronizedItemStreamReaderBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import javax.sql.DataSource;

@RequiredArgsConstructor
@Configuration
@Slf4j
public class JobAndStepListenerConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final CustomStepListener customStepListener;

    @Bean
    public Job job() throws Exception {
        return jobBuilderFactory.get("batchJob")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .next(step2())
                .listener(new CustomJobListener())
                .build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
//                        throw new RuntimeException("failed");
                        return RepeatStatus.FINISHED;
                    }
                })
                .listener(customStepListener)
                .build();
    }

    @Bean
    public Step step2() {
        return stepBuilderFactory.get("step2")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                        return RepeatStatus.FINISHED;
                    }
                })
                .listener(customStepListener)
                .build();
    }
}

CustomJobListener.java

package io.springbatch.springbatchlecture;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;

public class CustomJobListener implements JobExecutionListener {

    @Override
    public void beforeJob(JobExecution JobExecution) {
        System.out.println("JobExecution.getJobName() : " + JobExecution.getJobInstance().getJobName());
    }

    @Override
    public void afterJob(JobExecution JobExecution) {
        System.out.println("JobExecution.getStatus() : " + JobExecution.getStatus());
    }
}

CustomStepListener.java

package io.springbatch.springbatchlecture;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.stereotype.Component;

@Component
public class CustomStepListener implements StepExecutionListener {

    @Override
    public void beforeStep(StepExecution stepExecution) {
        System.out.println("stepExecution.getStepName() : " + stepExecution.getStepName());
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        System.out.println("stepExecution.getStatus() : " + stepExecution.getStatus());
        return ExitStatus.COMPLETED;
    }
}

[전체 소스코드]

 

ChunkListener

  • void beforeChunk(ChunkContext context) // 트랜잭션 시작 전 호출 (ItemReader의 read() 메소드 호출 전)      
  • void afterChunk(ChunkContext context)    // Chunk 커밋 후 호출 (ItemWriter의 write() 메소드 호출한 후), 롤백 시 호출되지 않음
  • void afterChunkError(ChunkContext context) // 오류 발생 및 롤백 시 호출
  public Job job() {
    return stepBuilderFactory.get("step")
        .chunk(chunkSize)
        .listener(ChunkListener)
        .listener(Object) // 어노테이션방식
        .build();
  }

 

ItemReadListener

  • void beforeRead()                         // read() 메소드 호출 전 매번 호출
  • void afterRead(T item)                 // read() 메소드 호출이 성공할 때마다 호출
  • void onReadError(Exception ex) // 읽는 도중 오류가 발생하면 호출
  public Job job() {
    return stepBuilderFactory.get("step")
        .chunk(chunkSize)
        .reader(ItemReader)
        .listener(ItemReadListener)
        .listener(Object) // 어노테이션방식
        .build();
  }

 

ItemProcessListener

  • void beforeProcess(T item)                               // process() 메소드 호출 전 호출
  • void afterProcess(T item, @Nullable S result) // process() 메소드 호출 성공 시 호출
  • void onProcessError(T item, Exception e)        // 처리 도중 오류 발생 시 호출
  public Job job() {
    return stepBuilderFactory.get("step")
        .chunk(chunkSize)
        .reader(ItemReader)
        .processor(ItemProcessor)
        .listener(ItemProcessListener)
        .listener(Object) // 어노테이션방식
        .build();
  }

 

ItemWriteListener

  • void beforeWrite(List<? extends S> items)                                        // write() 메소드 호출 전 호출
  • void afterWrite(List<? extends S> items)                                           // write() 메소드 호출 성공 시 호출
  • void onWriteError(Exception exception, List<? extends S> items) // 쓰기 도중 오류 발생 시 호출
  public Job job() {
    return stepBuilderFactory.get("step")
        .chunk(chunkSize)
        .reader(ItemReader)
        .writer(ItemWriter)
        .listener(ItemWriteListener)
        .listener(Object) // 어노테이션방식
        .build();
  }
package io.springbatch.springbatchlecture;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;
import java.util.List;

@RequiredArgsConstructor
@Configuration
public class ChunkListenerConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final CustomChunkListener customChunkListener;

    @Bean
    public Job job() throws Exception {
        return jobBuilderFactory.get("batchJob")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .build();
    }

    @Bean
    public Step step1() throws Exception {
        return stepBuilderFactory.get("step1")
                .<Integer, String>chunk(10)
                .listener(customChunkListener)
                .listener(new CustomItemReadListener())
                .listener(new CustomItemProcessListener())
                .listener(new CustomItemWriteListener())
                .reader(listItemReader())
                .processor((ItemProcessor) item -> {
                    throw new RuntimeException("failed");
//                    return "item" + item;
                })
                .writer((ItemWriter<String>) items -> {
                    throw new RuntimeException("failed");
                })
                .build();
    }

    @Bean
    public ItemReader<Integer> listItemReader() {
        List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
        return new ListItemReader<>(list);
    }
}

[전체 소스코드]

 

SkipListener

  • void onSkipInRead(Throwable t)                   // read 수행 중 Skip이 발생할 경우 호출
  • void onSkipInWrite(S item, Throwable t)      // write 수행 중 Skip이 발생할 경우 호출
  • void onSkipInProcess(T item, Throwable t) // process 수행 중 Skip이 발생할 경우 호출

package io.springbatch.springbatchlecture;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.*;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;
import java.util.List;

@RequiredArgsConstructor
@Configuration
public class SkipListenerConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final CustomSkipListener customSkipListener;

    @Bean
    public Job job() throws Exception {
        return jobBuilderFactory.get("batchJob")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .build();
    }

    @Bean
    public Step step1() throws Exception {
        return stepBuilderFactory.get("step1")
                .<Integer, String>chunk(10)
                .reader(listItemReader())
                .processor(new ItemProcessor<Integer, String>() {
                    @Override
                    public String process(Integer item) throws Exception {
                        if (item == 4) {
                            throw new CustomSkipException("process skipped");
                        }
                        System.out.println("process : " + item);
                        return "item" + item;
                    }
                })
                .writer(new ItemWriter<String>() {
                    @Override
                    public void write(List<? extends String> items) throws Exception {
                        for (String item : items) {
                            if (item.equals("item5")) {
                                throw new CustomSkipException("write skipped");
                            }
                            System.out.println("write : " + item);
                        }
                    }
                })
                .faultTolerant()
                .skip(CustomSkipException.class)
                .skipLimit(3)
                .listener(customSkipListener)
                .build();
    }

    @Bean
    public ItemReader<Integer> listItemReader() {
        List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
        return new LinkedListItemReader<>(list);
    }
}

CustomSkipListener.java

 

RetryListener

  • boolean open(RetryContext context, RetryCallback<T, E> callback) // 재시도 전 매번 호출, false를 반환할 경우 retry 시도하지 않음
  • void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable)    // 재시도 후 매번 호출
  • void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) // 재시도 실패 시마다 호출

package io.springbatch.springbatchlecture;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;
import java.util.List;

@RequiredArgsConstructor
@Configuration
public class RetryListenerConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final CustomRetryListener customRetryListener;

    @Bean
    public Job job(){
        return jobBuilderFactory.get("batchJob")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .build();
    }

    @Bean
    public Step step1(){
        return stepBuilderFactory.get("step1")
                .<Integer, String>chunk(10)
                .reader(listItemReader())
                .processor(new CustomItemProcessor())
                .writer(new CustomItemWriter())
                .faultTolerant()
                .retry(CustomRetryException.class)
                .retryLimit(2)
                .listener(customRetryListener)

                .build();
    }

    @Bean
    public ItemReader<Integer> listItemReader() {
        List<Integer> list = Arrays.asList(1,2,3,4);
//        List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
        return new LinkedListItemReader<>(list);
    }
}

CustomRetryListener.java

 

[참고자료]

인프런-스프링 배치 - Spring Boot 기반으로 개발하는 Spring Batch

반응형