Flux Sequence 분할을 위한 Operator
window
window(int maxSize) Operator는 Upstream에서 emit 되는 첫 번째 데이터부터 maxSize 숫자만큼의 데이터를 포함하는 새로운 Flux로 분할합니다. Reactor에서는 이렇게 분할된 Flux를 윈도우(Window)라고 합니다. 마지막 윈도우에 포함된 데이터의 개수는 maxSize보다 더 적거나 같습니다.
다음은 window() Operator를 이용해서 2021년도 분기별 도서 매출액을 구하는 예제 코드입니다. 원본 데이터 소스는 2021년도 월별 도서 매출액 데이터이며, 이 데이터를 window() Operator로 3개씩 분할한 후 MathFlux.sumInt() Operator를 이용해 3개씩 분할된 데이터의 합계를 구합니다.
import chapter14.SampleData;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.math.MathFlux;
/**
* split 예제
* - window(maxSize) Operator
* - Upstream에서 emit되는 첫 번째 데이터부터 maxSize의 숫자만큼의 데이터를 포함하는 새로운 Flux로 분할한다.
* - 새롭게 생성되는 Flux를 윈도우(Window)라고 한다.
* - 마지막 윈도우가 포함하는 데이터는 maxSize보다 작거나 같다.
*/
@Slf4j
public class Example14_54 {
public static void main(String[] args) {
Flux.fromIterable(SampleData.monthlyBookSales2021)
.window(3)
.flatMap(flux -> MathFlux.sumInt(flux))
.subscribe(new BaseSubscriber<>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(2);
}
@Override
protected void hookOnNext(Integer value) {
log.info("# onNext: {}", value);
request(2);
}
});
}
}
/** SampleData
public static final List<Integer> monthlyBookSales2021 =
Arrays.asList(2_500_000, 3_200_000, 2_300_000, 4_500_000,
6_500_000, 5_500_000, 3_100_000, 2_000_000,
2_800_000, 4_100_000, 6_200_000, 4_200_000);
*/
/**
* [실행 결과]
* # onNext: 8000000
* # onNext: 16500000
* # onNext: 7900000
* # onNext: 14500000
*/
Note. MathFlux
MathFlux는 수학 계산을 위한 전용 Flux 입니다. MathFlux를 사용하려면 아래와 같이 의존 라이브러리를 추가해 주어야 합니다.
implementation 'io.projectreactor.addons:reactor-extra:3.4.8'
buffer
buffer(int maxSize) Operator는 Upstream에서 emit 되는 첫 번째 데이터부터 maxSize 숫자만큼의 데이터를 List 버퍼로 한 번에 emit 합니다. 마지막 버퍼에 포함된 데이터의 개수는 maxSize보다 더 적거나 같습니다.
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
/**
* split 예제
* - buffer(maxSize) Operator
* - Upstream에서 emit되는 첫 번째 데이터부터 maxSize 숫자만큼의 데이터를 List 버퍼로 한번에 emit한다.
* - 마지막 버퍼가 포함하는 데이터는 maxSize보다 작거나 같다.
*/
@Slf4j
public class Example14_55 {
public static void main(String[] args) {
Flux.range(1, 95)
.buffer(10)
.subscribe(buffer -> log.info("# onNext: {}", buffer));
}
}
/**
* [실행 결과]
* # onNext: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
* ...
* # onNext: [91, 92, 93, 94, 95]
*/
높은 처리량을 요구하는 애플리케이션이 있다면, 들어오는 데이터를 순차적으로 처리하기보다는 batch insert 같은 일괄 작업에 buffer() Operator를 이용해서 성능 향상을 기대할 수 있습니다.
bufferTimeout
bufferTimeout(maxSize, maxTime) Operator는 Upstream에서 emit 되는 첫 번째 데이터부터 maxSize 숫자만큼의 데이터 또는 maxTime 내에 emit된 데이터를 List 버퍼로 한 번에 emit 합니다. maxSize나 maxTime 중 먼저 조건에 부합할 때까지 emit된 데이터를 List 버퍼로 emit 합니다.
다음은 bufferTimeout() Operator의 동작 과정을 이해하기 위해 map() Operator 내부에서 의도적으로 조건에 따라 지연 시간을 다르게 지정한 예제 코드입니다.
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import java.time.Duration;
/**
* split 예제
* - bufferTimeout(maxSize, maxTime) Operator
* - Upstream에서 emit되는 첫 번째 데이터부터 maxSize 숫자 만큼의 데이터 또는 maxTime 내에 emit된 데이터를 List 버퍼로 한번에 emit한다.
* - maxSize나 maxTime에서 먼저 조건에 부합할때까지 emit된 데이터를 List 버퍼로 emit한다.
* - 마지막 버퍼가 포함하는 데이터는 maxSize보다 작거나 같다.
*/
@Slf4j
public class Example14_56 {
public static void main(String[] args) {
Flux
.range(1, 20)
.map(num -> {
try {
if (num < 10) {
Thread.sleep(100L);
} else {
Thread.sleep(300L);
}
} catch (InterruptedException e) {}
return num;
})
.bufferTimeout(3, Duration.ofMillis(400L))
.subscribe(buffer -> log.info("# onNext: {}", buffer));
}
}
/**
* [실행 결과]
* # onNext: [1, 2, 3]
* # onNext: [4, 5, 6]
* # onNext: [7, 8, 9]
* # onNext: [10, 11]
* # onNext: [12, 13]
* # onNext: [14, 15]
* # onNext: [16, 17]
* # onNext: [18, 19]
* # onNext: [20]
*/
buffer(maxSize) Operator의 경우, 입력으로 들어오는 데이터가 maxSize가 되기 전에 어떤 오류로 인해 들어오지 못하는 상황이 발생할 경우, 애플리케이션은 maxSize가 될 때까지 무한정 기다리게 될 것입니다. 따라서 bufferTimeout(maxSize, maxTime) Operator를 사용함으로써, maxTime에 도달했을 때 버퍼를 비우게 해서 애플리케이션이 무한정 기다려야 되는 상황을 방지할 수 있습니다.
groupBy
groupBy(keyMapper) Operator는 emit 되는 데이터를 keyMapper로 생성한 key를 기준으로 그룹화한 GroupedFlux를 리턴하며, 이 GroupedFlux를 통해서 그룹별로 작업을 수행할 수 있습니다.
다음은 emit 되는 도서를 저자명으로 구룹화한 후 List로 변환하여 Subscriber에게 전달하는 예제코드입니다.
import chapter14.SampleData;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
/**
* split 예제
* - groupBy(keyMapper) Operator
* - emit되는 데이터를 key를 기준으로 그룹화 한 GroupedFlux를 리턴한다.
* - 그룹화 된 GroupedFlux로 그룹별 작업을 할 수 있다.
*/
@Slf4j
public class Example14_57 {
public static void main(String[] args) {
Flux.fromIterable(SampleData.books)
.groupBy(book -> book.getAuthorName())
.flatMap(groupedFlux ->
groupedFlux
.map(book -> book.getBookName() +
"(" + book.getAuthorName() + ")")
.collectList()
)
.subscribe(bookByAuthor ->
log.info("# book by author: {}", bookByAuthor));
}
}
groupBy(keyMapper) Operator의 경우 keyMapper를 통해 생성되는 key를 기준으로 emit 되는 데이터를 그룹화하지만 groupBy(keyMapper, valueMapper) Operator는 그룹화하면서 valueMapper를 통해 그룹화되는 데이터를 다른 형태로 가공 처리할 수 있습니다.
다음은 groupBy(keyMapper, valueMapper) Operator의 valueMapper를 이용해 Downstream으로 emit할 데이터에 대한 가공 처리를 미리 하는 예제 코드입니다. 실행 결과는 코드 14_57과 동일합니다.
import chapter14.SampleData;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
/**
* split 예제
* - groupBy(keyMapper, valueMapper) Operator
* - emit되는 데이터를 key를 기준으로 그룹화 한 GroupedFlux를 리턴한다.
* - 그룹화 된 GroupedFlux로 그룹별 작업을 할 수 있다.
* - valueMapper를 추가로 전달해서 그룹화 되어 emit되는 데이터의 값을 미리 다른 값으로 변경할 수 있다.
*/
@Slf4j
public class Example14_58 {
public static void main(String[] args) {
Flux.fromIterable(SampleData.books)
.groupBy(book ->
book.getAuthorName(),
book -> book.getBookName() + "(" + book.getAuthorName() + ")")
.flatMap(groupedFlux -> groupedFlux.collectList())
.subscribe(bookByAuthor ->
log.info("# book by author: {}", bookByAuthor));
}
}
[참고 정보]