Kafka Streams DSL은 레코드의 흐름을 추상화한 개념으로 KStream, KTable, GlobalKTable이 존재합니다.
- 이 개념들은 컨슈머, 프로듀서, 프로세서 API에서 사용되지 않고, 스트림즈 DSL 에서만 사용되는 개념입니다.
KStream
레코드의 흐름을 표현한 것으로 메시지 키와 메시지 값으로 구성되어 있습니다.
- 카프카 컨슈머로 토픽을 구독하는 것처럼 KStream에 존재하는 모든 레코드가 출력됩니다.
KTable
KStream은 모든 레코드를 조회할 수 있지만 KTable은 유니크한 메시지 키를 기준으로 가장 최신 레코드를 사용합니다.
- Java Collection의 Map 자료구조처럼 토픽에 있는 데이터를 key-value store 처럼 사용하는 것 이라고 생각하면 된다.
- 사용자 마지막에 업데이트한 주소 데이터가 필요한 경우, 혹은 최신으로 Join된 데이터를 활용할 경우 등
간단한 예시 (Stream - Table Join)
사용자가 검색한 키워드를 가지고 있는 토픽(user-search-keywords)과, 사용자의 관심사(user-interests)를 가지고 있는 토픽을 "관심사-검색 키워드" 문자열로 조합해서 결과 토픽(result)에 쌓아보자.
- 사용자가 검색한 모든 키워드를 확인하기 위해 KStream으로, 사용자의 관심사는 검색했을 때 최신의 데이터로 보여주기 위해서 KTable로 구성했습니다.
먼저 저희가 사용해야 하는 input topic 2개와 조합된 결과를 가지는 output topic 1개를 생성합니다.
- 간단한 예시로 replication-factor는 1로 partition 개수는 3개로 지정한 토픽 3개를 생성합니다.
✅ input topic
$ ./bin/kafka-topics.sh --bootstrap-server localhost:9092
--replication-factor 1 --partitions 3 --topic user-search-keywords --create
$ ./bin/kafka-topics.sh --bootstrap-server localhost:9092
--replication-factor 1 --partitions 3 --topic user-interests --create
✅ output topic
$ ./bin/kafka-topics.sh --bootstrap-server localhost:9092
--replication-factor 1 --partitions 3 --topic result --create
정상적으로 토픽이 생성된 것을 볼 수 있습니다.
$ ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
토픽이 정상적으로 생성되었다면 console-producer로 간단하게 테스트할 것으로 Key(userId), Value는 각각 String으로 사용합니다.
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-join-example");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
스트림즈 DSL의 주요옵션
필수옵션
옵션 | 설명 |
bootstrap.servers | 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트 이름으로 포트를 1개이상 작성할 수 있습니다. |
Application.id | 스트림즈 어플리케이션을 구분하기 위한 고유한 아이디를 설정. 다른 로직을 가진 스트림즈 어플리케이션은 서로 다른 application.id 값을 가져아한다. |
선택 옵션
옵션 | 설명 |
default.key.serde | 레코드의 메시지키를 직렬화, 역직렬화하는 클래스를 지정한다. (기본 값: 바이트 직렬화, 역직렬화 클래스인 Serdes.ByteArray().getClass().getName()) |
default.value.serde | 레코드의 메시지 값을 직렬화, 역직렬화하는 클래스를 지정한다. (기본값: 바이트 직렬화, 역직렬화 클래스인 Serdes.ByteArray().getClass().getName()) |
num.stream.threads | 스트림 프로세싱 실행 시 실행될 스레드 개수를 지정한다. (기본값: 1) |
state.dir | 상태기반 데이터 처리를 할 때 데이터를 저장할 디렉토리를 지정한다. (기본값: tmp/kafka-streams) |
코드 예제
사용자가 검색한 키워드(user-search-keywords)와 사용자의 관심사(user-interests)를 사용자 ID를 기준으로 Join을 진행합니다.
@Slf4j
public class SimpleJoinExam {
public static void main(String[] args) throws InterruptedException {
Properties props = Constants.getProperties(); // 아까 위에서 정의한 props
StreamsBuilder builder = new StreamsBuilder(); // Kafka Streams의 토폴로지를 정의하는 객체
// key,value를 역직렬화 할 때 string으로 처리
KStream<String, String> search = builder.stream("user-search-keywords",
Consumed.with(Serdes.String(), Serdes.String()));
KTable<String, String> interests = builder.table("user-interests", Consumed.with(Serdes.String(), Serdes.String()));
// 서로 일치하는 값이 있다면 "관심사-검색어" 문자열로 조합 없다면 간단하게 "fail"
KStream<String, String> streamsTableJoin = search.leftJoin(interests,
(stream, table) -> {
log.info("[MyTest] stream={} table={}", stream, table);
if (table != null) {
return String.format("%s-%s", table, stream);
} else {
return "fail";
}
});
// join된 결과를 토픽에 기록
streamsTableJoin.to("result", Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.cleanUp();
streams.start();
Thread.sleep(60000L);
streams.close();
}
}
console-producer.sh로 input topic 데이터 넣어주기
user-interests 토픽에 UserA, UserB에 관심사를 console-producer.sh를 통해서 입력한 뒤
user-search-keywords 토픽에 UserA, UserB의 키워드를 console-producer.sh를 통해서 입력해주면
console-consumer.sh로 output topic 결과 확인하기
- --from-beginning이라서 이전에 넣었던 데이터가 있었고 빨간색 박스에 조합된 문자열이 처리되었음을 알 수 있습니다.
✅ 최신 관심사가 반영되는지 확인해보기
UserA에 관심사를 Investment & Finance ➡️ new-interests로 변경하면 KTable에 데이터는 유니크하게 최신 데이터로 반영이 되는 것을 예상할 수 있습니다.
키워드에 값을 search-keyword라고 입력하면
결과 토픽에 새롭게 업데이트된 관심사로 입력받은 키워드와 조합되어 출력되는 것을 확인할 수 있습니다.
결과적으로 우리는 사용자의 ID값 즉 레코드의 Key값을 기준으로 Join하는 것을 알 수 있었습니다.
🤔 Key값이 Null인 경우는 어떻게 처리되는 걸까?
기존에 console-producer에서 separator 옵션을 통해서 :를 키준으로 key-value를 인식하고 있었습니다.
$ ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092
--topic user-search-keywords --property parse.key=true --property key.separator=:
$ ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092
--topic user-search-keywords
- key값 없이 메시지만 전송해서 확인해보겠습니다.
KStream에 user-search-keywords 토픽으로 확인을 해보니 log로 확인해보니 "test-keywords1", "test-keywords2"에 해당하는 table을 찾지 못한것을 볼 수 있습니다.
- 즉, KStream에 메시지 키값이 없어서 해당하는 KTable을 찾지 못한것을 알 수 있습니다.
우리가 코드 레벨에서 조건문으로 table이 없는 경우 fail을 기록했기 때문에 output topic에서 확인할 수 있습니다.
🤔 그렇다면 테이블에 key값이 null이라면 처리할 수 있을까요?
- KStream, KTable이 둘 다 null이라서 처리가 될까요?
- KTable에 key값이 null이라서 skip된 것을 WARN 로그를 통해서 확인할 수 있습니다.
지금 로그를 확인해보니 특정 파티션(2)이 지정되는 것을 볼 수 있습니다.
그렇다면 우리가 Producer를 공부할 때 Record Key값을 통해서 파티션이 지정되는 것을 우리는 알고 있는데 Join이 진행될 때 토픽 파티션 개수가 영향이 있을까요? 🤔
user-search-keywords 토픽을 생성할 때 파티션 개수를 3개로 지정했는데 4개로 변경해보겠습니다.
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic user-search-keywords --partitions 4
이후 console-producer.sh를 통해서 record를 전송하고
로그를 확인해보니 table에 값이 null이 찍히고 결과 토픽에 fail값이 기록되는 것을 볼 수 있습니다.
⚠️ 어라라? 이후에 실행되었던 코드가 StreamsException이 발생하면서 종료되는것을 볼 수 있었습니다.
이러한 에러가 발생하는 이유는 Kafka Streams는 상태 저장소를 관리할 때 내부적으로 changelog 토픽을 사용하는데 내부 토픽의 파티션 수가 4가 처음에 생성할 때 설정했던 3이랑 일치하지 않아서 발생하는 문제입니다.
- 예제인 만큼 changelog를 지워보고 다시 확인해보겠습니다.
Kafka Streams의 Consumer Coordinator가 파티션 할당 상태를 업데이트한다는 메시지입니다.
- simple-join-example-80fc6978-4da0-44b2-9672-cfa09b5b1c51-StreamThread-1라는 스트림 쓰레드에서 파티션 할당을 처리합니다.
- user-interests-0, user-interests-1, user-interests-2와 user-search-keywords-0, user-search-keywords-1, user-search-keywords-2, user-search-keywords-3라는 총 7개의 파티션이 할당되었습니다.
정상적으로 실행은 되지만 Record의 Key값에 대한 파티션 정책이 서로 다르기 때문에 KStream에 KTable이 가지고 있는 Key에 해당한 Record를 전송해도 Join할 수 없습니다.
Co-Partitioning
- 위와 같은 문제가 발생했던 이유는 Co-Partitioning 되지 않았기 때문이다.
코파티셔닝이란 조인을 하는 2개 데이터의 파티션 개수가 동일하고 파티셔닝 전략을 동일하게 맞추는 작업이다.
Kafka Streams에서 KStream-KStream, KTable-KTable, KStream-KTable Join을 수행할 때, 같은 키를 가진 레코드가 같은 스트림 태스크(stream task)에서 처리되도록 보장해야 합니다.
이미 DefaultPartitioner는 아래와 같은 규칙을 가지는 것을 알고 있습니다.
partition = hash(key) % numPartitions
동일한 메시지 키를 가진 데이터가 동일한 파티션에 들어갈 수 있도록 보장해야 합니다.
- 위에 규칙에 동일한 partition이 나오도록 파티션 개수와 key가 동일해야 한다.
- 파티션 개수가 같더라도 Partition Assignment Strategy가 다른 경우, 다른 파티션에 도달할 수 있기 때문에 동일한 테스크에 도달하는 것을 보장할 수 없다.
즉, 파티션 개수가 다르거나 Partition Assignment Strategy가 다른 경우에도 Join에 실패할 수 있습니다.
다음 두 경우에는 co-partitioning이 필요하지 않습니다. 😎
✅ GlobalKTable Join
- GlobalKTable은 모든 Kafka Streams 인스턴스에 전체 데이터를 복제하므로, 모든 인스턴스가 데이터를 조회할 수 있다.
- 따라서 같은 파티션에 있을 필요가 없고, 다른 파티션에 있는 데이터를 Join하는 것도 가능하기 때문이다.
✅ KTable-KTable Foreign-Key Join
- Kafka Streams 내부적으로 Foreign-Key Join을 위해 자동으로 co-partitioning을 보장해 주기 때문에 가능하다.
👊🏻 해결 방법
따라서 GlobalKTable을 사용하거나 파티션 정책이 동일한 토픽으로 리파티셔닝을 진행하고 난 뒤 Join을 해서 해결할 수 있습니다.
- 리파티셔닝은 join 하고자 하는 토픽을 생성해서 파티션 개수를 맞춰주는 작업으로 새로 토픽을 만들어야 하고 중복되서 데이터가 추가된다는 문제가 있습니다.
GlobalKTable
GlobalKTable은 KTable과 유사하지만, 클러스터 내의 모든 인스턴스에 동일한 데이터가 복제되어 유지되는 테이블이다.
- 즉, GlobalKTable에 데이터가 많으면 stream 파티션에 있는 모든 데이터에 대해서 조인하기 때문에 부하가 많아지게 된다.
- 데이터가 많지 않다면 GlobalTable로 활용하는것도 좋은 방법이다.
KTable과 동일하게 메시지 키를 기준으로 처리하지만 GlobalKTable로 선언된 토픽은 모든 파티션 데이터가 각 태스크에 할당되어 사용되게 됩니다.
- KTable로 선언된 토픽은 1개 파티션은 1개 태스크에 할당
GlobalKTable은 모든 애플리케이션 인스턴스에서 동일한 데이터를 사용하는 상황에서 주로 사용될 수 있다.
- 모든 인스턴스가 참조할 수 있는 공통된 참조 데이터나 정적 데이터를 저장하는 데 유용하다.
코드 예제
기존 Ktable ➡️ GlobalKTable로 변경한 뒤 Record를 어떤 기준으로 조인할지 정해주시면 됩니다. 😎
@Slf4j
public class SimpleJoinExam {
public static void main(String[] args) throws InterruptedException {
Properties props = Constants.getProperties();
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> search = builder.stream("user-search-keywords",
Consumed.with(Serdes.String(), Serdes.String()));
// KTable<String, String> interests = builder.table("user-interests", Consumed.with(Serdes.String(), Serdes.String()));
// KTable -> GlobalKTable로 설정
GlobalKTable<String, String> globalUserInterests = builder.globalTable("user-interests",
Consumed.with(Serdes.String(), Serdes.String()));
KStream<String, String> joinTable = search.leftJoin(globalUserInterests,
(key, value) -> key, // 추가: 어떤 데이터를 기준으로 조인할 것인지 (여기서는 key)
(stream, table) -> {
log.info("[MyTest] stream={} table={}", stream, table);
if (table != null) {
return String.format("%s-%s", table, stream);
} else {
return "fail";
}
});
joinTable.to("result", Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.cleanUp();
streams.start();
}
}
- output topic을 확인해보면 정상적으로 동작하는것을 확인할 수 있습니다.
📚 Reference
Apache Kafka Streams Support :: Spring Kafka
Starting with version 3.2, Spring for Apache Kafka provides basic facilities required for interactive queries in Kafka Streams. Interactive queries are useful in stateful Kafka Streams applications since they provide a way to constantly query the stateful
docs.spring.io
Real-Time Data Enrichment with Kafka Streams: Introducing Foreign-Key Joins
Foreign-key joins allow for data enrichment from multiple sources using joins, moving the incremental materialized view maintenance workload outside of single-node databases into a scalable distributed system by making use of streaming data.
www.confluent.io
'개발' 카테고리의 다른 글
Spring에서 @Async 사용하기 (0) | 2024.06.17 |
---|---|
java에서 이미지 작업 및 최적화하기 (0) | 2024.06.11 |
도메인 주도 설계의 사실과 오해 (0) | 2024.04.29 |
Getter 없이 Test해보기 (1) | 2023.11.24 |
EnumMap 적용하기! (1) | 2023.11.08 |