ABOUT ME

Developer를 위한 Reference 블로그 입니다.

Today
Yesterday
Total
  • 결합 연산자
    BackEnd/RxJava 2023. 7. 8. 07:00
    반응형

    merge

    • 다수의 Observable에서 통지된 데이터를 받아서 다시 하나의 Observable로 통지합니다.
    • 통지 시점이 빠른 Observable의 데이터부터 순차적으로 통지되고, 통지 시점이 같을 경우에는 merge() 함수의 파라미터로 먼저 지정된 Observable의 데이터부터 통지됩니다.

    package com.itvillage.chapter05.chapter0505;
    
    import com.itvillage.utils.LogType;
    import com.itvillage.utils.Logger;
    import com.itvillage.utils.TimeUtil;
    import io.reactivex.Observable;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * 두개 이상의 Observable을 merge하여 통지된 시간 순대로 출력하는 예제
     */
    public class ObservableMergeExample01 {
        public static void main(String[] args) {
            Observable<Long> observable1 = Observable.interval(200L, TimeUnit.MILLISECONDS)
                    .take(5);
    
            Observable<Long> observable2 = Observable.interval(400L, TimeUnit.MILLISECONDS)
                    .take(5)
                    .map(num -> num + 1000);
    
            Observable.merge(observable1, observable2)
                    .subscribe(data -> Logger.log(LogType.ON_NEXT, data));
    
            TimeUtil.sleep(4000);
        }
    }
    package com.itvillage.chapter05.chapter0505;
    
    import com.itvillage.common.SampleData;
    import com.itvillage.utils.LogType;
    import com.itvillage.utils.Logger;
    import com.itvillage.utils.TimeUtil;
    import io.reactivex.Observable;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * 각 구간의 차량 속도 데이터를 3개의 Observable에서 통지된 순서대로 merge하여 출력하는 예제
     */
    public class ObservableMergeExample02 {
        public static void main(String[] args) {
            Observable<String> observable1 =
                    SampleData.getSpeedPerSection("A", 55L, SampleData.speedOfSectionA);
            Observable<String> observable2 =
                    SampleData.getSpeedPerSection("B", 100L, SampleData.speedOfSectionB);
            Observable<String> observable3 =
                    SampleData.getSpeedPerSection("C", 77L, SampleData.speedOfSectionC);
    
            Observable.merge(observable1, observable2, observable3)
                    .subscribe(data -> Logger.log(LogType.ON_NEXT, data));
    
    
            TimeUtil.sleep(1000L);
        }
    }

     

    concat

    • 다수의 Observable에서 통지된 데이터를 받아서 다시 하나의 Observable로 통지합니다.
    • 하나의 Observable에서 통지가 끝나면 다음 Observable에서 연이어서 통지됩니다.
    • 각 Observable의 통지 시점과는 상관없이 concat() 함수의 파라미터로 먼저 입력된 Observable의 데이터부터 모두 통지된 후, 다음 Observable의 데이터가 통지됩니다.

    package com.itvillage.chapter05.chapter0505;
    
    import com.itvillage.utils.LogType;
    import com.itvillage.utils.Logger;
    import com.itvillage.utils.TimeUtil;
    import io.reactivex.Observable;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * concat 을 이용하여 2개 이상의 Observable을 하나의 Observable로 이어 붙여서 통지하는 예제
     */
    public class ObservableConcatExample01 {
        public static void main(String[] args) {
            Observable<Long> observable1 =
                    Observable.interval(500L, TimeUnit.MILLISECONDS)
                            .take(4);
    
            Observable<Long> observable2 =
                    Observable.interval(300L, TimeUnit.MILLISECONDS)
                            .take(5)
                            .map(num -> num + 1000);
    
            Observable.concat(observable2, observable1)
                    .subscribe(data -> Logger.log(LogType.ON_NEXT, data));
    
    
            TimeUtil.sleep(3500L);
    
        }
    }
    package com.itvillage.chapter05.chapter0505;
    
    import com.itvillage.common.SampleData;
    import com.itvillage.utils.LogType;
    import com.itvillage.utils.Logger;
    import com.itvillage.utils.TimeUtil;
    import io.reactivex.Observable;
    
    import java.util.Arrays;
    import java.util.List;
    
    /**
     * 3개의 Observable에서 통지된 순서와 상관없이 Observable이 concat( )에
     * 입력된 순서대로 각 구간의 차량 속도 데이터를 이어 붙여 출력하는 예제
     */
    public class ObservableConcatExample02 {
        public static void main(String[] args) {
            List<Observable<String>> speedPerSectionList = Arrays.asList(
                    SampleData.getSpeedPerSection("A", 55L, SampleData.speedOfSectionA),
                    SampleData.getSpeedPerSection("B", 100L, SampleData.speedOfSectionB),
                    SampleData.getSpeedPerSection("C", 77L, SampleData.speedOfSectionC)
            );
    
            Observable.concat(speedPerSectionList)
                    .subscribe(data -> Logger.log(LogType.ON_NEXT, data));
    
            TimeUtil.sleep(2000L);
        }
    }

     

    zip

    • 다수의 Observable에서 통지된 데이터를 받아서 다시 하나의 Observable로 통지합니다.
    • 각 Observable에서 통지된 데이터가 모두 모이면 각 Observable에서 동일한 index의 데이터로 새로운 데이터를 생성한 후 통지합니다.
    • 통지하는 데이터 개수가 가장 적은 Observable의 통지 시점에 완료 통지 시점을 맞춥니다.

    package com.itvillage.chapter05.chapter0505;
    
    import com.itvillage.utils.LogType;
    import com.itvillage.utils.Logger;
    import com.itvillage.utils.TimeUtil;
    import io.reactivex.Observable;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * zip을 이용해 2개의 Observable이 통지하는 데이터 중에서 통지되는 순서가 일치하는 데이터들을 조합하는 예제
     */
    public class ObservableZipExample01 {
        public static void main(String[] args) {
            Observable<Long> observable1 =
                    Observable.interval(200L, TimeUnit.MILLISECONDS)
                            .take(4);
    
            Observable<Long> observable2 =
                    Observable.interval(400L, TimeUnit.MILLISECONDS)
                            .take(6);
    
            Observable.zip(observable1, observable2, (data1, data2) -> data1 + data2)
                    .subscribe(data -> Logger.log(LogType.ON_NEXT, data));
    
            TimeUtil.sleep(3000L);
        }
    }

     

    combineLatest

    • 다수의 Observable에서 통지된 데이터를 받아서 다시 하나의 Observable로 통지합니다.
    • 각 Observable에서 데이터를 통지할 때마다 모든 Observable에서 마지막으로 통지한 각 데이터를 함수형 인터페이스에 전달하고, 새로운 데이터를 생성하여 통지합니다.

    package com.itvillage.chapter05.chapter0505;
    
    import com.itvillage.utils.LogType;
    import com.itvillage.utils.Logger;
    import com.itvillage.utils.TimeUtil;
    import io.reactivex.Observable;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * - 각 Observable에서 통지할 때 마다 모든 Observable에서 마지막으로 통지한 데이터들을 함수형 인터페이스에 반환하고,
     * 이를 가공해서 통지하는 예제.
     * - 각 Observable 중 하나의 Observable에서만 통지가 발생하더라도 이미 통지한 Observable의 마지막 데이터와
     * 함께 전달된다.
     *
     */
    public class ObservableCombineLatestExample01 {
        public static void main(String[] args) {
            Observable<Long> observable1 =
                    Observable.interval(500L, TimeUnit.MILLISECONDS)
    //                        .doOnNext(data -> Logger.don("# observable 1 : " + data))
                            .take(4);
    
            Observable<Long> observable2 =
                    Observable.interval(700L, TimeUnit.MILLISECONDS)
    //                        .doOnNext(data -> Logger.don("# observable 2 : " + data))
                            .take(4);
    
            Observable.combineLatest(
                    observable1,
                    observable2,
                    (data1, data2) -> "data1: " + data1 + "\tdata2: " + data2)
                    .subscribe(data -> Logger.log(LogType.ON_NEXT, data));
    
            TimeUtil.sleep(3000L);
        }
    }

     

     

     

    [참고 자료]

    반응형

    'BackEnd > RxJava' 카테고리의 다른 글

    유틸리티 연산자  (0) 2023.07.09
    에러 처리 연산자  (0) 2023.07.08
    변환 연산자  (0) 2023.07.07
    필터링 연산자  (0) 2023.07.06
    Flowable/Observable 생성 연산자  (0) 2023.07.02

    댓글

Designed by Tistory.