-
다수의 Subscriber에게 Flux를 멀티캐스팅(Multicasting) 하기 위한 OperatorSpring Reactive Web Application/Project Reactor 2023. 8. 10. 22:00반응형
Subscriber가 구독을 하면 Upstream에서 emit된 데이터가 구독 중인 모든 Subscriber에게 멀티캐스팅(Multicasting)되는데, 이를 가능하게 해주는 Operator들은 Cold Sequence를 Hot Sequence로 동작하게 하는 특징이 있습니다.
publish
publish() Operator는 구독을 하더라도 구독 시점에 즉시 데이터를 emit 하지 않고, connect()를 호출하는 시점에 데이터를 emit 합니다. 그리고 Hot Sequence로 변환되기 때문에 구독 시점 이후에 emit된 데이터만 전달받을 수 있습니다.
import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.ConnectableFlux; import reactor.core.publisher.Flux; import java.time.Duration; /** * multicast 예제 * - publish() Operator * - 다수의 Subscriber와 Flux를 공유한다. * - 즉, Cold Sequence를 Hot Sequence로 변환한다. * - connect()가 호출 되기 전까지는 데이터를 emit하지 않는다. */ @Slf4j public class Example14_60 { public static void main(String[] args) throws InterruptedException { ConnectableFlux<Integer> flux = Flux .range(1, 5) .delayElements(Duration.ofMillis(300L)) .publish(); Thread.sleep(500L); flux.subscribe(data -> log.info("# subscriber1: {}", data)); Thread.sleep(200L); flux.subscribe(data -> log.info("# subscriber2: {}", data)); flux.connect(); Thread.sleep(1000L); flux.subscribe(data -> log.info("# subscriber3: {}", data)); Thread.sleep(2000L); } } /** * [실행 결과] * # subscriber1: 1 * # subscriber2: 1 * # subscriber1: 2 * # subscriber2: 2 * # subscriber1: 3 * # subscriber2: 3 * # subscriber1: 4 * # subscriber2: 4 * # subscriber3: 4 * # subscriber1: 5 * # subscriber2: 5 * # subscriber3: 5 */
실행 결과를 보면 첫 번째, 두 번째 Subscriber는 emit된 모든 데이터를 전달받지만 세 번째 구독자는 숫자 4, 5만 전달받는 것을 볼 수 있습니다.
autoConnect
publish() Operator의 경우, 구독이 발생하더라도 connect()를 직접 호출하기 전까지는 데이터를 emit하지 않습니다. 반면에, autoConnect() Operator는 파라미터로 지정하는 숫자만큼의 구독이 발생하는 시점에 Upstream 소스에 자동으로 연결되기 때문에 별도의 connect() 호출이 필요하지 않습니다.
import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import java.time.Duration; /** * multicast 예제 * - autoConnect() Operator * - 다수의 Subscriber와 Flux를 공유한다. * - 즉, Cold Sequence를 Hot Sequence로 변환한다. * - 파라미터로 입력한 숫자만큼의 구독이 발생하는 시점에 connect()가 자동으로 호출된다. */ @Slf4j public class Example14_62 { public static void main(String[] args) throws InterruptedException { Flux<String> publisher = Flux .just("Concert part1", "Concert part2", "Concert part3") .delayElements(Duration.ofMillis(300L)) .publish() .autoConnect(2); Thread.sleep(500L); publisher.subscribe(data -> log.info("# audience 1 is watching {}", data)); Thread.sleep(500L); publisher.subscribe(data -> log.info("# audience 2 is watching {}", data)); Thread.sleep(500L); publisher.subscribe(data -> log.info("# audience 3 is watching {}", data)); Thread.sleep(1000L); } } /** * [실행 결과] * # audience 1 is watching Concert part1 * # audience 2 is watching Concert part1 * # audience 1 is watching Concert part2 * # audience 2 is watching Concert part2 * # audience 3 is watching Concert part2 * # audience 1 is watching Concert part1 * # audience 2 is watching Concert part2 * # audience 3 is watching Concert part3 */
refCount
refCount() Operator는 파라미터로 입력된 숫자만큼의 구독이 발생하는 시점에 Upstream 소스에 연결되며, 모든 구독이 취소되거나 Upstream의 데이터 emit이 종료되면 연결이 해제됩니다. refCount() Operator는 주로 무한 스트림 상황에서 모든 구독이 취소될 경우 연결을 해제하는데 사용할 수 있습니다.
import lombok.extern.slf4j.Slf4j; import reactor.core.Disposable; import reactor.core.publisher.Flux; import java.time.Duration; /** * multicast 예제 * - refCount() Operator * - 다수의 Subscriber와 Flux를 공유한다. * - 즉, Cold Sequence를 Hot Sequence로 변환한다. * - 파라미터로 입력한 숫자만큼의 구독이 발생하는 시점에 connect()가 자동으로 호출된다. * - 모든 구독이 취소되면 Upstream 소스와의 연결을 해제한다. */ @Slf4j public class Example14_63 { public static void main(String[] args) throws InterruptedException { Flux<Long> publisher = Flux .interval(Duration.ofMillis(500)) // .publish().autoConnect(1); .publish().refCount(1); Disposable disposable = publisher.subscribe(data -> log.info("# subscriber 1: {}", data)); Thread.sleep(2100L); disposable.dispose(); publisher.subscribe(data -> log.info("# subscriber 2: {}", data)); Thread.sleep(2500L); } } /** * [실행 결과] * # subscriber 1: 0 * # subscriber 1: 1 * # subscriber 1: 2 * # subscriber 1: 3 * # subscriber 2: 0 * # subscriber 2: 1 * # subscriber 2: 2 * # subscriber 2: 3 */
실행 결과를 보면 refCount()의 파라미터를 1로 지정했기 때문에 두 번째 구독이 발생하면 Upstream 소스에 새롭게 연결되어 interval() Operator가 0부터 다시 숫자를 emit하는 것을 볼 수 있습니다. publish().refCount(1)을 주석 처리하고, publish().autoConnect(1)을 주석 해제한 뒤 코드를 실행하면 subscriber 1은 3까지 전달받고 subscriber 2는 4부터 이어서 전달받는 것을 볼 수 있습니다.
[참고 정보]
반응형'Spring Reactive Web Application > Project Reactor' 카테고리의 다른 글
Flux Sequence 분할을 위한 Operator (0) 2023.08.09 Sequence의 동작 시간 측정을 위한 Operator (0) 2023.08.08 에러 처리를 위한 Operator (0) 2023.08.06 Sequence의 내부 동작 확인을 위한 Operator (0) 2023.08.05 Sequence 변환 Operator (0) 2023.08.05