BackEnd/RxJava
Flowable과 Observable
hanseom
2023. 7. 2. 05:00
반응형
Flowable vs Observable
Flowable | Observable |
Reactive Streams 인터페이스 구현 | Reactive Streams 인터페이스를 구현하지 않음 |
Subscriber에서 데이터 처리 | Observer에서 데이터 처리 |
배압 기능 존재 | 배압 기능 미존재 |
Subscription으로 전달 받는 데이터 개수 제어 가능 | 데이터 개수 제어 불가능 |
Subscription으로 구독 해지 | Disposable로 구독 해지 |
배압(Back Pressure)
Flowable에서 데이터를 통지하는 속도가 Subscriber에서 통지된 데이터를 전달받아 처리하는 속도 보다 빠를 때 밸런스를 맞추기 위해 데이터 통지량을 제어하는 기능을 말합니다.
배압 전략(Backpressure Strategy)
MISSING 전략
- 배압을 적용하지 않습니다.
- 나중에 onBackpressureXXX()로 배압을 적용할 수 있습니다.
ERROR 전략
- 통지된 데이터가 버퍼의 크기를 초과하면 MissingBackpressureException 에러를 통지합니다.
- 소비자가 생산자의 통지 속도를 따라 잡지 못할 때 발생합니다.
BUFFER 전략: DROP_LATEST
- 버퍼가 가득 찬 시점에 버퍼 내에서 가장 최근에 버퍼로 들어온 데이터를 DROP 합니다.
- DROP 된 빈 자리에 버퍼 밖에서 대기하던 데이터를 채웁니다.
BUFFER 전략: DROP_OLEDEST
- 버퍼가 가득 찬 시점에 버퍼 내에서 가장 오래전에(먼저) 버퍼로 들어온 데이터를 DROP 합니다.
- DROP 된 빈 자리에 버퍼 밖에서 대기하던 데이터를 채웁니다.
DROP 전략
- 버퍼에 데이터가 모두 채워진 상태가 되면 이후에 생성되는 데이터를 DROP 합니다.
- 버퍼가 비워지는 시점에 DROP 되지 않은 데이터부터 버퍼에 담습니다.
LATEST 전략
- 버퍼에 데이터가 모두 채워진 상태가 되면 버퍼가 비워질 때까지 통지된 데이터는 버퍼 밖에서 대기합니다.
- 버퍼가 비워지는 시점에 가장 나중에(최근에) 통지된 데이터부터 버퍼에 담습니다.
Flowable 예제 코드
package com.itvillage.chapter03.chapter0302;
import com.itvillage.utils.LogType;
import com.itvillage.utils.Logger;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.schedulers.Schedulers;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class HelloRxJavaFlowableCreateExample {
public static void main(String[] args) throws InterruptedException {
Flowable<String> flowable =
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
String[] datas = {"Hello", "RxJava!"};
for(String data : datas) {
// 구독이 해지되면 처리 중단
if (emitter.isCancelled())
return;
// 데이터 통지
emitter.onNext(data);
}
// 데이터 통지 완료를 알린다
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER); // 구독자의 처리가 늦을 경우 데이터를 버퍼에 담아두는 설정.
flowable.observeOn(Schedulers.computation())
.subscribe(new Subscriber<String>() {
// 데이터 개수 요청 및 구독을 취소하기 위한 Subscription 객체
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(String data) {
Logger.log(LogType.ON_NEXT, data);
}
@Override
public void onError(Throwable error) {
Logger.log(LogType.ON_ERROR, error);
}
@Override
public void onComplete() {
Logger.log(LogType.ON_COMPLETE);
}
});
Thread.sleep(500L);
}
}
Observable 예제 코드
package com.itvillage.chapter03.chapter0302;
import com.itvillage.utils.LogType;
import com.itvillage.utils.Logger;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
public class HelloRxJavaObservableCreateExample {
public static void main(String[] args) throws InterruptedException {
Observable<String> observable =
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
String[] datas = {"Hello", "RxJava!"};
for(String data : datas){
if(emitter.isDisposed())
return;
emitter.onNext(data);
}
emitter.onComplete();
}
});
observable.observeOn(Schedulers.computation())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable disposable) {
// 아무 처리도 하지 않음.
}
@Override
public void onNext(String data) {
Logger.log(LogType.ON_NEXT, data);
}
@Override
public void onError(Throwable error) {
Logger.log(LogType.ON_ERROR, error);
}
@Override
public void onComplete() {
Logger.log(LogType.ON_COMPLETE);
}
});
Thread.sleep(500L);
}
}
[참고 자료]
반응형