Spring Reactive Web Application/Project Reactor
-
다수의 Subscriber에게 Flux를 멀티캐스팅(Multicasting) 하기 위한 OperatorSpring Reactive Web Application/Project Reactor 2023. 8. 10. 22:00
Subscriber가 구독을 하면 Upstream에서 emit된 데이터가 구독 중인 모든 Subscriber에게 멀티캐스팅(Multicasting)되는데, 이를 가능하게 해주는 Operator들은 Cold Sequence를 Hot Sequence로 동작하게 하는 특징이 있습니다. publish publish() Operator는 구독을 하더라도 구독 시점에 즉시 데이터를 emit 하지 않고, connect()를 호출하는 시점에 데이터를 emit 합니다. 그리고 Hot Sequence로 변환되기 때문에 구독 시점 이후에 emit된 데이터만 전달받을 수 있습니다. import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.ConnectableFlux; ..
-
Flux Sequence 분할을 위한 OperatorSpring Reactive Web Application/Project Reactor 2023. 8. 9. 05:00
window window(int maxSize) Operator는 Upstream에서 emit 되는 첫 번째 데이터부터 maxSize 숫자만큼의 데이터를 포함하는 새로운 Flux로 분할합니다. Reactor에서는 이렇게 분할된 Flux를 윈도우(Window)라고 합니다. 마지막 윈도우에 포함된 데이터의 개수는 maxSize보다 더 적거나 같습니다. 다음은 window() Operator를 이용해서 2021년도 분기별 도서 매출액을 구하는 예제 코드입니다. 원본 데이터 소스는 2021년도 월별 도서 매출액 데이터이며, 이 데이터를 window() Operator로 3개씩 분할한 후 MathFlux.sumInt() Operator를 이용해 3개씩 분할된 데이터의 합계를 구합니다. import chapter1..
-
Sequence의 동작 시간 측정을 위한 OperatorSpring Reactive Web Application/Project Reactor 2023. 8. 8. 21:00
Reactor Sequence의 동작 시간을 제어하는 Operator에는 delayElements(), interval(), timeout() 등이 있습니다. 이외 동작 시간 자체를 측정하는 elapsed Operator가 존재합니다. elapsed elapsed() Operator는 emit된 데이터 사이의 경과 시간을 측정해서 Tuple 형태로 Downstream에 emit 합니다. emit 되는 첫 번째 데이터는 onSubscribe Signal과 첫 번째 데이터 사이를 기준으로 시간을 측정합니다. 측정된 시간의 단위는 milliseconds 입니다. 다음은 delayElements() Operator를 사용하여 1초에 한 번씩 데이터가 emit 되고, Subscriber는 emit 되는 데이터 사..
-
에러 처리를 위한 OperatorSpring Reactive Web Application/Project Reactor 2023. 8. 6. 09:00
error error() Operator는 파라미터로 지정된 에러로 종료하는 Flux를 생성합니다. error() Operator는 Java의 throw 키워드를 사용해서 예외를 의도적으로 던지는 것 같은 역할을 하는데 주로 체크 예외(Checked Exception)를 캐치해서 다시 던져야 하는 경우 사용할 수 있습니다. Note. 체크 예외(Checked Exception) 체크 예외는 Java에서 Exception 클래스를 상속한 클래스들을 의미하며 try ~ catch 문으로 반드시 처리해야 되는 예외입니다. 다음은 Upstream에서 emit되는 숫자에 2를 곱한 값이 3의 배수가 되는 경우 Downstream으로 emit을 허용하지 않기 위해 error() Operator를 사용한 예제 코드입..
-
Sequence의 내부 동작 확인을 위한 OperatorSpring Reactive Web Application/Project Reactor 2023. 8. 5. 09:00
Reactor에서는 Upstream Publisher에서 emit되는 데이터의 변경 없이 부수 효과(Side-effect)만을 수행하기 위한 doOnXXX()으로 시작하는 Operator들이 존재합니다. doOnXXX()으로 시작하는 Operator는 Consumer 또는 Runnable 타입의 함수형 인터페이스를 파라미터로 가지기 때문에 별도의 리턴 값이 없습니다. Upstream Publisher의 내부 동작을 로그로 출력하는 등의 디버깅 용도 또는 에러 발생 시 알림을 전송하는 등 부수 효과를 위한 다양한 로직을 적용할 수 있습니다. Operator 설명 doOnSubscribe() Publisher가 구독 중일 때 트리거되는 동작을 추가할 수 있습니다. doOnRequest() Publisher가..
-
Sequence 변환 OperatorSpring Reactive Web Application/Project Reactor 2023. 8. 5. 08:00
map map() Operator는 Upstream에서 emit된 데이터를 mapper Function을 사용하여 변환한 후, Downstream으로 emit 합니다. 다음은 Upstream에서 emit된 문자열의 일부인 'Circle'을 map() Operator 내부에서 replace() 메서드를 이용해 'Rectangle'로 변환 후 Downstream으로 emit 하는 예제 코드입니다. import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; /** * map 예제 */ @Slf4j public class Example14_27 { public static void main(String[] args) { Flux .just("1-..
-
Sequence 필터링 OperatorSpring Reactive Web Application/Project Reactor 2023. 8. 4. 05:00
filter filter() Operator는 Upstream에서 emit된 데이터 중 조건에 일치하는 데이터만 Downstream으로 emit 합니다. 즉, 파라미터로 입력받은 Predicate의 리턴 값이 true인 데이터만 Downstream으로 emit 합니다. import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; /** * filter 예제 */ @Slf4j public class Example14_15 { public static void main(String[] args) { Flux .range(1, 20) .filter(num -> num % 2 != 0) .subscribe(data -> log.info("# onN..
-
Sequence 생성 OperatorSpring Reactive Web Application/Project Reactor 2023. 8. 3. 23:00
justOrEmpty justOrEmpty()는 just()의 확장 Operator로서, just() Operator와 달리, emit할 데이터가 null일 경우 NullPointException이 발생하지 않고 onComplete Signal을 전송합니다. emit할 데이터가 null이 아닐 경우 해당 데이터를 emit하는 Mono를 생성합니다. import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Mono; @Slf4j public class Example14_1 { public static void main(String[] args) { Mono .justOrEmpty(null) .subscribe(data -> {}, error -> {..