ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Scheduler
    Spring Reactive Web Application/Project Reactor 2023. 7. 25. 03:00
    반응형

      Reactor에서 사용되는 Scheduler는 Reactor Sequence에서 사용되는 스레드를 관리해 주는 관리자 역할을 합니다.

     

      일반적으로 Java 프로그래밍에서 멀티스레드를 완벽하게 제어하는 것은 쉬운 일이 아닙니다. 스레드 간의 경쟁 조건(Rece Condition) 등을 신중하게 고려해서 코드를 작성해야 하는데, 이로 인해 코드의 복잡도가 높아지고 결과적으로 예상치 못한 오류가 발생할 가능성이 높습니다. Reactor에서는 Scheduler가 스레드의 제어를 대신해 주기 때문에 이러한 문제를 최소화 할 수 있습니다.

     

    Note. 스레드의 개념

    • CPU의 코어는 물리적인 스레드를 의미하며, 이 물리적인 스레드는 논리적인 코어라고도 부릅니다.
    • 논리적인 스레드는 소프트웨어적으로 생성되는 스레드를 의미합니다.
    • 물리적인 스레드는 병렬성과 관련이 있으며, 논리적인 스레드는 동시성과 관련이 있습니다.

     

    Scheduler를 위한 전용 Operator

    • subscribeOn(): 최상위 Upstream Publisher의 실행을 위한 스레드를 지정합니다. 즉, 원본 데이터 소스를 emit 하기 위한 Scheduler를 지정합니다.
    • publishOn():  Operator 체인에서 Downstream Operator의 실행을 위한 스레드를 지정합니다.
    • parallel(): Downstream에 대한 데이터 처리를 병렬로 분할 처리하기 위한 스레드를 지정합니다.

     

    publishOn()과 subscribeOn()의 동작

      publishOn()과 subscribeOn()을 사용하지 않을 경우, 별도의 Scheduler를 추가하지 않기에 모두 main 스레드에서 실행됩니다.

     

      다음은 두 개의 publishOn() Operator를 사용할 경우, Operator 체인에서 실행되는 스레드의 동작 과정입니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    import reactor.core.scheduler.Schedulers;
    
    
    /**
     * subscribeOn()과 publishOn()의 동작 과정 예
     *  - 두 개의 publishOn()을 사용한 경우
     *      - 다음 publishOn()을 만나기 전까지 publishOn() 아래 쪽 Operator들의 실행 쓰레드를 변경한다.
     *
     */
    @Slf4j
    public class Example10_7 {
        public static void main(String[] args) throws InterruptedException {
            Flux
                .fromArray(new Integer[] {1, 3, 5, 7})
                .doOnNext(data -> log.info("# doOnNext fromArray: {}", data))
                .publishOn(Schedulers.parallel())
                .filter(data -> data > 3)
                .doOnNext(data -> log.info("# doOnNext filter: {}", data))
                .publishOn(Schedulers.parallel())
                .map(data -> data * 10)
                .doOnNext(data -> log.info("# doOnNext map: {}", data))
                .subscribe(data -> log.info("# onNext: {}", data));
    
            Thread.sleep(500L);
        }
    }

     

      다음은 subscribeOn()과 publishOn()을 함께 사용할 경우, Operator 체인에서 실행되는 스레드의 동작 과정입니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    import reactor.core.scheduler.Schedulers;
    
    
    /**
     * subscribeOn()과 publishOn()의 동작 과정 예
     *  - subscribeOn()과 publishOn()을 함께 사용한 경우
     *      - subscribeOn()은 구독 직후에 실행될 쓰레드를 지정하고, publishOn()을 만나기 전까지 쓰레드를 변경하지 않는다.
     *
     */
    @Slf4j
    public class Example10_8 {
        public static void main(String[] args) throws InterruptedException {
            Flux
                .fromArray(new Integer[] {1, 3, 5, 7})
                .subscribeOn(Schedulers.boundedElastic())
                .doOnNext(data -> log.info("# doOnNext fromArray: {}", data))
                .filter(data -> data > 3)
                .doOnNext(data -> log.info("# doOnNext filter: {}", data))
                .publishOn(Schedulers.parallel())
                .map(data -> data * 10)
                .doOnNext(data -> log.info("# doOnNext map: {}", data))
                .subscribe(data -> log.info("# onNext: {}", data));
    
            Thread.sleep(500L);
        }
    }

    Note. subscribeOn() 이 publishOn() 앞/뒤에 위치하든 상관없이 publishOn()을 만나기 전까지의 Upstream Operator 체인은 subscribeOn()에서 지정한 스레드에서 실행됩니다. 즉, subscribeOn()은 Operator 체인상에서 어떤 위치에 있든 간에 구독 시점 직후, Publisher가 데이터를 emit하기 전에 실행 스레드를 변경합니다.

     

      이처럼 subscribeOn() Operator와 publishOn() Operator를 함께 사용하면 원본 Publisher에서 데이터를 emit하는 스레드와 emit된 데이터를 가공 처리하는 스레드를 적절하게 분리할 수 있습니다.

     

    parallel()

      subscribeOn() Operator와 publishOn() Operator의 경우, 동시성을 가지는 논리적인 스레드에 해당되지만 parallel() Operator는 병렬성을 가지는 물리적인 스레드에 해당됩니다. parallel()의 경우, 라운드 로빈(Round Robin) 방식으로 CPU 코어 개수(물리적인 스레드)만큼의 스레드를 병렬로 실행합니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    import reactor.core.scheduler.Schedulers;
    
    /**
     * parallel() 기본 사용 예제
     * - parallel()만 사용할 경우에는 병렬로 작업을 수행하지 않는다.
     * - runOn()을 사용해서 Scheduler를 할당해주어야 병렬로 작업을 수행한다.
     * - **** CPU 코어 갯수내에서 worker thread를 할당한다. ****
     */
    @Slf4j
    public class Example10_3 {
        public static void main(String[] args) throws InterruptedException {
            Flux.fromArray(new Integer[]{1, 3, 5, 7, 9, 11, 13, 15, 17, 19})
                    .parallel(4) // 파라미터 미지정 시 컴퓨터 CPU 사양을 따릅니다. 예) 4 코어 8 스레드의 경우, 8개의 스레드 병렬 실행
                    .runOn(Schedulers.parallel())
                    .subscribe(data -> log.info("# onNext: {}", data));
    
            Thread.sleep(100L);
        }
    }

     

    Scheduler의 종류

    1. Schedulers.immediate()

      별도의 스레드를 추가적으로 생성하지 않고, 현재 스레드에서 작업을 처리하고자 할 때 사용할 수 있습니다. Scheduler가 필요한 API에서 별도의 스레드를 추가 할당하고 싶지 않을 경우에 사용할 수 있습니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    import reactor.core.scheduler.Schedulers;
    
    
    /**
     * Schedulers.immediate() 예
     *  - 별도의 쓰레드를 할당하지 않고, 현재 쓰레드에서 실행된다.
     *
     */
    @Slf4j
    public class Example10_9 {
        public static void main(String[] args) throws InterruptedException {
            Flux
                .fromArray(new Integer[] {1, 3, 5, 7})
                .publishOn(Schedulers.parallel())
                .filter(data -> data > 3)
                .doOnNext(data -> log.info("# doOnNext filter: {}", data))
                .publishOn(Schedulers.immediate())
                .map(data -> data * 10)
                .doOnNext(data -> log.info("# doOnNext map: {}", data))
                .subscribe(data -> log.info("# onNext: {}", data));
    
            Thread.sleep(200L);
        }
    }

     

    2. Schedulers.single()

      스레드 하나만 생성해서 Scheduler가 제거되기 전까지 재사용하는 방식입니다. 저지연(low latency) 일회성 실행에 최적화 되어 있습니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    import reactor.core.scheduler.Schedulers;
    
    
    /**
     * Schedulers.single() 예
     *  - Scheduler가 제거될 때까지 동일한 쓰레드를 재사용한다.
     *
     */
    @Slf4j
    public class Example10_10 {
        public static void main(String[] args) throws InterruptedException {
            doTask("task1")
                    .subscribe(data -> log.info("# onNext: {}", data));
    
            doTask("task2")
                    .subscribe(data -> log.info("# onNext: {}", data));
    
            Thread.sleep(200L);
        }
    
        private static Flux<Integer> doTask(String taskName) {
            return Flux.fromArray(new Integer[] {1, 3, 5, 7})
                    .publishOn(Schedulers.single())
                    .filter(data -> data > 3)
                    .doOnNext(data -> log.info("# {} doOnNext filter: {}", taskName, data))
                    .map(data -> data * 10)
                    .doOnNext(data -> log.info("# {} doOnNext map: {}", taskName, data));
        }
    }

      실행 결과를 보면 doTask()가 두 번 호출되었지만 single-1이라는 하나의 스레드에서 처리됩니다. 즉, 첫 번째 호출에서 생성된 스레드를 두 번째 호출에서 재사용하게 됩니다.

     

    Schedulers.newSingle()

      Schedulers.single()이 하나의 스레드를 재사용하는 반면에, Schedulers.newSingle()은 호출할 때마다 매번 새로운 스레드 하나를 생성합니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    import reactor.core.scheduler.Schedulers;
    
    
    /**
     * Schedulers.newSingle() 예
     *  - 호출할 때 마다 매번 하나의 쓰레드를 새로 생성한다.
     *
     */
    @Slf4j
    public class Example10_11 {
        public static void main(String[] args) throws InterruptedException {
            doTask("task1")
                    .subscribe(data -> log.info("# onNext: {}", data));
    
            doTask("task2")
                    .subscribe(data -> log.info("# onNext: {}", data));
    
            Thread.sleep(200L);
        }
    
        private static Flux<Integer> doTask(String taskName) {
            return Flux.fromArray(new Integer[] {1, 3, 5, 7})
                    .publishOn(Schedulers.newSingle("new-single", true))
                    .filter(data -> data > 3)
                    .doOnNext(data -> log.info("# {} doOnNext filter: {}", taskName, data))
                    .map(data -> data * 10)
                    .doOnNext(data -> log.info("# {} doOnNext map: {}", taskName, data));
        }
    }

      실행 결과를 보면 doTask() 메서드를 호출할 때마다 새로운 스레드 하나를 생성해서 각각의 작업을 처리하는 것을 볼 수 있습니다.

    • Schedulers.newSingle(): 첫 번째 파라미터에는 생성할 스레드의 이름을 지정합니다. 두 번째 파라미터는 이 스레드를 데몬(Daemon) 스레드로 동작하게 할지 여부를 설정합니다.

    Note. 데몬(Daemon) 스레드

    데몬 스레드는 보조 스레드라고도 불립니다. 주 스레드가 종료되면 자동으로 종료되는 특성이 있습니다.

     

    3. Schedulers.boundedElastic()

      ExecutorService 기반의 스레드 풀(Thread Pool)을 생성한 후, 그 안에서 정해진 수만큼의 스레드를 사용하여 작업을 처리하고 작업이 종료된 스레드는 반납하여 재사용하는 방식입니다.

      기본적으로 CPU 코어 수 x 10만큼의 스레드를 생성하며, 풀에 있는 모든 스레드가 작업을 처리하고 있다면 이용 가능한 스레드가 생길 때까지 최대 100,000개의 작업이 큐에서 대기할 수 있습니다.

      Schedulers.boundedElastic()은 긴 실행 시간을 가질 수 있는 Blocking I/O 작업에 최적화 되어 있습니다. 다른 Non-Blocking 처리에 영향을 주지 않도록 전용 스레드를 할당하기 때문입니다.

     

    4. Schedulers.parallel()

      여러 개의 스레드를 할당해서 동시에 작업을 수행할 수 있습니다. Non-Blocking I/O에 최적화되어 있는 Scheduler로서 CPU 코어 수만큼의 스레드를 생성합니다.

     

    5. Schedulers.fromExecutorService()

      기존에 이미 사용하고 있는 ExecutorService가 있다면 해당 ExecutorService로부터 Scheduler를 생성하는 방식입니다. ExecutorService로부터 직접 생성할 수도 있지만 Reactor에서는 이 방식을 권장하지 않습니다.

     

    6. Schedulers.newXXX()

      Schedulers.newSingle(), Schedulers.newBoundedElastic(), Schedulers.newParallel() 메서드를 사용해서 새로운 Scheduler 인스턴스를 생성할 수 있습니다. 즉, 스레드 이름, 생성 가능한 디폴트 스레드의 개수, 스레드의 유휴 시간, 데몬 스레드로의 동작 여부 등을 직접 지정해서 커스텀 스레드 풀을 새로 생성할 수 있습니다.

     

     

     

     
    반응형

    'Spring Reactive Web Application > Project Reactor' 카테고리의 다른 글

    Debugging  (0) 2023.07.28
    Context  (0) 2023.07.26
    Sinks  (0) 2023.07.24
    Backpressure  (0) 2023.07.23
    Cold Sequence와 Hot Sequence  (0) 2023.07.23

    댓글

Designed by Tistory.