hanseom 2021. 12. 23. 23:45
반응형

Architecture

 

FlowJob

  Step을 순차적으로만 구성하는 것이 아닌 특정한 상태에 따라 흐름을 전환하도록 구성할 수 있으며 FlowJobBuilder에 의해 생성됩니다. Flow와 Job의 흐름을 구성하는데만 관여하고 실제 비즈니스 로직은 Step에서 이루어집니다. 내부적으로 SimpleFlow 객체를 포함하고 있으며 Job 실행 시 호출됩니다. 아래와 같은 케이스에 사용합니다.

  • Step이 실패하더라도 Job은 실패로 끝나지 않도록 해야 하는 경우
  • Step이 성공했을 때 다음에 실행해야 할 Step을 구분해서 실행 해야 하는 경우
  • 특정 Step은 전혀 실행되지 않게 구성 해야 하는 경우

SimpleJob vs FlowJob

 

API

  // JobBuilderFactory > JobBuilder > JobFlowBuilder > FlowBuilder > FlowJob
  public Job batchJob() {
    return jobBuilderFactory.get("batchJob")
        // 처음 실행 할 Step or Flow 설정
        // Step: SimpleJobBuilder 반환
        // Flow: JobFlowBuilder 반환
        .start(Step or Flow or JobExecutionDecider)
        // Step의 실행 결과로 돌려 받는 종료상태(ExitStatus)를 매칭하는 패턴, TransitionBuilder 반환
        // 예) .on("COMPLETED").to(step())
        .on(String pattern)
        // 다음으로 실행할 단계 지정
        .to(Step or Flow or JobExecutionDecider)
        // Flow를 중지 / 실패/ 종료하도록 Flow 종료
        .stop() / fail() / end() / stopAndRestart(Step or Flow or JobExecutionDecider)
        // 이전 단계에서 정의한 Transition을 새롭게 추가 정의함
        .from(Step or Flow or JobExecutionDecider)
        // 다음으로 이동할 단계 지정
        .next(Step or Flow or JobExecutionDecider)
        // build() 앞에 위치하면 FlowBuilder를 종료하고 SimpleFlow 객체 생성
        .end()
        // FlowJob 생성하고 flow 필드에 SimpleFlow 저장
        .build();
  }
  •   .start() / .from() / .next(): Flow(흐름을 정의하는 역할)
  •   .on() / .to() / .stop() / .fail() / .end() / .stopAndRestart(): Transition(조건에 따라 흐름을 전환시키는 역할)

 

Transition

  Flow 내 Step의 조건부 전환(전이)를 정의합니다. Job의 API 설정에서 on(String pattern) 메소드를 호출하면 TransitionBuilder가 반환되어 Transition Flow를 구성할 수 있습니다. Step의 종료상태(ExitStatus)가 어떤 pattern과도 매칭되지 않으면 스프링 배치에서 예외를 발생하고 Job은 실패합니다. transition은 구체적인 것부터 그렇지 않은 순서로 적용됩니다. (구체적인 것: FAILED, 구체적이지 않은 것: *)

 

.on(String pattern)

  Step의 실행 결과로 돌려받는 종료상태(ExitStatus)와 매칭하는 패턴 스키마로 BatchStatus와 매칭하는 것이 아닙니다. pattern과 ExitStatus와 매칭이 되면 다음으로 실행할 Step을 지정할 수 있습니다.

  특수문자는 "*"와 "?" 두 가지만 허용합니다.

  •   "*": 0개 이상의 문자와 매칭, 모든 ExitStatus와 매칭됩니다.
  •   "?": 정확히 1개의 문자와 매칭됩니다.
  •   예) "c*t"는 "cat"과 "count"에 매칭되고, "c?t"는 "cat"에는 매칭되지만 "count"에는 매칭되지 않습니다.

 

  .stop()

  •   FlowExecutionStatus가 STOPPED 상태로 종료되는 transition
  •   Job의 BatchStatus와 ExitStatus가 STOPPED으로 종료됨

 

  .fail()

  •   FlowExecutionStatus가 FAILED 상태로 종료되는 transition
  •   Job의 BatchStatus와 ExitStatus가 FAILED으로 종료됨

 

  .end()

  •   FlowExecutionStatus가 COMPLETED 상태로 종료되는 transition
  •   Job의 BatchStatus와 ExitStatus가 COMPLETED으로 종료됨
  •   Step의 ExitStatus가 FAILED 이더라도 Job의 BatchStatus가 COMPLETED로 종료하도록 가능하며 이 때 Job의 재시작은 불가능함

 

  .stopAndRestart(Step or Flow or JobExecutionDecider)

  •   stop() transition과 기본 흐름은 동일
  •   특정 step에서 작업을 중단하도록 설정하면 중단 이전의 step만 COMPLETED 저장되고 이후의 step은 실행되지 않고 STOPPED 상태로 Job 종료
  •   Job이 다시 실행됐을 때 실행해야 할 step을 restart 인자로 넘기면 이전에 COMPLETED로 저장된 step은 건너뛰고 중단 이후 step부터 시작

  아래는 step1()이 실패 시 step2()를 실행하고, step1()이 실패가 아닌 경우 step5()를 실행하는 예제 코드입니다.

  •   step1() 실패 > step2() 실행 > stop() ("*": step2()의 모든 ExitStatus와 매칭)
  •   step1() 실패 외(성공 등) > step5() 실행 > step6() 실행 > step6() 성공인 경우 종료
package io.springbatch.springbatchlecture;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.DefaultJobParametersValidator;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.listener.StepExecutionListenerSupport;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.job.DefaultJobParametersExtractor;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.*;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

@RequiredArgsConstructor
@Configuration
public class TransitionConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job batchJob() {
        return this.jobBuilderFactory.get("batchJob")
                .start(step1())
                  .on("FAILED")
                  .to(step2())
                  .on("*")
                  .stop()
                .from(step1())
                  .on("*")
                  .to(step5())
                  .next(step6())
                  .on("COMPLETED")
                  .end()
                .end()
                .build();
    }

    @Bean
    public Flow flow() {
        FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("flow");
        flowBuilder
                .start(step3())
                .next(step4())
                .end();
        return flowBuilder.build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println(">> step1 has executed");
//                    contribution.setExitStatus(ExitStatus.FAILED);
                    return RepeatStatus.FINISHED;
                })
                .build();
    }
    @Bean
    public Step step2() {
        return stepBuilderFactory.get("step2")
                .flow(flow())
                .build();
    }
    @Bean
    public Step step3() {
        return stepBuilderFactory.get("step3")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println(">> step3 has executed");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public Step step4() {
        return stepBuilderFactory.get("step4")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println(">> step4 has executed");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public Step step5() {
        return stepBuilderFactory.get("step5")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println(">> step5 has executed");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public Step step6() {
        return stepBuilderFactory.get("step6")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println(">> step6 has executed");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }
}

 

JobExecutionDecider

  ExitStatus를 조작하거나 StepExecutionListener를 등록할 필요 없이 Transition 처리를 위한 전용 클래스로 Step과 Transition 역할을 명확히 분리해서 설정할 수 있습니다. Step의 ExitStatus가 아닌 JobExecutionDecider의 FlowExecutionStatus 상태 값을 새롭게 설정해서 반환합니다.

  아래는 decider() 호출 후 반환되는 FlowExecutionStatus 값에 따라 oddStep() or evenStep()을 수행하는 예제 코드입니다.

package io.springbatch.springbatchlecture;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@RequiredArgsConstructor
@Configuration
public class JobExecutionDeciderConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job job() {
        return jobBuilderFactory.get("batchJob")
                .start(startStep())
                .next(decider())
                .from(decider()).on("ODD").to(oddStep())
                .from(decider()).on("EVEN").to(evenStep())
                .end()
                .build();
    }
    @Bean
    public Step startStep() {
        return stepBuilderFactory.get("startStep")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("This is the start tasklet");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    @Bean
    public Step evenStep() {
        return stepBuilderFactory.get("evenStep")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println(">>EvenStep has executed");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    @Bean
    public Step oddStep() {
        return stepBuilderFactory.get("oddStep")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println(">>OddStep has executed");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    @Bean
    public JobExecutionDecider decider() {
        return new CustomDecider();
    }

    public static class CustomDecider implements JobExecutionDecider {

        private int count = 0;

        @Override
        public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
            count++;

            if(count % 2 == 0) {
                return new FlowExecutionStatus("EVEN");
            }
            else {
                return new FlowExecutionStatus("ODD");
            }
        }
    }
}

 

SimpleFlow

  스프링 배치에서 제공하는 Flow의 구현체로서 각 요소(Step, Flow, JobExecutionDecider)들을 담고 있는 State를 실행시키는 도메인 객체입니다. FlowBuilder를 사용해서 생성하며 Transition과 조합하여 여러 개의 Flow 및 중첩 Flow를 만들어 Job을 구성할 수 있습니다.



 

API

  // JobBuilderFactory > FlowJobBuilder > FlowBuilder > SimpleFlow
  public Job batchJob() {
    return jobBuilderFactory.get("flowJob")
        .start(flow()) // Flow 정의 (SimpleFlow)
        .on("COMPLETED").to(nextFlow()) // Transition과 함께 Flow 정의 (SimpleFlow)
        .end() // SimpleFlow 객체 생성 (flow, nextFlow를 포함하는 Flow
        .build(); // FlowJob 객체 생성
  }
  
  @Bean
  public Flow flow() { // SimpleFlow 반환
    FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("flow");
    
    // Flow 안에 Step을 구성하거나 Flow를 중첩되게 구성할 수 있다.
    // Flow를 외부에서 구성하면 재사용이 가능하다.
    return flowBuilder.start(step())
        .next(nextStep())
        .end();
  }

 

FlowStep

  Step 내에 Flow를 할당하여 실행시키는 도메인 객체입니다. FlowStep의 BatchStatus와 ExitStatus는 Flow의 최종 상태 값에 따라 결정됩니다.

 

API

  // StepBuilderFactory > StepBuilder > FlowStepBuilder > FlowStep
  public Step flowStep() {
    return stepBuilderFactory.get("flowStep")
        // Step 내에서 실행 될 flow 설정, FlowStepBuilder 반환
        .flow(flow())
        // FlowStep 객체 생성
        .build();
  }

 

[참고자료]

인프런-스프링 배치 - Spring Boot 기반으로 개발하는 Spring Batch

반응형