ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Flux Sequence 분할을 위한 Operator
    Spring Reactive Web Application/Project Reactor 2023. 8. 9. 05:00
    반응형

    window

      window(int maxSize) Operator는 Upstream에서 emit 되는 첫 번째 데이터부터 maxSize 숫자만큼의 데이터를 포함하는 새로운 Flux로 분할합니다. Reactor에서는 이렇게 분할된 Flux를 윈도우(Window)라고 합니다. 마지막 윈도우에 포함된 데이터의 개수는 maxSize보다 더 적거나 같습니다.

     

      다음은 window() Operator를 이용해서 2021년도 분기별 도서 매출액을 구하는 예제 코드입니다. 원본 데이터 소스는 2021년도 월별 도서 매출액 데이터이며, 이 데이터를 window() Operator로 3개씩 분할한 후 MathFlux.sumInt() Operator를 이용해 3개씩 분할된 데이터의 합계를 구합니다.

    import chapter14.SampleData;
    import lombok.extern.slf4j.Slf4j;
    import org.reactivestreams.Subscription;
    import reactor.core.publisher.BaseSubscriber;
    import reactor.core.publisher.Flux;
    import reactor.math.MathFlux;
    
    /**
     * split 예제
     *  - window(maxSize) Operator
     *      - Upstream에서 emit되는 첫 번째 데이터부터 maxSize의 숫자만큼의 데이터를 포함하는 새로운 Flux로 분할한다.
     *      - 새롭게 생성되는 Flux를 윈도우(Window)라고 한다.
     *      - 마지막 윈도우가 포함하는 데이터는 maxSize보다 작거나 같다.
     */
    @Slf4j
    public class Example14_54 {
        public static void main(String[] args) {
            Flux.fromIterable(SampleData.monthlyBookSales2021)
                    .window(3)
                    .flatMap(flux -> MathFlux.sumInt(flux))
                    .subscribe(new BaseSubscriber<>() {
                        @Override
                        protected void hookOnSubscribe(Subscription subscription) {
                            subscription.request(2);
                        }
    
                        @Override
                        protected void hookOnNext(Integer value) {
                            log.info("# onNext: {}", value);
                            request(2);
                        }
                    });
        }
    }
    
    
    /** SampleData
        public static final List<Integer> monthlyBookSales2021 =
                Arrays.asList(2_500_000, 3_200_000, 2_300_000, 4_500_000,
                        6_500_000, 5_500_000, 3_100_000, 2_000_000,
                        2_800_000, 4_100_000, 6_200_000, 4_200_000);
    */
    
    /**
     * [실행 결과]
     * # onNext: 8000000
     * # onNext: 16500000
     * # onNext: 7900000
     * # onNext: 14500000
     */

    Note. MathFlux

      MathFlux는 수학 계산을 위한 전용 Flux 입니다. MathFlux를 사용하려면 아래와 같이 의존 라이브러리를 추가해 주어야 합니다.

    implementation 'io.projectreactor.addons:reactor-extra:3.4.8'

     

    buffer

      buffer(int maxSize) Operator는 Upstream에서 emit 되는 첫 번째 데이터부터 maxSize 숫자만큼의 데이터를 List 버퍼로 한 번에 emit 합니다. 마지막 버퍼에 포함된 데이터의 개수는 maxSize보다 더 적거나 같습니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    /**
     * split 예제
     *  - buffer(maxSize) Operator
     *      - Upstream에서 emit되는 첫 번째 데이터부터 maxSize 숫자만큼의 데이터를 List 버퍼로 한번에 emit한다.
     *      - 마지막 버퍼가 포함하는 데이터는 maxSize보다 작거나 같다.
     */
    @Slf4j
    public class Example14_55 {
        public static void main(String[] args) {
            Flux.range(1, 95)
                    .buffer(10)
                    .subscribe(buffer -> log.info("# onNext: {}", buffer));
        }
    }
    
    /**
     * [실행 결과]
     * # onNext: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
     * ...
     * # onNext: [91, 92, 93, 94, 95]
     */

     

      높은 처리량을 요구하는 애플리케이션이 있다면, 들어오는 데이터를 순차적으로 처리하기보다는 batch insert 같은 일괄 작업에 buffer() Operator를 이용해서 성능 향상을 기대할 수 있습니다.

     

    bufferTimeout

      bufferTimeout(maxSize, maxTime) Operator는 Upstream에서 emit 되는 첫 번째 데이터부터 maxSize 숫자만큼의 데이터 또는 maxTime 내에 emit된 데이터를 List 버퍼로 한 번에 emit 합니다. maxSize나 maxTime 중 먼저 조건에 부합할 때까지 emit된 데이터를 List 버퍼로 emit 합니다.

     

      다음은 bufferTimeout() Operator의 동작 과정을 이해하기 위해 map() Operator 내부에서 의도적으로 조건에 따라 지연 시간을 다르게 지정한 예제 코드입니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    import java.time.Duration;
    
    /**
     * split 예제
     *  - bufferTimeout(maxSize, maxTime) Operator
     *      - Upstream에서 emit되는 첫 번째 데이터부터 maxSize 숫자 만큼의 데이터 또는 maxTime 내에 emit된 데이터를 List 버퍼로 한번에 emit한다.
     *      - maxSize나 maxTime에서 먼저 조건에 부합할때까지 emit된 데이터를 List 버퍼로 emit한다.
     *      - 마지막 버퍼가 포함하는 데이터는 maxSize보다 작거나 같다.
     */
    @Slf4j
    public class Example14_56 {
        public static void main(String[] args) {
            Flux
                .range(1, 20)
                .map(num -> {
                    try {
                        if (num < 10) {
                            Thread.sleep(100L);
                        } else {
                            Thread.sleep(300L);
                        }
                    } catch (InterruptedException e) {}
                    return num;
                })
                .bufferTimeout(3, Duration.ofMillis(400L))
                .subscribe(buffer -> log.info("# onNext: {}", buffer));
        }
    }
    
    /**
     * [실행 결과]
     * # onNext: [1, 2, 3]
     * # onNext: [4, 5, 6]
     * # onNext: [7, 8, 9]
     * # onNext: [10, 11]
     * # onNext: [12, 13]
     * # onNext: [14, 15]
     * # onNext: [16, 17]
     * # onNext: [18, 19]
     * # onNext: [20]
     */

     

      buffer(maxSize) Operator의 경우, 입력으로 들어오는 데이터가 maxSize가 되기 전에 어떤 오류로 인해 들어오지 못하는 상황이 발생할 경우, 애플리케이션은 maxSize가 될 때까지 무한정 기다리게 될 것입니다. 따라서 bufferTimeout(maxSize, maxTime) Operator를 사용함으로써, maxTime에 도달했을 때 버퍼를 비우게 해서 애플리케이션이 무한정 기다려야 되는 상황을 방지할 수 있습니다.

     

    groupBy

      groupBy(keyMapper) Operator는 emit 되는 데이터를 keyMapper로 생성한 key를 기준으로 그룹화한 GroupedFlux를 리턴하며, 이 GroupedFlux를 통해서 그룹별로 작업을 수행할 수 있습니다.

     

      다음은 emit 되는 도서를 저자명으로 구룹화한 후 List로 변환하여 Subscriber에게 전달하는 예제코드입니다.

    import chapter14.SampleData;
    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    /**
     * split 예제
     *  - groupBy(keyMapper) Operator
     *      - emit되는 데이터를 key를 기준으로 그룹화 한 GroupedFlux를 리턴한다.
     *      - 그룹화 된 GroupedFlux로 그룹별 작업을 할 수 있다.
     */
    @Slf4j
    public class Example14_57 {
        public static void main(String[] args) {
            Flux.fromIterable(SampleData.books)
                    .groupBy(book -> book.getAuthorName())
                    .flatMap(groupedFlux ->
                            groupedFlux
                                    .map(book -> book.getBookName() +
                                            "(" + book.getAuthorName() + ")")
                                    .collectList()
                    )
                    .subscribe(bookByAuthor ->
                            log.info("# book by author: {}", bookByAuthor));
        }
    }

     

      groupBy(keyMapper) Operator의 경우 keyMapper를 통해 생성되는 key를 기준으로 emit 되는 데이터를 그룹화하지만 groupBy(keyMapper, valueMapper) Operator는 그룹화하면서 valueMapper를 통해 그룹화되는 데이터를 다른 형태로 가공 처리할 수 있습니다.

     

      다음은 groupBy(keyMapper, valueMapper) Operator의 valueMapper를 이용해 Downstream으로 emit할 데이터에 대한 가공 처리를 미리 하는 예제 코드입니다. 실행 결과는 코드 14_57과 동일합니다.

    import chapter14.SampleData;
    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    /**
     * split 예제
     *  - groupBy(keyMapper, valueMapper) Operator
     *      - emit되는 데이터를 key를 기준으로 그룹화 한 GroupedFlux를 리턴한다.
     *      - 그룹화 된 GroupedFlux로 그룹별 작업을 할 수 있다.
     *      - valueMapper를 추가로 전달해서 그룹화 되어 emit되는 데이터의 값을 미리 다른 값으로 변경할 수 있다.
     */
    @Slf4j
    public class Example14_58 {
        public static void main(String[] args) {
            Flux.fromIterable(SampleData.books)
                    .groupBy(book ->
                            book.getAuthorName(),
                            book -> book.getBookName() + "(" + book.getAuthorName() + ")")
                    .flatMap(groupedFlux -> groupedFlux.collectList())
                    .subscribe(bookByAuthor ->
                            log.info("# book by author: {}", bookByAuthor));
        }
    }

     

     

     

    [참고 정보]

    반응형

    댓글

Designed by Tistory.