ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Sinks
    Spring Reactive Web Application/Project Reactor 2023. 7. 24. 07:00
    반응형

      Processor의 기능을 개선한 Sinks가 Reactor 3.4.0 버전부터 지원되기 시작했습니다. Processor와 관련된 API는 Reactor 3.5.0부터 완전히 제거될 예정입니다.

     

    Sinks

    • Sinks는 Publisher와 Subscriber의 기능을 모두 지닌 Processor의 향상된 기능을 제공합니다.
    • Reactive Streams에서 발생하는 signal을 프로그래밍적으로 push할 수 있는 기능을 가지고 있는 Publisher의 일종입니다.
    • Sinks는 Sinks.Many 또는 Sinks.One interface를 사용해서 Thread-Safe하게 signal을 발생시킵니다.

    Note. Reactor에서 프로그래밍 방식으로 signal을 전송하는 가장 일반적인 방법은 generate() Operator나 create() Operator 등을 사용하는 것인데, 이는 싱글스레드 기반에서 signal을 전송합니다. 반면, Sinks는 멀티스레드 방식으로 signal을 전송해도 스레드 안전성을 보장하기 때문에 예기치 않은 동작으로 이어지는 것을 방지해 줍니다.

     

    Sinks 종류 및 특징

    Sinks.One

      Sinks.one() 메서드를 사용해서 한 건의 데이터를 전송하는 방법을 정의해 둔 기능 명세입니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Mono;
    import reactor.core.publisher.Sinks;
    
    import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;
    
    /**
     * Sinks.One 예제
     *  - emit 된 데이터 중에서 단 하나의 데이터만 Subscriber에게 전달한다. 나머지 데이터는 Drop 됨.
     */
    @Slf4j
    public class Example9_4 {
        public static void main(String[] args) throws InterruptedException {
            Sinks.One<String> sinkOne = Sinks.one();
            Mono<String> mono = sinkOne.asMono();
    
            sinkOne.emitValue("Hello Reactor", FAIL_FAST);
    //        sinkOne.emitValue("Hi Reactor", FAIL_FAST);
    //        sinkOne.emitValue(null, FAIL_FAST);
    
            mono.subscribe(data -> log.info("# Subscriber1 {}", data));
            mono.subscribe(data -> log.info("# Subscriber2 {}", data));
        }
    }
    • emitValue(): 메서드의 두 번째 파라미터는 emit 도중 에러가 발생할 경우 어떻게 처리할 것인지에 대한 핸들러를 나타냅니다.
    • FAIL_FAST: 빠르게 실패 처리한다는 의미는 에러가 발생했을 때 재시도를 하지 않고 즉시 실패 처리를 한다는 의미입니다. 주석 처리된 코드를 주석 해제하고 다시 실행하면 출력 결과는 동일하나 Drop 되었다는 디버그 로그를 확인할 수 있습니다. 즉, Sinks.One으로 아무리 많은 수의 데이터를 emit한다 하더라도 처음 emit한 데이터는 정상적으로 emit되지만 나머지 데이터들은 Drop된다는 것을 알 수 있습니다.
    • asMono(): emit한 데이터를 구독하여 전달받기 위해 Mono 객체로 변환합니다.
    Reactor API 문서에서는 asMono() 메서드를 통해 Sinks.One에서 Mono 객체로 변환할 수 있는 것을 'Mono의 의미 체계를 가진다(with Mono semantics)'라고 표현합니다.

     

    Sinks.Many

      Sinks.many() 메서드를 사용해서 여러 건의 데이터를 여러 가지 방식으로 전송하는 기능을 정의해 둔 기능 명세입니다.

     

      Sinks.One은 한 건의 데이터를 emit하는 한 가지 기능만 가지기 때문에 Default Spec(SinksSpecs.DEFAULT_ROOT_SPEC)을 사용합니다. 반면, Sinks.Many의 경우 데이터 emit을 위한 여러 가지 기능이 정의된 ManySpec을 리턴합니다.

    public final class Sinks {
      ...
      ...
        public interface ManySpec {
          UnicastSpec unicast();
          MulticastSpec multicast();
          MulticastReplaySpec replay();
        }
    }

    Note. 네트워크 통신에서 사용하는 Broadcast라는 용어는 네트워크에 연결된 모든 시스템이 정보를 전달받는(One to All) 방식입니다.

    Unicast는 하나의 특정 시스템만 정보를 전달하는(One to One) 방식이고, Multicast는 일부 시스템들만 정보를 전달받는(One to Many) 방식입니다.

     

    UnicastSpec

      UnicastSpec의 기능은 단 하나의 Subscriber에게만 데이터를 emit하는 것이기 때문에 다음 코드의 두 번째 Subscriber는 에러를 발생시킵니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    
    import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;
    
    /**
     * Sinks.Many 예제
     *  - unicast()통해 단 하나의 Subscriber만 데이터를 전달 받을 수 있다
     */
    @Slf4j
    public class Example9_8 {
        public static void main(String[] args) throws InterruptedException {
            Sinks.Many<Integer> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
            Flux<Integer> fluxView = unicastSink.asFlux();
    
            unicastSink.emitNext(1, FAIL_FAST);
            unicastSink.emitNext(2, FAIL_FAST);
    
    
            fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));
    
            unicastSink.emitNext(3, FAIL_FAST);
    
            fluxView.subscribe(data -> log.info("# Subscriber2: {}", data));
            // Caused by: java.lang.IllegalStateException
            //            : UnicastProcessor allows only a single Subscriber
        }
    }
    • asFlux(): Flux 객체로 변환합니다.

     

    MulticastSpec

      MulticastSpec의 기능은 하나 이상의 Subscriber에게 데이터를 emit 합니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    
    import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;
    
    /**
     * Sinks.Many 예제
     *  - multicast()를 사용해서 하나 이상의 Subscriber에게 데이터를 emit하는 예제
     */
    @Slf4j
    public class Example9_9 {
        public static void main(String[] args) {
            Sinks.Many<Integer> multicastSink =
                    Sinks.many().multicast().onBackpressureBuffer();
            Flux<Integer> fluxView = multicastSink.asFlux();
    
            multicastSink.emitNext(1, FAIL_FAST);
            multicastSink.emitNext(2, FAIL_FAST);
    
            fluxView.subscribe(data -> log.info("# Subscriber1: {}", data)); // 1, 2, 3
            fluxView.subscribe(data -> log.info("# Subscriber2: {}", data)); // 3
    
            multicastSink.emitNext(3, FAIL_FAST);
        }
    }

      실행 결과를 확인해보면 Subscriber1은 emit된 세 개의 데이터 모두를 전달받지만, Subscriber2는 세 번째 데이터만 전달받습니다.

    Note. Sinks가 Publisher의 역할을 할 경우 기본적으로 Hot Publisher로 동작합니다.

     

    MulticastReplaySpec

      MulticastReplaySpec에는 emit된 데이터를 다시 Replay해서 구독 전에 이미 emit된 데이터라도 Subscriber가 전달받을 수 있게 하는 다양한 메서드들이 정의되어 있습니다.

    • all(): 구독 전에 이미 emit된 데이터가 있더라도 처음 emit된 데이터부터 모든 데이터들이 Subscriber에게 전달됩니다.
    • limit(): emit된 데이터 중 파라미터로 입력한 개수만큼 나중에 emit된 데이터부터 Subscriber에게 전달됩니다.
    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    
    import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;
    
    /**
     * Sinks.Many 예제
     *  - replay()를 사용하여 이미 emit된 데이터 중에서 특정 개수의 최신 데이터만 전달하는 예제
     */
    @Slf4j
    public class Example9_10 {
        public static void main(String[] args) {
            Sinks.Many<Integer> replaySink = Sinks.many().replay().limit(2);
            Flux<Integer> fluxView = replaySink.asFlux();
    
            replaySink.emitNext(1, FAIL_FAST);
            replaySink.emitNext(2, FAIL_FAST);
            replaySink.emitNext(3, FAIL_FAST);
    
            fluxView.subscribe(data -> log.info("# Subscriber1: {}", data)); // 2, 3, 4
    
            replaySink.emitNext(4, FAIL_FAST);
    
            fluxView.subscribe(data -> log.info("# Subscriber2: {}", data)); // 3, 4
        }
    }

     

     

     

    [참고 정보]

    반응형

    'Spring Reactive Web Application > Project Reactor' 카테고리의 다른 글

    Context  (0) 2023.07.26
    Scheduler  (0) 2023.07.25
    Backpressure  (0) 2023.07.23
    Cold Sequence와 Hot Sequence  (0) 2023.07.23
    Mono와 Flux  (0) 2023.07.22

    댓글

Designed by Tistory.