ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Flowable/Observable 생성 연산자
    BackEnd/RxJava 2023. 7. 2. 09:00
    반응형

    Flowable/Observable 생성 연산자

    • create 연산자 (이전 포스팅 참고)
    • interval 연산자
    • range 연산자
    • timer 연산자
    • defer 연산자
    • fromIterable 연산자
    • fromFuture 연산자

     

    interval

    • 지정한 시간 간격마다 0부터 시작하는 숫자(Long)를 완료 없이 계속 통지합니다.
    • initialDelay 파라미터를 이용해서 최초 통지에 대한 대기 시간을 지정할 수 있습니다.
    • 호출한 스레드와는 별도의 스레드에서 실행됩니다.
    • polling 용도의 작업을 수행할 때 활용할 수 있습니다.
    package com.itvillage.chapter05.chapter0501;
    
    import com.itvillage.utils.LogType;
    import com.itvillage.utils.Logger;
    import com.itvillage.utils.TimeUtil;
    import io.reactivex.Observable;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * polling 용도로 주로 사용.
     */
    public class ObservableIntervalExample {
        public static void main(String[] args){
            System.out.println("# start : " +TimeUtil.getCurrentTimeFormatted());
    
            Observable.interval(1000L, TimeUnit.MILLISECONDS)
                    .map(num -> num + " count")
                    .subscribe(data -> Logger.log(LogType.ON_NEXT, data));
    
            TimeUtil.sleep(3000);
        }
    }

     

    range

    • 지정한 값(n)  부터 m 개의 숫자(Integer)를 통지합니다.
    • for, while 문 등의 반복문을 대체할 수 있습니다.
    package com.itvillage.chapter05.chapter0501;
    
    import com.itvillage.utils.LogType;
    import com.itvillage.utils.Logger;
    import io.reactivex.Observable;
    
    /**
     * 반복문으로 사용 가능
     */
    public class ObservableRangeExample {
        public static void main(String[] args){
            Observable<Integer> source = Observable.range(0, 5);
            source.subscribe(num -> Logger.log(LogType.ON_NEXT, num));
        }
    }

     

    timer

    • 지정한 시간이 지나면 0(Long)을 통지하고 onComplete() 이벤트가 발생하여 종료합니다.
    • 호출한 스레드와는 별도의 스레드에서 실행됩니다.
    • 특정 시간을 대기한 후 어떤 처리를 하고자 할 때 활용할 수 있습니다.
    package com.itvillage.chapter05.chapter0501;
    
    import com.itvillage.utils.LogType;
    import com.itvillage.utils.Logger;
    import com.itvillage.utils.TimeUtil;
    import io.reactivex.Observable;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * 설정한 시간이 지난 후에 특정 동작을 수행하고자 할때 사용
     */
    public class ObservableTimerExample {
        public static void main(String[] args){
            Logger.log(LogType.PRINT, "# Start!");
            Observable<String> observable =
                    Observable.timer(2000, TimeUnit.MILLISECONDS)
                            .map(count -> "Do work!");
    
            observable.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
    
            TimeUtil.sleep(3000);
        }
    }

     

    defer

    • 구독이 발생할 때마다 즉, subscribe()가 호출될 때마다 새로운 Observable을 생성합니.
    • 선언 시점의 데이터를 통지하는 것이 아니라 호출 시점의 데이터를 통지합니다.
    • 데이터 생성을 미루는 효과가 있기에 최신 데이터를 얻고자 할 때 활용할 수 있습니다.

      다음 코드의 결과를 보면, just의 구독 시간은 동일하나 defer의 구독 시간은 3초의 차이가 존재합니다.

    package com.itvillage.chapter05.chapter0501;
    
    import com.itvillage.utils.LogType;
    import com.itvillage.utils.Logger;
    import io.reactivex.Observable;
    
    import java.time.LocalTime;
    
    /**
     * 실제 구독이 발생할 때 Observable을 새로 반환하여 새로운 Observable을 생성한다.
     * defer()를 활용하면 데이터 흐름의 생성을 지연하는 효과를 보여준다.
     */
    public class ObservableDeferExample {
        public static void main(String[] args) throws InterruptedException {
            Observable<LocalTime> observable = Observable.defer(() -> {
                LocalTime currentTime = LocalTime.now();
                return Observable.just(currentTime);
            });
    
            Observable<LocalTime> observableJust = Observable.just(LocalTime.now());
    
            observable.subscribe(time -> Logger.log(LogType.PRINT, " # defer() 구독1의 구독 시간: " + time));
            observableJust.subscribe(time -> Logger.log(LogType.PRINT, " # just() 구독1의 구독 시간: " + time));
    
            Thread.sleep(3000);
    
            observable.subscribe(time -> Logger.log(LogType.PRINT, " # defer() 구독2의 구독 시간: " + time));
            observableJust.subscribe(time -> Logger.log(LogType.PRINT, " # just() 구독자2의 구독 시간: " + time));
        }
    }

     

    fromIterable

    • Iterable 인터페이스를 구현한 클래스(ArrayList 등)를 파라미터로 받습니다.
    • Iterable에 담긴 데이터를 순서대로 통지합니다.
    package com.itvillage.chapter05.chapter0501;
    
    import com.itvillage.utils.LogType;
    import com.itvillage.utils.Logger;
    import io.reactivex.Observable;
    
    import java.util.Arrays;
    import java.util.List;
    
    public class ObservableFromIterableExample {
        public static void main(String[] args){
            List<String> countries = Arrays.asList("Korea", "Canada", "USA", "Italy");
    
            Observable.fromIterable(countries)
                    .subscribe(country -> Logger.log(LogType.ON_NEXT, country));
        }
    }

     

    fromFuture

    • Future 인터페이스는 자바 5에서 비동기 처리를 위해 추가된 동시성 API 입니다.
    • 시간이 오래 걸리는 작업은 Future를 반환하는 ExecutorService에게 맡기고 비동기로 다른 작업을 수행할 수 있습니다.
    • Java 8에서는 CompletableFuture 클래스를 통해 구현이 간결해졌습니다.
    package com.itvillage.chapter05.chapter0501;
    
    import com.itvillage.utils.LogType;
    import com.itvillage.utils.Logger;
    import com.itvillage.utils.TimeUtil;
    import io.reactivex.Observable;
    import io.reactivex.schedulers.Schedulers;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    
    public class ObservableFromFutureExample {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            Logger.log(LogType.PRINT, "# start time");
    
            // 긴 처리 시간이 걸리는 작업
            Future<Double> future = longTimeWork();
    
            // 짧은 처리 시간이 걸리는 작업
            shortTimeWork();
    
            Observable.fromFuture(future)
                    .subscribe(data -> Logger.log(LogType.PRINT, "# 긴 처리 시간 작업 결과 : " + data));
    
            Logger.log(LogType.PRINT, "# end time");
        }
    
    
    
        public static CompletableFuture<Double> longTimeWork(){
            return CompletableFuture.supplyAsync(() -> calculate());
        }
    
        private static Double calculate() {
            Logger.log(LogType.PRINT, "# 긴 처리 시간이 걸리는 작업 중.........");
            TimeUtil.sleep(6000L);
            return 100000000000000000.0;
        }
    
        private static void shortTimeWork() {
            TimeUtil.sleep(3000L);
            Logger.log(LogType.PRINT, "# 짧은 처리 시간 작업 완료!");
        }
    }

     

    [참고 자료]

    반응형

    'BackEnd > RxJava' 카테고리의 다른 글

    변환 연산자  (0) 2023.07.07
    필터링 연산자  (0) 2023.07.06
    Single, Maybe, Completable  (0) 2023.07.02
    Flowable과 Observable  (0) 2023.07.02
    Reactive Streams  (0) 2023.07.01

    댓글

Designed by Tistory.