-
Flux Sequence 분할을 위한 OperatorSpring Reactive Web Application/Project Reactor 2023. 8. 9. 05:00반응형
window
window(int maxSize) Operator는 Upstream에서 emit 되는 첫 번째 데이터부터 maxSize 숫자만큼의 데이터를 포함하는 새로운 Flux로 분할합니다. Reactor에서는 이렇게 분할된 Flux를 윈도우(Window)라고 합니다. 마지막 윈도우에 포함된 데이터의 개수는 maxSize보다 더 적거나 같습니다.
다음은 window() Operator를 이용해서 2021년도 분기별 도서 매출액을 구하는 예제 코드입니다. 원본 데이터 소스는 2021년도 월별 도서 매출액 데이터이며, 이 데이터를 window() Operator로 3개씩 분할한 후 MathFlux.sumInt() Operator를 이용해 3개씩 분할된 데이터의 합계를 구합니다.
import chapter14.SampleData; import lombok.extern.slf4j.Slf4j; import org.reactivestreams.Subscription; import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; import reactor.math.MathFlux; /** * split 예제 * - window(maxSize) Operator * - Upstream에서 emit되는 첫 번째 데이터부터 maxSize의 숫자만큼의 데이터를 포함하는 새로운 Flux로 분할한다. * - 새롭게 생성되는 Flux를 윈도우(Window)라고 한다. * - 마지막 윈도우가 포함하는 데이터는 maxSize보다 작거나 같다. */ @Slf4j public class Example14_54 { public static void main(String[] args) { Flux.fromIterable(SampleData.monthlyBookSales2021) .window(3) .flatMap(flux -> MathFlux.sumInt(flux)) .subscribe(new BaseSubscriber<>() { @Override protected void hookOnSubscribe(Subscription subscription) { subscription.request(2); } @Override protected void hookOnNext(Integer value) { log.info("# onNext: {}", value); request(2); } }); } } /** SampleData public static final List<Integer> monthlyBookSales2021 = Arrays.asList(2_500_000, 3_200_000, 2_300_000, 4_500_000, 6_500_000, 5_500_000, 3_100_000, 2_000_000, 2_800_000, 4_100_000, 6_200_000, 4_200_000); */ /** * [실행 결과] * # onNext: 8000000 * # onNext: 16500000 * # onNext: 7900000 * # onNext: 14500000 */
Note. MathFlux
MathFlux는 수학 계산을 위한 전용 Flux 입니다. MathFlux를 사용하려면 아래와 같이 의존 라이브러리를 추가해 주어야 합니다.
implementation 'io.projectreactor.addons:reactor-extra:3.4.8'
buffer
buffer(int maxSize) Operator는 Upstream에서 emit 되는 첫 번째 데이터부터 maxSize 숫자만큼의 데이터를 List 버퍼로 한 번에 emit 합니다. 마지막 버퍼에 포함된 데이터의 개수는 maxSize보다 더 적거나 같습니다.
import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; /** * split 예제 * - buffer(maxSize) Operator * - Upstream에서 emit되는 첫 번째 데이터부터 maxSize 숫자만큼의 데이터를 List 버퍼로 한번에 emit한다. * - 마지막 버퍼가 포함하는 데이터는 maxSize보다 작거나 같다. */ @Slf4j public class Example14_55 { public static void main(String[] args) { Flux.range(1, 95) .buffer(10) .subscribe(buffer -> log.info("# onNext: {}", buffer)); } } /** * [실행 결과] * # onNext: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] * ... * # onNext: [91, 92, 93, 94, 95] */
높은 처리량을 요구하는 애플리케이션이 있다면, 들어오는 데이터를 순차적으로 처리하기보다는 batch insert 같은 일괄 작업에 buffer() Operator를 이용해서 성능 향상을 기대할 수 있습니다.
bufferTimeout
bufferTimeout(maxSize, maxTime) Operator는 Upstream에서 emit 되는 첫 번째 데이터부터 maxSize 숫자만큼의 데이터 또는 maxTime 내에 emit된 데이터를 List 버퍼로 한 번에 emit 합니다. maxSize나 maxTime 중 먼저 조건에 부합할 때까지 emit된 데이터를 List 버퍼로 emit 합니다.
다음은 bufferTimeout() Operator의 동작 과정을 이해하기 위해 map() Operator 내부에서 의도적으로 조건에 따라 지연 시간을 다르게 지정한 예제 코드입니다.
import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import java.time.Duration; /** * split 예제 * - bufferTimeout(maxSize, maxTime) Operator * - Upstream에서 emit되는 첫 번째 데이터부터 maxSize 숫자 만큼의 데이터 또는 maxTime 내에 emit된 데이터를 List 버퍼로 한번에 emit한다. * - maxSize나 maxTime에서 먼저 조건에 부합할때까지 emit된 데이터를 List 버퍼로 emit한다. * - 마지막 버퍼가 포함하는 데이터는 maxSize보다 작거나 같다. */ @Slf4j public class Example14_56 { public static void main(String[] args) { Flux .range(1, 20) .map(num -> { try { if (num < 10) { Thread.sleep(100L); } else { Thread.sleep(300L); } } catch (InterruptedException e) {} return num; }) .bufferTimeout(3, Duration.ofMillis(400L)) .subscribe(buffer -> log.info("# onNext: {}", buffer)); } } /** * [실행 결과] * # onNext: [1, 2, 3] * # onNext: [4, 5, 6] * # onNext: [7, 8, 9] * # onNext: [10, 11] * # onNext: [12, 13] * # onNext: [14, 15] * # onNext: [16, 17] * # onNext: [18, 19] * # onNext: [20] */
buffer(maxSize) Operator의 경우, 입력으로 들어오는 데이터가 maxSize가 되기 전에 어떤 오류로 인해 들어오지 못하는 상황이 발생할 경우, 애플리케이션은 maxSize가 될 때까지 무한정 기다리게 될 것입니다. 따라서 bufferTimeout(maxSize, maxTime) Operator를 사용함으로써, maxTime에 도달했을 때 버퍼를 비우게 해서 애플리케이션이 무한정 기다려야 되는 상황을 방지할 수 있습니다.
groupBy
groupBy(keyMapper) Operator는 emit 되는 데이터를 keyMapper로 생성한 key를 기준으로 그룹화한 GroupedFlux를 리턴하며, 이 GroupedFlux를 통해서 그룹별로 작업을 수행할 수 있습니다.
다음은 emit 되는 도서를 저자명으로 구룹화한 후 List로 변환하여 Subscriber에게 전달하는 예제코드입니다.
import chapter14.SampleData; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; /** * split 예제 * - groupBy(keyMapper) Operator * - emit되는 데이터를 key를 기준으로 그룹화 한 GroupedFlux를 리턴한다. * - 그룹화 된 GroupedFlux로 그룹별 작업을 할 수 있다. */ @Slf4j public class Example14_57 { public static void main(String[] args) { Flux.fromIterable(SampleData.books) .groupBy(book -> book.getAuthorName()) .flatMap(groupedFlux -> groupedFlux .map(book -> book.getBookName() + "(" + book.getAuthorName() + ")") .collectList() ) .subscribe(bookByAuthor -> log.info("# book by author: {}", bookByAuthor)); } }
groupBy(keyMapper) Operator의 경우 keyMapper를 통해 생성되는 key를 기준으로 emit 되는 데이터를 그룹화하지만 groupBy(keyMapper, valueMapper) Operator는 그룹화하면서 valueMapper를 통해 그룹화되는 데이터를 다른 형태로 가공 처리할 수 있습니다.
다음은 groupBy(keyMapper, valueMapper) Operator의 valueMapper를 이용해 Downstream으로 emit할 데이터에 대한 가공 처리를 미리 하는 예제 코드입니다. 실행 결과는 코드 14_57과 동일합니다.
import chapter14.SampleData; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; /** * split 예제 * - groupBy(keyMapper, valueMapper) Operator * - emit되는 데이터를 key를 기준으로 그룹화 한 GroupedFlux를 리턴한다. * - 그룹화 된 GroupedFlux로 그룹별 작업을 할 수 있다. * - valueMapper를 추가로 전달해서 그룹화 되어 emit되는 데이터의 값을 미리 다른 값으로 변경할 수 있다. */ @Slf4j public class Example14_58 { public static void main(String[] args) { Flux.fromIterable(SampleData.books) .groupBy(book -> book.getAuthorName(), book -> book.getBookName() + "(" + book.getAuthorName() + ")") .flatMap(groupedFlux -> groupedFlux.collectList()) .subscribe(bookByAuthor -> log.info("# book by author: {}", bookByAuthor)); } }
[참고 정보]
반응형'Spring Reactive Web Application > Project Reactor' 카테고리의 다른 글
다수의 Subscriber에게 Flux를 멀티캐스팅(Multicasting) 하기 위한 Operator (0) 2023.08.10 Sequence의 동작 시간 측정을 위한 Operator (0) 2023.08.08 에러 처리를 위한 Operator (0) 2023.08.06 Sequence의 내부 동작 확인을 위한 Operator (0) 2023.08.05 Sequence 변환 Operator (0) 2023.08.05