ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Sequence 변환 Operator
    Spring Reactive Web Application/Project Reactor 2023. 8. 5. 08:00
    반응형

    map

      map() Operator는 Upstream에서 emit된 데이터를 mapper Function을 사용하여 변환한 후, Downstream으로 emit 합니다.

     

      다음은 Upstream에서 emit된 문자열의 일부인 'Circle'을 map() Operator 내부에서 replace() 메서드를 이용해 'Rectangle'로 변환 후 Downstream으로 emit 하는 예제 코드입니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    /**
     * map 예제
     */
    @Slf4j
    public class Example14_27 {
        public static void main(String[] args) {
            Flux
                .just("1-Circle", "3-Circle", "5-Circle")
                .map(circle -> circle.replace("Circle", "Rectangle"))
                .subscribe(data -> log.info("# onNext: {}", data));
        }
    }

     

    flatMap

      flatMap() Operator는 Upstream에서 emit된 데이터를 Inner Sequence에서 평탄화(Flatten) 하고, 하나의 Sequence로 병합(Merge)해서 Downstream으로 emit 합니다.

     

      다음 코드의 실행 결과는 Upstream에서 emit되는 데이터 수 (2) X Inner Sequence에서 emit되는 데이터 수 (3) = 6개의 데이터가 최종적으로 Subscriber에게 전달됩니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    /**
     * flatMap 예제
     */
    @Slf4j
    public class Example14_29 {
        public static void main(String[] args) {
            Flux
                .just("Good", "Bad")
                .flatMap(feeling -> Flux
                                        .just("Morning", "Afternoon", "Evening")
                                        .map(time -> feeling + " " + time))
                .subscribe(log::info);
        }
    }
    
    /**
     * [실행결과]
     * Good Morning
     * Good Afternon
     * Good Evening
     * Bad Morning
     * Bad Afternon
     * Bad Evening
     */

     

      다음은 flatMap() Operator를 사용하여 구구단을 출력하는 예제 코드입니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    import reactor.core.scheduler.Schedulers;
    
    /**
     * flatMap 예제
     */
    @Slf4j
    public class Example14_30 {
        public static void main(String[] args) throws InterruptedException {
            Flux
                .range(2, 8)
                .flatMap(dan -> Flux
                                    .range(1, 9)
                                    .map(n -> dan + " * " + n + " = " + dan * n))
                .subscribe(log::info);
        }
    }

     

    concat

      concat() Operator는 파라미터로 입력되는 Publisher의 Sequence를 연결해서 데이터를 순차적으로 emit 합니다. 먼저 입력된 Publisher의 Sequence가 종료될 때까지 나머지 Publisher의 Sequence는 Subscribe되지 않고 대기하는 특성을 가집니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    /**
     * concat 예제
     */
    @Slf4j
    public class Example14_31 {
        public static void main(String[] args) {
            Flux
                .concat(Flux.just(1, 2, 3), Flux.just(4, 5))
                .subscribe(data -> log.info("# onNext: {}", data));
        }
    }
    
    /**
     * [실행 결과]
     * 1, 2, 3, 4, 5 
     */

     

    merge

      merge() Operator는 파라미터로 입력되는 Publisher의 Sequence에서 emit된 데이터를 시간 순서대로 병합(merge)합니다. 다음 예제 코드에서 첫 번째 파라미터의 Flux는 0.3초에 한 번씩 데이터를 emit 하고, 두 번째 파라미터의 Flux는 0.5초에 한 번씩 데이터를 emit 합니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    import java.time.Duration;
    
    /**
     * merge 예제
     */
    @Slf4j
    public class Example14_33 {
        public static void main(String[] args) throws InterruptedException {
            Flux
                .merge(
                        Flux.just(1, 2, 3, 4).delayElements(Duration.ofMillis(300L)),
                        Flux.just(5, 6, 7).delayElements(Duration.ofMillis(500L))
                )
                .subscribe(data -> log.info("# onNext: {}", data));
    
            Thread.sleep(2000L);
        }
    }
    
    /**
     * [실행 결과]
     * 1, 5, 2, 3, 6, 4, 7 
     */
      300ms 500ms 600ms 900ms 1000ms 1200ms 1500ms
    Flux 1 1   2 3   4  
    Flux 2   5     6   7

     

    zip

      zip() Operator는 파라미터로 입력되는 Publisher Sequence에서 emit된 데이터를 결합하는데, 각 Publisher가 데이터를 하나씩 emit 할 때까지 기다렸다가 결합합니다. 다음은 두 개의 Flux가 emit 하는 시간이 다르지만 각 Flux에서 하나씩 emit 할 때까지 기다렸다가 emit된 데이터를 Tuple2 객체로 묶어서 Subscriber에게 전달하는 예제 코드입니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    import java.time.Duration;
    
    /**
     * zip 예제
     */
    @Slf4j
    public class Example14_35 {
        public static void main(String[] args) throws InterruptedException {
            Flux
                .zip(
                        Flux.just(1, 2, 3).delayElements(Duration.ofMillis(300L)),
                        Flux.just(4, 5, 6).delayElements(Duration.ofMillis(500L))
                )
                .subscribe(tuple2 -> log.info("# onNext: {}", tuple2));
    
            Thread.sleep(2500L);
        }
    }
    
    /**
     * [실행 결과]
     * [1,4], [2,5], [3,6] 
     */

     

      다음은 두 개의 Flux가 emit 하는 데이터를 묶어서 Subscriber에게 전달하는 것이 아니라 zip() Operator의 세 번째 파라미터로 combinator(BiFunction 함수형 인터페이스)를 추가해서 두 개의 Flux가 emit 하는 한 쌍의 데이터를 combinator에서 전달받아 변환 작업을 거친 후 최종 변환된 데이터를 Subscriber에게 전달하는 예제 코드입니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    import java.time.Duration;
    
    /**
     * zip 예제
     */
    @Slf4j
    public class Example14_36 {
        public static void main(String[] args) throws InterruptedException {
            Flux
                .zip(
                        Flux.just(1, 2, 3).delayElements(Duration.ofMillis(300L)),
                        Flux.just(4, 5, 6).delayElements(Duration.ofMillis(500L)),
                        (n1, n2) -> n1 * n2
                )
                .subscribe(data -> log.info("# onNext: {}", data));
    
            Thread.sleep(2500L);
        }
    }
    
    /**
     * [실행 결과]
     * 4, 10, 18 
     */

     

    and

      and() Operator는 Mono의 Complete Signal과 파라미터로 입력된 Publisher의 Complete Signal을 결합하여 새로운 Mono<Void>를 반환합니다. 즉, Mono와 파라미터로 입력된 Publisher의 Sequence가 모두 종료되었음을 Subscriber에게 알릴 수 있습니다. 결과적으로 Subscriber에게 onComplete Signal만 전달되고, Upstream에서 emit된 데이터는 전달되지 않습니다.

     

      and() Operator는 모든 작업이 끝난 시점에 최종적으로 후처리 작업을 수행하기 적합한 Operator 입니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    import java.time.Duration;
    
    /**
     * and 예제
     */
    @Slf4j
    public class Example14_38 {
        public static void main(String[] args) throws InterruptedException {
            Mono
                    .just("Task 1")
                    .delayElement(Duration.ofSeconds(1))
                    .doOnNext(data -> log.info("# Mono doOnNext: {}", data))
                    .and(
                            Flux
                                    .just("Task 2", "Task 3")
                                    .delayElements(Duration.ofMillis(600))
                                    .doOnNext(data -> log.info("# Flux doOnNext: {}", data))
                    )
                    .subscribe(
                            data -> log.info("# onNext: {}", data),
                            error -> log.error("# onError:", error),
                            () -> log.info("# onComplete")
                    );
    
            Thread.sleep(5000);
        }
    }
    
    /**
     * [실행 결과]
     * Flux: Task 2
     * Mono: Task 1
     * Flux: Task 3
     * onComplete
     */

     

    collectList

      collectList() Operator는 Flux에서 emit된 데이터를 모아서 List로 변환한 후, 변환된 List를 emit하는 Mono를 반환합니다. 다음은 Upstream에서 emit되는 모스 부호를 해석한 문자를 List로 변환한 후 Subscriber에게 전달하는 예제 코드입니다.

    import chapter14.SampleData;
    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    import java.util.stream.Collectors;
    
    /**
     * collectList 예제
     */
    @Slf4j
    public class Example14_40 {
        public static void main(String[] args) {
            Flux
                .just("...", "---", "...")
                .map(code -> transformMorseCode(code))
                .collectList()
                .subscribe(list -> log.info(list.stream().collect(Collectors.joining())));
        }
    
        public static String transformMorseCode(String morseCode) {
            return SampleData.morseCodeMap.get(morseCode);
        }
    }
    
    /**
     * [실행 결과]
     * sos
     */

     

    collectMap

      collectMap() Operator는 Flux에서 emit된 데이터를 기반으로 key와 value를 생성하여 Map의 Element로 추가한 후, 최종적으로 Map을 emit하는 Mono를 반환합니다. 다음은 모스 부호를 키로, 모스 부호에 해당하는 알파벳을 값으로 가지는 Map을 출력하는 예제 코드입니다.

    import chapter14.SampleData;
    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    /**
     * collectMap 예제
     */
    @Slf4j
    public class Example14_41 {
        public static void main(String[] args) {
            Flux
                .range(0, 26)
                .collectMap(key -> SampleData.morseCodes[key],
                        value -> transformToLetter(value))
                .subscribe(map -> log.info("# onNext: {}", map));
        }
    
        private static String transformToLetter(int value) {
            return Character.toString((char) ('a' + value));
        }
    }
    
    /**
     * [실행 결과]
     * {..=i, .---=j, --.-=q, ---=o, --.=g, .--=w, .-.=r, -.--=y, ....=h, -.-.=c, --=m, -.=n, .-..=l, ...-=v, -.-=k, -..=d, -=t, ..-=u, .=e, ...=s, -...=b, ..-.=f, .--.=p, -..-=x, --..=z, .-=a}
     */

    Note. 실행 결과가 a부터 출력되지 않는 이유는 collectMap 연산자의 특성 때문입니다. collectMap 연산자는 내부적으로 병렬로 작업을 수행할 수 있으며, 각각의 작업은 독립적으로 실행됩니다. 때문에 순서를 보장하지는 않습니다.

     

     

     

    [참고 정보]

    반응형

    'Spring Reactive Web Application > Project Reactor' 카테고리의 다른 글

    에러 처리를 위한 Operator  (0) 2023.08.06
    Sequence의 내부 동작 확인을 위한 Operator  (0) 2023.08.05
    Sequence 필터링 Operator  (0) 2023.08.04
    Sequence 생성 Operator  (0) 2023.08.03
    Testing  (0) 2023.07.29

    댓글

Designed by Tistory.