ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 스케쥴러(Scheduler)
    BackEnd/RxJava 2023. 7. 15. 05:00
    반응형

    스케쥴러(Scheduler)

    • RxJava에서의 스케쥴러는 RxJava 비동기 프로그래밍을 위한 쓰레드(Thread) 관리자입니다.
    • 즉, 스케쥴러를 이용해서 어떤 쓰레드에서 무엇을 처리할 지에 대해 제어할 수 있습니다.
    • 스케쥴러를 이용해서 데이터를 통지하는 쪽과 데이터를 처리하는 쪽 쓰레드를 별도로 지정해서 분리할 수 있습니다.
    • RxJava의 스케쥴러를 통해 쓰레드를 위한 코드의 간결성 및 쓰레드 관리의 복잡함을 줄일 수 있습니다.
    • RxJava에서 스케쥴러를 지정하기 위해서 subscribeOn(), observeOn() 유틸리티 연산자를 사용합니다.
    • 생산자쪽의 데이터 흐름을 제어하기 위해서는 subscribeOn() 연산자를 사용하고, 소비자쪽에서 전달받은 데이터 처리를 제어하기 위해서는 observeOn() 연산자를 사용합니다.
    • subscribeOn(), observeOn() 연산자는 각각 파라미터로 Scheduler를 지정해야 합니다.

     

    스케쥴러(Scheduler)의 종류

    Schedulers.io()

    • I/O 처리 작업을 할 때 사용하는 스케쥴러로 네트워크 요청 처리, 각종 입/출력 작업, 데이터베이스 쿼리 등에 사용합니다.
    • 쓰레드 풀에서 쓰레드를 가져오거나 가져올 쓰레드가 없으면 새로운 쓰레드를 생성합니다.
    package com.itvillage.section01;
    
    import com.itvillage.utils.LogType;
    import com.itvillage.utils.Logger;
    import com.itvillage.utils.TimeUtil;
    import io.reactivex.Observable;
    import io.reactivex.schedulers.Schedulers;
    
    import java.io.File;
    
    /**
     * Scheduler.io()를 이용한 파일 입/출력 예제
     * observeOn()을 여러개 지정하면 지정한 다음의 데이터 처리를 각각 개별 쓰레드에서 진행한다.
     */
    public class SchedulerIOExample03 {
        public static void main(String[] args) {
            File[] files = new File("src/main/java/com/itvillage/").listFiles();
    
            Observable.fromArray(files)
                    .doOnNext(file -> Logger.log(LogType.DO_ON_NEXT, "# 데이터 통지"))
                    .subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.computation())
                    .filter(data -> data.isDirectory())
                    .doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, "# filter() 거침"))
                    .observeOn(Schedulers.computation())
                    .map(dir -> dir.getName())
                    .doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, "# map() 거침"))
                    .observeOn(Schedulers.computation())
                    .subscribe(data -> Logger.log(LogType.ON_NEXT, data));
    
            TimeUtil.sleep(1000L);
        }
    }

     

    Schedulers.computation()

    • 논리적인 연산 처리 시 사용하는 스케쥴러로 CPU 코어의 물리적 쓰레드 수를 넘지 않는 범위에서 쓰레드를 생성합니다.
    • 대기 시간 없이 빠르게 계산 작업을 수행하기 위해 사용합니다.
    package com.itvillage.section01;
    
    import com.itvillage.common.SampleData;
    import com.itvillage.utils.LogType;
    import com.itvillage.utils.Logger;
    import com.itvillage.utils.TimeUtil;
    import io.reactivex.Observable;
    import io.reactivex.schedulers.Schedulers;
    
    import java.util.Arrays;
    import java.util.Collections;
    
    /**
     * Schedulers.computatioin() 을 이용해서 계산 작업을 처리하는 예제
     */
    public class SchedulerComputationExample {
        public static void main(String[] args) {
            Observable<Integer> observable1 = Observable.fromIterable(SampleData.seoulPM10List);
            Observable<Integer> observable2 = Observable.fromIterable(SampleData.busanPM10List);
            Observable<Integer> observable3 = Observable.fromIterable(SampleData.incheonPM10List);
    
            Observable<Integer> observable4 = Observable.range(1, 24);
    
            Observable source = Observable.zip(observable1, observable2, observable3, observable4,
                    (data1, data2, data3, hour) -> hour + "시: " + Collections.max(Arrays.asList(data1, data2, data3)));
    
            source.subscribeOn(Schedulers.computation())
                    .subscribe(data -> Logger.log(LogType.ON_NEXT, data));
    
            source.subscribeOn(Schedulers.computation())
                    .subscribe(data -> Logger.log(LogType.ON_NEXT, data));
    
            TimeUtil.sleep(500L);
        }
    }

     

    Schedulers.newThread()

    • 요청 시마다 매번 새로운 쓰레드를 생성합니다.
    • 쓰레드 비용도 많이 들고, 재사용도 되지 않습니다.
    package com.itvillage.section01;
    
    import com.itvillage.utils.LogType;
    import com.itvillage.utils.Logger;
    import com.itvillage.utils.TimeUtil;
    import io.reactivex.Observable;
    import io.reactivex.schedulers.Schedulers;
    
    /**
     * Schedulers.newThread()를 이용하여 구독할때 마다 새로운 쓰레드를 생성하는 예제
     * - 쓰레드 생성 비용이 들고, 재사용 되지 않으므로 권장 되지 않는 방법이다.
     */
    public class SchedulerNewThreadExample {
        public static void main(String[] args) {
            Observable<String> observable = Observable.just("1", "2", "3", "4", "5");
    
            observable.subscribeOn(Schedulers.newThread())
                    .map(data -> "## " + data + " ##")
                    .subscribe(data -> Logger.log(LogType.ON_NEXT, data));
    
            observable.subscribeOn(Schedulers.newThread())
                    .map(data -> "$$ " + data + " $$")
                    .subscribe(data -> Logger.log(LogType.ON_NEXT, data));
    
    
            TimeUtil.sleep(300L);
        }
    }

     

    Schedulers.trampoline()

    • 현재 실행되고 있는 쓰레드에 큐(Queue)를 생성하여 처리할 작업들을 큐에 넣고 순서대로 처리합니다.
    package com.itvillage.section01;
    
    import com.itvillage.utils.LogType;
    import com.itvillage.utils.Logger;
    import io.reactivex.Observable;
    import io.reactivex.schedulers.Schedulers;
    
    /**
     * Schedulers.trampoline()을 이용하여 현재 실행되고 있는 쓰레드의 대기큐에 처리 작업을 등록하는 예제
     * - 대기 큐에 등록되는 순서대로 작업을 처리한다.
     */
    public class SchedulerTrampolineExample {
        public static void main(String[] args) {
            Observable<String> observable = Observable.just("1", "2", "3", "4", "5");
    
            observable.subscribeOn(Schedulers.trampoline())
                    .map(data -> "## " + data + " ##")
                    .subscribe(data -> Logger.log(LogType.ON_NEXT, data));
    
            observable.subscribeOn(Schedulers.trampoline())
                    .map(data -> "$$ " + data + " $$")
                    .subscribe(data -> Logger.log(LogType.ON_NEXT, data));
        }
    }

     

    Schedulers.single()

    • 단일 쓰레드를 생성하여 처리 작업을 진행합니다. 여러번 구독해도 공통으로 사용합니다.
    package com.itvillage.section01;
    
    import com.itvillage.utils.LogType;
    import com.itvillage.utils.Logger;
    import com.itvillage.utils.TimeUtil;
    import io.reactivex.Observable;
    import io.reactivex.schedulers.Schedulers;
    
    public class SchedulerSingleExample {
        public static void main(String[] args) {
            Observable<String> observable = Observable.just("1", "2", "3", "4", "5");
    
            observable.subscribeOn(Schedulers.single())
                    .map(data -> "## " + data + " ##")
                    .subscribe(data -> Logger.log(LogType.ON_NEXT, data));
    
            observable.subscribeOn(Schedulers.single())
                    .map(data -> "$$ " + data + " $$")
                    .subscribe(data -> Logger.log(LogType.ON_NEXT, data));
    
            TimeUtil.sleep(300L);
        }
    }

     

    Schedulers.from(executor)

    • Executor를 사용해서 생성한 쓰레드를 사용합니다.
    • RxJava의 Scheduler와 Executor의 동작 방식이 다르므로 자주 사용되지 않습니다.

     

     

     

    [참고 자료]

    반응형

    'BackEnd > RxJava' 카테고리의 다른 글

    테스트(1)  (0) 2023.07.16
    디버깅(Debugging)  (0) 2023.07.15
    Subject  (0) 2023.07.14
    집계 연산자  (0) 2023.07.13
    조건과 불린 연산자  (0) 2023.07.13

    댓글

Designed by Tistory.