ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Backpressure
    Spring Reactive Web Application/Project Reactor 2023. 7. 23. 10:30
    반응형

    Backpressure

      Backpressure는 우리말로 배압 또는 역압이라고 합니다. 리액티브 프로그래밍에서의 배압, 즉 Backpressure는 Publisher가 끊임없이 emit하는 무수히 많은 데이터를 적절하게 제어하여 데이터 처리에 과부하가 걸리지 않도록 제어하는 것입니다.

    Note. Publisher가 빠른 속도로 데이터를 끊임없이 emit하게 되면 처리되지 않고 대기 중인 데이터가 지속적으로 쌓이게 되어 오버플로가 발생하거나 최악의 경우에는 시스템이 다운되는 문제가 발생하게 됩니다.

     

    Reactor에서의 Backpressure 처리 방법

    • 요청 데이터 개수 제어
    • Backpressure 전략 사용

     

    요청 데이터 개수 제어

      Subscriber가 적절히 처리할 수 있는 수준의 데이터 개수를 Publisher에게 요청하는 것입니다.

     

      Reactor에서는 Subscriber가 데이터 요청 개수를 직접 제어하기 위해서 Subscriber 인터페이스의 구현 클래스인 BaseSubscriber를 사용할 수 있습니다. 다음 코드는 range() Operator를 사용하여 숫자 1부터 1씩 증가한 다섯 개의 데이터를 emit 하도록 정의되었으며, BaseSubscriber가 데이터를 1개씩 보내 주기를 Publisher에게 요청하는 예입니다.

    import lombok.SneakyThrows;
    import lombok.extern.slf4j.Slf4j;
    import org.reactivestreams.Subscription;
    import reactor.core.publisher.BaseSubscriber;
    import reactor.core.publisher.Flux;
    
    /**
     * doOnXXXX 예제
     *  - doOnXXXX() Operator의 호출 시점을 알 수 있다.
     */
    @Slf4j
    public class Example8_1 {
        public static void main(String[] args) {
            Flux.range(1, 5)
                .doOnRequest(data -> log.info("# doOnRequest: {}", data))
                .subscribe(new BaseSubscriber<Integer>() {
                    @Override
                    protected void hookOnSubscribe(Subscription subscription) {
                        request(1);
                    }
    
                    @SneakyThrows
                    @Override
                    protected void hookOnNext(Integer value) {
                        Thread.sleep(2000L);
                        log.info("# hookOnNext: {}", value);
                        request(1);
                    }
                });
        }
    }

     

    Backpressure 전략

    종류 설명
    IGNORE 전략 Backpressure를 적용하지 않습니다.
    ERROR 전략 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, Exception을 발생시키는 전략입니다.
    DROP 전략 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기하는 먼저 emit된 데이터부터 Drop시키는 전략입니다.
    LATEST 전략 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기하는 가장 최근에(나중에) emit된 데이터부터 버퍼에 채우는 전략입니다.
    BUFFER 전략 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 안에 있는 데이터부터 Drop시키는 전략입니다.

     

    IGNORE 전략

      Backpressure를 적용하지 않는 전략입니다. IGNORE 전략을 사용할 경우 Downstream에서의 Backpressure 요청이 무시되기 때문에 IllegalStateException이 발생할 수 있습니다.

     

    ERROR 전략

      Downstream의 데이터 처리 속도가 느려서 Upstream의 emit 속도를 따라가지 못할 경우 IllegalStateException을 발생시킵니다. 이 경우 Publisher는 Error Signal을 Subscriber에게 전송하고 삭제한 데이터는 폐기합니다.

     

      다음 코드는 Publisher가 0.001초에 한 번씩 데이터를 emit 하고, Subscriber가 0.005초에 한 번씩 데이터를 처리하는 예입니다. 코드를 실행해보면 255라는 숫자를 출력하고 OverflowException이 발생하면서 Sequence가 종료되는 것을 확인할 수 있습니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    import reactor.core.scheduler.Schedulers;
    
    import java.time.Duration;
    
    /**
     * Unbounded request 일 경우, Downstream 에 Backpressure Error 전략을 적용하는 예제
     *  - Downstream 으로 전달 할 데이터가 버퍼에 가득 찰 경우, Exception을 발생 시키는 전략
     */
    @Slf4j
    public class Example8_2 {
        public static void main(String[] args) throws InterruptedException {
            Flux
                .interval(Duration.ofMillis(1L))
                .onBackpressureError()
                .doOnNext(data -> log.info("# doOnNext: {}", data))
                .publishOn(Schedulers.parallel())
                .subscribe(data -> {
                            try {
                                Thread.sleep(5L);
                            } catch (InterruptedException e) {}
                            log.info("# onNext: {}", data);
                        },
                        error -> log.error("# onError", error));
    
            Thread.sleep(2000L);
        }
    }

     

    DROP 전략

      Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기 중인 데이터 중 먼저 emit된 데이터부터 Drop 시키는 전략입니다. Drop된 데이터는 폐기됩니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    import reactor.core.scheduler.Schedulers;
    
    import java.time.Duration;
    
    /**
     * Unbounded request 일 경우, Downstream 에 Backpressure Drop 전략을 적용하는 예제
     *  - Downstream 으로 전달 할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기하는 먼저 emit 된 데이터를 Drop 시키는 전략
     */
    @Slf4j
    public class Example8_3 {
        public static void main(String[] args) throws InterruptedException {
            Flux
                .interval(Duration.ofMillis(1L))
                .onBackpressureDrop(dropped -> log.info("# dropped: {}", dropped))
                .publishOn(Schedulers.parallel())
                .subscribe(data -> {
                            try {
                                Thread.sleep(5L);
                            } catch (InterruptedException e) {}
                            log.info("# onNext: {}", data);
                        },
                        error -> log.error("# onError", error));
    
            Thread.sleep(2000L);
        }
    }

     

    LATEST전략

      Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기 중인 데이터 중에서 가장 최근에(나중에) emit된 데이터부터 버퍼에 채우는 전략입니다.

    Note. Drop 전략은 버퍼가 가득 찰 경우 버퍼 밖에서 대기 중인 데이터를 하나씩 차례대로 Drop 하면서 폐기합니다. 이와 달리 LATEST 전략은 새로운 데이터가 들어오는 시점에 가장 최근의 데이터만 남겨 두고 나머지 데이터를 폐기합니다.

      그림상으로는 Step 4에서 가장 최근에 emit된 숫자 17 이외의 나머지 숫자들이 한꺼번에 폐기되는 것처럼 표현했지만, 실제로는 데이터가 들어올 때마다 이전에 유지하고 있던 데이터가 폐기됩니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    import reactor.core.scheduler.Schedulers;
    
    import java.time.Duration;
    
    /**
     * Unbounded request 일 경우, Downstream 에 Backpressure Latest 전략을 적용하는 예제
     *  - Downstream 으로 전달 할 데이터가 버퍼에 가득 찰 경우,
     *    버퍼 밖에서 대기하는 가장 나중에(최근에) emit 된 데이터부터 버퍼에 채우는 전략
     */
    @Slf4j
    public class Example8_4 {
        public static void main(String[] args) throws InterruptedException {
            Flux
                .interval(Duration.ofMillis(1L))
                .onBackpressureLatest()
                .publishOn(Schedulers.parallel())
                .subscribe(data -> {
                            try {
                                Thread.sleep(5L);
                            } catch (InterruptedException e) {}
                            log.info("# onNext: {}", data);
                        },
                        error -> log.error("# onError", error));
    
            Thread.sleep(2000L);
        }
    }

     

    BUFFER 전략

      BUFFER 전략은 버퍼의 데이터를 폐기하지 않고 버퍼링을 하는 전략, 버퍼가 가득 차면 버퍼 내의 데이터를 폐기하는 전략, 버퍼가 가득 차면 에러를 발생시키는 전략 등으로 구분할 수 있습니다. 이번에는 그중에서 버퍼가 가득 찼을 때 버퍼 내의 데이터를 폐기하는 전략을 알아보겠습니다.

     

      DROP 전략과 LATEST 전략은 버퍼가 가득 찼을 때 버퍼가 비워질 때까지 버퍼 바깥쪽에 있는 데이터를 폐기하는 방식입니다. 이와 달리 BUFFER 전략에서의 데이터 폐기는 버퍼가 가득 찼을 때 버퍼 안에 있는 데이터를 폐기하는 것을 의미합니다. BUFFER 전략 중 데이터를 폐기하는 전략에는 DROP_LATEST전략과 DROP_OLDEST 전략이 있습니다.

     

    DROP_LATEST 전략

      버퍼가 가득 찰 경우 가장 최근에(나중에) 버퍼 안에 채워진 데이터를 Drop하여 폐기한 후, 확보된 공간에 emit된 데이터를 채우는 전략입니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.BufferOverflowStrategy;
    import reactor.core.publisher.Flux;
    import reactor.core.scheduler.Schedulers;
    
    import java.time.Duration;
    
    /**
     * Unbounded request 일 경우, Downstream 에 Backpressure Buffer DROP_LATEST 전략을 적용하는 예제
     *  - Downstream 으로 전달 할 데이터가 버퍼에 가득 찰 경우,
     *    버퍼 안에 있는 데이터 중에서 가장 최근에(나중에) 버퍼로 들어온 데이터부터 Drop 시키는 전략
     */
    @Slf4j
    public class Example8_5 {
        public static void main(String[] args) throws InterruptedException {
            Flux
                .interval(Duration.ofMillis(300L))
                .doOnNext(data -> log.info("# emitted by original Flux: {}", data))
                .onBackpressureBuffer(2,
                        dropped -> log.info("** Overflow & Dropped: {} **", dropped),
                        BufferOverflowStrategy.DROP_LATEST)
                .doOnNext(data -> log.info("[ # emitted by Buffer: {} ]", data))
                .publishOn(Schedulers.parallel(), false, 1)
                .subscribe(data -> {
                            try {
                                Thread.sleep(1000L);
                            } catch (InterruptedException e) {}
                            log.info("# onNext: {}", data);
                        },
                        error -> log.error("# onError", error));
    
            Thread.sleep(2500L);
        }
    }

    onBackpressureBuffer()

    • 첫 번째 파라미터: 버퍼의 최대 용량입니다.
    • 두 번째 파라미터: 버퍼 오버플로가 발생했을 때 Drop되는 데이터를 전달받아 후처리를 할 수 있습니다.
    • 세 번째 파라미터: Backpressure 전략입니다.

     

    DROP_OLDEST 전략

      버퍼가 가득 찰 경우 버퍼 안에 채워진 데이터 중에서 가장 오래된 데이터를 Drop하여 폐기한 후, 확보된 공간에 emit된 데이터를 채우는 전략입니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.BufferOverflowStrategy;
    import reactor.core.publisher.Flux;
    import reactor.core.scheduler.Schedulers;
    
    import java.time.Duration;
    
    /**
     * Unbounded request 일 경우, Downstream 에 Backpressure Buffer DROP_OLDEST 전략을 적용하는 예제
     *  - Downstream 으로 전달 할 데이터가 버퍼에 가득 찰 경우,
     *    버퍼 안에 있는 데이터 중에서 가장 먼저 버퍼로 들어온 오래된 데이터부터 Drop 시키는 전략
     */
    @Slf4j
    public class Example8_6 {
        public static void main(String[] args) throws InterruptedException {
            Flux
                .interval(Duration.ofMillis(300L))
                .doOnNext(data -> log.info("# emitted by original Flux: {}", data))
                .onBackpressureBuffer(2,
                        dropped -> log.info("** Overflow & Dropped: {} **", dropped),
                        BufferOverflowStrategy.DROP_OLDEST)
                .doOnNext(data -> log.info("[ # emitted by Buffer: {} ]", data))
                .publishOn(Schedulers.parallel(), false, 1)
                .subscribe(data -> {
                            try {
                                Thread.sleep(1000L);
                            } catch (InterruptedException e) {}
                            log.info("# onNext: {}", data);
                        },
                        error -> log.error("# onError", error));
    
            Thread.sleep(2500L);
        }
    }

     

     

     

    반응형

    'Spring Reactive Web Application > Project Reactor' 카테고리의 다른 글

    Scheduler  (0) 2023.07.25
    Sinks  (0) 2023.07.24
    Cold Sequence와 Hot Sequence  (0) 2023.07.23
    Mono와 Flux  (0) 2023.07.22
    Reactor 개요  (0) 2023.07.22

    댓글

Designed by Tistory.