-
Sequence 필터링 OperatorSpring Reactive Web Application/Project Reactor 2023. 8. 4. 05:00반응형
filter
filter() Operator는 Upstream에서 emit된 데이터 중 조건에 일치하는 데이터만 Downstream으로 emit 합니다. 즉, 파라미터로 입력받은 Predicate의 리턴 값이 true인 데이터만 Downstream으로 emit 합니다.
import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; /** * filter 예제 */ @Slf4j public class Example14_15 { public static void main(String[] args) { Flux .range(1, 20) .filter(num -> num % 2 != 0) .subscribe(data -> log.info("# onNext: {}", data)); } }
다음은 비동기적으로 필터링을 수행하는 filterWhen() Operator를 사용한 예제 코드입니다.
import chapter14.SampleData; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuple2; import java.util.Map; import static chapter14.SampleData.*; /** * filterWhen 예제 */ @Slf4j public class Example14_17 { public static void main(String[] args) throws InterruptedException { Map<CovidVaccine, Tuple2<CovidVaccine, Integer>> vaccineMap = getCovidVaccines(); Flux .fromIterable(SampleData.coronaVaccineNames) .filterWhen(vaccine -> Mono .just(vaccineMap.get(vaccine).getT2() >= 3_000_000) .publishOn(Schedulers.parallel())) .subscribe(data -> log.info("# onNext: {}", data)); Thread.sleep(1000); } }
filterWhen() Operator는 내부에서 Inner Sequence를 통해 조건에 맞는 데이터인지를 비동기적으로 테스트한 후, 테스트 결과가 true라면 filterWhen()의 Upstream으로부터 전달받은 데이터를 Downstream으로 emit 합니다.
skip
skip() Operator는 Upstream에서 emit된 데이터 중 파라미터로 입력받은 숫자만큼 건너띈 후, 나머지 데이터를 Downstream으로 emit 합니다.
import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import java.time.Duration; /** * skip 예제 */ @Slf4j public class Example14_18 { public static void main(String[] args) throws InterruptedException { Flux .interval(Duration.ofSeconds(1)) .skip(2) .subscribe(data -> log.info("# onNext: {}", data)); Thread.sleep(5500L); } }
skip() Operator의 파라미터로 시간을 지정하면 파라미터로 지정한 시간 내에 emit된 데이터를 건너띈 후, 나머지 데이터를 emit 합니다.
import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import java.time.Duration; /** * skip 예제 */ @Slf4j public class Example14_19 { public static void main(String[] args) throws InterruptedException { Flux .interval(Duration.ofMillis(300)) .skip(Duration.ofSeconds(1)) .subscribe(data -> log.info("# onNext: {}", data)); Thread.sleep(2000L); } }
take
take() Operator는 Upstream에서 emit되는 데이터 중에서 파라미터로 입력받은 숫자만큼만 Downstream으로 emit 합니다.
import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import java.time.Duration; /** * take 예제 */ @Slf4j public class Example14_21 { public static void main(String[] args) throws InterruptedException { Flux .interval(Duration.ofSeconds(1)) .take(3) .subscribe(data -> log.info("# onNext: {}", data)); Thread.sleep(4000L); } }
take() Operator의 파라미터로 시간을 지정하면 Upstream에서 emit되는 데이터 중 파라미터로 입력한 시간 내에 emit된 데이터만 Downstream으로 emit 합니다.
import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import java.time.Duration; /** * take 예제 */ @Slf4j public class Example14_22 { public static void main(String[] args) throws InterruptedException { Flux .interval(Duration.ofSeconds(1)) .take(Duration.ofMillis(2500)) .subscribe(data -> log.info("# onNext: {}", data)); Thread.sleep(3000L); } }
takeLast
takeLast() Operaotr는 Upstream에서 emit된 데이터 중에서 파라미터로 입력한 개수만큼 가장 마지막에 emit된 데이터를 Downstream으로 emit 합니다.
import chapter14.SampleData; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; /** * takeLast 예제 */ @Slf4j public class Example14_23 { public static void main(String[] args) { Flux .fromIterable(SampleData.btcTopPricesPerYear) .takeLast(2) .subscribe(tuple -> log.info("# onNext: {}, {}", tuple.getT1(), tuple.getT2())); } }
takeUntil
takeIUntil() Operator는 파라미터로 입력한 람다 표현식(Predicate)이 true가 될 때까지 Upstream에서 emit된 데이터를 Downstream으로 emit 합니다. Upstream에서 emit된 데이터에는 Predicate을 평가할 때 사용한 데이터가 포함됩니다. 즉, 다음 예제 코드의 실행 결과를 보면 20,000,000원을 초과한 금액인 22,483,583원까지 emit된 것을 확인할 수 있습니다.
import chapter14.SampleData; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; /** * takeUntil 예제 */ @Slf4j public class Example14_24 { public static void main(String[] args) { Flux .fromIterable(SampleData.btcTopPricesPerYear) .takeUntil(tuple -> tuple.getT2() > 20_000_000) .subscribe(tuple -> log.info("# onNext: {}, {}", tuple.getT1(), tuple.getT2())); } }
takeWhile
takeWhile() Operator는 takeUntil() Operator와 달리 파라미터로 입력한 람다 표현식(Predicate)이 true가 되는 동안에만 Upstream에서 emit된 데이터를 Downstream으로 emit 합니다. Predicate을 평가할 때 사용한 데이터가 Downstream으로 emit되지 않습니다. 즉, 다음 예제 코드의 실행 결과를 보면 20,000,000원 미만인 1,111,811원까지 emit된 것을 확인할 수 있습니다.
import chapter14.SampleData; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; /** * takeWhile 예제 */ @Slf4j public class Example14_25 { public static void main(String[] args) { Flux .fromIterable(SampleData.btcTopPricesPerYear) .takeWhile(tuple -> tuple.getT2() < 20_000_000) .subscribe(tuple -> log.info("# onNext: {}, {}", tuple.getT1(), tuple.getT2())); } }
next
next() Operator는 Upstream에서 emit되는 데이터 중 첫 번째 데이터만 Downstream으로 emit 합니다. 만일 Upstream에서 emit되는 데이터가 empty라면 Downstream으로 empty Mono를 emit 합니다. 다음은 연도별 BTC 최고가 중 가장 첫 해의 최고가를 출력하는 예제 코드입니다.
import chapter14.SampleData; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; /** * next 예제 */ @Slf4j public class Example14_26 { public static void main(String[] args) { Flux .fromIterable(SampleData.btcTopPricesPerYear) .next() .subscribe(tuple -> log.info("# onNext: {}, {}", tuple.getT1(), tuple.getT2())); } }
[참고 정보]
반응형'Spring Reactive Web Application > Project Reactor' 카테고리의 다른 글
Sequence의 내부 동작 확인을 위한 Operator (0) 2023.08.05 Sequence 변환 Operator (0) 2023.08.05 Sequence 생성 Operator (0) 2023.08.03 Testing (0) 2023.07.29 Debugging (0) 2023.07.28