Spring Reactive Web Application/Project Reactor

에러 처리를 위한 Operator

hanseom 2023. 8. 6. 09:00
반응형

error

  error() Operator는 파라미터로 지정된 에러로 종료하는 Flux를 생성합니다. error() Operator는 Java의 throw 키워드를 사용해서 예외를 의도적으로 던지는 것 같은 역할을 하는데 주로 체크 예외(Checked Exception)를 캐치해서 다시 던져야 하는 경우 사용할 수 있습니다.

 

Note. 체크 예외(Checked Exception)

  체크 예외는 Java에서 Exception 클래스를 상속한 클래스들을 의미하며 try ~ catch 문으로 반드시 처리해야 되는 예외입니다.

 

  다음은 Upstream에서 emit되는 숫자에 2를 곱한 값이 3의 배수가 되는 경우 Downstream으로 emit을 허용하지 않기 위해 error() Operator를 사용한 예제 코드입니다.

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
 * error 처리 예제
 *  - error Operator
 *      - 명시적으로 error 이벤트를 발생시켜야 하는 경우
 */
@Slf4j
public class Example14_43 {
    public static void main(String[] args) {
        Flux
            .range(1, 5)
            .flatMap(num -> {
                if ((num * 2) % 3 == 0) {
                    return Flux.error(
                            new IllegalArgumentException("Not allowed multiple of 3"));
                } else {
                    return Mono.just(num * 2);
                }
            })
            .subscribe(data -> log.info("# onNext: {}", data),
                    error -> log.error("# onError: ", error));
    }
}

 

onErrorReturn

  onErrorReturn() Operator는 에러 이벤트가 발생했을 때, 에러 이벤트를 Downstream으로 전파하지 않고 대체 값을 emit 합니다. Java에서 예외가 발생했을 때 try ~ catch 문의 catch 블록에서 예외에 해당하는 대체 값을 리턴하는 방식과 유사합니다.

 

  다음은 onErrorReturn() Operator를 이용해 Exception이 발생하면 "No pen name"이라는 문자열 값으로 대체해서 emit 하는 예제 코드입니다.

import chapter14.Book;
import chapter14.SampleData;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

/**
 * error 처리 예제
 *  - onErrorReturn Operator
 *      - 예외가 발생했을 때, error 이벤트를 발생시키지 않고, default value로 대체해서 emit하고자 할 경우
 *      - try ~ catch 문의 경우, catch해서 return default value 하는 것과 같다.
 */
@Slf4j
public class Example14_45 {
    public static void main(String[] args) {
        getBooks()
                .map(book -> book.getPenName().toUpperCase())
                .onErrorReturn("No pen name")
                .subscribe(log::info);
    }

    public static Flux<Book> getBooks() {
        return Flux.fromIterable(SampleData.books);
    }
}

 

  다음은 첫 번째 파라미터로 특정 예외 타입을 지정해서 지정된 타입의 예외가 발생할 경우에만 대체 값을 emit 하도록 하는 예제 코드입니다.

import chapter14.Book;
import chapter14.SampleData;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

import java.util.IllegalFormatException;

/**
 * error 처리 예제
 *  - onErrorReturn Operator
 *      - 예외가 발생했을 때, error 이벤트를 발생시키지 않고, default value로 대체해서 emit하고자 할 경우
 *      - try ~ catch 문의 경우, catch해서 return default value 하는 것과 같다.
 */
@Slf4j
public class Example14_46 {
    public static void main(String[] args) {
        getBooks()
                .map(book -> book.getPenName().toUpperCase())
                .onErrorReturn(NullPointerException.class, "no pen name")
                .onErrorReturn(IllegalFormatException.class, "Illegal pen name")
                .subscribe(log::info);
    }

    public static Flux<Book> getBooks() {
        return Flux.fromIterable(SampleData.books);
    }
}

 

onErrorResume

  onErrorResume() Operator는 에러 이벤트가 발생했을 때, 에러 이벤트를 Downstream으로 전파하지 않고 대체 Publisher를 리턴합니다. Java에서 예외가 발생했을 때 try ~ catch 문의 catch 블록에서 예외가 발생한 메서드를 대체할 수 있는 또 다른 메서드를 호출하는 방식으로 볼 수 있습니다.

 

  다음은 특정 키워드로 캐시에서 검색된 도서가 없으면 데이터베이스에서 검색하도록 시뮬레이션 한 예제 코드입니다.

import chapter14.Book;
import chapter14.SampleData;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

import java.util.ArrayList;
import java.util.List;

/**
 * error 처리 예제
 *  - onErrorResume Operator
 *      - 예외가 발생했을 때, error 이벤트를 발생시키지 않고, 대체 Publisher로 데이터를 emit하고자 할 경우
 *      - try ~ catch 문의 경우, catch해서 return default value 하는 것과 같다.
 */
@Slf4j
public class Example14_47 {
    public static void main(String[] args) {
        final String keyword = "DDD";
        getBooksFromCache(keyword)
                .onErrorResume(error -> getBooksFromDatabase(keyword))
                .subscribe(data -> log.info("# onNext: {}", data.getBookName()),
                        error -> log.error("# onError: ", error));
    }

    public static Flux<Book> getBooksFromCache(final String keyword) {
        return Flux
                .fromIterable(SampleData.books)
                .filter(book -> book.getBookName().contains(keyword))
                .switchIfEmpty(Flux.error(new NoSuchBookException("No such Book")));
    }

    public static Flux<Book> getBooksFromDatabase(final String keyword) {
        List<Book> books = new ArrayList<>(SampleData.books);
        books.add(new Book("DDD: Domain Driven Design",
                "Joy", "ddd-man", 35000, 200));
        return Flux
                .fromIterable(books)
                .filter(book -> book.getBookName().contains(keyword))
                .switchIfEmpty(Flux.error(new NoSuchBookException("No such Book")));
    }

    private static class NoSuchBookException extends RuntimeException {
        NoSuchBookException(String message) {
            super(message);
        }
    }
}

/**
 * [실행 결과]
 * # onNext: DDD: Domain Driven Design
 */

 

onErrorContinue

  onErrorContinue() Operator는 에러가 발생했을 때, 에러 영역 내에 있는 데이터는 제거하고 Upstream에서 후속 데이터를 emit 합니다. onErrorContinue() Operator의 파라미터인 BiConsumer 함수형 인터페이스를 통해 에러 메시지와 에러가 발생했을 때 emit된 데이터를 전달받아서 로그를 기록하는 등의 후처리를 할 수 있습니다.

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

/**
 * error 처리 예제
 *  - onErrorContinue Operator
 *      - 예외가 발생했을 때, 예외를 발생시킨 데이터를 건너뛰고 Upstream에서 emit된 다음 데이터를
 *        처리한다.
 */
@Slf4j
public class Example14_48 {
    public static void main(String[] args) {
        Flux
            .just(1, 2, 4, 0, 6, 12)
            .map(num -> 12 / num)
            .onErrorContinue((error, num) ->
                    log.error("error: {}, num: {}", error, num))
            .subscribe(data -> log.info("# onNext: {}", data),
                        error -> log.error("# onError: ", error));
    }
}

/**
 * [실행 결과]
 * # onNext: 12
 * # onNext: 6
 * # onNext: 3
 * # error: java.lang.ArithmeticException: / by zero, num: 0
 * # onNext: 2
 * # onNext: 1
 */

Note. Reactor 공식 문서에서는 onErrorContinue() Operator가 명확하지 않은 Sequence의 동작으로 개발자가 의도하지 않은 상황을 발생시킬 수 있기 때문에 신중하게 사용하기를 권고합니다. 대부분의 에러는 Operator 내부에서 doOnError() Operator를 통해 로그를 기록하고 onErrorResume() Operator 등으로 처리할 수 있다고 명시합니다.

 

retry

  retry() Operator는 Publisher가 데이터를 emit 하는 과정에서 에러가 발생하면 파라미터로 입력한 횟수만큼 원본 Flux의 Sequence를 다시 구독합니다. 만약 파라미터로 Long.MAX_VALUE를 입력하면 재구독을 무한 반복합니다.

 

  retry() Operator는 특히 timeout() Operator와 함께 사용하여 네트워크 지연으로 인해 정해진 시간 안에 응답을 받지 못하면 일정 횟수만큼 재요청해야 하는 상황에서 유용하게 사용할 수 있습니다.

 

  다음은 1.5초 동안 Upstream으로부터 emit 되는 데이터가 없으면 TimeoutException이 발생하고, 1회 재구독하는 예제 코드입니다. range() Operator에서 emit되는 데이터가 1초에 한 번씩 emit 되는데, map() Operator 내부에서 숫자 3일 경우에만 의도적으로 1초의 지연 시간을 더합니다.

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

import java.time.Duration;

/**
 * error 처리 예제
 *  - retry Operator
 *      - 에러가 발생했을 때, 지정된 횟수만큼 Sequence를 다시 구독한다.
 */
@Slf4j
public class Example14_49 {
    public static void main(String[] args) throws InterruptedException {
        final int[] count = {1};
        Flux
            .range(1, 3)
            .delayElements(Duration.ofSeconds(1))
            .map(num -> {
                try {
                    if (num == 3 && count[0] == 1) {
                        count[0]++;
                        Thread.sleep(1000);
                    }
                } catch (InterruptedException e) {}

                return num;
            })
            .timeout(Duration.ofMillis(1500))
            .retry(1)
            .subscribe(data -> log.info("# onNext: {}", data),
                    (error -> log.error("# onError: ", error)),
                    () -> log.info("# onComplete"));

        Thread.sleep(7000);
    }
}

/**
 * [실행 결과]
 * # onNext: 1
 * # onNext: 2
 * onNextDropped: 3
 * # onNext: 1
 * # onNext: 2
 * # onNext: 3
 * # onComplete
 */

 

 

 

[참고 정보]

반응형