ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Subject
    BackEnd/RxJava 2023. 7. 14. 07:00
    반응형

    Processor vs Subject

    Processor

    • Reactive Streams에서 정의한 Publisher 인터페이스와 Subscriber 인터페이스를 둘 다 상속한 확장 인터페이스입니다.
    • 즉, Publisher(생산자)의 기능과 Subscriber(소비자)의 기능을 모두 가지고 있습니다.
    • Hot Publisher(뜨거운 생산자)입니다.

     

    Note) Hot Publisher vs Cold Publisher

    • Hot Publisher: 소비자는 구독한 시점의 타임라인부터 통지된 데이터를 받을 수 있습니다.
    • Cold Publisher: 소비자는 구독할 때마다 타임라인의 처음부터 모든 데이터를 받을 수 있습니다.

     

    Subject

    • Reactive Streams의 Processor와 동일한 기능을 하나 배압 기능이 없는 추상 클래스입니다.

     

    Processor와 Subject의 구현 클래스

    • PublishProcessor / PublishSubject
    • AsyncProcessor / AsyncSubject
    • BehaviorProcessor / BehaviorSubject
    • ReplayProcessor / ReplaySubject

     

    PublishSubject

    • 구독 전에 통지된 데이터는 받을 수 없고, 구독한 이후에 통지된 데이터만 받을 수 있습니다.
    • 데이터 통지가 완료된 이후에 소비자가 구독하면 완료 또는 에러 통지를 받습니다.

    package com.itvillage.section00;
    
    import com.itvillage.utils.LogType;
    import com.itvillage.utils.Logger;
    import io.reactivex.subjects.PublishSubject;
    
    /**
     * 소비자가 구독한 시점 이 후에 통지 된 데이터만 소비자에게 전달되는 PublishSubject 예제
     */
    public class PublishSubjectExample {
        public static void main(String[] args){
            PublishSubject<Integer> subject = PublishSubject.create();
    
            subject.subscribe(price -> Logger.log(LogType.ON_NEXT, "# 소비자 1 : " + price));
            subject.onNext(3500);
            subject.subscribe(price -> Logger.log(LogType.ON_NEXT, "# 소비자 2 : " + price));
            subject.onNext(3300);
            subject.subscribe(price -> Logger.log(LogType.ON_NEXT, "# 소비자 3 : " + price));
            subject.onNext(3400);
    
            subject.subscribe(
                    price -> Logger.log(LogType.ON_NEXT, "# 소비자 4 : " + price),
                    error -> Logger.log(LogType.ON_ERROR, error),
                    () -> Logger.log(LogType.ON_COMPLETE)
            );
    
            subject.onComplete();
    
    
        }
    }
    
    /**
     * 소비자 1: 3500, 3300, 3400
     * 소비자 2: 3300, 3400
     * 소비자 3: 3400
     * 소비자 4: onComplete()
     */

     

    AsyncSubject

    • 완료 전까지 아무것도 통지하지 않고 있다가 완료했을 때 마지막으로 통지한 데이터와 완료만 통지합니다.
    • 모든 소비자는 구독 시점에 상관없이 마지막으로 통지된 데이터와 완료 통지만 받습니다.
    • 완료 후에 구독한 소비자라도 마지막으로 통지된 데이터와 완료 통지를 받습니다.

    package com.itvillage.section00;
    
    import com.itvillage.utils.LogType;
    import com.itvillage.utils.Logger;
    import io.reactivex.subjects.AsyncSubject;
    
    /**
     * 구독 시점에 상관없이 모든 소비자들이 마지막으로 통지된 데이터만 전달 받는 AsyncSubject 예제
     */
    public class AsyncSubjectExample {
        public static void main(String[] args){
            AsyncSubject<Integer> subject = AsyncSubject.create();
            subject.onNext(1000);
    
            subject.doOnNext(price -> Logger.log(LogType.DO_ON_NEXT, "# 소비자 1 : " + price))
                    .subscribe(price -> Logger.log(LogType.ON_NEXT, "# 소비자 1 : " + price));
            subject.onNext(2000);
    
            subject.doOnNext(price -> Logger.log(LogType.DO_ON_NEXT, "# 소비자 2 : " + price))
                    .subscribe(price -> Logger.log(LogType.ON_NEXT, "# 소비자 2 : " + price));
            subject.onNext(3000);
    
            subject.doOnNext(price -> Logger.log(LogType.DO_ON_NEXT, "# 소비자 3 : " + price))
                    .subscribe(price -> Logger.log(LogType.ON_NEXT, "# 소비자 3 : " + price));
            subject.onNext(4000);
    
            subject.onComplete();
    
            subject.doOnNext(price -> Logger.log(LogType.DO_ON_NEXT, "# 소비자 4 : " + price))
                    .subscribe(price -> Logger.log(LogType.ON_NEXT, "# 소비자 4 : " + price));
        }
    }
    
    /**
     * 소비자 1: 4000
     * 소비자 2: 4000
     * 소비자 3: 4000
     * 소비자 4: 4000
     */

     

    BehaviorSubject

    • 구독 시점에 이미 통지된 데이터가 있다면 이미 통지된 데이터의 마지막 데이터를 전달 받은 후, 구독 이후에 통지된 데이터를 전달 받습니다.
    • 처리가 완료된 이후에 구독하면 완료나 에러 통지만 전달 받습니다.

    package com.itvillage.section00;
    
    import com.itvillage.utils.LogType;
    import com.itvillage.utils.Logger;
    import io.reactivex.subjects.BehaviorSubject;
    
    /**
     * 구독 시점에 이미 통지된 데이터가 있다면 이미 통지된 데이터의 마지막 데이터를 전달 받은 후,
     * 구독 이후부터 통지 된 데이터를 전달 받는 예제
     */
    public class BehaviorSubjectExample {
        public static void main(String[] args){
            BehaviorSubject<Integer> subject = BehaviorSubject.createDefault(3000);
    
            subject.subscribe(price -> Logger.log(LogType.ON_NEXT, "# 소비자 1 : " + price));
            subject.onNext(3500);
    
            subject.subscribe(price -> Logger.log(LogType.ON_NEXT, "# 소비자 2 : " + price));
            subject.onNext(3300);
    
            subject.subscribe(price -> Logger.log(LogType.ON_NEXT, "# 소비자 3 : " + price));
            subject.onNext(3400);
        }
    }
    
    /**
     * 소비자 1: 3000, 3500, 3300, 3400
     * 소비자 2: 3500, 3300, 3400
     * 소비자 3: 3300, 3400
     */

     

    ReplaySubject

    • 구독 시점에 이미 통지된 데이터가 있다면 이미 통지된 데이터 중에서 최근 통지된 데이터를 지정한 개수만큼 전달 받은 후, 구독 이후에 통지된 데이터를 전달 받습니다.
    • 이미 처리가 완료된 이후에 구독하더라도 지정한 개수 만큼의 최근 통지된 데이터를 전달 받습니다.
    • 파라미터로 개수를 지정하지 않으면 구독 시점 이전에 통지된 전체 데이터를 받습니다.

    package com.itvillage.section00;
    
    import com.itvillage.utils.LogType;
    import com.itvillage.utils.Logger;
    import io.reactivex.subjects.ReplaySubject;
    
    public class ReplaySubjectExample01 {
        public static void main(String[] args){
            ReplaySubject<Integer> subject = ReplaySubject.create(); // 파라미터 지정하지 않음
            subject.onNext(3000);
            subject.onNext(2500);
    
            subject.subscribe(price -> Logger.log(LogType.ON_NEXT, "# 소비자 1 : " + price));
            subject.onNext(3500);
    
            subject.subscribe(price -> Logger.log(LogType.ON_NEXT, "# 소비자 2 : " + price));
            subject.onNext(3300);
    
            subject.onComplete();
    
            subject.subscribe(price -> Logger.log(LogType.ON_NEXT, "# 소비자 3 : " + price));
    
        }
    }
    
    /**
      * 소비자 1: 3000, 2500, 3500, 3300
      * 소비자 2: 3000, 2500, 3500, 3300
      * 소비자 3: 3000, 2500, 3500, 3300
      */
    package com.itvillage.section00;
    
    import com.itvillage.utils.LogType;
    import com.itvillage.utils.Logger;
    import io.reactivex.subjects.ReplaySubject;
    
    public class ReplaySubjectExample02 {
        public static void main(String[] args){
            ReplaySubject<Integer> subject = ReplaySubject.createWithSize(2); // 파라미터 지정
            subject.onNext(3000);
            subject.onNext(2500);
    
            subject.subscribe(price -> Logger.log(LogType.ON_NEXT, "# 소비자 1 : " + price));
            subject.onNext(3500);
    
            subject.subscribe(price -> Logger.log(LogType.ON_NEXT, "# 소비자 2 : " + price));
            subject.onNext(3300);
    
            subject.subscribe(price -> Logger.log(LogType.ON_NEXT, "# 소비자 3 : " + price));
            subject.onNext(3400);
    
            subject.onComplete();
    
            subject.subscribe(price -> Logger.log(LogType.ON_NEXT, "# 소비자 4 : " + price));
        }
    }
    
    /**
      * 소비자 1: 3000, 2500, 3500, 3300, 3400
      * 소비자 2: 2500, 3500, 3300, 3400
      * 소비자 3: 3500, 3300, 3400
      * 소비자 4: 3300, 3400
      */

     

     

     

    [참고 자료]

    반응형

    'BackEnd > RxJava' 카테고리의 다른 글

    디버깅(Debugging)  (0) 2023.07.15
    스케쥴러(Scheduler)  (0) 2023.07.15
    집계 연산자  (0) 2023.07.13
    조건과 불린 연산자  (0) 2023.07.13
    유틸리티 연산자  (0) 2023.07.09

    댓글

Designed by Tistory.