ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Mono와 Flux
    Spring Reactive Web Application/Project Reactor 2023. 7. 22. 09:00
    반응형

    Mono

    • 0개 또는 1개의 데이터를 emit하는 Publisher 입니다. (Compare with RxJava Maybe)
    • 데이터 emit 과정에서 에러가 발생하면 onError signal을 emit 합니다.

    Mono 마블 다이어그램

    Note. 마블 다이어그램(Marble Diagram)이란 비동기적인 데이터 흐름을 시간의 흐름에 따라 시각적으로 표시한 다이어그램입니다.

    import reactor.core.publisher.Mono;
    
    /**
     * Mono 기본 개념 예제
     *  - 1개의 데이터를 생성해서 emit한다.
     */
    public class Example6_1 {
        public static void main(String[] args) {
            Mono.just("Hello Reactor")
                    .subscribe(System.out::println);
        }
    }
    import reactor.core.publisher.Mono;
    
    /**
     * Mono 기본 개념 예제
     *  - 원본 데이터의 emit 없이 onComplete signal 만 emit 한다.
     */
    public class Example6_2 {
        public static void main(String[] args) {
            Mono
                .empty()
                .subscribe(
                        none -> System.out.println("# emitted onNext signal"),
                        error -> {},
                        () -> System.out.println("# emitted onComplete signal")
                );
        }
    }

    Note. subcribe()의 람다 표현식 파라미터

    • 첫 번째 파라미터: Publisher가 onNext Signal 전송 시 실행됩니다. 즉, 데이터를 전달받기 위해 사용됩니다.
    • 두 번째 파라미터: Publisher가 onError Signal 전송 시 실행됩니다. 즉, 에러가 발생했을 때 사용됩니다.
    • 세 번째 파라미터: Publisher가 onComplete Singal 전송 시 실행됩니다. 즉, 데이터 emit이 종료되었을 때 사용됩니다. 작업 완료 이후 후처리를 진행하는데 사용할 수 있습니다.

     

    Flux

    • 0개 ~ N개의 데이터를 emit하는 Publisher 입니다.
    • 데이터 emit 과정에서 에러가 발생하면 onError signal을 emit 합니다.

    Flux 마블 다이어그램

    import reactor.core.publisher.Flux;
    
    /**
     * Flux 기본 예제
     */
    public class Example6_4 {
        public static void main(String[] args) {
            Flux.just(6, 9, 13)
                    .map(num -> num % 2)
                    .subscribe(System.out::println);
        }
    }
    import reactor.core.publisher.Flux;
    
    /**
     * Flux 에서의 Operator 체인 사용 예제
     */
    public class Example6_5 {
        public static void main(String[] args) {
            Flux.fromArray(new Integer[]{3, 6, 7, 9})
                    .filter(num -> num > 6)
                    .map(num -> num * 2)
                    .subscribe(System.out::println);
        }
    }
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    /**
     * 2개의 Mono를 연결해서 Flux로 변환하는 예제
     */
    public class Example6_6 {
        public static void main(String[] args) {
            Flux<String> flux =
                    Mono.justOrEmpty("Steve")
                            .concatWith(Mono.justOrEmpty("Jobs"));
            flux.subscribe(System.out::println);
        }
    }
    • justOrEmpty(): just() Operator의 경우 파라미터의 값으로 null을 허용하지 않지만, justOrEmpty()는 null을 허용합니다.
    • concatWith(): concatWith()를 호출하는 Publisher와 concatWith()의 파라미터로 전달되는 Publisher가 각각 emit하는 데이터들을 하나로 연결해서 새로운 Publisher의 데이터 소스로 만들어 줍니다.
    import reactor.core.publisher.Flux;
    
    /**
     * 여러개의 Flux를 연결해서 하나의 Flux로 결합하는 예제
     */
    public class Example6_7 {
        public static void main(String[] args) {
            Flux.concat(
                            Flux.just("Mercury", "Venus", "Earth"),
                            Flux.just("Mars", "Jupiter", "Saturn"),
                            Flux.just("Uranus", "Neptune", "Pluto"))
                    .collectList()
                    .subscribe(planets -> System.out.println(planets));
        }
    }
    • concat(): concatWith()의 경우 두 개의 데이터 소스만 연결할 수 있지만, concat()은 여러 개의 데이터 소스를 연결할 수 있습니다.
    • collectList(): Upstream Publisher에서 emit하는 데이터를 모아서 List의 원소로 포함시킨 새로운 데이터 소스로 만들어 주는 Operator 입니다.

     

     

     

    [참고 정보]

    반응형

    'Spring Reactive Web Application > Project Reactor' 카테고리의 다른 글

    Scheduler  (0) 2023.07.25
    Sinks  (0) 2023.07.24
    Backpressure  (0) 2023.07.23
    Cold Sequence와 Hot Sequence  (0) 2023.07.23
    Reactor 개요  (0) 2023.07.22

    댓글

Designed by Tistory.