Spring Reactive Web Application/Project Reactor

Flux Sequence 분할을 위한 Operator

hanseom 2023. 8. 9. 05:00
반응형

window

  window(int maxSize) Operator는 Upstream에서 emit 되는 첫 번째 데이터부터 maxSize 숫자만큼의 데이터를 포함하는 새로운 Flux로 분할합니다. Reactor에서는 이렇게 분할된 Flux를 윈도우(Window)라고 합니다. 마지막 윈도우에 포함된 데이터의 개수는 maxSize보다 더 적거나 같습니다.

 

  다음은 window() Operator를 이용해서 2021년도 분기별 도서 매출액을 구하는 예제 코드입니다. 원본 데이터 소스는 2021년도 월별 도서 매출액 데이터이며, 이 데이터를 window() Operator로 3개씩 분할한 후 MathFlux.sumInt() Operator를 이용해 3개씩 분할된 데이터의 합계를 구합니다.

import chapter14.SampleData;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.math.MathFlux;

/**
 * split 예제
 *  - window(maxSize) Operator
 *      - Upstream에서 emit되는 첫 번째 데이터부터 maxSize의 숫자만큼의 데이터를 포함하는 새로운 Flux로 분할한다.
 *      - 새롭게 생성되는 Flux를 윈도우(Window)라고 한다.
 *      - 마지막 윈도우가 포함하는 데이터는 maxSize보다 작거나 같다.
 */
@Slf4j
public class Example14_54 {
    public static void main(String[] args) {
        Flux.fromIterable(SampleData.monthlyBookSales2021)
                .window(3)
                .flatMap(flux -> MathFlux.sumInt(flux))
                .subscribe(new BaseSubscriber<>() {
                    @Override
                    protected void hookOnSubscribe(Subscription subscription) {
                        subscription.request(2);
                    }

                    @Override
                    protected void hookOnNext(Integer value) {
                        log.info("# onNext: {}", value);
                        request(2);
                    }
                });
    }
}


/** SampleData
    public static final List<Integer> monthlyBookSales2021 =
            Arrays.asList(2_500_000, 3_200_000, 2_300_000, 4_500_000,
                    6_500_000, 5_500_000, 3_100_000, 2_000_000,
                    2_800_000, 4_100_000, 6_200_000, 4_200_000);
*/

/**
 * [실행 결과]
 * # onNext: 8000000
 * # onNext: 16500000
 * # onNext: 7900000
 * # onNext: 14500000
 */

Note. MathFlux

  MathFlux는 수학 계산을 위한 전용 Flux 입니다. MathFlux를 사용하려면 아래와 같이 의존 라이브러리를 추가해 주어야 합니다.

implementation 'io.projectreactor.addons:reactor-extra:3.4.8'

 

buffer

  buffer(int maxSize) Operator는 Upstream에서 emit 되는 첫 번째 데이터부터 maxSize 숫자만큼의 데이터를 List 버퍼로 한 번에 emit 합니다. 마지막 버퍼에 포함된 데이터의 개수는 maxSize보다 더 적거나 같습니다.

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

/**
 * split 예제
 *  - buffer(maxSize) Operator
 *      - Upstream에서 emit되는 첫 번째 데이터부터 maxSize 숫자만큼의 데이터를 List 버퍼로 한번에 emit한다.
 *      - 마지막 버퍼가 포함하는 데이터는 maxSize보다 작거나 같다.
 */
@Slf4j
public class Example14_55 {
    public static void main(String[] args) {
        Flux.range(1, 95)
                .buffer(10)
                .subscribe(buffer -> log.info("# onNext: {}", buffer));
    }
}

/**
 * [실행 결과]
 * # onNext: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
 * ...
 * # onNext: [91, 92, 93, 94, 95]
 */

 

  높은 처리량을 요구하는 애플리케이션이 있다면, 들어오는 데이터를 순차적으로 처리하기보다는 batch insert 같은 일괄 작업에 buffer() Operator를 이용해서 성능 향상을 기대할 수 있습니다.

 

bufferTimeout

  bufferTimeout(maxSize, maxTime) Operator는 Upstream에서 emit 되는 첫 번째 데이터부터 maxSize 숫자만큼의 데이터 또는 maxTime 내에 emit된 데이터를 List 버퍼로 한 번에 emit 합니다. maxSize나 maxTime 중 먼저 조건에 부합할 때까지 emit된 데이터를 List 버퍼로 emit 합니다.

 

  다음은 bufferTimeout() Operator의 동작 과정을 이해하기 위해 map() Operator 내부에서 의도적으로 조건에 따라 지연 시간을 다르게 지정한 예제 코드입니다.

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

import java.time.Duration;

/**
 * split 예제
 *  - bufferTimeout(maxSize, maxTime) Operator
 *      - Upstream에서 emit되는 첫 번째 데이터부터 maxSize 숫자 만큼의 데이터 또는 maxTime 내에 emit된 데이터를 List 버퍼로 한번에 emit한다.
 *      - maxSize나 maxTime에서 먼저 조건에 부합할때까지 emit된 데이터를 List 버퍼로 emit한다.
 *      - 마지막 버퍼가 포함하는 데이터는 maxSize보다 작거나 같다.
 */
@Slf4j
public class Example14_56 {
    public static void main(String[] args) {
        Flux
            .range(1, 20)
            .map(num -> {
                try {
                    if (num < 10) {
                        Thread.sleep(100L);
                    } else {
                        Thread.sleep(300L);
                    }
                } catch (InterruptedException e) {}
                return num;
            })
            .bufferTimeout(3, Duration.ofMillis(400L))
            .subscribe(buffer -> log.info("# onNext: {}", buffer));
    }
}

/**
 * [실행 결과]
 * # onNext: [1, 2, 3]
 * # onNext: [4, 5, 6]
 * # onNext: [7, 8, 9]
 * # onNext: [10, 11]
 * # onNext: [12, 13]
 * # onNext: [14, 15]
 * # onNext: [16, 17]
 * # onNext: [18, 19]
 * # onNext: [20]
 */

 

  buffer(maxSize) Operator의 경우, 입력으로 들어오는 데이터가 maxSize가 되기 전에 어떤 오류로 인해 들어오지 못하는 상황이 발생할 경우, 애플리케이션은 maxSize가 될 때까지 무한정 기다리게 될 것입니다. 따라서 bufferTimeout(maxSize, maxTime) Operator를 사용함으로써, maxTime에 도달했을 때 버퍼를 비우게 해서 애플리케이션이 무한정 기다려야 되는 상황을 방지할 수 있습니다.

 

groupBy

  groupBy(keyMapper) Operator는 emit 되는 데이터를 keyMapper로 생성한 key를 기준으로 그룹화한 GroupedFlux를 리턴하며, 이 GroupedFlux를 통해서 그룹별로 작업을 수행할 수 있습니다.

 

  다음은 emit 되는 도서를 저자명으로 구룹화한 후 List로 변환하여 Subscriber에게 전달하는 예제코드입니다.

import chapter14.SampleData;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

/**
 * split 예제
 *  - groupBy(keyMapper) Operator
 *      - emit되는 데이터를 key를 기준으로 그룹화 한 GroupedFlux를 리턴한다.
 *      - 그룹화 된 GroupedFlux로 그룹별 작업을 할 수 있다.
 */
@Slf4j
public class Example14_57 {
    public static void main(String[] args) {
        Flux.fromIterable(SampleData.books)
                .groupBy(book -> book.getAuthorName())
                .flatMap(groupedFlux ->
                        groupedFlux
                                .map(book -> book.getBookName() +
                                        "(" + book.getAuthorName() + ")")
                                .collectList()
                )
                .subscribe(bookByAuthor ->
                        log.info("# book by author: {}", bookByAuthor));
    }
}

 

  groupBy(keyMapper) Operator의 경우 keyMapper를 통해 생성되는 key를 기준으로 emit 되는 데이터를 그룹화하지만 groupBy(keyMapper, valueMapper) Operator는 그룹화하면서 valueMapper를 통해 그룹화되는 데이터를 다른 형태로 가공 처리할 수 있습니다.

 

  다음은 groupBy(keyMapper, valueMapper) Operator의 valueMapper를 이용해 Downstream으로 emit할 데이터에 대한 가공 처리를 미리 하는 예제 코드입니다. 실행 결과는 코드 14_57과 동일합니다.

import chapter14.SampleData;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

/**
 * split 예제
 *  - groupBy(keyMapper, valueMapper) Operator
 *      - emit되는 데이터를 key를 기준으로 그룹화 한 GroupedFlux를 리턴한다.
 *      - 그룹화 된 GroupedFlux로 그룹별 작업을 할 수 있다.
 *      - valueMapper를 추가로 전달해서 그룹화 되어 emit되는 데이터의 값을 미리 다른 값으로 변경할 수 있다.
 */
@Slf4j
public class Example14_58 {
    public static void main(String[] args) {
        Flux.fromIterable(SampleData.books)
                .groupBy(book ->
                        book.getAuthorName(),
                        book -> book.getBookName() + "(" + book.getAuthorName() + ")")
                .flatMap(groupedFlux -> groupedFlux.collectList())
                .subscribe(bookByAuthor ->
                        log.info("# book by author: {}", bookByAuthor));
    }
}

 

 

 

[참고 정보]

반응형