-
Sequence 생성 OperatorSpring Reactive Web Application/Project Reactor 2023. 8. 3. 23:00반응형
justOrEmpty
justOrEmpty()는 just()의 확장 Operator로서, just() Operator와 달리, emit할 데이터가 null일 경우 NullPointException이 발생하지 않고 onComplete Signal을 전송합니다. emit할 데이터가 null이 아닐 경우 해당 데이터를 emit하는 Mono를 생성합니다.
import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Mono; @Slf4j public class Example14_1 { public static void main(String[] args) { Mono .justOrEmpty(null) .subscribe(data -> {}, error -> {}, () -> log.info("# onComplete")); } }
fromIterable
fromIterable() Operator는 Iterable에 포함된 데이터를 emit하는 Flux를 생성합니다. 즉, Java에서 제공하는 Iterable을 구현한 구현체를 fromIterable()의 파라미터로 전달할 수 있습니다.
import chapter14.SampleData; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; /** * fromIterable 예제 */ @Slf4j public class Example14_2 { public static void main(String[] args) { Flux .fromIterable(SampleData.coins) .subscribe(coin -> log.info("coin 명: {}, 현재가: {}", coin.getT1(), coin.getT2()) ); } } /** * SampleData.coins public class SampleData { ... public static final List<Tuple2<String, Integer>> coins = Arrays.asList( Tuples.of("BTC", 52_000_000), Tuples.of("ETH", 1_720_000), Tuples.of("XRP", 533), Tuples.of("ICX", 2_080), Tuples.of("EOS", 4_020), Tuples.of("BCH", 558_000)); ... } */
fromStream
fromStream() Operator는 Stream에 포함된 데이터를 emit하는 Flux를 생성합니다. Java의 Stream 특성상 Stream은 재사용할 수 없으며 cancel, error, complete 시 자동으로 닫히게 됩니다.
import chapter14.SampleData; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; /** * fromStream 예제 */ @Slf4j public class Example14_3 { public static void main(String[] args) { Flux .fromStream(() -> SampleData.coinNames.stream()) .filter(coin -> coin.equals("BTC") || coin.equals("ETH")) .subscribe(data -> log.info("{}", data)); } } /** * SampleData.coinNames public class SampleData { ... public static final List<String> coinNames = Arrays.asList("BTC", "ETH", "XRP", "ICX", "EOS", "BCH"); ... } */
range
range() Operator는 n부터 1씩 증가한 연속된 수 m개를 emit하는 Flux를 생성합니다. range() Operator는 for문처럼 특정 횟수만큼 어떤 작업을 처리하고자 할 경우에 주로 사용됩니다.
import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; /** * range 예제 */ @Slf4j public class Example14_4 { public static void main(String[] args) { Flux .range(5, 10) // 5부터 1씩 증가한 숫자 10개를 emit 합니다. .subscribe(data -> log.info("{}", data)); } }
defer
defer() Operator는 Operator를 선언한 시점에 데이터를 emit하는 것이 아니라 구독하는 시점에 데이터를 emit하는 Flux 또는 Mono를 생성합니다. defer는 데이터 emit을 지연시키기 때문에 꼭 필요한 시점에 데이터를 emit하여 불필요한 프로세스를 줄일 수 있습니다.
다음 코드의 실행 결과를 보면 just() Operator는 구독 시점에 데이터를 emit하기 때문에 결과가 동일하고, defer() Operator는 구독이 발생하기 전까지 데이터의 emit을 지연시키기에 2초씩 차이가 나는 것을 확인할 수 있습니다.
import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Mono; import java.time.LocalDateTime; /** * defer 예제 */ @Slf4j public class Example14_6 { public static void main(String[] args) throws InterruptedException { log.info("# start: {}", LocalDateTime.now()); Mono<LocalDateTime> justMono = Mono.just(LocalDateTime.now()); Mono<LocalDateTime> deferMono = Mono.defer(() -> Mono.just(LocalDateTime.now())); Thread.sleep(2000); justMono.subscribe(data -> log.info("# onNext just1: {}", data)); deferMono.subscribe(data -> log.info("# onNext defer1: {}", data)); Thread.sleep(2000); justMono.subscribe(data -> log.info("# onNext just2: {}", data)); deferMono.subscribe(data -> log.info("# onNext defer2: {}", data)); } } /** * [결과] * # start: 2023-08-04 03:00:00 * # onNext just1: 2023-08-04 03:00:00 * # onNext defer1: 2023-08-04 03:00:02 * # onNext just2: 2023-08-04 03:00:00 * # onNext defer2: 2023-08-04 03:00:04 */
using
using() Operator는 파라미터로 전달받은 resource를 emit하는 Flux를 생성합니다.
- 첫 번째 파라미터: 읽어 올 resource 입니다.
- 두 번째 파라미터: 읽어 온 resource를 emit하는 Flux 입니다.
- 세 번째 파라미터: 종료 Signal이 발생할 경우, resource를 해제하는 등의 후처리를 할 수 있습니다.
import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.stream.Stream; /** * using 예제 */ @Slf4j public class Example14_8 { public static void main(String[] args) { Path path = Paths.get("D:\\resources\\using_example.txt"); Flux .using(() -> Files.lines(path), Flux::fromStream, Stream::close) .subscribe(log::info); } }
generate
generate() Operator는 프로그래밍 방식으로 Signal 이벤트를 발생시키며, 특히 동기적으로 데이터를 하나씩 순차적으로 emit하고자 할 경우 사용됩니다.
- 첫 번째 파라미터: emit할 숫자의 초기값 (음수/양수 모두 가능) 입니다.
- 두 번째 파라미터: 초기값으로 지정한 숫자부터 emit하고 숫자를 1씩 증가시켜서 다시 emit 작업을 반복하는데, 이렇게 1씩 증가하는 숫자를 상태(State) 값으로 정의합니다.
다음 코드는 초기값을 0으로 지정하고 상태(State) 값이 10일 경우 onComplete Signal을 발생시켜 Sequence를 종료하는 예제입니다.
import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; /** * generate 예제 */ @Slf4j public class Example14_9 { public static void main(String[] args) { Flux .generate(() -> 0, (state, sink) -> { sink.next(state); if (state == 10) sink.complete(); return ++state; }) .subscribe(data -> log.info("# onNext: {}", data)); } }
create
create() Operator는 generate() Operator처럼 프로그래밍 방식으로 Signal 이벤트를 발생시키지만, generate() Operator는 데이터를 동기적으로 한 번에 한 건씩 emit할 수 있는 반면에 create() Operator는 한 번에 여러 건의 데이터를 비동기적으로 emit할 수 있습니다.
다음은 Subscriber가 request() 메서드를 통해 요청을 보내면 Publisher가 해당 요청 개수만큼의 데이터를 emit하는 일종의 pull 방식으로 데이터를 처리하는 예제 코드입니다. 주석으로 표기된 넘버링은 실행 순서입니다.
import lombok.extern.slf4j.Slf4j; import org.reactivestreams.Subscription; import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import java.util.Arrays; import java.util.List; /** * create 예제 * - pull 방식 */ @Slf4j public class Example14_12 { static int SIZE = 0; static int COUNT = -1; final static List<Integer> DATA_SOURCE = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); public static void main(String[] args) { log.info("# start"); Flux.create((FluxSink<Integer> sink) -> { sink.onRequest(n -> { // (2) try { Thread.sleep(1000L); for (int i = 0; i < n; i++) { if (COUNT >= 9) { sink.complete(); // (5): (2) ~ (4) 과정 반복 이후 실행됩니다. } else { COUNT++; sink.next(DATA_SOURCE.get(COUNT)); } } } catch (InterruptedException e) {} }); sink.onDispose(() -> log.info("# clean up")); }).subscribe(new BaseSubscriber<>() { @Override protected void hookOnSubscribe(Subscription subscription) { // (1) request(2); } @Override protected void hookOnNext(Integer value) { // (3) SIZE++; log.info("# onNext: {}", value); if (SIZE == 2) { request(2); // (4) SIZE = 0; } } @Override protected void hookOnComplete() { // (6) log.info("# onComplete"); } }); } }
다음은 Subscriber가 요청을 보내는 것과 상관없이 Listener를 통해 들어오는 데이터를 리스닝(Listening)하고 있다가 실제로 들어오는 데이터가 있을 경우에만 데이터를 emit하는 일종의 push 방식으로 데이터를 처리하는 예제 코드입니다.
import chapter14.CryptoCurrencyPriceEmitter; import chapter14.CryptoCurrencyPriceListener; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.scheduler.Schedulers; import java.util.List; /** * create 예제 * - push 방식 */ @Slf4j public class Example14_13 { public static void main(String[] args) throws InterruptedException { CryptoCurrencyPriceEmitter priceEmitter = new CryptoCurrencyPriceEmitter(); Flux.create((FluxSink<Integer> sink) -> priceEmitter.setListener(new CryptoCurrencyPriceListener() { @Override public void onPrice(List<Integer> priceList) { priceList.stream().forEach(price -> { sink.next(price); }); } @Override public void onComplete() { sink.complete(); } })) .publishOn(Schedulers.parallel()) .subscribe( data -> log.info("# onNext: {}", data), error -> {}, () -> log.info("# onComplete")); Thread.sleep(3000L); priceEmitter.flowInto(); Thread.sleep(2000L); priceEmitter.complete(); } } /** * SampleData.coinNames public class SampleData { ... public static final List<Integer> btcPrices = Arrays.asList(50_000_000, 50_100_000, 50_700_000, 51_500_000, 52_000_000); ... } */
import java.util.List; public interface CryptoCurrencyPriceListener { void onPrice(List<Integer> priceList); void onComplete(); }
public class CryptoCurrencyPriceEmitter { private CryptoCurrencyPriceListener listener; public void setListener(CryptoCurrencyPriceListener listener) { this.listener = listener; } public void flowInto() { listener.onPrice(SampleData.btcPrices); } public void complete() { listener.onComplete(); } }
create() Operator는 한 번에 여러 건의 데이터를 비동기적으로 emit할 수 있기 때문에 Backpressure 전략이 필요합니다. 다음은 Backpressure 전략 중 DROP 전략을 지정한 예제 코드입니다.
import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.scheduler.Schedulers; /** * create 예제 * - Backpressure 전략 적용 */ @Slf4j public class Example14_14 { static int start = 1; static int end = 4; public static void main(String[] args) throws InterruptedException { Flux.create((FluxSink<Integer> emitter) -> { emitter.onRequest(n -> { log.info("# requested: " + n); try { Thread.sleep(500L); for (int i = start; i <= end; i++) { // 4개의 데이터를 emit 합니다. emitter.next(i); } start += 4; end += 4; } catch (InterruptedException e) {} }); emitter.onDispose(() -> { log.info("# clean up"); }); }, FluxSink.OverflowStrategy.DROP) // Backpressure DROP 전략 .subscribeOn(Schedulers.boundedElastic()) .publishOn(Schedulers.parallel(), 2) // 2개씩 데이터를 요청합니다. .subscribe(data -> log.info("# onNext: {}", data)); Thread.sleep(3000L); } }
실행 결과를 보면 Downstream에서 2개의 데이터를 요청하는데, create() Operator 내부에서는 4개의 데이터를 emit하기 때문에 2개의 데이터가 DROP 되는 것을 알 수 있습니다.
[참고 정보]
반응형'Spring Reactive Web Application > Project Reactor' 카테고리의 다른 글
Sequence 변환 Operator (0) 2023.08.05 Sequence 필터링 Operator (0) 2023.08.04 Testing (0) 2023.07.29 Debugging (0) 2023.07.28 Context (0) 2023.07.26