ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 유틸리티 연산자
    BackEnd/RxJava 2023. 7. 9. 07:00
    반응형

    delay (1)

    • 생산자가 데이터를 생성 및 통지를 하지만 설정한 시간만큼 소비자쪽으로의 데이터 전달을 지연시킵니다.

    package com.itvillage.chapter05.chapter0507;
    
    import com.itvillage.utils.LogType;
    import com.itvillage.utils.Logger;
    import com.itvillage.utils.TimeUtil;
    import io.reactivex.Observable;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * 통지된 데이터를 소비자 쪽에서 전달 받는 시간을 일정 시간동안 지연 시키는 예제
     */
    public class ObservableDelayExample01 {
        public static void main(String[] args) {
            Logger.log(LogType.PRINT, "# 실행 시작 시간: " + TimeUtil.getCurrentTimeFormatted());
    
            Observable.just(1, 3, 4, 6)
                    .doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, data))
                    .delay(2000L, TimeUnit.MILLISECONDS)
                    .subscribe(data -> Logger.log(LogType.ON_NEXT, data));
    
            TimeUtil.sleep(2500L);
        }
    }

     

    delay (2)

    • 파라미터로 생성되는 Observable이 데이터를 통지할 때까지 각각의 원본 데이터의 통지를 지연시킵니다.

    package com.itvillage.chapter05.chapter0507;
    
    import com.itvillage.utils.LogType;
    import com.itvillage.utils.Logger;
    import com.itvillage.utils.TimeUtil;
    import io.reactivex.Observable;
    
    /**
     * 통지되는 데이터 각각에 지연 시간을 적용하는 예제
     */
    public class ObservableDelayExample02 {
        public static void main(String[] args) {
            Observable.just(1,3,5,7)
                    .delay(item -> {
                        TimeUtil.sleep(1000L);
                        return Observable.just(item); // 새로운 Observable의 통지 시점에, 원본 데이터를 통지한다.
                    }).subscribe(data -> Logger.log(LogType.ON_NEXT, data));
        }
    }

     

    delaySubscription

    • 생산자가 데이터의 생성 및 통지 자체를 설정한 시간만큼 지연시킵니다.
    • 즉, 소비자가 구독을 해도 구독 시점 자체가 지연됩니다.

    package com.itvillage.chapter05.chapter0507;
    
    import com.itvillage.utils.LogType;
    import com.itvillage.utils.Logger;
    import com.itvillage.utils.TimeUtil;
    import io.reactivex.Observable;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * 소비자가 구독시, 데이터 생성 및 통지 자체를 지연시키는 예제.
     * 즉, delay()는 소비자가 구독 시, 생성 및 통지는 즉시 하지만 소비자에게 전달하는 시간을 지연시키고,
     * delaySubscription()은 데이터 생성 및 통지 자체를 지연시킨다.
     */
    public class ObservableDelaySubscriptionExample {
        public static void main(String[] args) {
            Logger.log(LogType.PRINT, "# 실행 시작 시간: " + TimeUtil.getCurrentTimeFormatted());
    
            Observable.just(1, 3, 4, 6)
                    .doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, data))
                    .delaySubscription(2000L, TimeUnit.MILLISECONDS)
                    .subscribe(data -> Logger.log(LogType.ON_NEXT, data));
    
            TimeUtil.sleep(2500L);
        }
    }

     

    timeout

    • 각각의 데이터 통지 시 지정된 시간 안에 통지가 되지 않으면 에러를 통지합니다.
    • 에러 통지 시 전달되는 에러 객체는 TimeoutException 입니다.

    package com.itvillage.chapter05.chapter0507;
    
    import com.itvillage.utils.LogType;
    import com.itvillage.utils.Logger;
    import com.itvillage.utils.TimeUtil;
    import io.reactivex.Observable;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * 각 데이터 통지 시, 지정한 시간안에 데이터가 통지 되지 않으면 에러를 발생시키는 예제
     * - 네트워크 연결 지연 등으로 인한 처리를 위해 사용하기 좋은 연산자임.
     */
    public class ObservableTimeOutExample {
        public static void main(String[] args) {
            Observable.range(1, 5)
                    .map(num -> {
                        long time = 1000L;
                        if(num == 4){
                            time = 1500L;
                        }
                        TimeUtil.sleep(time);
                        return num;
                    })
                    .timeout(1200L, TimeUnit.MILLISECONDS)
                    .subscribe(
                            data -> Logger.log(LogType.ON_NEXT, data),
                            error -> Logger.log(LogType.ON_ERROR, error)
                    );
    
            TimeUtil.sleep(4000L);
        }
    }

     

    timeInterval

    • 각각의 데이터가 통지되는데 걸린 시간을 통지합니다.
    • 통지된 데이터와 데이터가 통지되는데 걸린 시간을 소비자쪽에서 모두 처리할 수 있습니다.

    package com.itvillage.chapter05.chapter0507;
    
    import com.itvillage.utils.LogType;
    import com.itvillage.utils.Logger;
    import com.itvillage.utils.NumberUtil;
    import com.itvillage.utils.TimeUtil;
    import io.reactivex.Observable;
    
    /**
     * timeInterval을 이용해서 데이터가 통지되는데 걸린 시간을 통지하는 예제
     */
    public class ObservableTimeIntervalExample {
        public static void main(String[] args) {
            Observable.just(1, 3, 5, 7, 9)
                    .delay(item -> {
                        TimeUtil.sleep(NumberUtil.randomRange(100, 1000));
                        return Observable.just(item);
                    })
                    .timeInterval()
                    .subscribe(
                            timed -> Logger.log(LogType.ON_NEXT, "# 통지하는데 걸린 시간: " + timed.time() + "\t# 통지된 데이터: " + timed.value())
                    );
        }
    }

     

    materialize / dematerialize

    • materialize: 통지된 데이터와 통지된 데이터의 통지 타입 자체를 Notification 객체에 담고 이 Notification 객체를 통지합니다. 즉, 통지 데이터의 메타 데이터를 포함해서 통지한다고 볼 수 있습니다.

    • dematerialize: 통지된 Notification 객체를 원래의 통지 데이터로 변환해서 통지합니다.

    package com.itvillage.chapter05.chapter0507;
    
    import com.itvillage.utils.LogType;
    import com.itvillage.utils.Logger;
    import com.itvillage.utils.TimeUtil;
    import io.reactivex.Observable;
    import io.reactivex.schedulers.Schedulers;
    
    import java.util.Arrays;
    
    /**
     * Material/Dematerial 연산자의 실제 활용 예제
     * - 특정 Observable 에서 에러가 발생 할 경우 해당 에러에 대해서 구체적으로 처리할 수 있다.
     */
    public class ObservableMaterialExample02 {
        public static void main(String[] args) {
            Observable.concatEager(
                    Observable.just(
                            getDBUser().subscribeOn(Schedulers.io()),
                            getAPIUser()
                                    .subscribeOn(Schedulers.io())
                                    .materialize()
                                    .map(notification -> {
                                        if (notification.isOnError()) {
                                            // 관리자에게 에러 발생을 알림
                                            Logger.log(LogType.PRINT, "# API user 에러 발생!");
                                        }
                                        return notification;
                                    })
                                    .filter(notification -> !notification.isOnError())
                                    .dematerialize(notification -> notification)
                    )
            ).subscribe(
                    data -> Logger.log(LogType.ON_NEXT, data),
                    error -> Logger.log(LogType.ON_ERROR, error),
                    () -> Logger.log(LogType.ON_COMPLETE)
            );
    
            TimeUtil.sleep(1000L);
        }
    
        private static Observable<String> getDBUser() {
            return Observable.fromIterable(Arrays.asList("DB user1", "DB user2", "DB user3", "DB user4", "DB user5"));
        }
    
        private static Observable<String> getAPIUser() {
            return Observable
                    .just("API user1", "API user2", "Not User", "API user4", "API user5")
                    .map(user -> {
                        if(user.equals("Not User"))
                            throw new RuntimeException();
                        return user;
                    });
        }
    }

     

     

     

    [참고 자료]

    반응형

    'BackEnd > RxJava' 카테고리의 다른 글

    집계 연산자  (0) 2023.07.13
    조건과 불린 연산자  (0) 2023.07.13
    에러 처리 연산자  (0) 2023.07.08
    결합 연산자  (1) 2023.07.08
    변환 연산자  (0) 2023.07.07

    댓글

Designed by Tistory.