Sequence 필터링 Operator
filter
filter() Operator는 Upstream에서 emit된 데이터 중 조건에 일치하는 데이터만 Downstream으로 emit 합니다. 즉, 파라미터로 입력받은 Predicate의 리턴 값이 true인 데이터만 Downstream으로 emit 합니다.
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
/**
* filter 예제
*/
@Slf4j
public class Example14_15 {
public static void main(String[] args) {
Flux
.range(1, 20)
.filter(num -> num % 2 != 0)
.subscribe(data -> log.info("# onNext: {}", data));
}
}
다음은 비동기적으로 필터링을 수행하는 filterWhen() Operator를 사용한 예제 코드입니다.
import chapter14.SampleData;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
import java.util.Map;
import static chapter14.SampleData.*;
/**
* filterWhen 예제
*/
@Slf4j
public class Example14_17 {
public static void main(String[] args) throws InterruptedException {
Map<CovidVaccine, Tuple2<CovidVaccine, Integer>> vaccineMap =
getCovidVaccines();
Flux
.fromIterable(SampleData.coronaVaccineNames)
.filterWhen(vaccine -> Mono
.just(vaccineMap.get(vaccine).getT2() >= 3_000_000)
.publishOn(Schedulers.parallel()))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(1000);
}
}
filterWhen() Operator는 내부에서 Inner Sequence를 통해 조건에 맞는 데이터인지를 비동기적으로 테스트한 후, 테스트 결과가 true라면 filterWhen()의 Upstream으로부터 전달받은 데이터를 Downstream으로 emit 합니다.
skip
skip() Operator는 Upstream에서 emit된 데이터 중 파라미터로 입력받은 숫자만큼 건너띈 후, 나머지 데이터를 Downstream으로 emit 합니다.
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import java.time.Duration;
/**
* skip 예제
*/
@Slf4j
public class Example14_18 {
public static void main(String[] args) throws InterruptedException {
Flux
.interval(Duration.ofSeconds(1))
.skip(2)
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(5500L);
}
}
skip() Operator의 파라미터로 시간을 지정하면 파라미터로 지정한 시간 내에 emit된 데이터를 건너띈 후, 나머지 데이터를 emit 합니다.
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import java.time.Duration;
/**
* skip 예제
*/
@Slf4j
public class Example14_19 {
public static void main(String[] args) throws InterruptedException {
Flux
.interval(Duration.ofMillis(300))
.skip(Duration.ofSeconds(1))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(2000L);
}
}
take
take() Operator는 Upstream에서 emit되는 데이터 중에서 파라미터로 입력받은 숫자만큼만 Downstream으로 emit 합니다.
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import java.time.Duration;
/**
* take 예제
*/
@Slf4j
public class Example14_21 {
public static void main(String[] args) throws InterruptedException {
Flux
.interval(Duration.ofSeconds(1))
.take(3)
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(4000L);
}
}
take() Operator의 파라미터로 시간을 지정하면 Upstream에서 emit되는 데이터 중 파라미터로 입력한 시간 내에 emit된 데이터만 Downstream으로 emit 합니다.
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import java.time.Duration;
/**
* take 예제
*/
@Slf4j
public class Example14_22 {
public static void main(String[] args) throws InterruptedException {
Flux
.interval(Duration.ofSeconds(1))
.take(Duration.ofMillis(2500))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(3000L);
}
}
takeLast
takeLast() Operaotr는 Upstream에서 emit된 데이터 중에서 파라미터로 입력한 개수만큼 가장 마지막에 emit된 데이터를 Downstream으로 emit 합니다.
import chapter14.SampleData;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
/**
* takeLast 예제
*/
@Slf4j
public class Example14_23 {
public static void main(String[] args) {
Flux
.fromIterable(SampleData.btcTopPricesPerYear)
.takeLast(2)
.subscribe(tuple -> log.info("# onNext: {}, {}",
tuple.getT1(), tuple.getT2()));
}
}
takeUntil
takeIUntil() Operator는 파라미터로 입력한 람다 표현식(Predicate)이 true가 될 때까지 Upstream에서 emit된 데이터를 Downstream으로 emit 합니다. Upstream에서 emit된 데이터에는 Predicate을 평가할 때 사용한 데이터가 포함됩니다. 즉, 다음 예제 코드의 실행 결과를 보면 20,000,000원을 초과한 금액인 22,483,583원까지 emit된 것을 확인할 수 있습니다.
import chapter14.SampleData;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
/**
* takeUntil 예제
*/
@Slf4j
public class Example14_24 {
public static void main(String[] args) {
Flux
.fromIterable(SampleData.btcTopPricesPerYear)
.takeUntil(tuple -> tuple.getT2() > 20_000_000)
.subscribe(tuple -> log.info("# onNext: {}, {}",
tuple.getT1(), tuple.getT2()));
}
}
takeWhile
takeWhile() Operator는 takeUntil() Operator와 달리 파라미터로 입력한 람다 표현식(Predicate)이 true가 되는 동안에만 Upstream에서 emit된 데이터를 Downstream으로 emit 합니다. Predicate을 평가할 때 사용한 데이터가 Downstream으로 emit되지 않습니다. 즉, 다음 예제 코드의 실행 결과를 보면 20,000,000원 미만인 1,111,811원까지 emit된 것을 확인할 수 있습니다.
import chapter14.SampleData;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
/**
* takeWhile 예제
*/
@Slf4j
public class Example14_25 {
public static void main(String[] args) {
Flux
.fromIterable(SampleData.btcTopPricesPerYear)
.takeWhile(tuple -> tuple.getT2() < 20_000_000)
.subscribe(tuple -> log.info("# onNext: {}, {}",
tuple.getT1(), tuple.getT2()));
}
}
next
next() Operator는 Upstream에서 emit되는 데이터 중 첫 번째 데이터만 Downstream으로 emit 합니다. 만일 Upstream에서 emit되는 데이터가 empty라면 Downstream으로 empty Mono를 emit 합니다. 다음은 연도별 BTC 최고가 중 가장 첫 해의 최고가를 출력하는 예제 코드입니다.
import chapter14.SampleData;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
/**
* next 예제
*/
@Slf4j
public class Example14_26 {
public static void main(String[] args) {
Flux
.fromIterable(SampleData.btcTopPricesPerYear)
.next()
.subscribe(tuple -> log.info("# onNext: {}, {}", tuple.getT1(), tuple.getT2()));
}
}
[참고 정보]