ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Cold Sequence와 Hot Sequence
    Spring Reactive Web Application/Project Reactor 2023. 7. 23. 09:00
    반응형

    Cold Sequence

      Subscriber의 구독 시점이 달라도 구독을 할 때마다 Publisher가 데이터를 emit하는 과정을 처음부터 다시 시작하는 데이터 흐름을 Cold Sequence라고 부릅니다. 그리고 이 Cold Sequence 흐름으로 동작하는 Publisher를 Cold Publisher라고 합니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    import java.util.Arrays;
    
    /**
     * Cold Sequence 예제
     */
    @Slf4j
    public class Example7_1 {
        public static void main(String[] args) throws InterruptedException {
    
            Flux<String> coldFlux =
                    Flux
                        .fromIterable(Arrays.asList("KOREA", "JAPAN", "CHINESE"))
                        .map(String::toLowerCase);
    
            coldFlux.subscribe(country -> log.info("# Subscriber1: {}", country)); // korea, japan, chinese
            System.out.println("----------------------------------------------------------------------");
            Thread.sleep(2000L);
            coldFlux.subscribe(country -> log.info("# Subscriber2: {}", country)); // korea, japan, chinese
        }
    }

     

    Hot Sequence

      Publisher가 데이터를 emit하는 과정이 한 번만 일어나고, Subscriber가 각각의 구독 시점 이후에 emit된 데이터만 전달받는 데이터의 흐름을 Hot Sequence라고 합니다. Hot Sequence의 경우 Cold Sequence와 반대로 구독 횟수와 상관없이 타임라인이 하나만 생성됩니다. 그리고 이 Hot Sequence 흐름으로 동작하는 Publisher를 Hot Publisher라고 합니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    import java.time.Duration;
    
    /**
     * Hot Sequence 예제
     */
    @Slf4j
    public class Example7_2 {
        public static void main(String[] args) throws InterruptedException {
            String[] singers = {"Singer A", "Singer B", "Singer C", "Singer D", "Singer E"};
    
            log.info("# Begin concert:");
            Flux<String> concertFlux =
                    Flux
                        .fromArray(singers)
                        .delayElements(Duration.ofSeconds(1))
                        .share();
    
            concertFlux.subscribe(
                    singer -> log.info("# Subscriber1 is watching {}'s song", singer)
            ); // A, B, C, D, E
    
            Thread.sleep(2500);
    
            concertFlux.subscribe(
                    singer -> log.info("# Subscriber2 is watching {}'s song", singer)
            ); // C, D, E
    
            Thread.sleep(3000);
        }
    }
    • delayElements(): 데이터 소스로 입력된 각 데이터의 emit을 일정시간 동안 지연시키는 Operator 입니다.
    • share(): Cold Sequence를 Hot Sequence로 동작하게 해주는 Operator 입니다. 즉, 여러 Subscriber가 하나의 원본 Flux를 공유한다는 의미입니다.

     

     

     

    [참고 정보]

    반응형

    'Spring Reactive Web Application > Project Reactor' 카테고리의 다른 글

    Scheduler  (0) 2023.07.25
    Sinks  (0) 2023.07.24
    Backpressure  (0) 2023.07.23
    Mono와 Flux  (0) 2023.07.22
    Reactor 개요  (0) 2023.07.22

    댓글

Designed by Tistory.