-
Flowable과 ObservableBackEnd/RxJava 2023. 7. 2. 05:00반응형
Flowable vs Observable
Flowable Observable Reactive Streams 인터페이스 구현 Reactive Streams 인터페이스를 구현하지 않음 Subscriber에서 데이터 처리 Observer에서 데이터 처리 배압 기능 존재 배압 기능 미존재 Subscription으로 전달 받는 데이터 개수 제어 가능 데이터 개수 제어 불가능 Subscription으로 구독 해지 Disposable로 구독 해지 배압(Back Pressure)
Flowable에서 데이터를 통지하는 속도가 Subscriber에서 통지된 데이터를 전달받아 처리하는 속도 보다 빠를 때 밸런스를 맞추기 위해 데이터 통지량을 제어하는 기능을 말합니다.
배압 전략(Backpressure Strategy)
MISSING 전략
- 배압을 적용하지 않습니다.
- 나중에 onBackpressureXXX()로 배압을 적용할 수 있습니다.
ERROR 전략
- 통지된 데이터가 버퍼의 크기를 초과하면 MissingBackpressureException 에러를 통지합니다.
- 소비자가 생산자의 통지 속도를 따라 잡지 못할 때 발생합니다.
BUFFER 전략: DROP_LATEST
- 버퍼가 가득 찬 시점에 버퍼 내에서 가장 최근에 버퍼로 들어온 데이터를 DROP 합니다.
- DROP 된 빈 자리에 버퍼 밖에서 대기하던 데이터를 채웁니다.
BUFFER 전략: DROP_OLEDEST
- 버퍼가 가득 찬 시점에 버퍼 내에서 가장 오래전에(먼저) 버퍼로 들어온 데이터를 DROP 합니다.
- DROP 된 빈 자리에 버퍼 밖에서 대기하던 데이터를 채웁니다.
DROP 전략
- 버퍼에 데이터가 모두 채워진 상태가 되면 이후에 생성되는 데이터를 DROP 합니다.
- 버퍼가 비워지는 시점에 DROP 되지 않은 데이터부터 버퍼에 담습니다.
LATEST 전략
- 버퍼에 데이터가 모두 채워진 상태가 되면 버퍼가 비워질 때까지 통지된 데이터는 버퍼 밖에서 대기합니다.
- 버퍼가 비워지는 시점에 가장 나중에(최근에) 통지된 데이터부터 버퍼에 담습니다.
Flowable 예제 코드
package com.itvillage.chapter03.chapter0302; import com.itvillage.utils.LogType; import com.itvillage.utils.Logger; import io.reactivex.BackpressureStrategy; import io.reactivex.Flowable; import io.reactivex.FlowableEmitter; import io.reactivex.FlowableOnSubscribe; import io.reactivex.schedulers.Schedulers; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; public class HelloRxJavaFlowableCreateExample { public static void main(String[] args) throws InterruptedException { Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() { @Override public void subscribe(FlowableEmitter<String> emitter) throws Exception { String[] datas = {"Hello", "RxJava!"}; for(String data : datas) { // 구독이 해지되면 처리 중단 if (emitter.isCancelled()) return; // 데이터 통지 emitter.onNext(data); } // 데이터 통지 완료를 알린다 emitter.onComplete(); } }, BackpressureStrategy.BUFFER); // 구독자의 처리가 늦을 경우 데이터를 버퍼에 담아두는 설정. flowable.observeOn(Schedulers.computation()) .subscribe(new Subscriber<String>() { // 데이터 개수 요청 및 구독을 취소하기 위한 Subscription 객체 private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; this.subscription.request(Long.MAX_VALUE); } @Override public void onNext(String data) { Logger.log(LogType.ON_NEXT, data); } @Override public void onError(Throwable error) { Logger.log(LogType.ON_ERROR, error); } @Override public void onComplete() { Logger.log(LogType.ON_COMPLETE); } }); Thread.sleep(500L); } }
Observable 예제 코드
package com.itvillage.chapter03.chapter0302; import com.itvillage.utils.LogType; import com.itvillage.utils.Logger; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; import io.reactivex.schedulers.Schedulers; public class HelloRxJavaObservableCreateExample { public static void main(String[] args) throws InterruptedException { Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { String[] datas = {"Hello", "RxJava!"}; for(String data : datas){ if(emitter.isDisposed()) return; emitter.onNext(data); } emitter.onComplete(); } }); observable.observeOn(Schedulers.computation()) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable disposable) { // 아무 처리도 하지 않음. } @Override public void onNext(String data) { Logger.log(LogType.ON_NEXT, data); } @Override public void onError(Throwable error) { Logger.log(LogType.ON_ERROR, error); } @Override public void onComplete() { Logger.log(LogType.ON_COMPLETE); } }); Thread.sleep(500L); } }
[참고 자료]
반응형'BackEnd > RxJava' 카테고리의 다른 글
Flowable/Observable 생성 연산자 (0) 2023.07.02 Single, Maybe, Completable (0) 2023.07.02 Reactive Streams (0) 2023.07.01 RxJava 프로젝트 환경 구축 (0) 2023.07.01 리액티브(Reactive) 프로그래밍 개요 (0) 2023.07.01