hanseom 2023. 8. 4. 05:00
반응형

filter

  filter() Operator는 Upstream에서 emit된 데이터 중 조건에 일치하는 데이터만 Downstream으로 emit 합니다. 즉, 파라미터로 입력받은 Predicate의 리턴 값이 true인 데이터만 Downstream으로 emit 합니다.

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

/**
 * filter 예제
 */
@Slf4j
public class Example14_15 {
    public static void main(String[] args) {
        Flux
            .range(1, 20)
            .filter(num -> num % 2 != 0)
            .subscribe(data -> log.info("# onNext: {}", data));
    }
}

 

  다음은 비동기적으로 필터링을 수행하는 filterWhen() Operator를 사용한 예제 코드입니다.

import chapter14.SampleData;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;

import java.util.Map;

import static chapter14.SampleData.*;

/**
 * filterWhen 예제
 */
@Slf4j
public class Example14_17 {
    public static void main(String[] args) throws InterruptedException {
        Map<CovidVaccine, Tuple2<CovidVaccine, Integer>> vaccineMap =
                                                                getCovidVaccines();
        Flux
            .fromIterable(SampleData.coronaVaccineNames)
            .filterWhen(vaccine -> Mono
                                    .just(vaccineMap.get(vaccine).getT2() >= 3_000_000)
                                    .publishOn(Schedulers.parallel()))
            .subscribe(data -> log.info("# onNext: {}", data));

        Thread.sleep(1000);
    }
}

  filterWhen() Operator는 내부에서 Inner Sequence를 통해 조건에 맞는 데이터인지를 비동기적으로 테스트한 후, 테스트 결과가 true라면 filterWhen()의 Upstream으로부터 전달받은 데이터를 Downstream으로 emit 합니다.

 

skip

  skip() Operator는 Upstream에서 emit된 데이터 중 파라미터로 입력받은 숫자만큼 건너띈 후, 나머지 데이터를 Downstream으로 emit 합니다.

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

import java.time.Duration;

/**
 * skip 예제
 */
@Slf4j
public class Example14_18 {
    public static void main(String[] args) throws InterruptedException {
        Flux
            .interval(Duration.ofSeconds(1))
            .skip(2)
            .subscribe(data -> log.info("# onNext: {}", data));

        Thread.sleep(5500L);
    }
}

 

  skip() Operator의 파라미터로 시간을 지정하면 파라미터로 지정한 시간 내에 emit된 데이터를 건너띈 후, 나머지 데이터를 emit 합니다.

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

import java.time.Duration;

/**
 * skip 예제
 */
@Slf4j
public class Example14_19 {
    public static void main(String[] args) throws InterruptedException {
        Flux
            .interval(Duration.ofMillis(300))
            .skip(Duration.ofSeconds(1))
            .subscribe(data -> log.info("# onNext: {}", data));

        Thread.sleep(2000L);
    }
}

 

take

  take() Operator는 Upstream에서 emit되는 데이터 중에서 파라미터로 입력받은 숫자만큼만 Downstream으로 emit 합니다.

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

import java.time.Duration;

/**
 * take 예제
 */
@Slf4j
public class Example14_21 {
    public static void main(String[] args) throws InterruptedException {
        Flux
            .interval(Duration.ofSeconds(1))
            .take(3)
            .subscribe(data -> log.info("# onNext: {}", data));

        Thread.sleep(4000L);
    }
}

 

  take() Operator의 파라미터로 시간을 지정하면 Upstream에서 emit되는 데이터 중 파라미터로 입력한 시간 내에 emit된 데이터만 Downstream으로 emit 합니다.

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

import java.time.Duration;

/**
 * take 예제
 */
@Slf4j
public class Example14_22 {
    public static void main(String[] args) throws InterruptedException {
        Flux
            .interval(Duration.ofSeconds(1))
            .take(Duration.ofMillis(2500))
            .subscribe(data -> log.info("# onNext: {}", data));

        Thread.sleep(3000L);
    }
}

 

takeLast

  takeLast() Operaotr는 Upstream에서 emit된 데이터 중에서 파라미터로 입력한 개수만큼 가장 마지막에 emit된 데이터를 Downstream으로 emit 합니다.

import chapter14.SampleData;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

/**
 * takeLast 예제
 */
@Slf4j
public class Example14_23 {
    public static void main(String[] args) {
        Flux
            .fromIterable(SampleData.btcTopPricesPerYear)
            .takeLast(2)
            .subscribe(tuple -> log.info("# onNext: {}, {}",
                                            tuple.getT1(), tuple.getT2()));
    }
}

 

takeUntil

  takeIUntil() Operator는 파라미터로 입력한 람다 표현식(Predicate)이 true가 될 때까지 Upstream에서 emit된 데이터를 Downstream으로 emit 합니다. Upstream에서 emit된 데이터에는 Predicate을 평가할 때 사용한 데이터가 포함됩니다. 즉, 다음 예제 코드의 실행 결과를 보면 20,000,000원을 초과한 금액인 22,483,583원까지 emit된 것을 확인할 수 있습니다.

import chapter14.SampleData;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

/**
 * takeUntil 예제
 */
@Slf4j
public class Example14_24 {
    public static void main(String[] args) {
        Flux
            .fromIterable(SampleData.btcTopPricesPerYear)
            .takeUntil(tuple -> tuple.getT2() > 20_000_000)
            .subscribe(tuple -> log.info("# onNext: {}, {}",
                                            tuple.getT1(), tuple.getT2()));
    }
}

 

takeWhile

  takeWhile() Operator는 takeUntil() Operator와 달리 파라미터로 입력한 람다 표현식(Predicate)이 true가 되는 동안에만 Upstream에서 emit된 데이터를 Downstream으로 emit 합니다. Predicate을 평가할 때 사용한 데이터가 Downstream으로 emit되지 않습니다. 즉, 다음 예제 코드의 실행 결과를 보면 20,000,000원 미만인 1,111,811원까지 emit된 것을 확인할 수 있습니다.

import chapter14.SampleData;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

/**
 * takeWhile 예제
 */
@Slf4j
public class Example14_25 {
    public static void main(String[] args) {
        Flux
            .fromIterable(SampleData.btcTopPricesPerYear)
            .takeWhile(tuple -> tuple.getT2() < 20_000_000)
            .subscribe(tuple -> log.info("# onNext: {}, {}",
                                                tuple.getT1(), tuple.getT2()));
    }
}

 

next

  next() Operator는 Upstream에서 emit되는 데이터 중 첫 번째 데이터만 Downstream으로 emit 합니다. 만일 Upstream에서 emit되는 데이터가 empty라면 Downstream으로 empty Mono를 emit 합니다. 다음은 연도별 BTC 최고가 중 가장 첫 해의 최고가를 출력하는 예제 코드입니다.

 

import chapter14.SampleData;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

/**
 * next 예제
 */
@Slf4j
public class Example14_26 {
    public static void main(String[] args) {
        Flux
            .fromIterable(SampleData.btcTopPricesPerYear)
            .next()
            .subscribe(tuple -> log.info("# onNext: {}, {}", tuple.getT1(), tuple.getT2()));
    }
}

 

 

 

[참고 정보]

반응형