-
에러 처리를 위한 OperatorSpring Reactive Web Application/Project Reactor 2023. 8. 6. 09:00반응형
error
error() Operator는 파라미터로 지정된 에러로 종료하는 Flux를 생성합니다. error() Operator는 Java의 throw 키워드를 사용해서 예외를 의도적으로 던지는 것 같은 역할을 하는데 주로 체크 예외(Checked Exception)를 캐치해서 다시 던져야 하는 경우 사용할 수 있습니다.
Note. 체크 예외(Checked Exception)
체크 예외는 Java에서 Exception 클래스를 상속한 클래스들을 의미하며 try ~ catch 문으로 반드시 처리해야 되는 예외입니다.
다음은 Upstream에서 emit되는 숫자에 2를 곱한 값이 3의 배수가 되는 경우 Downstream으로 emit을 허용하지 않기 위해 error() Operator를 사용한 예제 코드입니다.
import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** * error 처리 예제 * - error Operator * - 명시적으로 error 이벤트를 발생시켜야 하는 경우 */ @Slf4j public class Example14_43 { public static void main(String[] args) { Flux .range(1, 5) .flatMap(num -> { if ((num * 2) % 3 == 0) { return Flux.error( new IllegalArgumentException("Not allowed multiple of 3")); } else { return Mono.just(num * 2); } }) .subscribe(data -> log.info("# onNext: {}", data), error -> log.error("# onError: ", error)); } }
onErrorReturn
onErrorReturn() Operator는 에러 이벤트가 발생했을 때, 에러 이벤트를 Downstream으로 전파하지 않고 대체 값을 emit 합니다. Java에서 예외가 발생했을 때 try ~ catch 문의 catch 블록에서 예외에 해당하는 대체 값을 리턴하는 방식과 유사합니다.
다음은 onErrorReturn() Operator를 이용해 Exception이 발생하면 "No pen name"이라는 문자열 값으로 대체해서 emit 하는 예제 코드입니다.
import chapter14.Book; import chapter14.SampleData; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; /** * error 처리 예제 * - onErrorReturn Operator * - 예외가 발생했을 때, error 이벤트를 발생시키지 않고, default value로 대체해서 emit하고자 할 경우 * - try ~ catch 문의 경우, catch해서 return default value 하는 것과 같다. */ @Slf4j public class Example14_45 { public static void main(String[] args) { getBooks() .map(book -> book.getPenName().toUpperCase()) .onErrorReturn("No pen name") .subscribe(log::info); } public static Flux<Book> getBooks() { return Flux.fromIterable(SampleData.books); } }
다음은 첫 번째 파라미터로 특정 예외 타입을 지정해서 지정된 타입의 예외가 발생할 경우에만 대체 값을 emit 하도록 하는 예제 코드입니다.
import chapter14.Book; import chapter14.SampleData; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import java.util.IllegalFormatException; /** * error 처리 예제 * - onErrorReturn Operator * - 예외가 발생했을 때, error 이벤트를 발생시키지 않고, default value로 대체해서 emit하고자 할 경우 * - try ~ catch 문의 경우, catch해서 return default value 하는 것과 같다. */ @Slf4j public class Example14_46 { public static void main(String[] args) { getBooks() .map(book -> book.getPenName().toUpperCase()) .onErrorReturn(NullPointerException.class, "no pen name") .onErrorReturn(IllegalFormatException.class, "Illegal pen name") .subscribe(log::info); } public static Flux<Book> getBooks() { return Flux.fromIterable(SampleData.books); } }
onErrorResume
onErrorResume() Operator는 에러 이벤트가 발생했을 때, 에러 이벤트를 Downstream으로 전파하지 않고 대체 Publisher를 리턴합니다. Java에서 예외가 발생했을 때 try ~ catch 문의 catch 블록에서 예외가 발생한 메서드를 대체할 수 있는 또 다른 메서드를 호출하는 방식으로 볼 수 있습니다.
다음은 특정 키워드로 캐시에서 검색된 도서가 없으면 데이터베이스에서 검색하도록 시뮬레이션 한 예제 코드입니다.
import chapter14.Book; import chapter14.SampleData; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import java.util.ArrayList; import java.util.List; /** * error 처리 예제 * - onErrorResume Operator * - 예외가 발생했을 때, error 이벤트를 발생시키지 않고, 대체 Publisher로 데이터를 emit하고자 할 경우 * - try ~ catch 문의 경우, catch해서 return default value 하는 것과 같다. */ @Slf4j public class Example14_47 { public static void main(String[] args) { final String keyword = "DDD"; getBooksFromCache(keyword) .onErrorResume(error -> getBooksFromDatabase(keyword)) .subscribe(data -> log.info("# onNext: {}", data.getBookName()), error -> log.error("# onError: ", error)); } public static Flux<Book> getBooksFromCache(final String keyword) { return Flux .fromIterable(SampleData.books) .filter(book -> book.getBookName().contains(keyword)) .switchIfEmpty(Flux.error(new NoSuchBookException("No such Book"))); } public static Flux<Book> getBooksFromDatabase(final String keyword) { List<Book> books = new ArrayList<>(SampleData.books); books.add(new Book("DDD: Domain Driven Design", "Joy", "ddd-man", 35000, 200)); return Flux .fromIterable(books) .filter(book -> book.getBookName().contains(keyword)) .switchIfEmpty(Flux.error(new NoSuchBookException("No such Book"))); } private static class NoSuchBookException extends RuntimeException { NoSuchBookException(String message) { super(message); } } } /** * [실행 결과] * # onNext: DDD: Domain Driven Design */
onErrorContinue
onErrorContinue() Operator는 에러가 발생했을 때, 에러 영역 내에 있는 데이터는 제거하고 Upstream에서 후속 데이터를 emit 합니다. onErrorContinue() Operator의 파라미터인 BiConsumer 함수형 인터페이스를 통해 에러 메시지와 에러가 발생했을 때 emit된 데이터를 전달받아서 로그를 기록하는 등의 후처리를 할 수 있습니다.
import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; /** * error 처리 예제 * - onErrorContinue Operator * - 예외가 발생했을 때, 예외를 발생시킨 데이터를 건너뛰고 Upstream에서 emit된 다음 데이터를 * 처리한다. */ @Slf4j public class Example14_48 { public static void main(String[] args) { Flux .just(1, 2, 4, 0, 6, 12) .map(num -> 12 / num) .onErrorContinue((error, num) -> log.error("error: {}, num: {}", error, num)) .subscribe(data -> log.info("# onNext: {}", data), error -> log.error("# onError: ", error)); } } /** * [실행 결과] * # onNext: 12 * # onNext: 6 * # onNext: 3 * # error: java.lang.ArithmeticException: / by zero, num: 0 * # onNext: 2 * # onNext: 1 */
Note. Reactor 공식 문서에서는 onErrorContinue() Operator가 명확하지 않은 Sequence의 동작으로 개발자가 의도하지 않은 상황을 발생시킬 수 있기 때문에 신중하게 사용하기를 권고합니다. 대부분의 에러는 Operator 내부에서 doOnError() Operator를 통해 로그를 기록하고 onErrorResume() Operator 등으로 처리할 수 있다고 명시합니다.
retry
retry() Operator는 Publisher가 데이터를 emit 하는 과정에서 에러가 발생하면 파라미터로 입력한 횟수만큼 원본 Flux의 Sequence를 다시 구독합니다. 만약 파라미터로 Long.MAX_VALUE를 입력하면 재구독을 무한 반복합니다.
retry() Operator는 특히 timeout() Operator와 함께 사용하여 네트워크 지연으로 인해 정해진 시간 안에 응답을 받지 못하면 일정 횟수만큼 재요청해야 하는 상황에서 유용하게 사용할 수 있습니다.
다음은 1.5초 동안 Upstream으로부터 emit 되는 데이터가 없으면 TimeoutException이 발생하고, 1회 재구독하는 예제 코드입니다. range() Operator에서 emit되는 데이터가 1초에 한 번씩 emit 되는데, map() Operator 내부에서 숫자 3일 경우에만 의도적으로 1초의 지연 시간을 더합니다.
import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import java.time.Duration; /** * error 처리 예제 * - retry Operator * - 에러가 발생했을 때, 지정된 횟수만큼 Sequence를 다시 구독한다. */ @Slf4j public class Example14_49 { public static void main(String[] args) throws InterruptedException { final int[] count = {1}; Flux .range(1, 3) .delayElements(Duration.ofSeconds(1)) .map(num -> { try { if (num == 3 && count[0] == 1) { count[0]++; Thread.sleep(1000); } } catch (InterruptedException e) {} return num; }) .timeout(Duration.ofMillis(1500)) .retry(1) .subscribe(data -> log.info("# onNext: {}", data), (error -> log.error("# onError: ", error)), () -> log.info("# onComplete")); Thread.sleep(7000); } } /** * [실행 결과] * # onNext: 1 * # onNext: 2 * onNextDropped: 3 * # onNext: 1 * # onNext: 2 * # onNext: 3 * # onComplete */
[참고 정보]
반응형'Spring Reactive Web Application > Project Reactor' 카테고리의 다른 글
Flux Sequence 분할을 위한 Operator (0) 2023.08.09 Sequence의 동작 시간 측정을 위한 Operator (0) 2023.08.08 Sequence의 내부 동작 확인을 위한 Operator (0) 2023.08.05 Sequence 변환 Operator (0) 2023.08.05 Sequence 필터링 Operator (0) 2023.08.04