Sinks란?
리액티브 스트림즈의 구성요소 중 하나로 Subscriber로서 기능할 땐 다른 Publisher를 구독할 수 있고, Publisher로서 기능할 땐 다른 Subscriber가 구독할 수 있습니다.
- Signal을 프로그래밍 방식으로 푸시할 수 있는 구조 (Flux, Mono의 의미 체계를 가진다.)
- 명시적으로 Signal을 전송할 수 있다.
기존의 Reactor의 Processor 방식을 개선한 Sinks가 지원되면서 Reactor 3.5.0 부터 제거 되었다.
- onNext, onComplete, onError 메서드를 직접적으로 호출함으로써 스레드 안정성이 보장되지 않을 수 있었다.
- Sinks의 경우에는 동시 접근을 감지하고, 동시 접근하는 스레드 중 하나가 빠르게 실패함으로써 스레드 안전성을 보장한다.
Sinks vs Operator
Reactor에서 generate(), create() Operator 등을 통해서 Signal을 일반적으로 전송한다.
- Operator는 싱글스레드 기반에서 Signal을 전송
- Sinks는 멀티스레드 기반에서 Signal을 전송해도 스레드 안정성을 보장한다.
Operator
원본 데이터를 생성하는 create() Operator에서는 subscribeOn() Operator에서 지정한 스레드를 사용해서 생성한 데이터를 emit한다.
@Slf4j
public class CreateMethodExam {
public static void main(String[] args) throws InterruptedException {
int tasks = 6;
Flux
.create((FluxSink<String> sink) -> {
IntStream
.range(1, tasks)
// create() Operator가 처리해야 할 작업 만큼 수행
.forEach(n -> sink.next(doTask(n)));
})
.subscribeOn(Schedulers.boundedElastic())
// create() 작업 결과 출력
.doOnNext(n -> log.info("# create() : {}", n))
.publishOn(Schedulers.parallel())
// 가공 처리한 후 최종적으로 subScriber에게 전달
.map(result -> result + " success!")
.doOnNext(n -> log.info("# map() : {}", n))
.publishOn(Schedulers.parallel())
.subscribe(data -> log.info("# onNext() : {}", data));
Thread.sleep(500L);
}
static String doTask(int taskNumber) {
// now tasking...
// complete to task.
return "Task " + taskNumber + " result";
}
}
- 작업을 처리하는 스레드는 subscribeOn() Operator
- 처리 결과를 가공하는 스레드는 처음 publishOn() Operator
- 가공된 결과를 Subscriber에게 전달하는 스레드는 마지막 publishOn() Operator
실행 결과
main 스레드를 제외하고 Reactor Sequence는 총 3개의 스레드가 동시에 실행된다.
- create() Operator를 통해서 프로그래밍 방식으로 Signal을 전송할 수 있으며 Reactor Sequence를 단계적으로 나누어서 여러 개의 스레드로 처리할 수 있다.
- doTask() 메서드가 싱글 스레드가 아닌 여러 개의 스레드에서 각각의 전혀 다른 작업들을 처리한 후, 처리 결과를 반환할 때 Sinks를 통해서 적절하게 사용할 수 있다.
Sinks
@Slf4j
class SinksExam {
public static void main(String[] args) throws InterruptedException {
int tasks = 6;
Sinks.Many<String> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
Flux<String> fluxView = unicastSink.asFlux();
IntStream
.range(1, tasks)
.forEach(n -> {
try {
new Thread(() -> {
unicastSink.emitNext(doTask(n), Sinks.EmitFailureHandler.FAIL_FAST);
log.info("# emitted : {}", n);
}).start();
Thread.sleep(100L);
} catch (InterruptedException e) {
log.error(e.getMessage());
}
});
fluxView
.publishOn(Schedulers.parallel())
.map(result -> result + " success!!")
.doOnNext(n -> log.info("# map() : {}", n))
.publishOn(Schedulers.parallel())
.subscribe(data -> log.info("# onNext() : {}", data));
Thread.sleep(200L);
}
실행 결과
doTask() 메서드가 루프를 돌 때마다 새로운 스레드에서 실행되며 작업 처리 결과를 Sinks를 통해서 Downstream에 emit 한다.
- 2 + 5개 task = 7개 스레드
sinks는 프로그래밍 방식으로 Signal을 전송할 수 있으며 멀티스레드 환경에서도 스레드 안전성을 보장받을 수 있는 장점이 있다.
- 작업을 처리하는 0~4 스레드
- 처리 결과를 가공하는 스레드는 parallel-2 스레드
- 가공된 결과를 Subscriber에게 전달하는 데이터를 처리하는 parallel-1 스레드
Sinks 종류 및 특징
Reactor에서 프로그래밍 방식으로 Signal을 전송하는 방법은 크게 Sinks.One과 Sinks.Many를 사용한다.
- 차이점으로는 Sinks.one()은 Sinks.One<T>을 반환하는데 Sinks.many()는 ManySpec 인터페이스를 리턴한다.
static final class DefaultSinksSpecs implements Sinks.ManySpec, Sinks.MulticastSpec, Sinks.MulticastReplaySpec {
...
// Sinks.one()
<T> Sinks.One<T> one() {
return this.wrapOne(new SinkOneMulticast());
}
<T, ONE extends Sinks.One<T> & ContextHolder> Sinks.One<T> wrapOne(ONE original) {
return new SinkOneSerialized(original, (ContextHolder)original);
}
...
}
- DefaultSinksSpecs에 정의되어 있는데 SinkOneMulticast를 감싼 Sinks.One<T>를(SinkOneSerialized) 반환한다.
Sinks.One
Sinks.One() 메서드를 사용해 한 건의 데이터를 전송하는 방법을 정의해 둔 기능 명세
- 한 건의 데이터를 프로그래밍 방식으로 emit하는 역할
- Mono 방식으로 Subscriber가 데이터를 소비할 수 있도록 해주는 Sinks 클래스 내부에 Sinks 인터페이스 스펙 또는 사양
@Slf4j
class SinksOneExam {
public static void main(String[] args) {
Sinks.One<String> sinkOne = Sinks.one();
// Mono 객체를 통해 emit된 데이터를 전달받을 수 있다.
Mono<String> mono = sinkOne.asMono();
sinkOne.emitValue("1. Hello Reactor!", Sinks.EmitFailureHandler.FAIL_FAST);
sinkOne.emitValue("2. Hello Reactor!", Sinks.EmitFailureHandler.FAIL_FAST);
mono.subscribe(data -> log.info("# Subscriber A : {}", data));
mono.subscribe(data -> log.info("# Subscriber B : {}", data));
}
}
- FAIL_FAST 값을 지정하면 EmitFailureHandler 객체를 통해서 emit 도중 발생한 에러에 대해 빠르게 실패처리한다.
(재시도를 하지 않고 즉시 실패 처리)
확인해보면 첫 번째 1. Hello Reactor! 문자열은 잘 들어와서 처리되지만
확인해보면 두 번째 2. Hello Reactor! 문자열은 drop 된다.
- 이 덕분에 스레드 간의 경합 등으로 발생하는 교착 상태 등을 미연에 방지할 수 있으며, 스레드 안전성을 보장할 수 있다.
출력 결과를 보면 첫 번째 문자열은 정상적으로 emit되지만 나머지 데이터들은 Drop되는걸 알 수 있다.
Sinks.Many
여러 건의 데이터를 여러 가지 방식으로 전송하는 기능을 정의해 둔 기능 명세
ManySpec 인터페이스에 정의된 세 기능은 각각 별도의 Spec으로 정의해두고 있다.
public final class Sinks {
...
public interface ManySpec {
UnicastSpec unicast();
MulticastSpec multicast();
MulticastReplaySpec replay();
}
...
}
unicast() -> UnicastSpec
네트워크 통신에서 사용되는 Broadcast(One to All) 용어 처럼 unicast(One to One)의 의미를 가지는 UnicastSpec의 기능은 단 하나의 Subscribe에게만 데이터를 emit 한다.
@Slf4j
class UnicastSpecExam {
public static void main(String[] args) {
Sinks.Many<Integer> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
Flux<Integer> fluxView = unicastSink.asFlux(); //Flux 객체 변환
unicastSink.emitNext(1, Sinks.EmitFailureHandler.FAIL_FAST);
unicastSink.emitNext(2, Sinks.EmitFailureHandler.FAIL_FAST);
fluxView.subscribe(data -> log.info("# Subscribe A : {}", data));
unicastSink.emitNext(3, Sinks.EmitFailureHandler.FAIL_FAST);
// fluxView.subscribe(data -> log.info("# Subscribe B : {}", data));
}
}
UnicastSpec에 정의된 onBackpressureBuffer() 메서드를 호출한다.
주석 해제 전 실행 결과
주석 해제 후 실행 결과
- 하나의 Subscribe만 허용된다고 예외 메시지에서 확인이 가능하다.
- 즉, 단 하나의 Subscriber에게 데이터를 emit하기 위해서 내부적으로 UnicastProcessor를 사용
multicast( ) -> MulticastSpec
multicast(One to Many)은 하나 이상의 Subscriber에게 데이터를 emit 한다.
@Slf4j
class MulticastExam {
public static void main(String[] args) {
Sinks.Many<Integer> multicastSink = Sinks.many().multicast().onBackpressureBuffer();
Flux<Integer> fluxView = multicastSink.asFlux();
multicastSink.emitNext(1, Sinks.EmitFailureHandler.FAIL_FAST);
multicastSink.emitNext(2, Sinks.EmitFailureHandler.FAIL_FAST);
fluxView.subscribe(data -> log.info("# Subscribe A : {}", data));
fluxView.subscribe(data -> log.info("# Subscribe B : {}", data));
multicastSink.emitNext(3, Sinks.EmitFailureHandler.FAIL_FAST);
fluxView.subscribe(data -> log.info("# Subscribe C : {}", data));
}
}
이까 Sinks.one()에서 사용되었던 DefaultSinksSpecs을 통해서 MulticastSpec을 반환한다.
static final class DefaultSinksSpecs implements Sinks.ManySpec, Sinks.MulticastSpec, Sinks.MulticastReplaySpec {
...
public Sinks.MulticastSpec multicast() {
return this;
}
public <T> Sinks.Many<T> onBackpressureBuffer() {
return this.wrapMany(new SinkManyEmitterProcessor(true, Queues.SMALL_BUFFER_SIZE));
}
...
}
실행 결과
Sinks가 Publisher의 역할을 할 경우 기본적으로 Hot Publisher로 동작한다.
- 특히 onBackpressureBuffer() 메서드는 Warm up의 특징을 가지는 Hot Sequence로 동작하기 때문에 첫 번째 구독이 발생한 시점에 Downstream 쪽으로 데이터가 전달되어 A는 전부를 B는 하나를 C는 하나도 받지 못한다.
replay( ) -> MulticastReplaySpec
replay 버튼을 누르면 동영상이 처음부터 다시 재생되는 것처럼 동작한다.
@Slf4j
class MulticastReplaySpecExam {
public static void main(String[] args) {
// limit에 인자로 3을 넘겨주게 되면 1번째 emit도 전달받을 수 있다.
Sinks.Many<Integer> replaySink = Sinks.many().replay().limit(2);
Flux<Integer> fluxView = replaySink.asFlux();
replaySink.emitNext(1, Sinks.EmitFailureHandler.FAIL_FAST);
replaySink.emitNext(2, Sinks.EmitFailureHandler.FAIL_FAST);
replaySink.emitNext(3, Sinks.EmitFailureHandler.FAIL_FAST);
fluxView.subscribe(data -> log.info("# Subscribe A : {}", data));
replaySink.emitNext(4, Sinks.EmitFailureHandler.FAIL_FAST);
fluxView.subscribe(data -> log.info("# Subscribe B : {}", data));
}
}
- emit된 데이터를 다시 replay해서 구독 전에 이미 emit된 데이터라도 Subscriber가 전달받을 수 있게 하는 다양한 메서드들이 있는데 대표적으로 all() 메서드가 있다.
DefaultSinksSpecs을 통해서 MulticastReplaySpec을 반환한다.
static final class DefaultSinksSpecs implements Sinks.ManySpec, Sinks.MulticastSpec, Sinks.MulticastReplaySpec {
...
public Sinks.MulticastReplaySpec replay() {
return this;
}
public <T> Sinks.Many<T> limit(int historySize) {
if (historySize <= 0) {
throw new IllegalArgumentException("historySize must be > 0");
} else {
SinkManyReplayProcessor<T> original = SinkManyReplayProcessor.create(historySize);
return this.wrapMany(original);
}
}
<T, MANY extends Sinks.Many<T> & ContextHolder> Sinks.Many<T> wrapMany(MANY original) {
return new SinkManySerialized(original, (ContextHolder)original);
}
...
}
limit() 메서드에 인자로 3을 넣었을 때 실행 결과
- 2를 넣으면 가장 처음 받은 emit이 하나씩 없어진다.
- 두 subscribe 모두 emit 이후 호출했기 때문이다. (B는 4부터 시작했으므로 4-3 = 1로 가장 처음 메시지 전달 못받음)
all() 메서드를 사용할 때 실행 결과
Subscribe B도 전부 emit된 데이터를 받은걸 볼 수 있다.
'Reactive Programming' 카테고리의 다른 글
Debugging (0) | 2025.01.02 |
---|---|
Context (0) | 2024.12.22 |
Scheduler (0) | 2024.12.16 |
Backpressure (0) | 2024.11.19 |
Cold Sequence와 Hot Sequence (0) | 2024.11.19 |