hanseom
2023. 7. 22. 09:00
반응형
Mono
- 0개 또는 1개의 데이터를 emit하는 Publisher 입니다. (Compare with RxJava Maybe)
- 데이터 emit 과정에서 에러가 발생하면 onError signal을 emit 합니다.
Note. 마블 다이어그램(Marble Diagram)이란 비동기적인 데이터 흐름을 시간의 흐름에 따라 시각적으로 표시한 다이어그램입니다.
import reactor.core.publisher.Mono;
/**
* Mono 기본 개념 예제
* - 1개의 데이터를 생성해서 emit한다.
*/
public class Example6_1 {
public static void main(String[] args) {
Mono.just("Hello Reactor")
.subscribe(System.out::println);
}
}
import reactor.core.publisher.Mono;
/**
* Mono 기본 개념 예제
* - 원본 데이터의 emit 없이 onComplete signal 만 emit 한다.
*/
public class Example6_2 {
public static void main(String[] args) {
Mono
.empty()
.subscribe(
none -> System.out.println("# emitted onNext signal"),
error -> {},
() -> System.out.println("# emitted onComplete signal")
);
}
}
Note. subcribe()의 람다 표현식 파라미터
- 첫 번째 파라미터: Publisher가 onNext Signal 전송 시 실행됩니다. 즉, 데이터를 전달받기 위해 사용됩니다.
- 두 번째 파라미터: Publisher가 onError Signal 전송 시 실행됩니다. 즉, 에러가 발생했을 때 사용됩니다.
- 세 번째 파라미터: Publisher가 onComplete Singal 전송 시 실행됩니다. 즉, 데이터 emit이 종료되었을 때 사용됩니다. 작업 완료 이후 후처리를 진행하는데 사용할 수 있습니다.
Flux
- 0개 ~ N개의 데이터를 emit하는 Publisher 입니다.
- 데이터 emit 과정에서 에러가 발생하면 onError signal을 emit 합니다.
import reactor.core.publisher.Flux;
/**
* Flux 기본 예제
*/
public class Example6_4 {
public static void main(String[] args) {
Flux.just(6, 9, 13)
.map(num -> num % 2)
.subscribe(System.out::println);
}
}
import reactor.core.publisher.Flux;
/**
* Flux 에서의 Operator 체인 사용 예제
*/
public class Example6_5 {
public static void main(String[] args) {
Flux.fromArray(new Integer[]{3, 6, 7, 9})
.filter(num -> num > 6)
.map(num -> num * 2)
.subscribe(System.out::println);
}
}
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* 2개의 Mono를 연결해서 Flux로 변환하는 예제
*/
public class Example6_6 {
public static void main(String[] args) {
Flux<String> flux =
Mono.justOrEmpty("Steve")
.concatWith(Mono.justOrEmpty("Jobs"));
flux.subscribe(System.out::println);
}
}
- justOrEmpty(): just() Operator의 경우 파라미터의 값으로 null을 허용하지 않지만, justOrEmpty()는 null을 허용합니다.
- concatWith(): concatWith()를 호출하는 Publisher와 concatWith()의 파라미터로 전달되는 Publisher가 각각 emit하는 데이터들을 하나로 연결해서 새로운 Publisher의 데이터 소스로 만들어 줍니다.
import reactor.core.publisher.Flux;
/**
* 여러개의 Flux를 연결해서 하나의 Flux로 결합하는 예제
*/
public class Example6_7 {
public static void main(String[] args) {
Flux.concat(
Flux.just("Mercury", "Venus", "Earth"),
Flux.just("Mars", "Jupiter", "Saturn"),
Flux.just("Uranus", "Neptune", "Pluto"))
.collectList()
.subscribe(planets -> System.out.println(planets));
}
}
- concat(): concatWith()의 경우 두 개의 데이터 소스만 연결할 수 있지만, concat()은 여러 개의 데이터 소스를 연결할 수 있습니다.
- collectList(): Upstream Publisher에서 emit하는 데이터를 모아서 List의 원소로 포함시킨 새로운 데이터 소스로 만들어 주는 Operator 입니다.
[참고 정보]
반응형