BackEnd/RxJava
결합 연산자
hanseom
2023. 7. 8. 07:00
반응형
merge
- 다수의 Observable에서 통지된 데이터를 받아서 다시 하나의 Observable로 통지합니다.
- 통지 시점이 빠른 Observable의 데이터부터 순차적으로 통지되고, 통지 시점이 같을 경우에는 merge() 함수의 파라미터로 먼저 지정된 Observable의 데이터부터 통지됩니다.
package com.itvillage.chapter05.chapter0505;
import com.itvillage.utils.LogType;
import com.itvillage.utils.Logger;
import com.itvillage.utils.TimeUtil;
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
/**
* 두개 이상의 Observable을 merge하여 통지된 시간 순대로 출력하는 예제
*/
public class ObservableMergeExample01 {
public static void main(String[] args) {
Observable<Long> observable1 = Observable.interval(200L, TimeUnit.MILLISECONDS)
.take(5);
Observable<Long> observable2 = Observable.interval(400L, TimeUnit.MILLISECONDS)
.take(5)
.map(num -> num + 1000);
Observable.merge(observable1, observable2)
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(4000);
}
}
package com.itvillage.chapter05.chapter0505;
import com.itvillage.common.SampleData;
import com.itvillage.utils.LogType;
import com.itvillage.utils.Logger;
import com.itvillage.utils.TimeUtil;
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
/**
* 각 구간의 차량 속도 데이터를 3개의 Observable에서 통지된 순서대로 merge하여 출력하는 예제
*/
public class ObservableMergeExample02 {
public static void main(String[] args) {
Observable<String> observable1 =
SampleData.getSpeedPerSection("A", 55L, SampleData.speedOfSectionA);
Observable<String> observable2 =
SampleData.getSpeedPerSection("B", 100L, SampleData.speedOfSectionB);
Observable<String> observable3 =
SampleData.getSpeedPerSection("C", 77L, SampleData.speedOfSectionC);
Observable.merge(observable1, observable2, observable3)
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(1000L);
}
}
concat
- 다수의 Observable에서 통지된 데이터를 받아서 다시 하나의 Observable로 통지합니다.
- 하나의 Observable에서 통지가 끝나면 다음 Observable에서 연이어서 통지됩니다.
- 각 Observable의 통지 시점과는 상관없이 concat() 함수의 파라미터로 먼저 입력된 Observable의 데이터부터 모두 통지된 후, 다음 Observable의 데이터가 통지됩니다.
package com.itvillage.chapter05.chapter0505;
import com.itvillage.utils.LogType;
import com.itvillage.utils.Logger;
import com.itvillage.utils.TimeUtil;
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
/**
* concat 을 이용하여 2개 이상의 Observable을 하나의 Observable로 이어 붙여서 통지하는 예제
*/
public class ObservableConcatExample01 {
public static void main(String[] args) {
Observable<Long> observable1 =
Observable.interval(500L, TimeUnit.MILLISECONDS)
.take(4);
Observable<Long> observable2 =
Observable.interval(300L, TimeUnit.MILLISECONDS)
.take(5)
.map(num -> num + 1000);
Observable.concat(observable2, observable1)
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(3500L);
}
}
package com.itvillage.chapter05.chapter0505;
import com.itvillage.common.SampleData;
import com.itvillage.utils.LogType;
import com.itvillage.utils.Logger;
import com.itvillage.utils.TimeUtil;
import io.reactivex.Observable;
import java.util.Arrays;
import java.util.List;
/**
* 3개의 Observable에서 통지된 순서와 상관없이 Observable이 concat( )에
* 입력된 순서대로 각 구간의 차량 속도 데이터를 이어 붙여 출력하는 예제
*/
public class ObservableConcatExample02 {
public static void main(String[] args) {
List<Observable<String>> speedPerSectionList = Arrays.asList(
SampleData.getSpeedPerSection("A", 55L, SampleData.speedOfSectionA),
SampleData.getSpeedPerSection("B", 100L, SampleData.speedOfSectionB),
SampleData.getSpeedPerSection("C", 77L, SampleData.speedOfSectionC)
);
Observable.concat(speedPerSectionList)
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(2000L);
}
}
zip
- 다수의 Observable에서 통지된 데이터를 받아서 다시 하나의 Observable로 통지합니다.
- 각 Observable에서 통지된 데이터가 모두 모이면 각 Observable에서 동일한 index의 데이터로 새로운 데이터를 생성한 후 통지합니다.
- 통지하는 데이터 개수가 가장 적은 Observable의 통지 시점에 완료 통지 시점을 맞춥니다.
package com.itvillage.chapter05.chapter0505;
import com.itvillage.utils.LogType;
import com.itvillage.utils.Logger;
import com.itvillage.utils.TimeUtil;
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
/**
* zip을 이용해 2개의 Observable이 통지하는 데이터 중에서 통지되는 순서가 일치하는 데이터들을 조합하는 예제
*/
public class ObservableZipExample01 {
public static void main(String[] args) {
Observable<Long> observable1 =
Observable.interval(200L, TimeUnit.MILLISECONDS)
.take(4);
Observable<Long> observable2 =
Observable.interval(400L, TimeUnit.MILLISECONDS)
.take(6);
Observable.zip(observable1, observable2, (data1, data2) -> data1 + data2)
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(3000L);
}
}
combineLatest
- 다수의 Observable에서 통지된 데이터를 받아서 다시 하나의 Observable로 통지합니다.
- 각 Observable에서 데이터를 통지할 때마다 모든 Observable에서 마지막으로 통지한 각 데이터를 함수형 인터페이스에 전달하고, 새로운 데이터를 생성하여 통지합니다.
package com.itvillage.chapter05.chapter0505;
import com.itvillage.utils.LogType;
import com.itvillage.utils.Logger;
import com.itvillage.utils.TimeUtil;
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
/**
* - 각 Observable에서 통지할 때 마다 모든 Observable에서 마지막으로 통지한 데이터들을 함수형 인터페이스에 반환하고,
* 이를 가공해서 통지하는 예제.
* - 각 Observable 중 하나의 Observable에서만 통지가 발생하더라도 이미 통지한 Observable의 마지막 데이터와
* 함께 전달된다.
*
*/
public class ObservableCombineLatestExample01 {
public static void main(String[] args) {
Observable<Long> observable1 =
Observable.interval(500L, TimeUnit.MILLISECONDS)
// .doOnNext(data -> Logger.don("# observable 1 : " + data))
.take(4);
Observable<Long> observable2 =
Observable.interval(700L, TimeUnit.MILLISECONDS)
// .doOnNext(data -> Logger.don("# observable 2 : " + data))
.take(4);
Observable.combineLatest(
observable1,
observable2,
(data1, data2) -> "data1: " + data1 + "\tdata2: " + data2)
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(3000L);
}
}
[참고 자료]
반응형