09. Flow
FlowJob
Step을 순차적으로만 구성하는 것이 아닌 특정한 상태에 따라 흐름을 전환하도록 구성할 수 있으며 FlowJobBuilder에 의해 생성됩니다. Flow와 Job의 흐름을 구성하는데만 관여하고 실제 비즈니스 로직은 Step에서 이루어집니다. 내부적으로 SimpleFlow 객체를 포함하고 있으며 Job 실행 시 호출됩니다. 아래와 같은 케이스에 사용합니다.
- Step이 실패하더라도 Job은 실패로 끝나지 않도록 해야 하는 경우
- Step이 성공했을 때 다음에 실행해야 할 Step을 구분해서 실행 해야 하는 경우
- 특정 Step은 전혀 실행되지 않게 구성 해야 하는 경우
API
// JobBuilderFactory > JobBuilder > JobFlowBuilder > FlowBuilder > FlowJob
public Job batchJob() {
return jobBuilderFactory.get("batchJob")
// 처음 실행 할 Step or Flow 설정
// Step: SimpleJobBuilder 반환
// Flow: JobFlowBuilder 반환
.start(Step or Flow or JobExecutionDecider)
// Step의 실행 결과로 돌려 받는 종료상태(ExitStatus)를 매칭하는 패턴, TransitionBuilder 반환
// 예) .on("COMPLETED").to(step())
.on(String pattern)
// 다음으로 실행할 단계 지정
.to(Step or Flow or JobExecutionDecider)
// Flow를 중지 / 실패/ 종료하도록 Flow 종료
.stop() / fail() / end() / stopAndRestart(Step or Flow or JobExecutionDecider)
// 이전 단계에서 정의한 Transition을 새롭게 추가 정의함
.from(Step or Flow or JobExecutionDecider)
// 다음으로 이동할 단계 지정
.next(Step or Flow or JobExecutionDecider)
// build() 앞에 위치하면 FlowBuilder를 종료하고 SimpleFlow 객체 생성
.end()
// FlowJob 생성하고 flow 필드에 SimpleFlow 저장
.build();
}
- .start() / .from() / .next(): Flow(흐름을 정의하는 역할)
- .on() / .to() / .stop() / .fail() / .end() / .stopAndRestart(): Transition(조건에 따라 흐름을 전환시키는 역할)
Transition
Flow 내 Step의 조건부 전환(전이)를 정의합니다. Job의 API 설정에서 on(String pattern) 메소드를 호출하면 TransitionBuilder가 반환되어 Transition Flow를 구성할 수 있습니다. Step의 종료상태(ExitStatus)가 어떤 pattern과도 매칭되지 않으면 스프링 배치에서 예외를 발생하고 Job은 실패합니다. transition은 구체적인 것부터 그렇지 않은 순서로 적용됩니다. (구체적인 것: FAILED, 구체적이지 않은 것: *)
.on(String pattern)
Step의 실행 결과로 돌려받는 종료상태(ExitStatus)와 매칭하는 패턴 스키마로 BatchStatus와 매칭하는 것이 아닙니다. pattern과 ExitStatus와 매칭이 되면 다음으로 실행할 Step을 지정할 수 있습니다.
특수문자는 "*"와 "?" 두 가지만 허용합니다.
- "*": 0개 이상의 문자와 매칭, 모든 ExitStatus와 매칭됩니다.
- "?": 정확히 1개의 문자와 매칭됩니다.
- 예) "c*t"는 "cat"과 "count"에 매칭되고, "c?t"는 "cat"에는 매칭되지만 "count"에는 매칭되지 않습니다.
.stop()
- FlowExecutionStatus가 STOPPED 상태로 종료되는 transition
- Job의 BatchStatus와 ExitStatus가 STOPPED으로 종료됨
.fail()
- FlowExecutionStatus가 FAILED 상태로 종료되는 transition
- Job의 BatchStatus와 ExitStatus가 FAILED으로 종료됨
.end()
- FlowExecutionStatus가 COMPLETED 상태로 종료되는 transition
- Job의 BatchStatus와 ExitStatus가 COMPLETED으로 종료됨
- Step의 ExitStatus가 FAILED 이더라도 Job의 BatchStatus가 COMPLETED로 종료하도록 가능하며 이 때 Job의 재시작은 불가능함
.stopAndRestart(Step or Flow or JobExecutionDecider)
- stop() transition과 기본 흐름은 동일
- 특정 step에서 작업을 중단하도록 설정하면 중단 이전의 step만 COMPLETED 저장되고 이후의 step은 실행되지 않고 STOPPED 상태로 Job 종료
- Job이 다시 실행됐을 때 실행해야 할 step을 restart 인자로 넘기면 이전에 COMPLETED로 저장된 step은 건너뛰고 중단 이후 step부터 시작
아래는 step1()이 실패 시 step2()를 실행하고, step1()이 실패가 아닌 경우 step5()를 실행하는 예제 코드입니다.
- step1() 실패 > step2() 실행 > stop() ("*": step2()의 모든 ExitStatus와 매칭)
- step1() 실패 외(성공 등) > step5() 실행 > step6() 실행 > step6() 성공인 경우 종료
package io.springbatch.springbatchlecture;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.DefaultJobParametersValidator;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.listener.StepExecutionListenerSupport;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.job.DefaultJobParametersExtractor;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.*;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;
@RequiredArgsConstructor
@Configuration
public class TransitionConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job batchJob() {
return this.jobBuilderFactory.get("batchJob")
.start(step1())
.on("FAILED")
.to(step2())
.on("*")
.stop()
.from(step1())
.on("*")
.to(step5())
.next(step6())
.on("COMPLETED")
.end()
.end()
.build();
}
@Bean
public Flow flow() {
FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("flow");
flowBuilder
.start(step3())
.next(step4())
.end();
return flowBuilder.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet((contribution, chunkContext) -> {
System.out.println(">> step1 has executed");
// contribution.setExitStatus(ExitStatus.FAILED);
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.flow(flow())
.build();
}
@Bean
public Step step3() {
return stepBuilderFactory.get("step3")
.tasklet((contribution, chunkContext) -> {
System.out.println(">> step3 has executed");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Step step4() {
return stepBuilderFactory.get("step4")
.tasklet((contribution, chunkContext) -> {
System.out.println(">> step4 has executed");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Step step5() {
return stepBuilderFactory.get("step5")
.tasklet((contribution, chunkContext) -> {
System.out.println(">> step5 has executed");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Step step6() {
return stepBuilderFactory.get("step6")
.tasklet((contribution, chunkContext) -> {
System.out.println(">> step6 has executed");
return RepeatStatus.FINISHED;
})
.build();
}
}
JobExecutionDecider
ExitStatus를 조작하거나 StepExecutionListener를 등록할 필요 없이 Transition 처리를 위한 전용 클래스로 Step과 Transition 역할을 명확히 분리해서 설정할 수 있습니다. Step의 ExitStatus가 아닌 JobExecutionDecider의 FlowExecutionStatus 상태 값을 새롭게 설정해서 반환합니다.
아래는 decider() 호출 후 반환되는 FlowExecutionStatus 값에 따라 oddStep() or evenStep()을 수행하는 예제 코드입니다.
package io.springbatch.springbatchlecture;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@RequiredArgsConstructor
@Configuration
public class JobExecutionDeciderConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job job() {
return jobBuilderFactory.get("batchJob")
.start(startStep())
.next(decider())
.from(decider()).on("ODD").to(oddStep())
.from(decider()).on("EVEN").to(evenStep())
.end()
.build();
}
@Bean
public Step startStep() {
return stepBuilderFactory.get("startStep")
.tasklet((contribution, chunkContext) -> {
System.out.println("This is the start tasklet");
return RepeatStatus.FINISHED;
}).build();
}
@Bean
public Step evenStep() {
return stepBuilderFactory.get("evenStep")
.tasklet((contribution, chunkContext) -> {
System.out.println(">>EvenStep has executed");
return RepeatStatus.FINISHED;
}).build();
}
@Bean
public Step oddStep() {
return stepBuilderFactory.get("oddStep")
.tasklet((contribution, chunkContext) -> {
System.out.println(">>OddStep has executed");
return RepeatStatus.FINISHED;
}).build();
}
@Bean
public JobExecutionDecider decider() {
return new CustomDecider();
}
public static class CustomDecider implements JobExecutionDecider {
private int count = 0;
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
count++;
if(count % 2 == 0) {
return new FlowExecutionStatus("EVEN");
}
else {
return new FlowExecutionStatus("ODD");
}
}
}
}
SimpleFlow
스프링 배치에서 제공하는 Flow의 구현체로서 각 요소(Step, Flow, JobExecutionDecider)들을 담고 있는 State를 실행시키는 도메인 객체입니다. FlowBuilder를 사용해서 생성하며 Transition과 조합하여 여러 개의 Flow 및 중첩 Flow를 만들어 Job을 구성할 수 있습니다.
API
// JobBuilderFactory > FlowJobBuilder > FlowBuilder > SimpleFlow
public Job batchJob() {
return jobBuilderFactory.get("flowJob")
.start(flow()) // Flow 정의 (SimpleFlow)
.on("COMPLETED").to(nextFlow()) // Transition과 함께 Flow 정의 (SimpleFlow)
.end() // SimpleFlow 객체 생성 (flow, nextFlow를 포함하는 Flow
.build(); // FlowJob 객체 생성
}
@Bean
public Flow flow() { // SimpleFlow 반환
FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("flow");
// Flow 안에 Step을 구성하거나 Flow를 중첩되게 구성할 수 있다.
// Flow를 외부에서 구성하면 재사용이 가능하다.
return flowBuilder.start(step())
.next(nextStep())
.end();
}
FlowStep
Step 내에 Flow를 할당하여 실행시키는 도메인 객체입니다. FlowStep의 BatchStatus와 ExitStatus는 Flow의 최종 상태 값에 따라 결정됩니다.
API
// StepBuilderFactory > StepBuilder > FlowStepBuilder > FlowStep
public Step flowStep() {
return stepBuilderFactory.get("flowStep")
// Step 내에서 실행 될 flow 설정, FlowStepBuilder 반환
.flow(flow())
// FlowStep 객체 생성
.build();
}
[참고자료]