ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Context
    Spring Reactive Web Application/Project Reactor 2023. 7. 26. 05:00
    반응형

    Context

      Context는 일반적으로 어떠한 상황에서 그 상황을 처리하기 위해 필요한 정보를 의미합니다. Reactor API 문서에서는 Context를 다음과 같이 정의합니다.

    A key/value store that is propagated between components such as operators via the context protocol.

      즉, Reactor의 Context는 Operator 같은 Reactor 구성요소 간에 전파되는 key/value 형태의 저장소라고 정의합니다. 여기서의 '전파'는 Downstream에서 Upstream으로 Context가 전파되어 Operator 체인상의 각 Operator가 해당 Context의 정보를 동일하게 이용할 수 있음을 의미합니다.

     

      Reactor의 Contextsms ThreadLocal과 다소 유사한 면이 있지만, 각각의 실행 스레드와 매핑되는 ThreadLocal과 달리 실행 스레드와 매핑되는 것이 아니라 Subscriber와 매핑됩니다. 즉, 구독이 발생할 때마다 해당 구독과 연결된 하나의 Context가 생긴다고 보면 됩니다.

     

    Context에 데이터 쓰기

      contextWrite() Operator를 통해 Context에 데이터를 씁니다. contextWrite() Operator의 파라미터는 Function 타입의 함수형 인터페이스인데, 람다 표현식으로 표현할 경우 람다 파라미터의 타입이 Context이고, 리턴 값 역시 Context인 것을 알 수 있습니다. 실제로 데이터를 쓰는 동작은 Context API 중 하나인 put()을 통해 쓸 수 있습니다.

     

    Context에 쓰인 데이터 읽기

      Context에서 값을 읽어오기 위해서는 읽기 전용 뷰인 ContextView를 사용합니다. 다음 코드의 deferContextual()의 파라미터로 정의된 람다 표현식의 람다 파라미터(ctx)는 Context 타입의 객체가 아니라 ContextView 타입의 객체입니다.

    • deferContextual() Operator: 원본 데이터 소스레벨에서 읽는 방식입니다.
    • transformDeferredContextual() Operator: Operator 체인의 중간에서 읽는 방식입니다.

     

      다음의 코드는 subscribeOn()과 publishOn()을 사용해서 데이터를 emit하는 스레드와 데이터를 처리하는 스레드를 분리했습니다. 이처럼 Reactor에서는 Operator 체인상의 서로 다른 스레드들이 Context의 저장된 데이터에 손쉽게 접근할 수 있습니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Mono;
    import reactor.core.scheduler.Schedulers;
    
    /**
     * Context 기본 예제
     *  - contextWrite() Operator로 Context에 데이터 쓰기 작업을 할 수 있다.
     *  - Context.put()으로 Context에 데이터를 쓸 수 있다.
     *  - deferContextual() Operator로 Context에 데이터 읽기 작업을 할 수 있다.
     *  - Context.get()으로 Context에서 데이터를 읽을 수 있다.
     *  - transformDeferredContextual() Operator로 Operator 중간에서 Context에 데이터 읽기 작업을 할 수 있다.
     */
    @Slf4j
    public class Example11_1 {
        public static void main(String[] args) throws InterruptedException {
            Mono
                .deferContextual(ctx ->
                    Mono
                        .just("Hello" + " " + ctx.get("firstName"))
                        .doOnNext(data -> log.info("# just doOnNext : {}", data))
                )
                .subscribeOn(Schedulers.boundedElastic())
                .publishOn(Schedulers.parallel())
                .transformDeferredContextual(
                        (mono, ctx) -> mono.map(data -> data + " " + ctx.get("lastName"))
                )
                .contextWrite(context -> context.put("lastName", "Jobs"))
                .contextWrite(context -> context.put("firstName", "Steve"))
                .subscribe(data -> log.info("# onNext: {}", data));
    
            Thread.sleep(100L);
        }
    }

     

    Context API

    Context API 설명
    put(key, value) key/value 형태로 Context에 값을 씁니다.
    of(key1, value1, key2, value2, ...) key/value 형태로 Context에 여러 개의 값을 씁니다. (최대 5개)
    putAll(ContextView) 현재 Context와 파라미터로 입력된 ContextView를 merge합니다.
    delete(key) Context에서 key에 해당하는 value를 삭제합니다.

     

      다음 코드는 총 세 개의 데이터를 Context에 쓰는 예제입니다. 먼저 put()을 이용해 데이터 한 개를 쓰고, putAll()을 이용해 나머지 두 개의 데이터를 씁니다. 두 개의 데이터는 Context.of()를 사용해서 putAll()의 파라미터로 전달합니다(readOnly() API를 통해 Context 객체를 ContextView 객체로 변환 후 전달합니다).

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Mono;
    import reactor.core.scheduler.Schedulers;
    import reactor.util.context.Context;
    
    /**
     * Context API 사용 예제
     */
    @Slf4j
    public class Example11_3 {
        public static void main(String[] args) throws InterruptedException {
            final String key1 = "company";
            final String key2 = "firstName";
            final String key3 = "lastName";
    
            Mono
                .deferContextual(ctx ->
                        Mono.just(ctx.get(key1) + ", " + ctx.get(key2) + " " + ctx.get(key3))
                )
                .publishOn(Schedulers.parallel())
                .contextWrite(context ->
                        context.putAll(Context.of(key2, "Steve", key3, "Jobs").readOnly())
                )
                .contextWrite(context -> context.put(key1, "Apple"))
                .subscribe(data -> log.info("# onNext: {}" , data));
    
            Thread.sleep(100L);
        }
    }

     

    ContextView API

    ContextView API 설명
    get(key) ContextView에서 key에 해당하는 value를 반환합니다.
    getOrEmpty(key) ContextView에서 key에 해당하는 value를 Optional로 래핑해서 반환합니다.
    getOrDefault(key, default value) ContextView에서 key에 해당하는 value를 가져옵니다. key에 해당하는 value가 없으면 default value를 가져옵니다.
    hasKey(key) ContextView에서 특정 key가 존재하는지를 확인합니다.
    isEmpty() Context가 비어 있는지 확인합니다.
    size() Context 내에 있는 key/value의 개수를 반환합니다.
    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Mono;
    import reactor.core.scheduler.Schedulers;
    
    /**
     * ContextView API 사용 예제
     */
    @Slf4j
    public class Example11_4 {
        public static void main(String[] args) throws InterruptedException {
            final String key1 = "company";
            final String key2 = "firstName";
            final String key3 = "lastName";
    
            Mono
                .deferContextual(ctx ->
                        Mono.just(ctx.get(key1) + ", " +
                                ctx.getOrEmpty(key2).orElse("no firstName") + " " +
                                ctx.getOrDefault(key3, "no lastName"))
                )
                .publishOn(Schedulers.parallel())
                .contextWrite(context -> context.put(key1, "Apple"))
                .subscribe(data -> log.info("# onNext: {}" , data));
    
            Thread.sleep(100L);
        }
    }

     

    Context 특징

    • Context는 구독이 발생할 때마다 하나의 Context가 해당 구독에 연결됩니다.
    • Context는 Operator 체인의 아래에서 위로 전파됩니다.
    • 동일한 키에 대한 값을 중복해서 저장하면 Operator 체인에서 가장 위쪽에 위치한 contextWrite()이 저장한 값으로 덮어씁니다.
    • Inner Sequence 내부에서는 외부 Context에 저장된 데이터를 읽을 수 있습니다. 반면, Inner Sequence 외부에서는 Inner Sequence 내부 Context에 저장된 데이터를 읽을 수 없습니다.
    • Context는 인증 정보 같은 직교성(독립성)을 가지는 정보를 전송하는데 적합합니다.

     

      다음 코드는 얼핏 보면 두 개의 데이터가 하나의 Context에 저장될 것 같지만, Context는 구독별로 연결되는 특징이 있기 때문에 구독이 발생할 때마다 해당하는 하나의 Context가 하나의 구독에 연결됩니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Mono;
    import reactor.core.scheduler.Schedulers;
    
    /**
     * Context의 특징 예제
     *  - Context는 각각의 구독을 통해 Reactor Sequence에 연결 되며 체인의 각 Operator는 연결된 Context에 접근할 수 있어야 한다.
     */
    @Slf4j
    public class Example11_5 {
        public static void main(String[] args) throws InterruptedException {
            final String key1 = "company";
    
            Mono<String> mono = Mono.deferContextual(ctx ->
                            Mono.just("Company: " + " " + ctx.get(key1))
                    )
                    .publishOn(Schedulers.parallel());
    
    
            mono.contextWrite(context -> context.put(key1, "Apple"))
                    .subscribe(data -> log.info("# subscribe1 onNext: {}", data)); // Apple
    
            mono.contextWrite(context -> context.put(key1, "Microsoft"))
                    .subscribe(data -> log.info("# subscribe2 onNext: {}", data)); // Microsoft
    
            Thread.sleep(100L);
        }
    }

     

      다음 코드의 실행 결과는 Apple, Steve입니다. contextWrite를 통해 Context에 key2/"Bill"을 저장했지만 "Steve"가 출력되는 이유는 Context의 경우, Operator 체인상의 아래에서 위로 전파되는 특징이 있기 때문입니다. 따라서 일반적으로 모든 Operator에서 Context에 저장된 데이터를 읽을 수 있도록 contextWrite()을 Operator 체인의 맨 마지막에 둡니다.

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Mono;
    import reactor.core.scheduler.Schedulers;
    
    /**
     * Context의 특징 예제
     *  - Context는 Operator 체인의 아래에서부터 위로 전파된다.
     *      - 따라서 Operator 체인 상에서 Context read 메서드가 Context write 메서드 밑에 있을 경우에는 write된 값을 read할 수 없다.
     */
    @Slf4j
    public class Example11_6 {
        public static void main(String[] args) throws InterruptedException {
            String key1 = "company";
            String key2 = "name";
    
            Mono
                .deferContextual(ctx ->
                    Mono.just(ctx.get(key1))
                )
                .publishOn(Schedulers.parallel())
                .contextWrite(context -> context.put(key2, "Bill"))
                .transformDeferredContextual((mono, ctx) ->
                        mono.map(data -> data + ", " + ctx.getOrDefault(key2, "Steve"))
                )
                .contextWrite(context -> context.put(key1, "Apple"))
                .subscribe(data -> log.info("# onNext: {}", data));
    
            Thread.sleep(100L);
        }
    }

     

      다음 코드는 Context에 데이터를 두 번 쓰고 있습니다. 한 번은 Operator 체인의 제일 마지막에 쓰고, 또 한 번은 flatMap() Operator 내부에 존재하는 Operator 체인에서 값을 쓰고 있습니다. flatMap() Operator 내부에 있는 Sequence를 Inner Sequence라고 하는데, Inner Sequence에서는 바깥쪽 Sequence에 연결된 Context의 값을 읽을 수 있습니다. 반면, Inner Sequence 외부에서는 Inner Sequence 내부 Context에 저장된 데이터를 읽을 수 없습니다(주석 처리된 코드).

    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Mono;
    import reactor.core.scheduler.Schedulers;
    
    /**
     * Context의 특징
     *  - inner Sequence 내부에서는 외부 Context에 저장된 데이터를 읽을 수 있다.
     *  - inner Sequence 외부에서는 inner Sequence 내부 Context에 저장된 데이터를  읽을 수 없다.
     */
    @Slf4j
    public class Example11_7 {
        public static void main(String[] args) throws InterruptedException {
            String key1 = "company";
            Mono
                .just("Steve")
    //            .transformDeferredContextual((stringMono, ctx) ->
    //                    ctx.get("role"))
                .flatMap(name ->
                    Mono.deferContextual(ctx ->
                        Mono
                            .just(ctx.get(key1) + ", " + name)
                            .transformDeferredContextual((mono, innerCtx) ->
                                    mono.map(data -> data + ", " + innerCtx.get("role"))
                            )
                            .contextWrite(context -> context.put("role", "CEO"))
                    )
                )
                .publishOn(Schedulers.parallel())
                .contextWrite(context -> context.put(key1, "Apple"))
                .subscribe(data -> log.info("# onNext: {}", data));
    
            Thread.sleep(100L);
        }
    }

     

     

     

    [참고 정보]

    반응형

    'Spring Reactive Web Application > Project Reactor' 카테고리의 다른 글

    Testing  (0) 2023.07.29
    Debugging  (0) 2023.07.28
    Scheduler  (0) 2023.07.25
    Sinks  (0) 2023.07.24
    Backpressure  (0) 2023.07.23

    댓글

Designed by Tistory.