ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 에러 처리를 위한 Operator
    Spring Reactive Web Application/Project Reactor 2023. 8. 6. 09:00
    반응형

    error

      error() Operator는 파라미터로 지정된 에러로 종료하는 Flux를 생성합니다. error() Operator는 Java의 throw 키워드를 사용해서 예외를 의도적으로 던지는 것 같은 역할을 하는데 주로 체크 예외(Checked Exception)를 캐치해서 다시 던져야 하는 경우 사용할 수 있습니다.

     

    Note. 체크 예외(Checked Exception)

      체크 예외는 Java에서 Exception 클래스를 상속한 클래스들을 의미하며 try ~ catch 문으로 반드시 처리해야 되는 예외입니다.

     

      다음은 Upstream에서 emit되는 숫자에 2를 곱한 값이 3의 배수가 되는 경우 Downstream으로 emit을 허용하지 않기 위해 error() Operator를 사용한 예제 코드입니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    /**
     * error 처리 예제
     *  - error Operator
     *      - 명시적으로 error 이벤트를 발생시켜야 하는 경우
     */
    @Slf4j
    public class Example14_43 {
        public static void main(String[] args) {
            Flux
                .range(1, 5)
                .flatMap(num -> {
                    if ((num * 2) % 3 == 0) {
                        return Flux.error(
                                new IllegalArgumentException("Not allowed multiple of 3"));
                    } else {
                        return Mono.just(num * 2);
                    }
                })
                .subscribe(data -> log.info("# onNext: {}", data),
                        error -> log.error("# onError: ", error));
        }
    }

     

    onErrorReturn

      onErrorReturn() Operator는 에러 이벤트가 발생했을 때, 에러 이벤트를 Downstream으로 전파하지 않고 대체 값을 emit 합니다. Java에서 예외가 발생했을 때 try ~ catch 문의 catch 블록에서 예외에 해당하는 대체 값을 리턴하는 방식과 유사합니다.

     

      다음은 onErrorReturn() Operator를 이용해 Exception이 발생하면 "No pen name"이라는 문자열 값으로 대체해서 emit 하는 예제 코드입니다.

    import chapter14.Book;
    import chapter14.SampleData;
    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    /**
     * error 처리 예제
     *  - onErrorReturn Operator
     *      - 예외가 발생했을 때, error 이벤트를 발생시키지 않고, default value로 대체해서 emit하고자 할 경우
     *      - try ~ catch 문의 경우, catch해서 return default value 하는 것과 같다.
     */
    @Slf4j
    public class Example14_45 {
        public static void main(String[] args) {
            getBooks()
                    .map(book -> book.getPenName().toUpperCase())
                    .onErrorReturn("No pen name")
                    .subscribe(log::info);
        }
    
        public static Flux<Book> getBooks() {
            return Flux.fromIterable(SampleData.books);
        }
    }

     

      다음은 첫 번째 파라미터로 특정 예외 타입을 지정해서 지정된 타입의 예외가 발생할 경우에만 대체 값을 emit 하도록 하는 예제 코드입니다.

    import chapter14.Book;
    import chapter14.SampleData;
    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    import java.util.IllegalFormatException;
    
    /**
     * error 처리 예제
     *  - onErrorReturn Operator
     *      - 예외가 발생했을 때, error 이벤트를 발생시키지 않고, default value로 대체해서 emit하고자 할 경우
     *      - try ~ catch 문의 경우, catch해서 return default value 하는 것과 같다.
     */
    @Slf4j
    public class Example14_46 {
        public static void main(String[] args) {
            getBooks()
                    .map(book -> book.getPenName().toUpperCase())
                    .onErrorReturn(NullPointerException.class, "no pen name")
                    .onErrorReturn(IllegalFormatException.class, "Illegal pen name")
                    .subscribe(log::info);
        }
    
        public static Flux<Book> getBooks() {
            return Flux.fromIterable(SampleData.books);
        }
    }

     

    onErrorResume

      onErrorResume() Operator는 에러 이벤트가 발생했을 때, 에러 이벤트를 Downstream으로 전파하지 않고 대체 Publisher를 리턴합니다. Java에서 예외가 발생했을 때 try ~ catch 문의 catch 블록에서 예외가 발생한 메서드를 대체할 수 있는 또 다른 메서드를 호출하는 방식으로 볼 수 있습니다.

     

      다음은 특정 키워드로 캐시에서 검색된 도서가 없으면 데이터베이스에서 검색하도록 시뮬레이션 한 예제 코드입니다.

    import chapter14.Book;
    import chapter14.SampleData;
    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * error 처리 예제
     *  - onErrorResume Operator
     *      - 예외가 발생했을 때, error 이벤트를 발생시키지 않고, 대체 Publisher로 데이터를 emit하고자 할 경우
     *      - try ~ catch 문의 경우, catch해서 return default value 하는 것과 같다.
     */
    @Slf4j
    public class Example14_47 {
        public static void main(String[] args) {
            final String keyword = "DDD";
            getBooksFromCache(keyword)
                    .onErrorResume(error -> getBooksFromDatabase(keyword))
                    .subscribe(data -> log.info("# onNext: {}", data.getBookName()),
                            error -> log.error("# onError: ", error));
        }
    
        public static Flux<Book> getBooksFromCache(final String keyword) {
            return Flux
                    .fromIterable(SampleData.books)
                    .filter(book -> book.getBookName().contains(keyword))
                    .switchIfEmpty(Flux.error(new NoSuchBookException("No such Book")));
        }
    
        public static Flux<Book> getBooksFromDatabase(final String keyword) {
            List<Book> books = new ArrayList<>(SampleData.books);
            books.add(new Book("DDD: Domain Driven Design",
                    "Joy", "ddd-man", 35000, 200));
            return Flux
                    .fromIterable(books)
                    .filter(book -> book.getBookName().contains(keyword))
                    .switchIfEmpty(Flux.error(new NoSuchBookException("No such Book")));
        }
    
        private static class NoSuchBookException extends RuntimeException {
            NoSuchBookException(String message) {
                super(message);
            }
        }
    }
    
    /**
     * [실행 결과]
     * # onNext: DDD: Domain Driven Design
     */

     

    onErrorContinue

      onErrorContinue() Operator는 에러가 발생했을 때, 에러 영역 내에 있는 데이터는 제거하고 Upstream에서 후속 데이터를 emit 합니다. onErrorContinue() Operator의 파라미터인 BiConsumer 함수형 인터페이스를 통해 에러 메시지와 에러가 발생했을 때 emit된 데이터를 전달받아서 로그를 기록하는 등의 후처리를 할 수 있습니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    /**
     * error 처리 예제
     *  - onErrorContinue Operator
     *      - 예외가 발생했을 때, 예외를 발생시킨 데이터를 건너뛰고 Upstream에서 emit된 다음 데이터를
     *        처리한다.
     */
    @Slf4j
    public class Example14_48 {
        public static void main(String[] args) {
            Flux
                .just(1, 2, 4, 0, 6, 12)
                .map(num -> 12 / num)
                .onErrorContinue((error, num) ->
                        log.error("error: {}, num: {}", error, num))
                .subscribe(data -> log.info("# onNext: {}", data),
                            error -> log.error("# onError: ", error));
        }
    }
    
    /**
     * [실행 결과]
     * # onNext: 12
     * # onNext: 6
     * # onNext: 3
     * # error: java.lang.ArithmeticException: / by zero, num: 0
     * # onNext: 2
     * # onNext: 1
     */

    Note. Reactor 공식 문서에서는 onErrorContinue() Operator가 명확하지 않은 Sequence의 동작으로 개발자가 의도하지 않은 상황을 발생시킬 수 있기 때문에 신중하게 사용하기를 권고합니다. 대부분의 에러는 Operator 내부에서 doOnError() Operator를 통해 로그를 기록하고 onErrorResume() Operator 등으로 처리할 수 있다고 명시합니다.

     

    retry

      retry() Operator는 Publisher가 데이터를 emit 하는 과정에서 에러가 발생하면 파라미터로 입력한 횟수만큼 원본 Flux의 Sequence를 다시 구독합니다. 만약 파라미터로 Long.MAX_VALUE를 입력하면 재구독을 무한 반복합니다.

     

      retry() Operator는 특히 timeout() Operator와 함께 사용하여 네트워크 지연으로 인해 정해진 시간 안에 응답을 받지 못하면 일정 횟수만큼 재요청해야 하는 상황에서 유용하게 사용할 수 있습니다.

     

      다음은 1.5초 동안 Upstream으로부터 emit 되는 데이터가 없으면 TimeoutException이 발생하고, 1회 재구독하는 예제 코드입니다. range() Operator에서 emit되는 데이터가 1초에 한 번씩 emit 되는데, map() Operator 내부에서 숫자 3일 경우에만 의도적으로 1초의 지연 시간을 더합니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    import java.time.Duration;
    
    /**
     * error 처리 예제
     *  - retry Operator
     *      - 에러가 발생했을 때, 지정된 횟수만큼 Sequence를 다시 구독한다.
     */
    @Slf4j
    public class Example14_49 {
        public static void main(String[] args) throws InterruptedException {
            final int[] count = {1};
            Flux
                .range(1, 3)
                .delayElements(Duration.ofSeconds(1))
                .map(num -> {
                    try {
                        if (num == 3 && count[0] == 1) {
                            count[0]++;
                            Thread.sleep(1000);
                        }
                    } catch (InterruptedException e) {}
    
                    return num;
                })
                .timeout(Duration.ofMillis(1500))
                .retry(1)
                .subscribe(data -> log.info("# onNext: {}", data),
                        (error -> log.error("# onError: ", error)),
                        () -> log.info("# onComplete"));
    
            Thread.sleep(7000);
        }
    }
    
    /**
     * [실행 결과]
     * # onNext: 1
     * # onNext: 2
     * onNextDropped: 3
     * # onNext: 1
     * # onNext: 2
     * # onNext: 3
     * # onComplete
     */

     

     

     

    [참고 정보]

    반응형

    댓글

Designed by Tistory.