-
09. FlowBackEnd/Spring Batch 2021. 12. 23. 23:45반응형
FlowJob
Step을 순차적으로만 구성하는 것이 아닌 특정한 상태에 따라 흐름을 전환하도록 구성할 수 있으며 FlowJobBuilder에 의해 생성됩니다. Flow와 Job의 흐름을 구성하는데만 관여하고 실제 비즈니스 로직은 Step에서 이루어집니다. 내부적으로 SimpleFlow 객체를 포함하고 있으며 Job 실행 시 호출됩니다. 아래와 같은 케이스에 사용합니다.
- Step이 실패하더라도 Job은 실패로 끝나지 않도록 해야 하는 경우
- Step이 성공했을 때 다음에 실행해야 할 Step을 구분해서 실행 해야 하는 경우
- 특정 Step은 전혀 실행되지 않게 구성 해야 하는 경우
API
// JobBuilderFactory > JobBuilder > JobFlowBuilder > FlowBuilder > FlowJob public Job batchJob() { return jobBuilderFactory.get("batchJob") // 처음 실행 할 Step or Flow 설정 // Step: SimpleJobBuilder 반환 // Flow: JobFlowBuilder 반환 .start(Step or Flow or JobExecutionDecider) // Step의 실행 결과로 돌려 받는 종료상태(ExitStatus)를 매칭하는 패턴, TransitionBuilder 반환 // 예) .on("COMPLETED").to(step()) .on(String pattern) // 다음으로 실행할 단계 지정 .to(Step or Flow or JobExecutionDecider) // Flow를 중지 / 실패/ 종료하도록 Flow 종료 .stop() / fail() / end() / stopAndRestart(Step or Flow or JobExecutionDecider) // 이전 단계에서 정의한 Transition을 새롭게 추가 정의함 .from(Step or Flow or JobExecutionDecider) // 다음으로 이동할 단계 지정 .next(Step or Flow or JobExecutionDecider) // build() 앞에 위치하면 FlowBuilder를 종료하고 SimpleFlow 객체 생성 .end() // FlowJob 생성하고 flow 필드에 SimpleFlow 저장 .build(); }
- .start() / .from() / .next(): Flow(흐름을 정의하는 역할)
- .on() / .to() / .stop() / .fail() / .end() / .stopAndRestart(): Transition(조건에 따라 흐름을 전환시키는 역할)
Transition
Flow 내 Step의 조건부 전환(전이)를 정의합니다. Job의 API 설정에서 on(String pattern) 메소드를 호출하면 TransitionBuilder가 반환되어 Transition Flow를 구성할 수 있습니다. Step의 종료상태(ExitStatus)가 어떤 pattern과도 매칭되지 않으면 스프링 배치에서 예외를 발생하고 Job은 실패합니다. transition은 구체적인 것부터 그렇지 않은 순서로 적용됩니다. (구체적인 것: FAILED, 구체적이지 않은 것: *)
.on(String pattern)
Step의 실행 결과로 돌려받는 종료상태(ExitStatus)와 매칭하는 패턴 스키마로 BatchStatus와 매칭하는 것이 아닙니다. pattern과 ExitStatus와 매칭이 되면 다음으로 실행할 Step을 지정할 수 있습니다.
특수문자는 "*"와 "?" 두 가지만 허용합니다.
- "*": 0개 이상의 문자와 매칭, 모든 ExitStatus와 매칭됩니다.
- "?": 정확히 1개의 문자와 매칭됩니다.
- 예) "c*t"는 "cat"과 "count"에 매칭되고, "c?t"는 "cat"에는 매칭되지만 "count"에는 매칭되지 않습니다.
.stop()
- FlowExecutionStatus가 STOPPED 상태로 종료되는 transition
- Job의 BatchStatus와 ExitStatus가 STOPPED으로 종료됨
.fail()
- FlowExecutionStatus가 FAILED 상태로 종료되는 transition
- Job의 BatchStatus와 ExitStatus가 FAILED으로 종료됨
.end()
- FlowExecutionStatus가 COMPLETED 상태로 종료되는 transition
- Job의 BatchStatus와 ExitStatus가 COMPLETED으로 종료됨
- Step의 ExitStatus가 FAILED 이더라도 Job의 BatchStatus가 COMPLETED로 종료하도록 가능하며 이 때 Job의 재시작은 불가능함
.stopAndRestart(Step or Flow or JobExecutionDecider)
- stop() transition과 기본 흐름은 동일
- 특정 step에서 작업을 중단하도록 설정하면 중단 이전의 step만 COMPLETED 저장되고 이후의 step은 실행되지 않고 STOPPED 상태로 Job 종료
- Job이 다시 실행됐을 때 실행해야 할 step을 restart 인자로 넘기면 이전에 COMPLETED로 저장된 step은 건너뛰고 중단 이후 step부터 시작
아래는 step1()이 실패 시 step2()를 실행하고, step1()이 실패가 아닌 경우 step5()를 실행하는 예제 코드입니다.
- step1() 실패 > step2() 실행 > stop() ("*": step2()의 모든 ExitStatus와 매칭)
- step1() 실패 외(성공 등) > step5() 실행 > step6() 실행 > step6() 성공인 경우 종료
package io.springbatch.springbatchlecture; import lombok.RequiredArgsConstructor; import org.springframework.batch.core.*; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.job.DefaultJobParametersValidator; import org.springframework.batch.core.job.builder.FlowBuilder; import org.springframework.batch.core.job.flow.Flow; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.listener.StepExecutionListenerSupport; import org.springframework.batch.core.partition.support.Partitioner; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.step.job.DefaultJobParametersExtractor; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.item.*; import org.springframework.batch.item.support.ListItemReader; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.Arrays; import java.util.Map; import java.util.stream.Collectors; @RequiredArgsConstructor @Configuration public class TransitionConfiguration { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; @Bean public Job batchJob() { return this.jobBuilderFactory.get("batchJob") .start(step1()) .on("FAILED") .to(step2()) .on("*") .stop() .from(step1()) .on("*") .to(step5()) .next(step6()) .on("COMPLETED") .end() .end() .build(); } @Bean public Flow flow() { FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("flow"); flowBuilder .start(step3()) .next(step4()) .end(); return flowBuilder.build(); } @Bean public Step step1() { return stepBuilderFactory.get("step1") .tasklet((contribution, chunkContext) -> { System.out.println(">> step1 has executed"); // contribution.setExitStatus(ExitStatus.FAILED); return RepeatStatus.FINISHED; }) .build(); } @Bean public Step step2() { return stepBuilderFactory.get("step2") .flow(flow()) .build(); } @Bean public Step step3() { return stepBuilderFactory.get("step3") .tasklet((contribution, chunkContext) -> { System.out.println(">> step3 has executed"); return RepeatStatus.FINISHED; }) .build(); } @Bean public Step step4() { return stepBuilderFactory.get("step4") .tasklet((contribution, chunkContext) -> { System.out.println(">> step4 has executed"); return RepeatStatus.FINISHED; }) .build(); } @Bean public Step step5() { return stepBuilderFactory.get("step5") .tasklet((contribution, chunkContext) -> { System.out.println(">> step5 has executed"); return RepeatStatus.FINISHED; }) .build(); } @Bean public Step step6() { return stepBuilderFactory.get("step6") .tasklet((contribution, chunkContext) -> { System.out.println(">> step6 has executed"); return RepeatStatus.FINISHED; }) .build(); } }
JobExecutionDecider
ExitStatus를 조작하거나 StepExecutionListener를 등록할 필요 없이 Transition 처리를 위한 전용 클래스로 Step과 Transition 역할을 명확히 분리해서 설정할 수 있습니다. Step의 ExitStatus가 아닌 JobExecutionDecider의 FlowExecutionStatus 상태 값을 새롭게 설정해서 반환합니다.
아래는 decider() 호출 후 반환되는 FlowExecutionStatus 값에 따라 oddStep() or evenStep()을 수행하는 예제 코드입니다.
package io.springbatch.springbatchlecture; import lombok.RequiredArgsConstructor; import org.springframework.batch.core.*; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.job.flow.FlowExecutionStatus; import org.springframework.batch.core.job.flow.JobExecutionDecider; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @RequiredArgsConstructor @Configuration public class JobExecutionDeciderConfiguration { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; @Bean public Job job() { return jobBuilderFactory.get("batchJob") .start(startStep()) .next(decider()) .from(decider()).on("ODD").to(oddStep()) .from(decider()).on("EVEN").to(evenStep()) .end() .build(); } @Bean public Step startStep() { return stepBuilderFactory.get("startStep") .tasklet((contribution, chunkContext) -> { System.out.println("This is the start tasklet"); return RepeatStatus.FINISHED; }).build(); } @Bean public Step evenStep() { return stepBuilderFactory.get("evenStep") .tasklet((contribution, chunkContext) -> { System.out.println(">>EvenStep has executed"); return RepeatStatus.FINISHED; }).build(); } @Bean public Step oddStep() { return stepBuilderFactory.get("oddStep") .tasklet((contribution, chunkContext) -> { System.out.println(">>OddStep has executed"); return RepeatStatus.FINISHED; }).build(); } @Bean public JobExecutionDecider decider() { return new CustomDecider(); } public static class CustomDecider implements JobExecutionDecider { private int count = 0; @Override public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) { count++; if(count % 2 == 0) { return new FlowExecutionStatus("EVEN"); } else { return new FlowExecutionStatus("ODD"); } } } }
SimpleFlow
스프링 배치에서 제공하는 Flow의 구현체로서 각 요소(Step, Flow, JobExecutionDecider)들을 담고 있는 State를 실행시키는 도메인 객체입니다. FlowBuilder를 사용해서 생성하며 Transition과 조합하여 여러 개의 Flow 및 중첩 Flow를 만들어 Job을 구성할 수 있습니다.
API
// JobBuilderFactory > FlowJobBuilder > FlowBuilder > SimpleFlow public Job batchJob() { return jobBuilderFactory.get("flowJob") .start(flow()) // Flow 정의 (SimpleFlow) .on("COMPLETED").to(nextFlow()) // Transition과 함께 Flow 정의 (SimpleFlow) .end() // SimpleFlow 객체 생성 (flow, nextFlow를 포함하는 Flow .build(); // FlowJob 객체 생성 } @Bean public Flow flow() { // SimpleFlow 반환 FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("flow"); // Flow 안에 Step을 구성하거나 Flow를 중첩되게 구성할 수 있다. // Flow를 외부에서 구성하면 재사용이 가능하다. return flowBuilder.start(step()) .next(nextStep()) .end(); }
FlowStep
Step 내에 Flow를 할당하여 실행시키는 도메인 객체입니다. FlowStep의 BatchStatus와 ExitStatus는 Flow의 최종 상태 값에 따라 결정됩니다.
API
// StepBuilderFactory > StepBuilder > FlowStepBuilder > FlowStep public Step flowStep() { return stepBuilderFactory.get("flowStep") // Step 내에서 실행 될 flow 설정, FlowStepBuilder 반환 .flow(flow()) // FlowStep 객체 생성 .build(); }
[참고자료]
반응형'BackEnd > Spring Batch' 카테고리의 다른 글
11. 스프링 배치 청크 프로세스 (Chunk) (0) 2021.12.24 10. Scope (0) 2021.12.24 08. 배치 상태 유형 (0) 2021.12.22 07. Step (0) 2021.12.21 06. Job (0) 2021.12.16