hanseom 2023. 7. 22. 09:00
반응형

Mono

  • 0개 또는 1개의 데이터를 emit하는 Publisher 입니다. (Compare with RxJava Maybe)
  • 데이터 emit 과정에서 에러가 발생하면 onError signal을 emit 합니다.

Mono 마블 다이어그램

Note. 마블 다이어그램(Marble Diagram)이란 비동기적인 데이터 흐름을 시간의 흐름에 따라 시각적으로 표시한 다이어그램입니다.

import reactor.core.publisher.Mono;

/**
 * Mono 기본 개념 예제
 *  - 1개의 데이터를 생성해서 emit한다.
 */
public class Example6_1 {
    public static void main(String[] args) {
        Mono.just("Hello Reactor")
                .subscribe(System.out::println);
    }
}
import reactor.core.publisher.Mono;

/**
 * Mono 기본 개념 예제
 *  - 원본 데이터의 emit 없이 onComplete signal 만 emit 한다.
 */
public class Example6_2 {
    public static void main(String[] args) {
        Mono
            .empty()
            .subscribe(
                    none -> System.out.println("# emitted onNext signal"),
                    error -> {},
                    () -> System.out.println("# emitted onComplete signal")
            );
    }
}

Note. subcribe()의 람다 표현식 파라미터

  • 첫 번째 파라미터: Publisher가 onNext Signal 전송 시 실행됩니다. 즉, 데이터를 전달받기 위해 사용됩니다.
  • 두 번째 파라미터: Publisher가 onError Signal 전송 시 실행됩니다. 즉, 에러가 발생했을 때 사용됩니다.
  • 세 번째 파라미터: Publisher가 onComplete Singal 전송 시 실행됩니다. 즉, 데이터 emit이 종료되었을 때 사용됩니다. 작업 완료 이후 후처리를 진행하는데 사용할 수 있습니다.

 

Flux

  • 0개 ~ N개의 데이터를 emit하는 Publisher 입니다.
  • 데이터 emit 과정에서 에러가 발생하면 onError signal을 emit 합니다.

Flux 마블 다이어그램

import reactor.core.publisher.Flux;

/**
 * Flux 기본 예제
 */
public class Example6_4 {
    public static void main(String[] args) {
        Flux.just(6, 9, 13)
                .map(num -> num % 2)
                .subscribe(System.out::println);
    }
}
import reactor.core.publisher.Flux;

/**
 * Flux 에서의 Operator 체인 사용 예제
 */
public class Example6_5 {
    public static void main(String[] args) {
        Flux.fromArray(new Integer[]{3, 6, 7, 9})
                .filter(num -> num > 6)
                .map(num -> num * 2)
                .subscribe(System.out::println);
    }
}
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
 * 2개의 Mono를 연결해서 Flux로 변환하는 예제
 */
public class Example6_6 {
    public static void main(String[] args) {
        Flux<String> flux =
                Mono.justOrEmpty("Steve")
                        .concatWith(Mono.justOrEmpty("Jobs"));
        flux.subscribe(System.out::println);
    }
}
  • justOrEmpty(): just() Operator의 경우 파라미터의 값으로 null을 허용하지 않지만, justOrEmpty()는 null을 허용합니다.
  • concatWith(): concatWith()를 호출하는 Publisher와 concatWith()의 파라미터로 전달되는 Publisher가 각각 emit하는 데이터들을 하나로 연결해서 새로운 Publisher의 데이터 소스로 만들어 줍니다.
import reactor.core.publisher.Flux;

/**
 * 여러개의 Flux를 연결해서 하나의 Flux로 결합하는 예제
 */
public class Example6_7 {
    public static void main(String[] args) {
        Flux.concat(
                        Flux.just("Mercury", "Venus", "Earth"),
                        Flux.just("Mars", "Jupiter", "Saturn"),
                        Flux.just("Uranus", "Neptune", "Pluto"))
                .collectList()
                .subscribe(planets -> System.out.println(planets));
    }
}
  • concat(): concatWith()의 경우 두 개의 데이터 소스만 연결할 수 있지만, concat()은 여러 개의 데이터 소스를 연결할 수 있습니다.
  • collectList(): Upstream Publisher에서 emit하는 데이터를 모아서 List의 원소로 포함시킨 새로운 데이터 소스로 만들어 주는 Operator 입니다.

 

 

 

[참고 정보]

반응형