Reactor에서의 Context는 Operator 같은 Reactor 구성요소 간에 전파되는 key/value 형태의 저장소라고 정의한다.
여기서의 전파란?
Downstream에서 Upstream으로 Context가 전파되어 Operator 체인상의 각 Operator가 해당 Context의 정보를 동일하게 이용할 수 있음을 의미한다.
- Reactor의 Context는 ThreadLocal과 다소 유사하지만 각각의 실행 스레드와 매핑되는 ThreadLocal과 달리 실행 스레드와 매핑되는 것이 아니라 Subscriber와 매핑된다.
- 구독이 발생할때마다 해당 구독과 연결된 하나의 Context가 생긴다.
Context 기본 예제
@Slf4j
class ContextBasicExam {
public static void main(String[] args) throws InterruptedException {
Mono
.deferContextual(ctx ->
Mono
.just("Hello" + " " + ctx.get("firstName"))
.doOnNext(data -> log.info("# just doOnNext : {}", data))
)
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.parallel())
.transformDeferredContextual(
(mono, ctx) -> mono.map(data -> data + " " + ctx.get("lastName"))
)
.contextWrite(context -> context.put("lastName", "Jobs"))
.contextWrite(context -> context.put("firstName", "Steve"))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(100L);
}
}
Context에 데이터 쓰기
contextWrite() Operator에 Function 함수형 인터페이스를 전달해서 컨텍스트에 데이터 쓸 수 있다.
public final Mono<T> contextWrite(Function<Context, Context> contextModifier) {
return ContextPropagationSupport.shouldPropagateContextToThreadLocals()
? onAssembly(new MonoContextWriteRestoringThreadLocals(this, contextModifier))
: onAssembly(new MonoContextWrite(this, contextModifier));
}
실제로 데이터를 쓰는 동작은 Context API 중 하나인 put()을 통해서 쓸 수 있다.
.contextWrite(context -> context.put("lastName", "Jobs"))
.contextWrite(context -> context.put("firstName", "Steve"))
- context.put()을 통해 Context에 데이터를 쓴 후에 매번 불편 객체를 다음 contextWrite() Operator로 전달함으로써 스레드 안정성을 보장한다.
Context에 데이터 읽기
Context에 쓰인 데이터를 읽는 방식은 크게 두 가지가 있다.
1. deferContextual() Operator를 통해서 원본 데이터 소스 레벨에서 읽는 방식
.deferContextual(ctx ->
Mono
.just("Hello" + " " + ctx.get("firstName"))
.doOnNext(data -> log.info("# just doOnNext : {}", data))
)
- deferContextual()의 람다 표현식의 람다 파라미터 ctx는 Context 타입이 아니라 ContextView 타입 객체다. (ContextView API 중에서 get() 사용)
- 데이터를 쓸 때는 Context를 사용하지만 읽을 때는 ContextView를 사용한다.
2. transformDeferredContextual() Operator를 통해서 Operator 체인의 중간에서 읽는 방식
.transformDeferredContextual(
(mono, ctx) -> mono.map(data -> data + " " + ctx.get("lastName"))
)
- 여기서도 ContextView 타입 객체의 get()을 통해서 읽는것을 확인할 수 있다.
실행 결과
21:20:39.314 [boundedElastic-1] -- # just doOnNext : Hello Steve
21:20:39.332 [parallel-1] INFO -- # onNext: Hello Steve Jobs
- subscribeOn()과 publishOn()을 사용해서 데이터를 emit하는 스레드와 데이터를 처리하는 스레드를 분리했으므로 Context에서 데이터를 읽는 작업을 각각 다른 스레드에서 수행한다.
- 이처럼 Reactor에서는 Operator 체인상의 서로 다른 스레드들이 Context의 저장된 데이터에 손쉽게 접근할 수 있다.
자주 사용되는 Context 관련 API
Context API | 요약 | 설명 |
put(key, value) | key/value 형태로 Context에 값을 쓴다. | Context에 하나의 데이터를 쓰는 API |
of(key1, value2, key2, value2, ...) | key/value 형태로 Context에 여러 개의 값을 쓴다. | 한번의 API 호출로 여러개의 데이터를 Context에 쓸수 있는데, 최대 5개의 데이터를 파라미터로 입력할 수 있다. 6개 이상의 데이터를 쓰기 위해서는 아래의 putAll()을 사용해야 한다. |
putAll(ContextView) | 현재 Context와 파라미터로 입력된 ContextView를 merge 한다. | 현재 Context의 데이터와 파라미터로 입력된 ContextView의 데이터를 합친 후, 새로운 Context를 생성한다. |
delete(key) | Context에서 key에 해당하는 value를 삭제한다. | key에 해당되는 데이터를 삭제한다. |
예제 코드
@Slf4j
class ContextApiExam {
public static void main(String[] args) throws InterruptedException {
final String key1 = "company";
final String key2 = "firstName";
final String key3 = "lastName";
Mono
.deferContextual(ctx ->
// get()에 해당하는 key에 value가 없는 경우 NoSuchElementException 예외 발생
Mono.just(ctx.getOrDefault(key1, "company") + ", " + ctx.getOrDefault(key2, "firstName") + " " + ctx.get(key3))
.doOnNext(data -> log.info("test")) // subscribeOn()이 없으므로 main thread
)
.publishOn(Schedulers.parallel())
.contextWrite(context -> context.delete(key1)) // contextWrite는 체이닝 방식으로 구성되며, 마지막에 선언된 것이 먼저 적용된다.
.contextWrite(context -> context.delete(key2))
.contextWrite(context ->
context.putAll(Context.of(key2, "Steve", key3, "Jobs").readOnly())
)
.contextWrite(context -> context.put(key1, "Apple"))
// .contextWrite(context -> context.delete(key1)) 여기에 위치시키면 delete 되고 다시 put 되기 때문에 delete 되지 않는다.
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(100L);
}
}
put()을 통해서 1개의 데이터를 쓴다.
.contextWrite(context -> context.put(key1, "Apple"))
두 개의 데이터를 Context.of()를 사용해서 putAll() 파라미터로 전달한다.
.contextWrite(context ->
context.putAll(Context.of(key2, "Steve", key3, "Jobs").readOnly())
)
- context.putAll()은 파라미터로 ContextView를 받는다.
- Context를 받는건 Deprecate 되었다. 불변 객체와 관련된 이슈가 있었던게 아닌가 싶다.
- context.readOnly()를 통해서 Context를 읽기 작업만 가능한 ContextView로 변환할 수 있다.
getOrDefault()
.deferContextual(ctx ->
Mono.just(ctx.getOrDefault(key1, "company") + ", " + ctx.getOrDefault(key2, "firstName") + " " + ctx.get(key3))
.doOnNext(data -> log.info("test")) // subscribeOn()이 없으므로 main thread
)
- get() 메서드 호출할 때 key에 해당하는 value가 없는 경우 NoSuchElementException 예외가 발생한다.
- Map 자료구조 처럼 getOrDefault()를 통해서 기본값 설정이 가능하다.
context.delete()
.contextWrite(context -> context.delete(key2))
.contextWrite(context ->
context.putAll(Context.of(key2, "Steve", key3, "Jobs").readOnly())
)
//.contextWrite(context -> context.delete(key2))
- contextWrite는 체이닝 방식으로 구성되어, 마지막에 선언된 것이 먼저 적용되는 것 같다.
- 주석 처리된 곳에 있다면 delete 처리를 하고 putAll()을 수행하므로 삭제되지 않는것 같다.
- readOnly()를 사용하면 delete 할 때 예외가 발생할 것이라고 생각하고 실행했는데 삭제가 되는 것을 아래 실행 결과를 통해서 볼 수 있었다.
실행 결과
22:12:11.170 [main] -- test
22:12:11.178 [parallel-1] -- # onNext: company, firstName Jobs
- subscribeOn()이 없으므로 main thread에서 doOnNext()가 실행되는걸 볼 수 있다.
자주 사용되는 ContextView API
- Context에 저장된 데이터를 읽기 위해서는 ContextView API를 사용해야 한다.
ContextView API | 설명 |
get(key) | ContextView에서 key에 해당하는 value를 반환한다. |
getOrEmpty(key) | ContextView에서 key에 해당하는 value를 Optional로 래핑해서 반환한다. |
getOrDefault(key, default value) | ContextView에서 key에 해당하는 value를 가져온다.key에 해당하는 value가 없으면 default value를 가져온다. |
hasKey(key) | ContextView에서 특정 key가 존재하는지를 확인한다. |
isEmpty() | Context가 비어있는지 확인한다. |
size() | Context 내에 있는 key/value의 개수를 반환한다. |
예제 코드
@Slf4j
class ContextViewApiExam {
public static void main(String[] args) throws InterruptedException {
final String key1 = "company";
final String key2 = "firstName";
final String key3 = "lastName";
Mono
.deferContextual(ctx ->
Mono.just(ctx.get(key1) + ", " +
ctx.getOrEmpty(key2).orElse("no firstName") + " " +
ctx.getOrDefault(key3, "no lastName"))
)
.transformDeferredContextual(
(stringMono, contextView) -> {
log.info("size={}",contextView.size());
log.info("haskey(key1)={}",contextView.hasKey(key1));
log.info("haskey(key2)={}",contextView.hasKey(key2));
log.info("isEmpty={}",contextView.isEmpty());
return stringMono;
}
)
.publishOn(Schedulers.parallel())
.contextWrite(context -> context.put(key1, "Apple"))
.subscribe(data -> log.info("# onNext: {}" , data));
Thread.sleep(100L);
}
}
데이터 읽는 방법
.deferContextual(ctx ->
Mono.just(ctx.get(key1) + ", " +
ctx.getOrEmpty(key2).orElse("no firstName") + " " +
ctx.getOrDefault(key3, "no lastName"))
)
- 위에 예제는 key1 만 존재한다 Context API 예제에서도 말했지만 get(key)에 해당하는 value가 없는 경우 NoSuchElementException이 발생한다.
- getOrEmpty()는 Optional을 반환하므로 orElse()를 사용할 수 있다.
그외 ContextView API
.transformDeferredContextual(
(stringMono, contextView) -> {
log.info("size={}",contextView.size());
log.info("haskey(key1)={}",contextView.hasKey(key1));
log.info("haskey(key2)={}",contextView.hasKey(key2));
log.info("isEmpty={}",contextView.isEmpty());
return stringMono;
}
)
- Map 자료구조와 메서드가 비슷하며 실행 결과를 보면 예상 가능하다.
실행 결과
22:42:59.049 [main] -- size=1
22:42:59.054 [main] -- haskey(key1)=true
22:42:59.055 [main] -- haskey(key2)=false
22:42:59.055 [main] -- isEmpty=false
22:42:59.067 [parallel-1] -- # onNext: Apple, no firstName no lastName
Context의 특징
Context는 구독이 발생할 때마다 하나의 Context가 해당 구독에 연결된다.
@Slf4j
class ContextCharacteristicExam {
public static void main(String[] args) throws InterruptedException {
final String key1 = "company";
Mono<String> mono = Mono.deferContextual(ctx ->
Mono.just("Company: " + " " + ctx.get(key1))
)
.publishOn(Schedulers.parallel());
mono.contextWrite(context -> context.put(key1, "Apple"))
.subscribe(data -> log.info("# subscribe1 onNext: {}", data));
mono.contextWrite(context -> context.put(key1, "Microsoft"))
.subscribe(data -> log.info("# subscribe2 onNext: {}", data));
Thread.sleep(100L);
}
}
실행결과
23:38:07.112 [parallel-1] -- # subscribe1 onNext: Company: Apple
23:38:07.112 [parallel-2] -- # subscribe2 onNext: Company: Microsoft
- 얼핏 보면 두 개의 데이터가 하나의 Context에 저장될 것 같지만, 구독이 발생할 때마다 해당하는 하나의 Context가 하나의 구독에 연결된다.
Context는 Operator 체인이 아래에서 위로 전파된다.
동일한 키에 대한 값을 중복해서 저장하면 Operator 체인에서 가장 위쪽에 위치한 contextWrite()이 저장한 값으로 덮어쓴다. (저번 context.delete() 호출이 안 먹혔던 이유)
- 일반적으로 모든 Operator에서 Context에 저장된 데이터를 읽을 수 있도록 contextWrite()을 Operator 체인의 맨 마지막에 둔다.
@Slf4j
class ContextCharacteristicExam2 {
public static void main(String[] args) throws InterruptedException {
String key1 = "company";
String key2 = "name";
Mono
.deferContextual(ctx ->
Mono.just(ctx.get(key1))
)
.publishOn(Schedulers.parallel())
.contextWrite(context -> context.put(key2, "Bill"))
.transformDeferredContextual((mono, ctx) ->
mono.map(data -> data + ", " + ctx.getOrDefault(key2, "Steve"))
)
.contextWrite(context -> context.put(key1, "Apple"))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(100L);
}
}
실행 결과
23:53:41.616 [parallel-1] -- # onNext: Apple, Steve
- Context의 경우 Operator 체인이 아래에서 위로 전파되기 때문에 key2에 해당하는 값이 없어 default인 Steve가 Subscribe에 전달된다.
Inner Sequence 내부에서는 외부 Context에 저장된 데이터를 읽을 수 있다.
- Inner Sequence 외부에서는 Inner Sequence 내부 Context에 저장된 데이터를 읽을 수 없다.
@Slf4j
class ReadableInnerContext {
public static void main(String[] args) throws InterruptedException {
String key1 = "company";
Mono
.just("Steve")
.transformDeferredContextual((stringMono, ctx) ->
stringMono.mapNotNull(str -> ctx.getOrDefault("role", "test"))
.doOnNext(str -> log.info("str={}", str))
)
.flatMap(name ->
Mono.deferContextual(ctx ->
Mono
.just(ctx.get(key1) + ", " + name)
.transformDeferredContextual((mono, innerCtx) ->
mono.map(data -> data + ", " + innerCtx.get("role"))
)
.contextWrite(context -> context.put("role", "CEO"))
)
)
.publishOn(Schedulers.parallel())
.contextWrite(context -> context.put(key1, "Apple"))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(100L);
}
}
- Operator 체인의 제일 마지막과 flatMap() Operator 내부에 존재하는 Operator 체인에서 값을 쓰고 있다.
- flatMap() Operator 내부에 있는 Sequence를 Inner Sequence라고 한다.
- Inner Sequence에서는 Inner Sequence 바깥쪽 Sequence에 연결된 Context 값을 읽을 수 있다.
실행 결과
00:11:48.008 [main] -- str=test
00:11:48.022 [parallel-1] -- # onNext: Apple, test, CEO
- 바깥쪽 Sequence에 연결된 Context에 쓴 값인 Apple을 Inner Sequence에서 읽을 수 있다.
.transformDeferredContextual((stringMono, ctx) ->
stringMono.mapNotNull(str -> ctx.getOrDefault("role", "test"))
.doOnNext(str -> log.info("str={}", str))
)
- doOnNext를 통해서 확인해보니 기본값인 test가 출력되는 것을 볼 수 있다.
- 즉, Inner Sequence 외부에서는 Inner Sequence 내부 Context에 저장된 데이터를 읽을 수 없다.
Context 활용 예제
Context는 인증 정보 같은 직교성(독립성)을 가지는 정보를 전송하는 데 적합하다.
- 인증된 도서 관리자가 신규 도서를 등록하기 위해 도서 정보와 인증 토큰을 서버로 전송한다고 가정하자.
@Slf4j
public class ContextExam {
public static final String HEADER_AUTH_TOKEN = "authToken";
public static void main(String[] args) {
Mono<String> mono =
postBook(Mono.just(
new Book("abcd-1111-3533-2809"
, "Reactor's Bible"
, "Kevin"))
)
.contextWrite(Context.of(HEADER_AUTH_TOKEN, "eyJhbGciOi"));
mono.subscribe(data -> log.info("# onNext: {}", data));
}
private static Mono<String> postBook(Mono<Book> book) {
return Mono
.zip(book,
Mono
.deferContextual(ctx ->
Mono.just(ctx.get(HEADER_AUTH_TOKEN)))
)
.flatMap(tuple -> {
String response = "POST the book(" + tuple.getT1().getBookName() +
"," + tuple.getT1().getAuthor() + ") with token: " +
tuple.getT2();
return Mono.just(response); // HTTP POST 전송을 했다고 가정
});
}
}
@AllArgsConstructor
@Data
class Book {
private String isbn;
private String bookName;
private String author;
}
zip() Operator
.zip(book,
Mono
.deferContextual(ctx ->
Mono.just(ctx.get(HEADER_AUTH_TOKEN)))
)
- Mono<Book> 객체와 인증 토큰 정보를 의미하는 Mono<String> 객체를 하나의 Mono로 합친다.
- 이때 합쳐진 Mono는 Mono<Tuple2>의 객체가 된다.
핵심 로직
public static void main(String[] args) {
Mono<String> mono =
postBook(Mono.just(book)
.contextWrite(Context.of(HEADER_AUTH_TOKEN, "eyJhbGciOi"));
mono.subscribe(data -> log.info("# onNext: {}", data));
}
private static Mono<String> postBook(Mono<Book> book) {
return Mono
.zip(book,
Mono
.deferContextual(ctx -> Mono.just(ctx.get(HEADER_AUTH_TOKEN)))
)
...
}
- Mono가 어떤 과정을 거치든 상관없이 가장 마지막에 리턴된 Mono를 구독하기 직전에 contextWrite()으로 데이터를 저장한다.
- 따라서 Operator 체인의 위쪽으로 전파되고, Operator 체인 어느 위치에서든 Context에 접근할 수 있다.
- Inner Sequence 내부에서는 외부 Context에 저장된 데이터를 읽을 수 있다.
실행 결과
00:24:45.288 [main] -- # onNext: POST the book(Reactor's Bible,Kevin) with token: eyJhbGciOi
'Reactive Programming' 카테고리의 다른 글
Testing (1) | 2025.01.02 |
---|---|
Debugging (0) | 2025.01.02 |
Scheduler (0) | 2024.12.16 |
Sinks (1) | 2024.12.04 |
Backpressure (0) | 2024.11.19 |