-
결합 연산자BackEnd/RxJava 2023. 7. 8. 07:00반응형
merge
- 다수의 Observable에서 통지된 데이터를 받아서 다시 하나의 Observable로 통지합니다.
- 통지 시점이 빠른 Observable의 데이터부터 순차적으로 통지되고, 통지 시점이 같을 경우에는 merge() 함수의 파라미터로 먼저 지정된 Observable의 데이터부터 통지됩니다.
package com.itvillage.chapter05.chapter0505; import com.itvillage.utils.LogType; import com.itvillage.utils.Logger; import com.itvillage.utils.TimeUtil; import io.reactivex.Observable; import java.util.concurrent.TimeUnit; /** * 두개 이상의 Observable을 merge하여 통지된 시간 순대로 출력하는 예제 */ public class ObservableMergeExample01 { public static void main(String[] args) { Observable<Long> observable1 = Observable.interval(200L, TimeUnit.MILLISECONDS) .take(5); Observable<Long> observable2 = Observable.interval(400L, TimeUnit.MILLISECONDS) .take(5) .map(num -> num + 1000); Observable.merge(observable1, observable2) .subscribe(data -> Logger.log(LogType.ON_NEXT, data)); TimeUtil.sleep(4000); } }
package com.itvillage.chapter05.chapter0505; import com.itvillage.common.SampleData; import com.itvillage.utils.LogType; import com.itvillage.utils.Logger; import com.itvillage.utils.TimeUtil; import io.reactivex.Observable; import java.util.concurrent.TimeUnit; /** * 각 구간의 차량 속도 데이터를 3개의 Observable에서 통지된 순서대로 merge하여 출력하는 예제 */ public class ObservableMergeExample02 { public static void main(String[] args) { Observable<String> observable1 = SampleData.getSpeedPerSection("A", 55L, SampleData.speedOfSectionA); Observable<String> observable2 = SampleData.getSpeedPerSection("B", 100L, SampleData.speedOfSectionB); Observable<String> observable3 = SampleData.getSpeedPerSection("C", 77L, SampleData.speedOfSectionC); Observable.merge(observable1, observable2, observable3) .subscribe(data -> Logger.log(LogType.ON_NEXT, data)); TimeUtil.sleep(1000L); } }
concat
- 다수의 Observable에서 통지된 데이터를 받아서 다시 하나의 Observable로 통지합니다.
- 하나의 Observable에서 통지가 끝나면 다음 Observable에서 연이어서 통지됩니다.
- 각 Observable의 통지 시점과는 상관없이 concat() 함수의 파라미터로 먼저 입력된 Observable의 데이터부터 모두 통지된 후, 다음 Observable의 데이터가 통지됩니다.
package com.itvillage.chapter05.chapter0505; import com.itvillage.utils.LogType; import com.itvillage.utils.Logger; import com.itvillage.utils.TimeUtil; import io.reactivex.Observable; import java.util.concurrent.TimeUnit; /** * concat 을 이용하여 2개 이상의 Observable을 하나의 Observable로 이어 붙여서 통지하는 예제 */ public class ObservableConcatExample01 { public static void main(String[] args) { Observable<Long> observable1 = Observable.interval(500L, TimeUnit.MILLISECONDS) .take(4); Observable<Long> observable2 = Observable.interval(300L, TimeUnit.MILLISECONDS) .take(5) .map(num -> num + 1000); Observable.concat(observable2, observable1) .subscribe(data -> Logger.log(LogType.ON_NEXT, data)); TimeUtil.sleep(3500L); } }
package com.itvillage.chapter05.chapter0505; import com.itvillage.common.SampleData; import com.itvillage.utils.LogType; import com.itvillage.utils.Logger; import com.itvillage.utils.TimeUtil; import io.reactivex.Observable; import java.util.Arrays; import java.util.List; /** * 3개의 Observable에서 통지된 순서와 상관없이 Observable이 concat( )에 * 입력된 순서대로 각 구간의 차량 속도 데이터를 이어 붙여 출력하는 예제 */ public class ObservableConcatExample02 { public static void main(String[] args) { List<Observable<String>> speedPerSectionList = Arrays.asList( SampleData.getSpeedPerSection("A", 55L, SampleData.speedOfSectionA), SampleData.getSpeedPerSection("B", 100L, SampleData.speedOfSectionB), SampleData.getSpeedPerSection("C", 77L, SampleData.speedOfSectionC) ); Observable.concat(speedPerSectionList) .subscribe(data -> Logger.log(LogType.ON_NEXT, data)); TimeUtil.sleep(2000L); } }
zip
- 다수의 Observable에서 통지된 데이터를 받아서 다시 하나의 Observable로 통지합니다.
- 각 Observable에서 통지된 데이터가 모두 모이면 각 Observable에서 동일한 index의 데이터로 새로운 데이터를 생성한 후 통지합니다.
- 통지하는 데이터 개수가 가장 적은 Observable의 통지 시점에 완료 통지 시점을 맞춥니다.
package com.itvillage.chapter05.chapter0505; import com.itvillage.utils.LogType; import com.itvillage.utils.Logger; import com.itvillage.utils.TimeUtil; import io.reactivex.Observable; import java.util.concurrent.TimeUnit; /** * zip을 이용해 2개의 Observable이 통지하는 데이터 중에서 통지되는 순서가 일치하는 데이터들을 조합하는 예제 */ public class ObservableZipExample01 { public static void main(String[] args) { Observable<Long> observable1 = Observable.interval(200L, TimeUnit.MILLISECONDS) .take(4); Observable<Long> observable2 = Observable.interval(400L, TimeUnit.MILLISECONDS) .take(6); Observable.zip(observable1, observable2, (data1, data2) -> data1 + data2) .subscribe(data -> Logger.log(LogType.ON_NEXT, data)); TimeUtil.sleep(3000L); } }
combineLatest
- 다수의 Observable에서 통지된 데이터를 받아서 다시 하나의 Observable로 통지합니다.
- 각 Observable에서 데이터를 통지할 때마다 모든 Observable에서 마지막으로 통지한 각 데이터를 함수형 인터페이스에 전달하고, 새로운 데이터를 생성하여 통지합니다.
package com.itvillage.chapter05.chapter0505; import com.itvillage.utils.LogType; import com.itvillage.utils.Logger; import com.itvillage.utils.TimeUtil; import io.reactivex.Observable; import java.util.concurrent.TimeUnit; /** * - 각 Observable에서 통지할 때 마다 모든 Observable에서 마지막으로 통지한 데이터들을 함수형 인터페이스에 반환하고, * 이를 가공해서 통지하는 예제. * - 각 Observable 중 하나의 Observable에서만 통지가 발생하더라도 이미 통지한 Observable의 마지막 데이터와 * 함께 전달된다. * */ public class ObservableCombineLatestExample01 { public static void main(String[] args) { Observable<Long> observable1 = Observable.interval(500L, TimeUnit.MILLISECONDS) // .doOnNext(data -> Logger.don("# observable 1 : " + data)) .take(4); Observable<Long> observable2 = Observable.interval(700L, TimeUnit.MILLISECONDS) // .doOnNext(data -> Logger.don("# observable 2 : " + data)) .take(4); Observable.combineLatest( observable1, observable2, (data1, data2) -> "data1: " + data1 + "\tdata2: " + data2) .subscribe(data -> Logger.log(LogType.ON_NEXT, data)); TimeUtil.sleep(3000L); } }
[참고 자료]
반응형