ABOUT ME

Developer를 위한 Reference 블로그 입니다.

Today
Yesterday
Total
  • Sequence의 내부 동작 확인을 위한 Operator
    Spring Reactive Web Application/Project Reactor 2023. 8. 5. 09:00
    반응형

      Reactor에서는 Upstream Publisher에서 emit되는 데이터의 변경 없이 부수 효과(Side-effect)만을 수행하기 위한 doOnXXX()으로 시작하는 Operator들이 존재합니다. doOnXXX()으로 시작하는 Operator는 Consumer 또는 Runnable 타입의 함수형 인터페이스를 파라미터로 가지기 때문에 별도의 리턴 값이 없습니다. Upstream Publisher의 내부 동작을 로그로 출력하는 등의 디버깅 용도 또는 에러 발생 시 알림을 전송하는 등 부수 효과를 위한 다양한 로직을 적용할 수 있습니다.

     

    Operator 설명
    doOnSubscribe() Publisher가 구독 중일 때 트리거되는 동작을 추가할 수 있습니다.
    doOnRequest() Publisher가 요청을 수신할 때 트리거되는 동작을 추가할 수 있습니다.
    doOnNext() Publisher가 데이터를 emit할 때 트리거되는 동작을 추가할 수 있습니다.
    doOnComplete() Publisher가 성공적으로 완료되었을 때 트리거되는 동작을 추가할 수 있습니다.
    doOnError() Publisher가 에러가 발생한 상태로 종료되었을 때 트리거되는 동작을 추가할 수 있습니다.
    doOnCancel() Publisher가 취소되었을 때 트리거되는 동작을 추가할 수 있습니다.
    doOnTerminate() Publisher가 성공적으로 완료되었을 때 또는 에러가 발생한 상태로 종료되었을 때 트리거되는 동작을 추가할 수 있습니다.
    doOnEach() Publisher가 데이터를 emit할 때, 성공적으로 완료되었을 때, 에러가 발생한 상태로 종료되었을 때 트리거되는 동작을 추가할 수 있습니다.
    doOnDiscard() Upstream에 있는 전체 Operator 체인의 동작 중에서 Operator에 의해 폐기되는 요소를 조건부로 정리할 수 있습니다.
    doAfterTerminate() Downstream을 성공적으로 완료한 직후 또는 에러가 발생하여 Publisher가 종료된 직후에 트리거되는 동작을 추가할 수 있습니다.
    doFirst() Publisher가 구독되기 전에 트리거되는 동작을 추가할 수 있습니다.
    doFinally() 에러를 포함해서 어떤 이유이든 간에 Publisher가 종료된 후 트리거되는 동작을 추가할 수 있습니다.

     

      다음은 doOnXXX() Operator 중에서 일부를 Operator 체인상에 추가해서 doOnXXX() Operator의 동작 시점을 확인해보는 예제 코드입니다.

    import lombok.extern.slf4j.Slf4j;
    import org.reactivestreams.Subscription;
    import reactor.core.publisher.BaseSubscriber;
    import reactor.core.publisher.Flux;
    
    /**
     * doOnXXXX 예제
     *  - doOnXXXX() Operator의 호출 시점을 알 수 있다.
     */
    @Slf4j
    public class Example14_42 {
        public static void main(String[] args) {
            Flux.range(1, 5)
                .doFinally(signalType -> log.info("# doFinally 1: {}", signalType))
                .doFinally(signalType -> log.info("# doFinally 2: {}", signalType))
                .doOnNext(data -> log.info("# range > doOnNext(): {}", data))
                .doOnRequest(data -> log.info("# doOnRequest: {}", data))
                .doOnSubscribe(subscription -> log.info("# doOnSubscribe 1"))
                .doFirst(() -> log.info("# doFirst()"))
                .filter(num -> num % 2 == 1)
                .doOnNext(data -> log.info("# filter > doOnNext(): {}", data))
                .doOnComplete(() -> log.info("# doOnComplete()"))
                .subscribe(new BaseSubscriber<>() {
                    @Override
                    protected void hookOnSubscribe(Subscription subscription) {
                        request(1);
                    }
    
                    @Override
                    protected void hookOnNext(Integer value) {
                        log.info("# hookOnNext: {}", value);
                        request(1);
                    }
                });
        }
    }

      다음은 실행 결과입니다.

    # doFirst()
    # doOnSubscribe 1
    # doOnRequest: 1
    # range > doOnNext(): 1
    # filter > doOnNext(): 1
    # hookOnNext: 1
    # doOnRequest: 1
    # range > doOnNext(): 2
    # range > doOnNext(): 3
    # filter > doOnNext(): 3
    # hookOnNext: 3
    # doOnRequest: 1
    # range > doOnNext(): 4
    # range > doOnNext(): 5
    # filter > doOnNext(): 5
    # hookOnNext: 5
    # doOnRequest: 1
    # doOnComplete()
    # doFinally 2: onComplete
    # doFinally 1: onComplete
    
    Process finished with exit code 0
    • doFirst() Operator는 위치와 상관없이 제일 먼저 동작합니다.
    • doFinally() Operator 역시 위치와 무관하게 제일 마지막에 동작합니다(선언 시점 역순으로 동작합니다).
    • 구독이 발생하면 doOnSubscriber() Operator가 동작합니다.
    • Subscribe의 요청이 있을 때 doOnRequest() Operator가 동작합니다.
    • Upstream에서 데이터가 emit될 때마다 doOnNext() Operator가 동작합니다.

     

     

     

    [참고 정보]

    반응형

    댓글

Designed by Tistory.