ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Sequence 필터링 Operator
    Spring Reactive Web Application/Project Reactor 2023. 8. 4. 05:00
    반응형

    filter

      filter() Operator는 Upstream에서 emit된 데이터 중 조건에 일치하는 데이터만 Downstream으로 emit 합니다. 즉, 파라미터로 입력받은 Predicate의 리턴 값이 true인 데이터만 Downstream으로 emit 합니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    /**
     * filter 예제
     */
    @Slf4j
    public class Example14_15 {
        public static void main(String[] args) {
            Flux
                .range(1, 20)
                .filter(num -> num % 2 != 0)
                .subscribe(data -> log.info("# onNext: {}", data));
        }
    }

     

      다음은 비동기적으로 필터링을 수행하는 filterWhen() Operator를 사용한 예제 코드입니다.

    import chapter14.SampleData;
    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    import reactor.core.scheduler.Schedulers;
    import reactor.util.function.Tuple2;
    
    import java.util.Map;
    
    import static chapter14.SampleData.*;
    
    /**
     * filterWhen 예제
     */
    @Slf4j
    public class Example14_17 {
        public static void main(String[] args) throws InterruptedException {
            Map<CovidVaccine, Tuple2<CovidVaccine, Integer>> vaccineMap =
                                                                    getCovidVaccines();
            Flux
                .fromIterable(SampleData.coronaVaccineNames)
                .filterWhen(vaccine -> Mono
                                        .just(vaccineMap.get(vaccine).getT2() >= 3_000_000)
                                        .publishOn(Schedulers.parallel()))
                .subscribe(data -> log.info("# onNext: {}", data));
    
            Thread.sleep(1000);
        }
    }

      filterWhen() Operator는 내부에서 Inner Sequence를 통해 조건에 맞는 데이터인지를 비동기적으로 테스트한 후, 테스트 결과가 true라면 filterWhen()의 Upstream으로부터 전달받은 데이터를 Downstream으로 emit 합니다.

     

    skip

      skip() Operator는 Upstream에서 emit된 데이터 중 파라미터로 입력받은 숫자만큼 건너띈 후, 나머지 데이터를 Downstream으로 emit 합니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    import java.time.Duration;
    
    /**
     * skip 예제
     */
    @Slf4j
    public class Example14_18 {
        public static void main(String[] args) throws InterruptedException {
            Flux
                .interval(Duration.ofSeconds(1))
                .skip(2)
                .subscribe(data -> log.info("# onNext: {}", data));
    
            Thread.sleep(5500L);
        }
    }

     

      skip() Operator의 파라미터로 시간을 지정하면 파라미터로 지정한 시간 내에 emit된 데이터를 건너띈 후, 나머지 데이터를 emit 합니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    import java.time.Duration;
    
    /**
     * skip 예제
     */
    @Slf4j
    public class Example14_19 {
        public static void main(String[] args) throws InterruptedException {
            Flux
                .interval(Duration.ofMillis(300))
                .skip(Duration.ofSeconds(1))
                .subscribe(data -> log.info("# onNext: {}", data));
    
            Thread.sleep(2000L);
        }
    }

     

    take

      take() Operator는 Upstream에서 emit되는 데이터 중에서 파라미터로 입력받은 숫자만큼만 Downstream으로 emit 합니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    import java.time.Duration;
    
    /**
     * take 예제
     */
    @Slf4j
    public class Example14_21 {
        public static void main(String[] args) throws InterruptedException {
            Flux
                .interval(Duration.ofSeconds(1))
                .take(3)
                .subscribe(data -> log.info("# onNext: {}", data));
    
            Thread.sleep(4000L);
        }
    }

     

      take() Operator의 파라미터로 시간을 지정하면 Upstream에서 emit되는 데이터 중 파라미터로 입력한 시간 내에 emit된 데이터만 Downstream으로 emit 합니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    import java.time.Duration;
    
    /**
     * take 예제
     */
    @Slf4j
    public class Example14_22 {
        public static void main(String[] args) throws InterruptedException {
            Flux
                .interval(Duration.ofSeconds(1))
                .take(Duration.ofMillis(2500))
                .subscribe(data -> log.info("# onNext: {}", data));
    
            Thread.sleep(3000L);
        }
    }

     

    takeLast

      takeLast() Operaotr는 Upstream에서 emit된 데이터 중에서 파라미터로 입력한 개수만큼 가장 마지막에 emit된 데이터를 Downstream으로 emit 합니다.

    import chapter14.SampleData;
    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    /**
     * takeLast 예제
     */
    @Slf4j
    public class Example14_23 {
        public static void main(String[] args) {
            Flux
                .fromIterable(SampleData.btcTopPricesPerYear)
                .takeLast(2)
                .subscribe(tuple -> log.info("# onNext: {}, {}",
                                                tuple.getT1(), tuple.getT2()));
        }
    }

     

    takeUntil

      takeIUntil() Operator는 파라미터로 입력한 람다 표현식(Predicate)이 true가 될 때까지 Upstream에서 emit된 데이터를 Downstream으로 emit 합니다. Upstream에서 emit된 데이터에는 Predicate을 평가할 때 사용한 데이터가 포함됩니다. 즉, 다음 예제 코드의 실행 결과를 보면 20,000,000원을 초과한 금액인 22,483,583원까지 emit된 것을 확인할 수 있습니다.

    import chapter14.SampleData;
    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    /**
     * takeUntil 예제
     */
    @Slf4j
    public class Example14_24 {
        public static void main(String[] args) {
            Flux
                .fromIterable(SampleData.btcTopPricesPerYear)
                .takeUntil(tuple -> tuple.getT2() > 20_000_000)
                .subscribe(tuple -> log.info("# onNext: {}, {}",
                                                tuple.getT1(), tuple.getT2()));
        }
    }

     

    takeWhile

      takeWhile() Operator는 takeUntil() Operator와 달리 파라미터로 입력한 람다 표현식(Predicate)이 true가 되는 동안에만 Upstream에서 emit된 데이터를 Downstream으로 emit 합니다. Predicate을 평가할 때 사용한 데이터가 Downstream으로 emit되지 않습니다. 즉, 다음 예제 코드의 실행 결과를 보면 20,000,000원 미만인 1,111,811원까지 emit된 것을 확인할 수 있습니다.

    import chapter14.SampleData;
    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    /**
     * takeWhile 예제
     */
    @Slf4j
    public class Example14_25 {
        public static void main(String[] args) {
            Flux
                .fromIterable(SampleData.btcTopPricesPerYear)
                .takeWhile(tuple -> tuple.getT2() < 20_000_000)
                .subscribe(tuple -> log.info("# onNext: {}, {}",
                                                    tuple.getT1(), tuple.getT2()));
        }
    }

     

    next

      next() Operator는 Upstream에서 emit되는 데이터 중 첫 번째 데이터만 Downstream으로 emit 합니다. 만일 Upstream에서 emit되는 데이터가 empty라면 Downstream으로 empty Mono를 emit 합니다. 다음은 연도별 BTC 최고가 중 가장 첫 해의 최고가를 출력하는 예제 코드입니다.

     

    import chapter14.SampleData;
    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    /**
     * next 예제
     */
    @Slf4j
    public class Example14_26 {
        public static void main(String[] args) {
            Flux
                .fromIterable(SampleData.btcTopPricesPerYear)
                .next()
                .subscribe(tuple -> log.info("# onNext: {}, {}", tuple.getT1(), tuple.getT2()));
        }
    }

     

     

     

    [참고 정보]

    반응형

    'Spring Reactive Web Application > Project Reactor' 카테고리의 다른 글

    Sequence의 내부 동작 확인을 위한 Operator  (0) 2023.08.05
    Sequence 변환 Operator  (0) 2023.08.05
    Sequence 생성 Operator  (0) 2023.08.03
    Testing  (0) 2023.07.29
    Debugging  (0) 2023.07.28

    댓글

Designed by Tistory.