-
유틸리티 연산자BackEnd/RxJava 2023. 7. 9. 07:00반응형
delay (1)
- 생산자가 데이터를 생성 및 통지를 하지만 설정한 시간만큼 소비자쪽으로의 데이터 전달을 지연시킵니다.
package com.itvillage.chapter05.chapter0507; import com.itvillage.utils.LogType; import com.itvillage.utils.Logger; import com.itvillage.utils.TimeUtil; import io.reactivex.Observable; import java.util.concurrent.TimeUnit; /** * 통지된 데이터를 소비자 쪽에서 전달 받는 시간을 일정 시간동안 지연 시키는 예제 */ public class ObservableDelayExample01 { public static void main(String[] args) { Logger.log(LogType.PRINT, "# 실행 시작 시간: " + TimeUtil.getCurrentTimeFormatted()); Observable.just(1, 3, 4, 6) .doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, data)) .delay(2000L, TimeUnit.MILLISECONDS) .subscribe(data -> Logger.log(LogType.ON_NEXT, data)); TimeUtil.sleep(2500L); } }
delay (2)
- 파라미터로 생성되는 Observable이 데이터를 통지할 때까지 각각의 원본 데이터의 통지를 지연시킵니다.
package com.itvillage.chapter05.chapter0507; import com.itvillage.utils.LogType; import com.itvillage.utils.Logger; import com.itvillage.utils.TimeUtil; import io.reactivex.Observable; /** * 통지되는 데이터 각각에 지연 시간을 적용하는 예제 */ public class ObservableDelayExample02 { public static void main(String[] args) { Observable.just(1,3,5,7) .delay(item -> { TimeUtil.sleep(1000L); return Observable.just(item); // 새로운 Observable의 통지 시점에, 원본 데이터를 통지한다. }).subscribe(data -> Logger.log(LogType.ON_NEXT, data)); } }
delaySubscription
- 생산자가 데이터의 생성 및 통지 자체를 설정한 시간만큼 지연시킵니다.
- 즉, 소비자가 구독을 해도 구독 시점 자체가 지연됩니다.
package com.itvillage.chapter05.chapter0507; import com.itvillage.utils.LogType; import com.itvillage.utils.Logger; import com.itvillage.utils.TimeUtil; import io.reactivex.Observable; import java.util.concurrent.TimeUnit; /** * 소비자가 구독시, 데이터 생성 및 통지 자체를 지연시키는 예제. * 즉, delay()는 소비자가 구독 시, 생성 및 통지는 즉시 하지만 소비자에게 전달하는 시간을 지연시키고, * delaySubscription()은 데이터 생성 및 통지 자체를 지연시킨다. */ public class ObservableDelaySubscriptionExample { public static void main(String[] args) { Logger.log(LogType.PRINT, "# 실행 시작 시간: " + TimeUtil.getCurrentTimeFormatted()); Observable.just(1, 3, 4, 6) .doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, data)) .delaySubscription(2000L, TimeUnit.MILLISECONDS) .subscribe(data -> Logger.log(LogType.ON_NEXT, data)); TimeUtil.sleep(2500L); } }
timeout
- 각각의 데이터 통지 시 지정된 시간 안에 통지가 되지 않으면 에러를 통지합니다.
- 에러 통지 시 전달되는 에러 객체는 TimeoutException 입니다.
package com.itvillage.chapter05.chapter0507; import com.itvillage.utils.LogType; import com.itvillage.utils.Logger; import com.itvillage.utils.TimeUtil; import io.reactivex.Observable; import java.util.concurrent.TimeUnit; /** * 각 데이터 통지 시, 지정한 시간안에 데이터가 통지 되지 않으면 에러를 발생시키는 예제 * - 네트워크 연결 지연 등으로 인한 처리를 위해 사용하기 좋은 연산자임. */ public class ObservableTimeOutExample { public static void main(String[] args) { Observable.range(1, 5) .map(num -> { long time = 1000L; if(num == 4){ time = 1500L; } TimeUtil.sleep(time); return num; }) .timeout(1200L, TimeUnit.MILLISECONDS) .subscribe( data -> Logger.log(LogType.ON_NEXT, data), error -> Logger.log(LogType.ON_ERROR, error) ); TimeUtil.sleep(4000L); } }
timeInterval
- 각각의 데이터가 통지되는데 걸린 시간을 통지합니다.
- 통지된 데이터와 데이터가 통지되는데 걸린 시간을 소비자쪽에서 모두 처리할 수 있습니다.
package com.itvillage.chapter05.chapter0507; import com.itvillage.utils.LogType; import com.itvillage.utils.Logger; import com.itvillage.utils.NumberUtil; import com.itvillage.utils.TimeUtil; import io.reactivex.Observable; /** * timeInterval을 이용해서 데이터가 통지되는데 걸린 시간을 통지하는 예제 */ public class ObservableTimeIntervalExample { public static void main(String[] args) { Observable.just(1, 3, 5, 7, 9) .delay(item -> { TimeUtil.sleep(NumberUtil.randomRange(100, 1000)); return Observable.just(item); }) .timeInterval() .subscribe( timed -> Logger.log(LogType.ON_NEXT, "# 통지하는데 걸린 시간: " + timed.time() + "\t# 통지된 데이터: " + timed.value()) ); } }
materialize / dematerialize
- materialize: 통지된 데이터와 통지된 데이터의 통지 타입 자체를 Notification 객체에 담고 이 Notification 객체를 통지합니다. 즉, 통지 데이터의 메타 데이터를 포함해서 통지한다고 볼 수 있습니다.
- dematerialize: 통지된 Notification 객체를 원래의 통지 데이터로 변환해서 통지합니다.
package com.itvillage.chapter05.chapter0507; import com.itvillage.utils.LogType; import com.itvillage.utils.Logger; import com.itvillage.utils.TimeUtil; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; import java.util.Arrays; /** * Material/Dematerial 연산자의 실제 활용 예제 * - 특정 Observable 에서 에러가 발생 할 경우 해당 에러에 대해서 구체적으로 처리할 수 있다. */ public class ObservableMaterialExample02 { public static void main(String[] args) { Observable.concatEager( Observable.just( getDBUser().subscribeOn(Schedulers.io()), getAPIUser() .subscribeOn(Schedulers.io()) .materialize() .map(notification -> { if (notification.isOnError()) { // 관리자에게 에러 발생을 알림 Logger.log(LogType.PRINT, "# API user 에러 발생!"); } return notification; }) .filter(notification -> !notification.isOnError()) .dematerialize(notification -> notification) ) ).subscribe( data -> Logger.log(LogType.ON_NEXT, data), error -> Logger.log(LogType.ON_ERROR, error), () -> Logger.log(LogType.ON_COMPLETE) ); TimeUtil.sleep(1000L); } private static Observable<String> getDBUser() { return Observable.fromIterable(Arrays.asList("DB user1", "DB user2", "DB user3", "DB user4", "DB user5")); } private static Observable<String> getAPIUser() { return Observable .just("API user1", "API user2", "Not User", "API user4", "API user5") .map(user -> { if(user.equals("Not User")) throw new RuntimeException(); return user; }); } }
[참고 자료]
반응형