ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Reactive Streams
    BackEnd/RxJava 2023. 7. 1. 09:00
    반응형

    Reactive Streams

    • Github
    • 리액티브 프로그래밍 라이브러리의 표준 사양입니다.
    • RxJava는 Reactive Streams의 인터페이스들을 구현한 구현체입니다.
    • Reactive Streams는 Publisher, Subscriber, Subscription, Processor 4개의 인터페이스를 제공합니다.

     

    Publisher

      데이터를 생성하고 통지합니다.

    public interface Publisher<T> {
        public void subscribe(Subscriber<? super T> s);
    }

     

    Subscriber

      통지된 데이터를 전달받아서 처리합니다.

    public interface Subscriber<T> {
        public void onSubscribe(Subscription s);
        public void onNext(T t);
        public void onError(Throwable t);
        public void onComplete();
    }

     

    Subscription

      전달 받을 데이터의 개수를 요청하고 구독을 해지합니다.

    public interface Subscription {
        public void request(long n);
        public void cancel();
    }

     

    Processor

      Publisher와 Subscriber의 기능이 모두 있습니다.

    public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
    }

     

    Publisher와 Subscriber간의 프로세스 흐름

     

    Cold Publisher vs Hot Publisher

    • Cold Publisher(차가운 생산자):  생산자는 소비자가 구독할 때마다 데이터를 처음부터 새로 통지합니다. 소비자는 구독 시점과 상관 없이 통지된 데이터를 처음부터 전달 받을 수 있습니다.
    package com.itvillage.chapter03.chapter0301;
    
    import io.reactivex.Flowable;
    
    public class ColdPublisherExample {
        public static void main(String[] args){
            Flowable<Integer> flowable = Flowable.just(1, 3, 5, 7);
    
            flowable.subscribe(data -> System.out.println("구독자1: " + data)); // 1, 3, 5, 7
            flowable.subscribe(data -> System.out.println("구독자2: " + data)); // 1, 3, 5, 7
        }
    }

     

    • Hot Publisher(뜨거운 생산자): 생산자는 데이터를 한 번만 통지합니다. 소비자는 발행된 데이터를 처음부터 전달 받지 않고, 구독한 시점에 통지된 데이터들만 전달 받을 수 있습니다.
    package com.itvillage.chapter03.chapter0301;
    
    import io.reactivex.processors.PublishProcessor;
    
    public class HotPublisherExample {
        public static void main(String[] args){
            PublishProcessor<Integer> processor = PublishProcessor.create();
            processor.subscribe(data -> System.out.println("구독자1: " + data)); // 1, 3, 5, 7
            processor.onNext(1);
            processor.onNext(3);
    
            processor.subscribe(data -> System.out.println("구독자2: " + data)); // 5, 7
            processor.onNext(5);
            processor.onNext(7);
    
            processor.onComplete();
        }
    }

     

    [참고 자료]

    반응형

    'BackEnd > RxJava' 카테고리의 다른 글

    Flowable/Observable 생성 연산자  (0) 2023.07.02
    Single, Maybe, Completable  (0) 2023.07.02
    Flowable과 Observable  (0) 2023.07.02
    RxJava 프로젝트 환경 구축  (0) 2023.07.01
    리액티브(Reactive) 프로그래밍 개요  (0) 2023.07.01

    댓글

Designed by Tistory.