ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 09. Flow
    BackEnd/Spring Batch 2021. 12. 23. 23:45
    반응형

    Architecture

     

    FlowJob

      Step을 순차적으로만 구성하는 것이 아닌 특정한 상태에 따라 흐름을 전환하도록 구성할 수 있으며 FlowJobBuilder에 의해 생성됩니다. Flow와 Job의 흐름을 구성하는데만 관여하고 실제 비즈니스 로직은 Step에서 이루어집니다. 내부적으로 SimpleFlow 객체를 포함하고 있으며 Job 실행 시 호출됩니다. 아래와 같은 케이스에 사용합니다.

    • Step이 실패하더라도 Job은 실패로 끝나지 않도록 해야 하는 경우
    • Step이 성공했을 때 다음에 실행해야 할 Step을 구분해서 실행 해야 하는 경우
    • 특정 Step은 전혀 실행되지 않게 구성 해야 하는 경우

    SimpleJob vs FlowJob

     

    API

      // JobBuilderFactory > JobBuilder > JobFlowBuilder > FlowBuilder > FlowJob
      public Job batchJob() {
        return jobBuilderFactory.get("batchJob")
            // 처음 실행 할 Step or Flow 설정
            // Step: SimpleJobBuilder 반환
            // Flow: JobFlowBuilder 반환
            .start(Step or Flow or JobExecutionDecider)
            // Step의 실행 결과로 돌려 받는 종료상태(ExitStatus)를 매칭하는 패턴, TransitionBuilder 반환
            // 예) .on("COMPLETED").to(step())
            .on(String pattern)
            // 다음으로 실행할 단계 지정
            .to(Step or Flow or JobExecutionDecider)
            // Flow를 중지 / 실패/ 종료하도록 Flow 종료
            .stop() / fail() / end() / stopAndRestart(Step or Flow or JobExecutionDecider)
            // 이전 단계에서 정의한 Transition을 새롭게 추가 정의함
            .from(Step or Flow or JobExecutionDecider)
            // 다음으로 이동할 단계 지정
            .next(Step or Flow or JobExecutionDecider)
            // build() 앞에 위치하면 FlowBuilder를 종료하고 SimpleFlow 객체 생성
            .end()
            // FlowJob 생성하고 flow 필드에 SimpleFlow 저장
            .build();
      }
    •   .start() / .from() / .next(): Flow(흐름을 정의하는 역할)
    •   .on() / .to() / .stop() / .fail() / .end() / .stopAndRestart(): Transition(조건에 따라 흐름을 전환시키는 역할)

     

    Transition

      Flow 내 Step의 조건부 전환(전이)를 정의합니다. Job의 API 설정에서 on(String pattern) 메소드를 호출하면 TransitionBuilder가 반환되어 Transition Flow를 구성할 수 있습니다. Step의 종료상태(ExitStatus)가 어떤 pattern과도 매칭되지 않으면 스프링 배치에서 예외를 발생하고 Job은 실패합니다. transition은 구체적인 것부터 그렇지 않은 순서로 적용됩니다. (구체적인 것: FAILED, 구체적이지 않은 것: *)

     

    .on(String pattern)

      Step의 실행 결과로 돌려받는 종료상태(ExitStatus)와 매칭하는 패턴 스키마로 BatchStatus와 매칭하는 것이 아닙니다. pattern과 ExitStatus와 매칭이 되면 다음으로 실행할 Step을 지정할 수 있습니다.

      특수문자는 "*"와 "?" 두 가지만 허용합니다.

    •   "*": 0개 이상의 문자와 매칭, 모든 ExitStatus와 매칭됩니다.
    •   "?": 정확히 1개의 문자와 매칭됩니다.
    •   예) "c*t"는 "cat"과 "count"에 매칭되고, "c?t"는 "cat"에는 매칭되지만 "count"에는 매칭되지 않습니다.

     

      .stop()

    •   FlowExecutionStatus가 STOPPED 상태로 종료되는 transition
    •   Job의 BatchStatus와 ExitStatus가 STOPPED으로 종료됨

     

      .fail()

    •   FlowExecutionStatus가 FAILED 상태로 종료되는 transition
    •   Job의 BatchStatus와 ExitStatus가 FAILED으로 종료됨

     

      .end()

    •   FlowExecutionStatus가 COMPLETED 상태로 종료되는 transition
    •   Job의 BatchStatus와 ExitStatus가 COMPLETED으로 종료됨
    •   Step의 ExitStatus가 FAILED 이더라도 Job의 BatchStatus가 COMPLETED로 종료하도록 가능하며 이 때 Job의 재시작은 불가능함

     

      .stopAndRestart(Step or Flow or JobExecutionDecider)

    •   stop() transition과 기본 흐름은 동일
    •   특정 step에서 작업을 중단하도록 설정하면 중단 이전의 step만 COMPLETED 저장되고 이후의 step은 실행되지 않고 STOPPED 상태로 Job 종료
    •   Job이 다시 실행됐을 때 실행해야 할 step을 restart 인자로 넘기면 이전에 COMPLETED로 저장된 step은 건너뛰고 중단 이후 step부터 시작

      아래는 step1()이 실패 시 step2()를 실행하고, step1()이 실패가 아닌 경우 step5()를 실행하는 예제 코드입니다.

    •   step1() 실패 > step2() 실행 > stop() ("*": step2()의 모든 ExitStatus와 매칭)
    •   step1() 실패 외(성공 등) > step5() 실행 > step6() 실행 > step6() 성공인 경우 종료
    package io.springbatch.springbatchlecture;
    
    import lombok.RequiredArgsConstructor;
    import org.springframework.batch.core.*;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.job.DefaultJobParametersValidator;
    import org.springframework.batch.core.job.builder.FlowBuilder;
    import org.springframework.batch.core.job.flow.Flow;
    import org.springframework.batch.core.launch.JobLauncher;
    import org.springframework.batch.core.launch.support.RunIdIncrementer;
    import org.springframework.batch.core.listener.StepExecutionListenerSupport;
    import org.springframework.batch.core.partition.support.Partitioner;
    import org.springframework.batch.core.scope.context.ChunkContext;
    import org.springframework.batch.core.step.job.DefaultJobParametersExtractor;
    import org.springframework.batch.core.step.tasklet.Tasklet;
    import org.springframework.batch.item.*;
    import org.springframework.batch.item.support.ListItemReader;
    import org.springframework.batch.repeat.RepeatStatus;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.Arrays;
    import java.util.Map;
    import java.util.stream.Collectors;
    
    @RequiredArgsConstructor
    @Configuration
    public class TransitionConfiguration {
    
        private final JobBuilderFactory jobBuilderFactory;
        private final StepBuilderFactory stepBuilderFactory;
    
        @Bean
        public Job batchJob() {
            return this.jobBuilderFactory.get("batchJob")
                    .start(step1())
                      .on("FAILED")
                      .to(step2())
                      .on("*")
                      .stop()
                    .from(step1())
                      .on("*")
                      .to(step5())
                      .next(step6())
                      .on("COMPLETED")
                      .end()
                    .end()
                    .build();
        }
    
        @Bean
        public Flow flow() {
            FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("flow");
            flowBuilder
                    .start(step3())
                    .next(step4())
                    .end();
            return flowBuilder.build();
        }
    
        @Bean
        public Step step1() {
            return stepBuilderFactory.get("step1")
                    .tasklet((contribution, chunkContext) -> {
                        System.out.println(">> step1 has executed");
    //                    contribution.setExitStatus(ExitStatus.FAILED);
                        return RepeatStatus.FINISHED;
                    })
                    .build();
        }
        @Bean
        public Step step2() {
            return stepBuilderFactory.get("step2")
                    .flow(flow())
                    .build();
        }
        @Bean
        public Step step3() {
            return stepBuilderFactory.get("step3")
                    .tasklet((contribution, chunkContext) -> {
                        System.out.println(">> step3 has executed");
                        return RepeatStatus.FINISHED;
                    })
                    .build();
        }
    
        @Bean
        public Step step4() {
            return stepBuilderFactory.get("step4")
                    .tasklet((contribution, chunkContext) -> {
                        System.out.println(">> step4 has executed");
                        return RepeatStatus.FINISHED;
                    })
                    .build();
        }
    
        @Bean
        public Step step5() {
            return stepBuilderFactory.get("step5")
                    .tasklet((contribution, chunkContext) -> {
                        System.out.println(">> step5 has executed");
                        return RepeatStatus.FINISHED;
                    })
                    .build();
        }
    
        @Bean
        public Step step6() {
            return stepBuilderFactory.get("step6")
                    .tasklet((contribution, chunkContext) -> {
                        System.out.println(">> step6 has executed");
                        return RepeatStatus.FINISHED;
                    })
                    .build();
        }
    }

     

    JobExecutionDecider

      ExitStatus를 조작하거나 StepExecutionListener를 등록할 필요 없이 Transition 처리를 위한 전용 클래스로 Step과 Transition 역할을 명확히 분리해서 설정할 수 있습니다. Step의 ExitStatus가 아닌 JobExecutionDecider의 FlowExecutionStatus 상태 값을 새롭게 설정해서 반환합니다.

      아래는 decider() 호출 후 반환되는 FlowExecutionStatus 값에 따라 oddStep() or evenStep()을 수행하는 예제 코드입니다.

    package io.springbatch.springbatchlecture;
    
    import lombok.RequiredArgsConstructor;
    import org.springframework.batch.core.*;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.job.flow.FlowExecutionStatus;
    import org.springframework.batch.core.job.flow.JobExecutionDecider;
    import org.springframework.batch.repeat.RepeatStatus;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @RequiredArgsConstructor
    @Configuration
    public class JobExecutionDeciderConfiguration {
    
        private final JobBuilderFactory jobBuilderFactory;
        private final StepBuilderFactory stepBuilderFactory;
    
        @Bean
        public Job job() {
            return jobBuilderFactory.get("batchJob")
                    .start(startStep())
                    .next(decider())
                    .from(decider()).on("ODD").to(oddStep())
                    .from(decider()).on("EVEN").to(evenStep())
                    .end()
                    .build();
        }
        @Bean
        public Step startStep() {
            return stepBuilderFactory.get("startStep")
                    .tasklet((contribution, chunkContext) -> {
                        System.out.println("This is the start tasklet");
                        return RepeatStatus.FINISHED;
                    }).build();
        }
    
        @Bean
        public Step evenStep() {
            return stepBuilderFactory.get("evenStep")
                    .tasklet((contribution, chunkContext) -> {
                        System.out.println(">>EvenStep has executed");
                        return RepeatStatus.FINISHED;
                    }).build();
        }
    
        @Bean
        public Step oddStep() {
            return stepBuilderFactory.get("oddStep")
                    .tasklet((contribution, chunkContext) -> {
                        System.out.println(">>OddStep has executed");
                        return RepeatStatus.FINISHED;
                    }).build();
        }
    
        @Bean
        public JobExecutionDecider decider() {
            return new CustomDecider();
        }
    
        public static class CustomDecider implements JobExecutionDecider {
    
            private int count = 0;
    
            @Override
            public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
                count++;
    
                if(count % 2 == 0) {
                    return new FlowExecutionStatus("EVEN");
                }
                else {
                    return new FlowExecutionStatus("ODD");
                }
            }
        }
    }

     

    SimpleFlow

      스프링 배치에서 제공하는 Flow의 구현체로서 각 요소(Step, Flow, JobExecutionDecider)들을 담고 있는 State를 실행시키는 도메인 객체입니다. FlowBuilder를 사용해서 생성하며 Transition과 조합하여 여러 개의 Flow 및 중첩 Flow를 만들어 Job을 구성할 수 있습니다.



     

    API

      // JobBuilderFactory > FlowJobBuilder > FlowBuilder > SimpleFlow
      public Job batchJob() {
        return jobBuilderFactory.get("flowJob")
            .start(flow()) // Flow 정의 (SimpleFlow)
            .on("COMPLETED").to(nextFlow()) // Transition과 함께 Flow 정의 (SimpleFlow)
            .end() // SimpleFlow 객체 생성 (flow, nextFlow를 포함하는 Flow
            .build(); // FlowJob 객체 생성
      }
      
      @Bean
      public Flow flow() { // SimpleFlow 반환
        FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("flow");
        
        // Flow 안에 Step을 구성하거나 Flow를 중첩되게 구성할 수 있다.
        // Flow를 외부에서 구성하면 재사용이 가능하다.
        return flowBuilder.start(step())
            .next(nextStep())
            .end();
      }

     

    FlowStep

      Step 내에 Flow를 할당하여 실행시키는 도메인 객체입니다. FlowStep의 BatchStatus와 ExitStatus는 Flow의 최종 상태 값에 따라 결정됩니다.

     

    API

      // StepBuilderFactory > StepBuilder > FlowStepBuilder > FlowStep
      public Step flowStep() {
        return stepBuilderFactory.get("flowStep")
            // Step 내에서 실행 될 flow 설정, FlowStepBuilder 반환
            .flow(flow())
            // FlowStep 객체 생성
            .build();
      }

     

    [참고자료]

    인프런-스프링 배치 - Spring Boot 기반으로 개발하는 Spring Batch

    반응형

    'BackEnd > Spring Batch' 카테고리의 다른 글

    11. 스프링 배치 청크 프로세스 (Chunk)  (0) 2021.12.24
    10. Scope  (0) 2021.12.24
    08. 배치 상태 유형  (0) 2021.12.22
    07. Step  (0) 2021.12.21
    06. Job  (0) 2021.12.16

    댓글

Designed by Tistory.