Backpressure 의미
Backpressure는 Publisher가 끊임없이 emit하는 무수히 많은 데이터를 적절하게 제어하여 데이터 처리에 있어 과부하가 걸리지 않도록 제어하는 데이터 처리 방식입니다.
Backpressure가 필요한 이유
Publisher가 빠르게 데이터를 emit하는 경우 Subscriber의 처리속도가 느려서 처리가 끝나기도 전에 계속해서 emit 합니다.
- 처리를 하지 못하고 대기 중인 데이터가 지속적으로 쌓이게되어 오버플로가 발생하거나 최악의 경우에는 시스템이 다운되는 문제가 발생한다.
Reactor에서의 Backpressure 처리 방식
- BaseSubscriber를 사용하여 데이터 요청 개수를 적절하게 제어하는 방식
Subscriber가 적절히 처리할 수 있는 수준의 데이터 개수를 Publisher에게 요청하는 것이다.
- Subscriber가 request() 메서드를 통해서 적절한 데이터 개수를 요청하는 방식
@Slf4j
public class ReactorBackPressure {
public static void main(String[] args) {
Flux.range(1, 5)
// 여기서 data는 baseSubscriber가 데이터를 몇 개씩 보내는지 로그처리
.doOnRequest(data -> log.info("# doOnRequest: {}", data))
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
this.request(1); // 호출시 doOnRequest 람다 실행
}
@SneakyThrows
@Override
protected void hookOnNext(Integer value) {
Thread.sleep(2000L); // Subscriber가 Publisher 보다 지연이 있다는 것을 표현
log.info("# hookOnNext: {}", value);
request(1);
}
});
}
}
- Reactor에서는 Subscriber가 데이터 요청 개수를 직접 제어하기 위해서 Subscriber 인터페이스의 구현 클래스인 BaseSubscriber를 사용할 수 있다.
hookOnSubscribe(Subscription subscription)
@Override
protected void hookOnSubscribe(Subscription subscription) {
this.request(1); // 호출시 doOnRequest 람다 실행
}
- Reactor에서 Subscriber 인터페이스에 정의된 onSubscribe() 메서드를 대신해 구독 시점에 request() 메서드를 호출해서 최초 데이터 요청 개수를 제어하는 역할을 한다.
hookOnNext(Integer value)
@SneakyThrows
@Override
protected void hookOnNext(Integer value) {
Thread.sleep(2000L); // Subscriber가 Publisher 보다 지연이 있다는 것을 표현
log.info("# hookOnNext: {}", value);
request(1);
}
- Subscriber 인터페이스에 정의된 onNext() 메서드를 대신해 Publisher가 emit한 데이터를 전달받아 처리한 후에 Publisher에게 또다시 데이터를 요청하는 역할을 한다.
- reuqest() 메서드를 호출해서 데이터 요청 개수를 제어한다.
@SneakyThrows: Java에서 메서드 선언부에 Throws 를 정의하지 않고도, 검사 된 예외를 Throw 할 수 있도록 하는 Lombok 에서 제공하는 어노테이션입니다.
실행 결과
18:33:18.459 [main] INFO study.studyspringreactive.ReactorBackPressure -- # doOnRequest: 1
18:33:20.468 [main] INFO study.studyspringreactive.ReactorBackPressure -- # hookOnNext: 1
18:33:20.469 [main] INFO study.studyspringreactive.ReactorBackPressure -- # doOnRequest: 1
18:33:22.473 [main] INFO study.studyspringreactive.ReactorBackPressure -- # hookOnNext: 2
18:33:22.474 [main] INFO study.studyspringreactive.ReactorBackPressure -- # doOnRequest: 1
18:33:24.479 [main] INFO study.studyspringreactive.ReactorBackPressure -- # hookOnNext: 3
18:33:24.480 [main] INFO study.studyspringreactive.ReactorBackPressure -- # doOnRequest: 1
18:33:26.484 [main] INFO study.studyspringreactive.ReactorBackPressure -- # hookOnNext: 4
18:33:26.484 [main] INFO study.studyspringreactive.ReactorBackPressure -- # doOnRequest: 1
18:33:28.489 [main] INFO study.studyspringreactive.ReactorBackPressure -- # hookOnNext: 5
18:33:28.489 [main] INFO study.studyspringreactive.ReactorBackPressure -- # doOnRequest: 1
Reactor에서 제공하는 Backpressure 전략 사용하기
Reactor에서 제공하는 Backpressure 전략 종류
종류 | 설명 |
IGNORE 전략 | Backpressure를 적용하지 않는다. |
ERROR 전략 | Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, Exception을 발생시킨다. |
DROP 전략 | Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기하는 먼저 emit된 데이터부터 Drop시킨다. |
LATEST 전략 | Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기하는 가장 최근에(나중에) emit된 데이터부터 버퍼에 채운다. |
BUFFER 전략 | Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 안에 있는 데이터부터 Drop시킨다. |
- onBackpressureXXX로 전략을 사용한다.
DownStream
데이터를 처리하는 쪽은 Subscriber뿐만 아니라 Upstream Publisher로 부터 데이터를 전달받는 모든 Downstream Publisher를 포함한다.
- 저자는 각 Operator가 리턴하는 리턴 값이 Reactor의 Publisher인 Flux 또는 Mono이기 때문에 DownStream Publisher라고 칭했다.
- Operator를 거쳐 가공된 데이터를 Downstream으로 다시 전달하는 측면에서는 Publisher가 될 수 있다는 뜻이다.
IGNORE 전략
말 그대로 Backpressure를 적용하지 않는 전략
- Downstream에서의 Backpressure 요청이 무시되기 때문에 IllegalStateException이 발생할 수 있다.
ERROR 전략
Downstream의 데이터 처리 속도가 느려서 Upstream의 emit 속도를 따라가지 못할 경우 IllegalStateException을 발생시킨다.
- Publisher는 Error Signal을 Subscriber에게 전송하고 삭제한 데이터는 폐기한다.
@Slf4j
class ErrorStrategy {
public static void main(String[] args) throws InterruptedException {
Flux
.interval(Duration.ofMillis(1L)) //0부터 1씩 증가한 숫자를 0.001초에 한번씩 아주 빠른 속도로 emit한다.
.onBackpressureError() // ERROR 전략을 사용한다.
.doOnNext(data -> log.info("# doOnNext: {}", data)) // Publisher가 emit한 데이터를 디버깅하는 용도
.publishOn(Schedulers.parallel()) // 별도의 스레드를 하나 더 실행
.subscribe(data -> {
try {
Thread.sleep(5L); // Subscriber가 전달받은 데이터를 처리하는데 0.005초 시간이 걸리도록 설정
} catch (InterruptedException e) {}
log.info("# onNext: {}", data);
},
error -> log.error("# onError", error));
Thread.sleep(2000L);
}
}
실행 결과
18:47:34.805 [parallel-2] INFO study.studyspringreactive.ErrorStrategy -- # doOnNext: 0
18:47:34.808 [parallel-2] INFO study.studyspringreactive.ErrorStrategy -- # doOnNext: 1
18:47:34.808 [parallel-2] INFO study.studyspringreactive.ErrorStrategy -- # doOnNext: 2
...
...
18:47:35.054 [parallel-2] INFO study.studyspringreactive.ErrorStrategy -- # doOnNext: 250
18:47:35.055 [parallel-2] INFO study.studyspringreactive.ErrorStrategy -- # doOnNext: 251
18:47:35.055 [parallel-1] INFO study.studyspringreactive.ErrorStrategy -- # onNext: 39
18:47:35.056 [parallel-2] INFO study.studyspringreactive.ErrorStrategy -- # doOnNext: 252
18:47:35.057 [parallel-2] INFO study.studyspringreactive.ErrorStrategy -- # doOnNext: 253
18:47:35.058 [parallel-2] INFO study.studyspringreactive.ErrorStrategy -- # doOnNext: 254
18:47:35.059 [parallel-2] INFO study.studyspringreactive.ErrorStrategy -- # doOnNext: 255
18:47:35.061 [parallel-1] INFO study.studyspringreactive.ErrorStrategy -- # onNext: 40
18:47:35.067 [parallel-1] INFO study.studyspringreactive.ErrorStrategy -- # onNext: 41
18:47:35.073 [parallel-1] INFO study.studyspringreactive.ErrorStrategy -- # onNext: 42
18:47:35.080 [parallel-1] INFO study.studyspringreactive.ErrorStrategy -- # onNext: 43
18:47:35.086 [parallel-1] INFO study.studyspringreactive.ErrorStrategy -- # onNext: 44
...
...
18:47:36.378 [parallel-1] INFO study.studyspringreactive.ErrorStrategy -- # onNext: 252
18:47:36.384 [parallel-1] INFO study.studyspringreactive.ErrorStrategy -- # onNext: 253
18:47:36.391 [parallel-1] INFO study.studyspringreactive.ErrorStrategy -- # onNext: 254
18:47:36.397 [parallel-1] INFO study.studyspringreactive.ErrorStrategy -- # onNext: 255
18:47:36.399 [parallel-1] ERROR study.studyspringreactive.ErrorStrategy -- # onError
reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:238)
at reactor.core.publisher.Flux.lambda$onBackpressureError$27(Flux.java:7060)
at reactor.core.publisher.FluxOnBackpressureDrop$DropSubscriber.onNext(FluxOnBackpressureDrop.java:135)
at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:124)
at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:358)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
- onNext 람다 표현식에서는 0.005초에 한 번씩 로그를 출력하다가 255라는 숫자를 출력하고 OverflowException이 발생하면서 Sequence가 종료되는 것을 알 수 있다.
- OverflowException은 IllegalStateException을 상속한 하위 클래스이다.
DROP 전략
버퍼 밖에서 대기 중인 데이터 중에서 먼저 emit된 데이터부터 Drop 시키는 전략 (Drop된 데이터는 폐기)
Drop 전략 코드
@Slf4j
class DropStrategy {
public static void main(String[] args) throws InterruptedException {
Flux
.interval(Duration.ofMillis(1L))
.onBackpressureDrop(dropped -> log.info("# dropped: {}", dropped))
.publishOn(Schedulers.parallel())
.subscribe(data -> {
try {
Thread.sleep(5L);
} catch (InterruptedException e) {}
log.info("# onNext: {}", data);
},
error -> log.error("# onError", error));
Thread.sleep(2000L);
}
}
Drop 전략 사용하기
.onBackpressureDrop(dropped -> log.info("# dropped: {}", dropped))
- onBackpressureError() Operator와 다르게 onBackpressureDrop()은 DROP된 데이터를 파라미터로 전달받을 수 있기 때문에 Drop된 데이터가 폐기되기 전에 추가 작업을 수행할 수 있다.
실행 결과
18:58:42.290 [parallel-1] INFO study.studyspringreactive.DropStrategy -- # onNext: 0
18:58:42.299 [parallel-1] INFO study.studyspringreactive.DropStrategy -- # onNext: 1
18:58:42.305 [parallel-1] INFO study.studyspringreactive.DropStrategy -- # onNext: 2
18:58:42.312 [parallel-1] INFO study.studyspringreactive.DropStrategy -- # onNext: 3
...
...
18:58:42.530 [parallel-1] INFO study.studyspringreactive.DropStrategy -- # onNext: 38
18:58:42.536 [parallel-1] INFO study.studyspringreactive.DropStrategy -- # onNext: 39
**18:58:42.539 [parallel-2] INFO study.studyspringreactive.DropStrategy -- # dropped: 256**
18:58:42.540 [parallel-2] INFO study.studyspringreactive.DropStrategy -- # dropped: 257
18:58:42.541 [parallel-2] INFO study.studyspringreactive.DropStrategy -- # dropped: 258
18:58:42.542 [parallel-2] INFO study.studyspringreactive.DropStrategy -- # dropped: 259
....
....
18:58:43.437 [parallel-2] INFO study.studyspringreactive.DropStrategy -- # dropped: 1154
18:58:43.438 [parallel-2] INFO study.studyspringreactive.DropStrategy -- # dropped: 1155
18:58:43.439 [parallel-2] INFO study.studyspringreactive.DropStrategy -- # dropped: 1156
18:58:43.440 [parallel-2] INFO study.studyspringreactive.DropStrategy -- # dropped: 1157
18:58:43.441 [parallel-2] INFO study.studyspringreactive.DropStrategy -- # dropped: 1158
**18:58:43.442 [parallel-2] INFO study.studyspringreactive.DropStrategy -- # dropped: 1159**
18:58:43.442 [parallel-1] INFO study.studyspringreactive.DropStrategy -- # onNext: 191
18:58:43.448 [parallel-1] INFO study.studyspringreactive.DropStrategy -- # onNext: 192
첫 번째 Drop이 시작되는 데이터는 256이고 Drop이 끝나는 데이터는 1159이다.
- 이 구간 동안에는 버퍼가 가득 차있다는 것을 알 수 있다.
- 1159까지 Drop되기 때문에 Subscriber 쪽에서는 1160부터 전달받아 처리한다.
버퍼가 가득 찬 상태에서는 버퍼가 비워질 때까지 데이터를 Drop 한다.
LATEST 전략
Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기 중인 데이터 중에서 가장 최근에(나중에) emit된 데이터부터 버퍼에 채우는 전략이다.
- 그림상으로는 Step 4에서 가장 최근에 emit된 숫자 17 이외의 나머지 숫자들이 한꺼번에 폐기되는 것처럼 표현했지만, 실제로는 데이터가 들어올 때마다 이전에 유지하고 있던 데이터가 폐기됩니다.
Latest 전략 코드
@Slf4j
class LatestStrategy {
public static void main(String[] args) throws InterruptedException {
Flux
.interval(Duration.ofMillis(1L))
.onBackpressureLatest()
.publishOn(Schedulers.parallel())
.subscribe(data -> {
try {
Thread.sleep(5L);
} catch (InterruptedException ignored) {}
log.info("# onNext: {}", data);
},
error -> log.error("# onError", error));
Thread.sleep(2000L);
}
}
Latest 전략 사용하기
.onBackpressureLatest()
- Drop 전략은 버퍼가 가득 찰 경우 버퍼 밖에서 대기 중인 데이터를 하나씩 차례대로 Drop 하면서 폐기합니다. 반면 LATEST 전략은 새로운 데이터가 들어오는 시점에 가장 최근의 데이터만 남겨 두고 나머지 데이터를 폐기합니다.
실행 결과
19:09:22.184 [parallel-1] INFO study.studyspringreactive.LatestStrategy -- # onNext: 0
19:09:22.194 [parallel-1] INFO study.studyspringreactive.LatestStrategy -- # onNext: 1
19:09:22.199 [parallel-1] INFO study.studyspringreactive.LatestStrategy -- # onNext: 2
19:09:22.205 [parallel-1] INFO study.studyspringreactive.LatestStrategy -- # onNext: 3
19:09:22.211 [parallel-1] INFO study.studyspringreactive.LatestStrategy -- # onNext: 4
....
....
19:09:23.730 [parallel-1] INFO study.studyspringreactive.LatestStrategy -- # onNext: 253
19:09:23.736 [parallel-1] INFO study.studyspringreactive.LatestStrategy -- # onNext: 254
19:09:23.742 [parallel-1] INFO study.studyspringreactive.LatestStrategy -- # onNext: 255
**19:09:23.749 [parallel-1] INFO study.studyspringreactive.LatestStrategy -- # onNext: 1177**
19:09:23.754 [parallel-1] INFO study.studyspringreactive.LatestStrategy -- # onNext: 1178
19:09:23.759 [parallel-1] INFO study.studyspringreactive.LatestStrategy -- # onNext: 1179
19:09:23.765 [parallel-1] INFO study.studyspringreactive.LatestStrategy -- # onNext: 1180
19:09:23.771 [parallel-1] INFO study.studyspringreactive.LatestStrategy -- # onNext: 1181
19:09:23.777 [parallel-1] INFO study.studyspringreactive.LatestStrategy -- # onNext: 1182
19:09:23.783 [parallel-1] INFO study.studyspringreactive.LatestStrategy -- # onNext: 1183
...
...
- Subscriber가 숫자 255를 출력하고 곧바로 그다음에 숫자 1177을 출력하는 것을 볼 수 있다.
- 이는 버퍼가 가득 찼다가 버퍼가 다시 비워지는 시간 동안 emit되는 데이터가 1177이라는 것을 알 수 있다.
- 가장 최근에 emit된 데이터가 된 후, 다음 데이터가 emit되면 다시 폐기되는 과정을 반복한다.
BUFFER 전략
컴퓨터 시스템에서의 버퍼는 입출력을 수행하는 장치들간의 속도 차이를 조절하기 위해 입출력 장치 중간에 위치해서 데이터를 어느정도 쌓아 두었다가 전송하는것 (버퍼링)
Backpressure BUFFER 전략도 이와 비슷하게 아래 전략들을 제공한다.
- 버퍼의 데이터를 폐기하지 않고 버퍼링을 하는 전략
- 버퍼가 가득 차면 버퍼 내의 데이터를 폐기하는 전략
- 버퍼가 가득 차면 에러를 발생시키는 전략
버퍼가 가득 차면 버퍼 내의 데이터를 폐기하는 전략
DROP, LATEST 전략과 다르게 BUFFER 전략에서의 데이터 폐기는 BUFFER가 가득 찼을 때 버퍼 안에 있는 데이터를 폐기한다.
BUFFER 전략 중에서 데이터를 폐기하는 전략 2가지
- DROP_LATEST 전략
- DROP_OLDEST 전략
DROP_LATEST 전략
Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 가장 최근에(나중에) 버퍼 안에 채워진 데이터를 Drop하여 폐기한 후, 이렇게 확보된 공간에 emit된 데이터를 채우는 전략이다.
- 버퍼의 최대 용량이 10이라고 가정했을 때 그림
Drop_Latest 전략 코드
@Slf4j
class BufferDropLatestStrategy {
public static void main(String[] args) throws InterruptedException {
Flux
.interval(Duration.ofMillis(300L))
.doOnNext(data -> log.info("# emitted by original Flux: {}", data))
.onBackpressureBuffer(2, // 최대 용량으로 2를 설정
dropped -> log.info("** Overflow & Dropped: {} **", dropped), // drop 되는 데이터를 전달받아 후처리
BufferOverflowStrategy.DROP_LATEST) // 적용할 Backpressure 전략
.doOnNext(data -> log.info("[ # emitted by Buffer: {} ]", data)) // 원본 데이터가 emit되는 과정을 확인
.publishOn(Schedulers.parallel(), false, 1)
.subscribe(data -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {}
log.info("# onNext: {}", data);
},
error -> log.error("# onError", error));
Thread.sleep(2500L);
}
}
Drop_Latest 전략 사용하기
.onBackpressureBuffer(2, // 최대 용량으로 2를 설정
dropped -> log.info("** Overflow & Dropped: {} **", dropped), // drop 되는 데이터를 전달받아 후처리
BufferOverflowStrategy.DROP_LATEST) // 적용할 Backpressure 전략
- 3가지 파라미터는 순서대로 최대 용량은 2, 오버플로가 발생했을 때, Drop되는 데이터를 전달받아 후처리 하기 위한 람다, Backpressure 전략을 뜻한다.
publishOn()
.publishOn(Schedulers.parallel(), false, 1)
- 3번째 인자인 prefetch는 Scheduler가 생성하는 스레드의 비동기 경계 시점에 미리 보관할 데이터의 개수를 의미하며 데이터의 요청 개수에 영향을 미친다.
출력 결과
19:31:38.364 [parallel-2] INFO study.studyspringreactive.BufferDropLatestStrategy -- # emitted by original Flux: 0
19:31:38.367 [parallel-2] INFO study.studyspringreactive.BufferDropLatestStrategy -- [ # emitted by Buffer: 0 ]
19:31:38.666 [parallel-2] INFO study.studyspringreactive.BufferDropLatestStrategy -- # emitted by original Flux: 1
19:31:38.967 [parallel-2] INFO study.studyspringreactive.BufferDropLatestStrategy -- # emitted by original Flux: 2
19:31:39.262 [parallel-2] INFO study.studyspringreactive.BufferDropLatestStrategy -- # emitted by original Flux: 3
19:31:39.263 [parallel-2] INFO study.studyspringreactive.BufferDropLatestStrategy -- ** Overflow & Dropped: 3 **
19:31:39.373 [parallel-1] INFO study.studyspringreactive.BufferDropLatestStrategy -- # onNext: 0
19:31:39.373 [parallel-1] INFO study.studyspringreactive.BufferDropLatestStrategy -- [ # emitted by Buffer: 1 ]
19:31:39.566 [parallel-2] INFO study.studyspringreactive.BufferDropLatestStrategy -- # emitted by original Flux: 4
19:31:39.867 [parallel-2] INFO study.studyspringreactive.BufferDropLatestStrategy -- # emitted by original Flux: 5
19:31:39.868 [parallel-2] INFO study.studyspringreactive.BufferDropLatestStrategy -- ** Overflow & Dropped: 5 **
19:31:40.166 [parallel-2] INFO study.studyspringreactive.BufferDropLatestStrategy -- # emitted by original Flux: 6
19:31:40.167 [parallel-2] INFO study.studyspringreactive.BufferDropLatestStrategy -- ** Overflow & Dropped: 6 **
19:31:40.376 [parallel-1] INFO study.studyspringreactive.BufferDropLatestStrategy -- # onNext: 1
19:31:40.376 [parallel-1] INFO study.studyspringreactive.BufferDropLatestStrategy -- [ # emitted by Buffer: 2 ]
19:31:40.464 [parallel-2] INFO study.studyspringreactive.BufferDropLatestStrategy -- # emitted by original Flux: 7
- 최대 용량이 2라서 0이 emit되고 [2, 1]이 Buffer에 들어있을 때 3이 emit되자 3을 drop하는걸 볼 수 있다.
DROP_OLDEST 전략
Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 안에 채워진 데이터 중에서 가장 오래된 데이터를 Drop하여 폐기한 후, 확보된 공간에 emit된 데이터를 채우는 전략이다.
Drop_Oldest 전략 코드
@Slf4j
class BufferDropOldestStrategy {
public static void main(String[] args) throws InterruptedException {
Flux
.interval(Duration.ofMillis(300L))
.doOnNext(data -> log.info("# emitted by original Flux: {}", data))
.onBackpressureBuffer(2,
dropped -> log.info("** Overflow & Dropped: {} **", dropped),
BufferOverflowStrategy.DROP_OLDEST)
.doOnNext(data -> log.info("[ # emitted by Buffer: {} ]", data))
.publishOn(Schedulers.parallel(), false, 1)
.subscribe(data -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {}
log.info("# onNext: {}", data);
},
error -> log.error("# onError", error));
Thread.sleep(2500L);
}
}
Drop_Oldest 전략 사용하기
.onBackpressureBuffer(2,
dropped -> log.info("** Overflow & Dropped: {} **", dropped),
BufferOverflowStrategy.DROP_OLDEST)
- 세 번째 인자로 전략만 바뀐것을 확인할 수 있다.
실행 결과
19:40:09.216 [parallel-2] INFO study.studyspringreactive.BufferDropOldestStrategy -- # emitted by original Flux: 0
19:40:09.218 [parallel-2] INFO study.studyspringreactive.BufferDropOldestStrategy -- [ # emitted by Buffer: 0 ]
19:40:09.512 [parallel-2] INFO study.studyspringreactive.BufferDropOldestStrategy -- # emitted by original Flux: 1
19:40:09.813 [parallel-2] INFO study.studyspringreactive.BufferDropOldestStrategy -- # emitted by original Flux: 2
19:40:10.111 [parallel-2] INFO study.studyspringreactive.BufferDropOldestStrategy -- # emitted by original Flux: 3
19:40:10.112 [parallel-2] INFO study.studyspringreactive.BufferDropOldestStrategy -- ** Overflow & Dropped: 1 **
19:40:10.224 [parallel-1] INFO study.studyspringreactive.BufferDropOldestStrategy -- # onNext: 0
19:40:10.224 [parallel-1] INFO study.studyspringreactive.BufferDropOldestStrategy -- [ # emitted by Buffer: 2 ]
19:40:10.416 [parallel-2] INFO study.studyspringreactive.BufferDropOldestStrategy -- # emitted by original Flux: 4
19:40:10.711 [parallel-2] INFO study.studyspringreactive.BufferDropOldestStrategy -- # emitted by original Flux: 5
19:40:10.712 [parallel-2] INFO study.studyspringreactive.BufferDropOldestStrategy -- ** Overflow & Dropped: 3 **
19:40:11.013 [parallel-2] INFO study.studyspringreactive.BufferDropOldestStrategy -- # emitted by original Flux: 6
19:40:11.013 [parallel-2] INFO study.studyspringreactive.BufferDropOldestStrategy -- ** Overflow & Dropped: 4 **
19:40:11.225 [parallel-1] INFO study.studyspringreactive.BufferDropOldestStrategy -- # onNext: 2
19:40:11.225 [parallel-1] INFO study.studyspringreactive.BufferDropOldestStrategy -- [ # emitted by Buffer: 5 ]
19:40:11.311 [parallel-2] INFO study.studyspringreactive.BufferDropOldestStrategy -- # emitted by original Flux: 7
- 최대 용량이 2라서 0이 emit되고 [2, 1]이 Buffer에 들어있을 때 3이 emit되자 **Drop_Latest**와 다르게 3을 drop하는게 아니라 가장 오래된 데이터인 1을 drop하는 것을 볼 수 있다.
'Reactive Programming' 카테고리의 다른 글
Debugging (0) | 2025.01.02 |
---|---|
Context (0) | 2024.12.22 |
Scheduler (0) | 2024.12.16 |
Sinks (1) | 2024.12.04 |
Cold Sequence와 Hot Sequence (0) | 2024.11.19 |