ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 29. 리스너 (Listener)
    BackEnd/Spring Batch 2022. 1. 13. 22:20
    반응형

      Listener는 배치 흐름 중에 Job, Step, Chunk 단계의 실행 전후에 발생하는 이벤트를 받아 용도에 맞게 활용할 수 있도록 제공하는 인터셉터 개념의 클래스입니다. 각 단계별로 로그를 기록하거나 소요된 시간을 계산하거나 실행상태 정보들을 참조 및 조회할 수 있습니다. 이벤트를 받기 위해서는 Listener를 등록해야 하며 등록은 API 설정에서 각 단계별로 지정할 수 있습니다.

     

    Listeners

    • JobExecutionListener: Job 실행 전후
    • StepExecutionListener: Step 실행 전후
    • ChunkListener: Chunk 실행 전후 (Tasklet 실행 전후), 오류 시점
    • ItemReadListener: ItemReader 실행 전후, 오류 시점, item이 null일 경우 호출 안됨
    • ItemProcessListener: ItemProcessor 실행 전후, 오류 시점, item이 null일 경우 호출 안됨
    • ItemWriteListener: ItemWriter 실행 전후, 오류 시점, item이 null일 경우 호출 안됨
    • SkipListener: 읽기, 쓰기, 처리 Skip 실행 시점, Item 처리가 Skip 될 경우 Skip 된 item을 추적함
    • RetryListener: Retry 시작, 종료, 에러 시점

     

    구현 방법

     

    JobExecutionListener

      Job의 성공여부와 상관없이 호출되며, 성공/실패 여부는 JobExecution을 통해 알 수 있습니다.

    • void beforeJob(JobExecution jobExecution) // Job 실행 전 호출
    • void afterJob(JobExecution jobExecution)    // Job 실행 후 호출
      public Job job() {
        return jobBuilderFactory.get("job")
            .start(step())
            .next(flow())
            .listener(JobExecutionListener)
            .listener(Object) // 어노테이션방식
            .build();
      }

     

    StepExecutionListener

      Step의 성공여부와 상관없이 호출되며, 성공/실패 여부는 StepExecution을 통해 알 수 있습니다.

    • void beforeStep(StepExecution stepExecution)        // Step 실행 전 호출
    • ExitStatus afterStep(StepExecution stepExecution) // Step 실행 후 호출, ExitStatus를 변경하면 최종 종료코드로 반영
      public Job job() {
        return stepBuilderFactory.get("step")
            .tasklet(tasklet())
            .listener(StepExecutionListener)
            .listener(Object) // 어노테이션방식
            .build();
      }
    package io.springbatch.springbatchlecture;
    
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.batch.core.ItemReadListener;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.StepContribution;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepScope;
    import org.springframework.batch.core.launch.support.RunIdIncrementer;
    import org.springframework.batch.core.scope.context.ChunkContext;
    import org.springframework.batch.core.step.tasklet.Tasklet;
    import org.springframework.batch.item.ItemProcessor;
    import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
    import org.springframework.batch.item.database.JdbcBatchItemWriter;
    import org.springframework.batch.item.database.JdbcCursorItemReader;
    import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
    import org.springframework.batch.item.support.SynchronizedItemStreamReader;
    import org.springframework.batch.item.support.builder.SynchronizedItemStreamReaderBuilder;
    import org.springframework.batch.repeat.RepeatStatus;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.task.TaskExecutor;
    import org.springframework.jdbc.core.BeanPropertyRowMapper;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    import javax.sql.DataSource;
    
    @RequiredArgsConstructor
    @Configuration
    @Slf4j
    public class JobAndStepListenerConfiguration {
    
        private final JobBuilderFactory jobBuilderFactory;
        private final StepBuilderFactory stepBuilderFactory;
        private final CustomStepListener customStepListener;
    
        @Bean
        public Job job() throws Exception {
            return jobBuilderFactory.get("batchJob")
                    .incrementer(new RunIdIncrementer())
                    .start(step1())
                    .next(step2())
                    .listener(new CustomJobListener())
                    .build();
        }
    
        @Bean
        public Step step1() {
            return stepBuilderFactory.get("step1")
                    .tasklet(new Tasklet() {
                        @Override
                        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
    //                        throw new RuntimeException("failed");
                            return RepeatStatus.FINISHED;
                        }
                    })
                    .listener(customStepListener)
                    .build();
        }
    
        @Bean
        public Step step2() {
            return stepBuilderFactory.get("step2")
                    .tasklet(new Tasklet() {
                        @Override
                        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                            return RepeatStatus.FINISHED;
                        }
                    })
                    .listener(customStepListener)
                    .build();
        }
    }

    CustomJobListener.java

    package io.springbatch.springbatchlecture;
    
    import org.springframework.batch.core.JobExecution;
    import org.springframework.batch.core.JobExecutionListener;
    
    public class CustomJobListener implements JobExecutionListener {
    
        @Override
        public void beforeJob(JobExecution JobExecution) {
            System.out.println("JobExecution.getJobName() : " + JobExecution.getJobInstance().getJobName());
        }
    
        @Override
        public void afterJob(JobExecution JobExecution) {
            System.out.println("JobExecution.getStatus() : " + JobExecution.getStatus());
        }
    }

    CustomStepListener.java

    package io.springbatch.springbatchlecture;
    
    import org.springframework.batch.core.ExitStatus;
    import org.springframework.batch.core.StepExecution;
    import org.springframework.batch.core.StepExecutionListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class CustomStepListener implements StepExecutionListener {
    
        @Override
        public void beforeStep(StepExecution stepExecution) {
            System.out.println("stepExecution.getStepName() : " + stepExecution.getStepName());
        }
    
        @Override
        public ExitStatus afterStep(StepExecution stepExecution) {
            System.out.println("stepExecution.getStatus() : " + stepExecution.getStatus());
            return ExitStatus.COMPLETED;
        }
    }

    [전체 소스코드]

     

    ChunkListener

    • void beforeChunk(ChunkContext context) // 트랜잭션 시작 전 호출 (ItemReader의 read() 메소드 호출 전)      
    • void afterChunk(ChunkContext context)    // Chunk 커밋 후 호출 (ItemWriter의 write() 메소드 호출한 후), 롤백 시 호출되지 않음
    • void afterChunkError(ChunkContext context) // 오류 발생 및 롤백 시 호출
      public Job job() {
        return stepBuilderFactory.get("step")
            .chunk(chunkSize)
            .listener(ChunkListener)
            .listener(Object) // 어노테이션방식
            .build();
      }

     

    ItemReadListener

    • void beforeRead()                         // read() 메소드 호출 전 매번 호출
    • void afterRead(T item)                 // read() 메소드 호출이 성공할 때마다 호출
    • void onReadError(Exception ex) // 읽는 도중 오류가 발생하면 호출
      public Job job() {
        return stepBuilderFactory.get("step")
            .chunk(chunkSize)
            .reader(ItemReader)
            .listener(ItemReadListener)
            .listener(Object) // 어노테이션방식
            .build();
      }

     

    ItemProcessListener

    • void beforeProcess(T item)                               // process() 메소드 호출 전 호출
    • void afterProcess(T item, @Nullable S result) // process() 메소드 호출 성공 시 호출
    • void onProcessError(T item, Exception e)        // 처리 도중 오류 발생 시 호출
      public Job job() {
        return stepBuilderFactory.get("step")
            .chunk(chunkSize)
            .reader(ItemReader)
            .processor(ItemProcessor)
            .listener(ItemProcessListener)
            .listener(Object) // 어노테이션방식
            .build();
      }

     

    ItemWriteListener

    • void beforeWrite(List<? extends S> items)                                        // write() 메소드 호출 전 호출
    • void afterWrite(List<? extends S> items)                                           // write() 메소드 호출 성공 시 호출
    • void onWriteError(Exception exception, List<? extends S> items) // 쓰기 도중 오류 발생 시 호출
      public Job job() {
        return stepBuilderFactory.get("step")
            .chunk(chunkSize)
            .reader(ItemReader)
            .writer(ItemWriter)
            .listener(ItemWriteListener)
            .listener(Object) // 어노테이션방식
            .build();
      }
    package io.springbatch.springbatchlecture;
    
    import lombok.RequiredArgsConstructor;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.launch.support.RunIdIncrementer;
    import org.springframework.batch.item.ItemProcessor;
    import org.springframework.batch.item.ItemReader;
    import org.springframework.batch.item.ItemWriter;
    import org.springframework.batch.item.support.ListItemReader;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.Arrays;
    import java.util.List;
    
    @RequiredArgsConstructor
    @Configuration
    public class ChunkListenerConfiguration {
    
        private final JobBuilderFactory jobBuilderFactory;
        private final StepBuilderFactory stepBuilderFactory;
        private final CustomChunkListener customChunkListener;
    
        @Bean
        public Job job() throws Exception {
            return jobBuilderFactory.get("batchJob")
                    .incrementer(new RunIdIncrementer())
                    .start(step1())
                    .build();
        }
    
        @Bean
        public Step step1() throws Exception {
            return stepBuilderFactory.get("step1")
                    .<Integer, String>chunk(10)
                    .listener(customChunkListener)
                    .listener(new CustomItemReadListener())
                    .listener(new CustomItemProcessListener())
                    .listener(new CustomItemWriteListener())
                    .reader(listItemReader())
                    .processor((ItemProcessor) item -> {
                        throw new RuntimeException("failed");
    //                    return "item" + item;
                    })
                    .writer((ItemWriter<String>) items -> {
                        throw new RuntimeException("failed");
                    })
                    .build();
        }
    
        @Bean
        public ItemReader<Integer> listItemReader() {
            List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
            return new ListItemReader<>(list);
        }
    }

    [전체 소스코드]

     

    SkipListener

    • void onSkipInRead(Throwable t)                   // read 수행 중 Skip이 발생할 경우 호출
    • void onSkipInWrite(S item, Throwable t)      // write 수행 중 Skip이 발생할 경우 호출
    • void onSkipInProcess(T item, Throwable t) // process 수행 중 Skip이 발생할 경우 호출

    package io.springbatch.springbatchlecture;
    
    import lombok.RequiredArgsConstructor;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.launch.support.RunIdIncrementer;
    import org.springframework.batch.item.*;
    import org.springframework.batch.item.support.ListItemReader;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.Arrays;
    import java.util.List;
    
    @RequiredArgsConstructor
    @Configuration
    public class SkipListenerConfiguration {
    
        private final JobBuilderFactory jobBuilderFactory;
        private final StepBuilderFactory stepBuilderFactory;
        private final CustomSkipListener customSkipListener;
    
        @Bean
        public Job job() throws Exception {
            return jobBuilderFactory.get("batchJob")
                    .incrementer(new RunIdIncrementer())
                    .start(step1())
                    .build();
        }
    
        @Bean
        public Step step1() throws Exception {
            return stepBuilderFactory.get("step1")
                    .<Integer, String>chunk(10)
                    .reader(listItemReader())
                    .processor(new ItemProcessor<Integer, String>() {
                        @Override
                        public String process(Integer item) throws Exception {
                            if (item == 4) {
                                throw new CustomSkipException("process skipped");
                            }
                            System.out.println("process : " + item);
                            return "item" + item;
                        }
                    })
                    .writer(new ItemWriter<String>() {
                        @Override
                        public void write(List<? extends String> items) throws Exception {
                            for (String item : items) {
                                if (item.equals("item5")) {
                                    throw new CustomSkipException("write skipped");
                                }
                                System.out.println("write : " + item);
                            }
                        }
                    })
                    .faultTolerant()
                    .skip(CustomSkipException.class)
                    .skipLimit(3)
                    .listener(customSkipListener)
                    .build();
        }
    
        @Bean
        public ItemReader<Integer> listItemReader() {
            List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
            return new LinkedListItemReader<>(list);
        }
    }

    CustomSkipListener.java

     

    RetryListener

    • boolean open(RetryContext context, RetryCallback<T, E> callback) // 재시도 전 매번 호출, false를 반환할 경우 retry 시도하지 않음
    • void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable)    // 재시도 후 매번 호출
    • void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) // 재시도 실패 시마다 호출

    package io.springbatch.springbatchlecture;
    
    import lombok.RequiredArgsConstructor;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.launch.support.RunIdIncrementer;
    import org.springframework.batch.item.ItemReader;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.Arrays;
    import java.util.List;
    
    @RequiredArgsConstructor
    @Configuration
    public class RetryListenerConfiguration {
    
        private final JobBuilderFactory jobBuilderFactory;
        private final StepBuilderFactory stepBuilderFactory;
        private final CustomRetryListener customRetryListener;
    
        @Bean
        public Job job(){
            return jobBuilderFactory.get("batchJob")
                    .incrementer(new RunIdIncrementer())
                    .start(step1())
                    .build();
        }
    
        @Bean
        public Step step1(){
            return stepBuilderFactory.get("step1")
                    .<Integer, String>chunk(10)
                    .reader(listItemReader())
                    .processor(new CustomItemProcessor())
                    .writer(new CustomItemWriter())
                    .faultTolerant()
                    .retry(CustomRetryException.class)
                    .retryLimit(2)
                    .listener(customRetryListener)
    
                    .build();
        }
    
        @Bean
        public ItemReader<Integer> listItemReader() {
            List<Integer> list = Arrays.asList(1,2,3,4);
    //        List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
            return new LinkedListItemReader<>(list);
        }
    }

    CustomRetryListener.java

     

    [참고자료]

    인프런-스프링 배치 - Spring Boot 기반으로 개발하는 Spring Batch

    반응형

    'BackEnd > Spring Batch' 카테고리의 다른 글

    30. Spring Batch Test  (1) 2022.01.14
    28. 멀티 스레드 프로세싱 (Multi Thread Processing)  (0) 2022.01.08
    27. Skip & Retry Architect  (0) 2022.01.07
    26. Retry  (0) 2022.01.07
    25. Skip  (0) 2022.01.06

    댓글

Designed by Tistory.