컴퓨터 시스템에서 Hot과 Cold의 의미
Hot은 무언가 처음부터 다시 시작하지 않고, 같은 작업이 반복되지 않는 작업을 이야기합니다.
- 서비스나 시스템을 재가동할 필요가 없고, 인터넷에 다시 연결할 필요 없이 바로 사용 가능을 의미 (Hot Swap, Hot Deploy)
Cold는 처음부터 새로 시작해야 하고, 새로 시작하기 때문에 같은 작업이 반복되게 됩니다.
- 서버나 시스템을 부팅할 때마다 초기화 작업을 매번 하거나 인터넷에 다시 연결해야 하는 상황등
즉 Cold는 무언가를 새로 시작하고, Hot은 무언가를 새로 시작하지 않는다라고 말할 수 있습니다.
Sequence
Publisher가 emit하는 데이터의 연속적인 흐름을 정의해 놓은 것으로 표현하면 Operator 체인 형태로 정의된다.
Cold Sequence
Subscriber가 구독할 때마다 데이터 흐름이 처음부터 다시 시작되는 Sequence를 의미한다.
Subscriber A가 구독을 하면 Publisher는 4개(1, 3, 5, 7) 데이터를 emit하고 그리고 아래에 있는 Subscriber B가 구독을 해도 Publisher는 4개(1, 3, 5, 7) 데이터를 emit 합니다.
- 마블 다이어그램을 보면 모두 동일한 데이터를 전달 받지만 Subscriber A의 구독 시점이 B의 구독시점보다 빠른것을 볼 수 있습니다.
즉, Cold Sequence란, Subscriber의 구독 시점이 달라도 구독을 할 때마다 Publisher가 데이터를 emit하는 과정을 처음부터 다시 시작하는 데이터의 흐름을 말합니다.
- Cold Sequence 흐름으로 동작하는 Publisher를 Cold Publisher라고 한다.
예제 코드
public static void main(String[] args) throws InterruptedException {
Flux<String> coldFlux = Flux.fromIterable(List.of("Korea", "Japan", "Chinese"))
.map(String::toUpperCase);
coldFlux.subscribe(country -> System.out.println("1 country" + country));
Thread.sleep(2000L);
coldFlux.subscribe(country -> System.out.println("2 country" + country));
}
실행 결과
첫 번째 구독이 발생하고 2초 뒤에 두 번째 구독이 발생하는 것을 볼 수 있습니다.
- 구독이 발생할 때마다 emit된 데이터를 처음부터 다시 전달받고 있음을 확인할 수 있다.
Hot Sequence
Hot Sequence의 경우 구독이 발생한 시점 이전에 Publisher로 부터 emit된 데이터는 Subscriber가 전달받지 못하고 구독이 발생한 시점 이후에 emit된 데이터만 전달받을 수 있습니다.
- Cold Sequence는 구독 시점과 상관없이 데이터를 처음부터 다시 전달받을 수 있었다.
이미지를 보면 구독이 3번 발생했지만 타임라인은 하나 밖에 생기지 않습니다.
- 즉, Hot Sequence의 경우 구독이 아무리 많이 발생해도 Publisher가 데이터를 처음부터 emit하지 않는다.
첫 번째 Subscriber는 1, 3, 5, 7
두 번째 Subscriber는 5, 7
세 번째 Subscriber는 7
Publisher가 데이터를 emit하는 과정이 한 번만 일어나고 Subscriber는 각각의 구독 시점 이후에 emit된 데이터만 전달받는 데이터의 흐름을 Hot Sequence라고 합니다.
예제 코드
@Slf4j
class HotSequence {
public static void main(String[] args) throws InterruptedException {
String[] singers = {"A", "B", "C", "D", "E"};
System.out.println("begin");
Flux<String> concertFlux = Flux.fromArray(singers)
.delayElements(Duration.ofMillis(1000))
.share();
concertFlux.subscribe(
singer -> log.info("1 {}", singer)
);
Thread.sleep(2500);
concertFlux.subscribe(
singer -> log.info("2 {}", singer)
);
Thread.sleep(3000);
}
}
delayElements()는 데이터 소스로 입력된 각 데이터의 emit을 일정시간 동안 지연시키는 Operator입니다.
- 예제에서 Duration.ofSeconds(1)를 통해서 데이터의 emit이 1초씩 지연될 수 있도록한다.
share()는 Cold Sequence를 Hot Sequence로 동작하게 해주는 Operator이다.
- 원본 Flux를 멀티캐스트(공유)하는 새로운 Flux를 리턴한다는 뜻으로 이해하면 된다.
- 원본 Flux: Operator를 통해 가공되지 않은 원본 데이터 소스를 처음으로 emit하는 Flux
정리하면 원본 Flux를 공유해서 다 같이 사용하기 때문에 어떤 subscriber가 이 원본 Flux를 먼저 구독해 버리면 데이터 emit을 시작하게 되고, 이후에 다른 subscriber가 구독하는 시점에는 원본 Flux에서 이미 emit된 데이터를 전달받을 수 없게 됩니다.
- 두번째 구독의 경우, 원본 Flux가 emit한 데이터 중에서 A, B는 전달받지 못했다.
- 2.5초의 지연 시간 동안 원본 Flux가 이미 Singer A, Singer B 데이터를 emit해서 Hot Sequence처럼 동작했기 때문이다.
- delayElements() Operator의 디폴트 스레드 스케줄러가 parallel이기 때문에 위 예제코드는 main 스레드, parallel-1 부터 parallel-5 스레드가 실행되는 것을 볼 수 있습니다.
HTTP 요청과 응답에서의 Cold Sequence
@Slf4j
class HttpColdSequenceExample {
public static void main(String[] args) throws InterruptedException {
URI worldTimeUri = UriComponentsBuilder.newInstance().scheme("http")
.host("worldtimeapi.org")
.port(80)
.path("/api/timezone/Asia/Seoul")
.build()
.encode()
.toUri();
Mono<String> mono = getWorldTime(worldTimeUri); // 리턴 값으로 Mono를 전달
mono.subscribe(dateTime -> log.info("# dateTime 1: {}", dateTime));
Thread.sleep(2000);
mono.subscribe(dateTime -> log.info("# dateTime 2: {}", dateTime));
Thread.sleep(2000);
}
private static Mono<String> getWorldTime(URI worldTimeUri) {
return WebClient.create() //non-blocking 통신을 위해서 사용
.get()
.uri(worldTimeUri)
.retrieve()
.bodyToMono(String.class)
.map(response -> {
DocumentContext jsonContext = JsonPath.parse(response);
String dateTime = jsonContext.read("$.datetime");
return dateTime;
});
}
}
2초의 지연을 두고 2번의 구독을 하는 경우
mono.subscribe(dateTime -> log.info("# dateTime 1: {}", dateTime));
Thread.sleep(2000);
mono.subscribe(dateTime -> log.info("# dateTime 2: {}", dateTime));
Thread.sleep(2000);
- 구동이 발생할 때마다 데이터를 emit 과정이 처음부터 새로 시작되는 Cold Sequence의 특징으로 인해 두 번의 구동이 발생했으므로 두 번의 새로운 HTTP 요청이 발생한다.
- 응답에서 두 결과의 시간이 2초 정도 차이가 났다.
HTTP 요청과 응답에서의 Hot Sequence
@Slf4j
class HttpHotSequenceExample {
public static void main(String[] args) throws InterruptedException {
URI worldTimeUri = UriComponentsBuilder.newInstance().scheme("http")
.host("worldtimeapi.org")
.port(80)
.path("/api/timezone/Asia/Seoul")
.build()
.encode()
.toUri();
Mono<String> mono = getWorldTime(worldTimeUri)
.cache(); // Cold Sequence가 Hot Sequence로 동작
mono.subscribe(dateTime -> log.info("# dateTime 1: {}", dateTime));
Thread.sleep(2000);
mono.subscribe(dateTime -> log.info("# dateTime 2: {}", dateTime));
Thread.sleep(2000);
}
private static Mono<String> getWorldTime(URI worldTimeUri) {
return WebClient.create()
.get()
.uri(worldTimeUri)
.retrieve()
.bodyToMono(String.class)
.map(response -> {
DocumentContext jsonContext = JsonPath.parse(response);
String dateTime = jsonContext.read("$.datetime");
return dateTime;
});
}
}
cache() 메서드로 인해 Cold Sequence가 Hot Sequence로 동작하게 된다.
- cache() Operator는 Cold Sequence로 동작하는 Mono를 Hot Sequence로 변경해주고 emit된 데이터를 캐시한 뒤, 구독이 발생할 때마다 캐시된 데이터를 전달한다.
결과적으로 캐시된 데이터를 전달하기 때문에 구독이 발생할때마다 Subscriber는 동일한 데이터를 전달받게 된다.
- 출력된 시간이 동일한 것을 볼 수 있다. (캐시처리 되었기 때문에 시간이 같음)
cache() Operator 사용 예시
대표적인 예로는 REST API 요청을 위해서 인증 토큰이 필요한 경우 사용될 수 있다.
- getAuthToken() 메서드를 호출할 때마다 API 서버 쪽에서 매번 새로운 인증 토큰을 전송하게 되어 불필요한 HTTP 요청이 발생된다.
- 이를 방지하기 위해서 cache() Operator를 사용해서 캐시된 인증 토큰을 사용하여 효율적인 동작 과정을 구성할 수 있다.
정리
Reactor는 최초 구독이 발생하기 전까지는 데이터 emit이 발생하지 않는 Warm up과 구독 여부와 상관없이 데이터가 emit되는 Hot으로 구분될 수 있다.
- share()의 경우, 최초 구독이 발생했을 때 데이터를 emit하는 Warm up의 의미를 가지는 Hot Sequence라고 할 수 있다.
- share(), cache() 등의 Operator를 사용해서 Cold Sequence를 Hot Sequence로 변환할 수 있다.
'Reactive Programming' 카테고리의 다른 글
Debugging (0) | 2025.01.02 |
---|---|
Context (0) | 2024.12.22 |
Scheduler (0) | 2024.12.16 |
Sinks (1) | 2024.12.04 |
Backpressure (0) | 2024.11.19 |