-
스케쥴러(Scheduler)BackEnd/RxJava 2023. 7. 15. 05:00반응형
스케쥴러(Scheduler)
- RxJava에서의 스케쥴러는 RxJava 비동기 프로그래밍을 위한 쓰레드(Thread) 관리자입니다.
- 즉, 스케쥴러를 이용해서 어떤 쓰레드에서 무엇을 처리할 지에 대해 제어할 수 있습니다.
- 스케쥴러를 이용해서 데이터를 통지하는 쪽과 데이터를 처리하는 쪽 쓰레드를 별도로 지정해서 분리할 수 있습니다.
- RxJava의 스케쥴러를 통해 쓰레드를 위한 코드의 간결성 및 쓰레드 관리의 복잡함을 줄일 수 있습니다.
- RxJava에서 스케쥴러를 지정하기 위해서 subscribeOn(), observeOn() 유틸리티 연산자를 사용합니다.
- 생산자쪽의 데이터 흐름을 제어하기 위해서는 subscribeOn() 연산자를 사용하고, 소비자쪽에서 전달받은 데이터 처리를 제어하기 위해서는 observeOn() 연산자를 사용합니다.
- subscribeOn(), observeOn() 연산자는 각각 파라미터로 Scheduler를 지정해야 합니다.
스케쥴러(Scheduler)의 종류
Schedulers.io()
- I/O 처리 작업을 할 때 사용하는 스케쥴러로 네트워크 요청 처리, 각종 입/출력 작업, 데이터베이스 쿼리 등에 사용합니다.
- 쓰레드 풀에서 쓰레드를 가져오거나 가져올 쓰레드가 없으면 새로운 쓰레드를 생성합니다.
package com.itvillage.section01; import com.itvillage.utils.LogType; import com.itvillage.utils.Logger; import com.itvillage.utils.TimeUtil; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; import java.io.File; /** * Scheduler.io()를 이용한 파일 입/출력 예제 * observeOn()을 여러개 지정하면 지정한 다음의 데이터 처리를 각각 개별 쓰레드에서 진행한다. */ public class SchedulerIOExample03 { public static void main(String[] args) { File[] files = new File("src/main/java/com/itvillage/").listFiles(); Observable.fromArray(files) .doOnNext(file -> Logger.log(LogType.DO_ON_NEXT, "# 데이터 통지")) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .filter(data -> data.isDirectory()) .doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, "# filter() 거침")) .observeOn(Schedulers.computation()) .map(dir -> dir.getName()) .doOnNext(data -> Logger.log(LogType.DO_ON_NEXT, "# map() 거침")) .observeOn(Schedulers.computation()) .subscribe(data -> Logger.log(LogType.ON_NEXT, data)); TimeUtil.sleep(1000L); } }
Schedulers.computation()
- 논리적인 연산 처리 시 사용하는 스케쥴러로 CPU 코어의 물리적 쓰레드 수를 넘지 않는 범위에서 쓰레드를 생성합니다.
- 대기 시간 없이 빠르게 계산 작업을 수행하기 위해 사용합니다.
package com.itvillage.section01; import com.itvillage.common.SampleData; import com.itvillage.utils.LogType; import com.itvillage.utils.Logger; import com.itvillage.utils.TimeUtil; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; import java.util.Arrays; import java.util.Collections; /** * Schedulers.computatioin() 을 이용해서 계산 작업을 처리하는 예제 */ public class SchedulerComputationExample { public static void main(String[] args) { Observable<Integer> observable1 = Observable.fromIterable(SampleData.seoulPM10List); Observable<Integer> observable2 = Observable.fromIterable(SampleData.busanPM10List); Observable<Integer> observable3 = Observable.fromIterable(SampleData.incheonPM10List); Observable<Integer> observable4 = Observable.range(1, 24); Observable source = Observable.zip(observable1, observable2, observable3, observable4, (data1, data2, data3, hour) -> hour + "시: " + Collections.max(Arrays.asList(data1, data2, data3))); source.subscribeOn(Schedulers.computation()) .subscribe(data -> Logger.log(LogType.ON_NEXT, data)); source.subscribeOn(Schedulers.computation()) .subscribe(data -> Logger.log(LogType.ON_NEXT, data)); TimeUtil.sleep(500L); } }
Schedulers.newThread()
- 요청 시마다 매번 새로운 쓰레드를 생성합니다.
- 쓰레드 비용도 많이 들고, 재사용도 되지 않습니다.
package com.itvillage.section01; import com.itvillage.utils.LogType; import com.itvillage.utils.Logger; import com.itvillage.utils.TimeUtil; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; /** * Schedulers.newThread()를 이용하여 구독할때 마다 새로운 쓰레드를 생성하는 예제 * - 쓰레드 생성 비용이 들고, 재사용 되지 않으므로 권장 되지 않는 방법이다. */ public class SchedulerNewThreadExample { public static void main(String[] args) { Observable<String> observable = Observable.just("1", "2", "3", "4", "5"); observable.subscribeOn(Schedulers.newThread()) .map(data -> "## " + data + " ##") .subscribe(data -> Logger.log(LogType.ON_NEXT, data)); observable.subscribeOn(Schedulers.newThread()) .map(data -> "$$ " + data + " $$") .subscribe(data -> Logger.log(LogType.ON_NEXT, data)); TimeUtil.sleep(300L); } }
Schedulers.trampoline()
- 현재 실행되고 있는 쓰레드에 큐(Queue)를 생성하여 처리할 작업들을 큐에 넣고 순서대로 처리합니다.
package com.itvillage.section01; import com.itvillage.utils.LogType; import com.itvillage.utils.Logger; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; /** * Schedulers.trampoline()을 이용하여 현재 실행되고 있는 쓰레드의 대기큐에 처리 작업을 등록하는 예제 * - 대기 큐에 등록되는 순서대로 작업을 처리한다. */ public class SchedulerTrampolineExample { public static void main(String[] args) { Observable<String> observable = Observable.just("1", "2", "3", "4", "5"); observable.subscribeOn(Schedulers.trampoline()) .map(data -> "## " + data + " ##") .subscribe(data -> Logger.log(LogType.ON_NEXT, data)); observable.subscribeOn(Schedulers.trampoline()) .map(data -> "$$ " + data + " $$") .subscribe(data -> Logger.log(LogType.ON_NEXT, data)); } }
Schedulers.single()
- 단일 쓰레드를 생성하여 처리 작업을 진행합니다. 여러번 구독해도 공통으로 사용합니다.
package com.itvillage.section01; import com.itvillage.utils.LogType; import com.itvillage.utils.Logger; import com.itvillage.utils.TimeUtil; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; public class SchedulerSingleExample { public static void main(String[] args) { Observable<String> observable = Observable.just("1", "2", "3", "4", "5"); observable.subscribeOn(Schedulers.single()) .map(data -> "## " + data + " ##") .subscribe(data -> Logger.log(LogType.ON_NEXT, data)); observable.subscribeOn(Schedulers.single()) .map(data -> "$$ " + data + " $$") .subscribe(data -> Logger.log(LogType.ON_NEXT, data)); TimeUtil.sleep(300L); } }
Schedulers.from(executor)
- Executor를 사용해서 생성한 쓰레드를 사용합니다.
- RxJava의 Scheduler와 Executor의 동작 방식이 다르므로 자주 사용되지 않습니다.
[참고 자료]
반응형'BackEnd > RxJava' 카테고리의 다른 글
테스트(1) (0) 2023.07.16 디버깅(Debugging) (0) 2023.07.15 Subject (0) 2023.07.14 집계 연산자 (0) 2023.07.13 조건과 불린 연산자 (0) 2023.07.13