AdminClient 개요
AdminClinet.createTopics메서드는 토픽이 생성될 때 까지 기다리거나, 생성 상태를 확인하거나 토픽이 생성된 뒤 토픽 설정을 가져올 수 있다.
- 카프카의 각 메서드 요청을 컨트롤러로 전송한 뒤 바로 1개 이상의 Future 객체를 리턴한다.
카프카 컨트롤러부터 브로커로의 메타데이터 전파는 비동기적으로 이루어지기에 AdminClient가 전달하는 Future 객체들은 컨트롤러의 상태가 완전히 업데이트 된 시점에서 완료된 것으로 간주한다.
즉, 모든 브로커가 전부 다 새로운 상태에 대해 알고 있지 못할 수 있다.
- 토픽에 최신 상태를 전달 받지 않은 브로커에 의해 처리될 수 있다.
- 일관성을 지키기 위해 최종적으로 모든 브로커는 모든 토픽에 대해 알게 되어 동기화 처리가 되겠지만 언제 동기화가 될지는 보장할 수 없다.
옵션
AdminClient의 각 메서드는 메서드별로 특정한 Options 객체를 인수로 받는다.
- 이 객체들은 브로커가 요청을 어떻게 처리할지에 대해 서로 다른 설정을 담는다.
- 다른 옵션에는 listTopics()가 내부 토픽을 리턴할지, describeCluster()가 클라이언트가 실행할 수 있는 권한을 가진 작업을 리턴할지의 여부 같은 것들이 있다.
모든 AdminClient 메서드는 timeoutMs 매개변수를 가지는데 클라이언트가 TimeoutException을 발생시키기 전, 클러스터로부터의 응답을 기다리는 시간을 조정한다.
수평 구조
모든 어드민 작업은 kafkaAdminClient에 구현되어 있는 아파치 카프카 프로토콜을 사용해서 이루어지며 객체 간의 의존 관계나 네임스페이스가 없고 꽤 크기가 큰 인터페이스로 되어있다.
추가 참고 사항
클러스터의 상태를 변경하는 모든 작업(create, delete, alter)은 컨트롤러에 의해 수행된다.
클러스터 상태를 읽기만 하는 작업(list, describe)는 아무 브로커에서나 수행될 수 있으며 클라이언트 입장에서 부하가 가장 적어 보이는 브로커로 전달된다.
- 뭔가 예상치 못한 결과가 나왔을 때 염두하자
주키퍼에 저장되어 있는 메타데이터를 직접 수정하는 방식은 절대 안된다.
- 주키퍼를 직접 수정하는 방식으로 어드민 작업을 수행하는 모든 애플리케이션은 수정되어야 한다.
- AdminClient API는 카프카 클러스터 내부가 바뀌어도 그대로 남아있다.
아파치 카프카의 새로운 메커니즘인 KRaft를 사용하면 메타데이터를 효율적으로 관리할 수 있다.
- KRaft는 주키퍼의 의존성을 제거하고, 카프카 클러스터 내 컨트롤러가 선출된 후 메타데이터를 직접 관리한다.
- 이로 인해 성능과 안정성이 향상되며, 유지보수가 단순화되고, 병목현상이 줄어든다.
Admin Client 사용법: 생성, 설정, 닫기
AdminClient 객체를 생성하는 방법
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(props);
// TODO: AdminClient를 사용해서 필요한 작업을 수행한다.
admin.close(Duration.ofSeconds(30));
create() 메서드
- 반드시 있어야 하는 설정으로 클러스터에 대한 URI (연결한 브로커 목록을 쉼표로 구분한 목록) 하나다.
- 프로덕션 환경에서는 브로커 중 하나에 장애가 발생할 경우를 대비하여 최소한 3개 이상의 브로커를 지정하는 것이 보통이다.
close() 메서드
- close() 를 호출할 때는 아직 진행 중인 작업이 있을 수 있으므로 타임아웃 매개변수를 받는다.
- close() 를 호출하면 다른 메서드를 호출해서 요청을 보낼 수는 없지만, 클라이언트는 타임아웃이 만료될 때까지 응답을 기다린다.
- 타임아웃이 발생하면 클라이언트는 모든 진행 중인 작업을 멈추고 모든 자원을 해제한다.
- 타임아웃 없이 close() 를 호출하면 얼마가 되었든 모든 진행 중인 작업이 완료될 때까지 대기하게 된다.
Admin Client 에서 중요한 설정 2가지
1. client.dns.lookup
기본적으로 카프카는 부트스트랩 서버 설정에 포함된 호스트명을 기준으로 연결을 검증, 해석, 생성한다.
- 브로커로부터 호스트 정보를 받은 뒤부터는 advertised.listeners 설정에 있는 호스트명을 기준으로 연결한다.
이 단순한 모델은 대부분 제대로 작동하지만 두 가지 맹점이 있다.
- DNS 별칭을 사용할 경우 (특히 부트스트랩 설정에서)
- 2개 이상의 IP 주소로 연결되는 하나의 DNS 항목을 사용할 경우
이 둘은 비슷해보이지만 약간 다른데 동시에 발생할 수 없는 시나리오를 살펴보자
1-1 DNS 별칭을 사용할 경우
broker1.hostname.com, broker2.hostname.com … 등의 브로커들을 가지고 있을 때 이 모든 브로커들을 부트스트랩 서버 설정에 일일이 지정하는 것보다 이 모든 브로커 전체를 가리킬 하나의 DNS 별칭을 만드는 것이 유지 관리하기 쉽다.
어떤 브로커가 클라이언트와 처음으로 연결될지는 그리 중요하지 않기 때문에 부트스트래핑을 위해 all-brokers.hostname.com(as) 을 사용할 수 있다.
- SASL (Simple Authentication and Security Layer) 을 사용하여 인증을 하려고 할 때는 문제가 생긴다.
- SASL 을 사용할 경우 클라이언트는 all-brokers.hostname 에 대해 인증을 하려고 하는데, 서버의 보안 주체 (principal) 는 broker2.hostname.com 이기 때문이다.
만일 호스트명이 일치하지 않을 경우 악의적인 공격일 수도 있기 때문에 SASL 은 인증을 거부하고 연결도 실패한다.
- 이 때는 client.dns.lookup=resolve_canonical_bootstrap_servers_only 설정을 해주면 된다.
- 이 설정이 되어있는 경우 클라이언트는 DNS alias 에 포함된 모든 브로커 이름을 일일이 부트스트랩 서버 목록에 넣어준 것과 동일하게 작동한다.
1-2 다수의 IP 주소로 연결되는 DNS 이름을 사용하는 경우
네트워크 아키텍처에서 모든 브로커를 프록시나 로드 밸런서 뒤로 숨기는 것은 매우 흔하며, 외부로부터 연결을 허용하기 위해 로드 밸런서를 두어야 하는 쿠버네티스를 사용하는 경우는 특히나 더 그렇다.
이 경우, 로드 밸런서가 단일 장애점이 되는 것을 원치 않아 broker1.hostname.com을 여러 개의 IP 로 연결하는 것은 매우 흔한 일이다.
- 이 IP 주소들은 모두 로드 밸런서로 연결되고 따라서 모든 트래픽이 동일한 브로커로 전달된다,
- 이 IP 주소들은 시간이 지남에 따라 변경될 수도 있다.
기본적으로 카프카 클라이언트는 해석된 첫 번째 호스트명으로 연결을 시도할 뿐이다.
- 해석된 IP 주소가 사용 불가능일 경우 브로커가 정상임에도 불구하고 클라이언트는 연결에 실패할 수도 있다.
- client.dns.lookup=use_all_dns_ips 로 잡아서 클라이언트가 로드 밸런싱 계층의 고가용성을 충분히 활용할 수 있도록 하자. (강력히 권장)
2. request.timeout.ms (기본값 2m)
request.timeout.ms 는 애플리케이션이 AdminClient 의 응답을 기다릴 수 있는 최대 시간이다.
- 여기엔 클라이언트가 재시도가 가능한 에러를 받고 재시도하는 시간도 포함된다.
- 기본값이 꽤 길지만, 컨슈머 그룹 관리 기능 같은 몇몇 AdminClient 작업은 응답에 꽤 시간이 걸릴 수도 있다.
각각의 AdminClient 메서드는 해당 메서드에만 해당하는 타임아웃 값을 포함하는 Options 객체를 받는다.
만일 애플리케이션에서 AdminClient 작업이 주요 경로 (critical path) 상에 있을 경우, 타임아웃 값을 낮게 잡아준 뒤 제 시간에 리턴되지 않는 응답은 조금 다른 방식으로 처리해야 할 수도 있다.
- 서비스가 시작될 때 특정한 토픽이 존재하는지 확인 작업
- 브로커가 응답하는데 30s 이상 걸릴 경우, 확인 작업을 건너뛰거나 일단 서버 기동을 계속한 뒤 나중에 토픽의 존재 확인 작업
필수적인 토픽 관리 기능
AdminClient 의 가장 흔한 활용 사례는 토픽 목록 조회, 상세 내역 조회, 생성 및 삭제 등의 토픽 관리이다.
클러스터에 있는 토픽 목록 조회
AdminClient adminClient = AdminClient.create(props);
// Future 객체들을 감싸고 있는 ListTopicsResult 객체를 리턴한다.
ListTopicsResult topics = adminClient.listTopics();
// 토픽 이름 집합에 대한 Future 객체를 리턴
topics.names().get().forEach(System.out::println);
admin.close(Duration.ofSeconds(30));
- Future 객체에서 get() 메서드를 호출하면 실행 스레드는 서버가 토픽 이름 집합을 리턴할 때까지 기다리거나 타임아웃 예외를 발생시킨다.
특정 토픽 처리하기
특정 토픽이 존재하는지 확인하는 방법 중 하나는 모든 토픽의 목록을 받은 후 원하는 토픽이 그 안에 있는지 확인하는 것이다.
- 하지만 큰 클러스터에서 이 방법은 비효율적일수 있고, 때로는 단순히 토픽의 존재 여부 뿐 아니라 해당 토픽이 필요한 만큼의 파티션과 레플리카키를 갖고 있는지 확인하는 등 그 이상의 정보가 필요할 수도 있다.
아래에 예시는 먼저 토픽 리스트 조회하고 이후 아래 동작을 수행하게 된다.
- 해당 토픽 리스트에서 특정 토픽이 있는지 조회
- 해당 토픽이 존재한다면 의도한 파티션 수를 갖고 있는지 확인
- 해당 토픽이 존재하지 않으면 새로운 토픽 생성
- 새로운 토픽 생성 후 의도한 파티션 수를 갖고 있는지 확인
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000); // optional
props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 1000); // optional
AdminClient adminClient = AdminClient.create(props);
/**
* 정확한 설정을 갖는 토픽이 존재하는지 확인하려면 확인하려는 토픽의 목록을 인자로 넣어서 describeTopics() 메서드 호출
* 리턴되는 DescribeTopicsResult 객체 안에는 토픽 이름을 key 로, 토픽에 대한 상세 정보를 담는 Future 객체를 value 로 하는 맵이 들어있다.
*/
DescribeTopicsResult sampleTopic = adminClient.describeTopics(TOPIC_LIST); // ["sample-topic"]
try {
/**
* 만일 토픽이 존재하지 않으면 서버가 상세 정보를 보내줄 수 없는데 Future 는 ExecutionException 을 발생시킨다.
* 예외의 cause 에 들어있는 것이 서버가 실제 리턴한 실제 에러다.
* 존재하지 않으면 토픽을 생성할 것이기 때문에 이 예외를 처리해주어야 한다.
*/
TopicDescription topicDescription = sampleTopic.topicNameValues().get(TOPIC_NAME).get();
/**
* 토픽이 존재하는 경우 토픽에 속한 모든 파티션의 목록을 담은 TopicDescription을 리턴
* 파티션별로 어느 브로커가 리더이고, 어디에 레플리카가 있고, in-sync replica 가 무엇인지까지 포함한다.
*/
log.info("Description of sample topic: {}", topicDescription);
if (topicDescription.partitions().size() != NUM_PARTITIONS) {
log.error("Topic has wrong number of partitions: {}", topicDescription.partitions().size());
System.exit(1);
}
} catch (ExecutionException e) {
// 토픽이 존재하지 않은 경우가 아닌 모든 예외에 대해 바로 종료
if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
log.error(e.getMessage());
throw e;
}
log.info("Topic {} does not exist. Going to create it now.", TOPIC_NAME);
/**
* 토픽 생성 시 토픽의 이름만 필수이고, 파티션 수와 레플리카수는 선택사항이다.
* 이 값들을 지정하지 않으면 카프카 브로커에 설정된 기본값이 사용된다.
*
* 잘못된 수의 파티션으로 토픽으로 생성되었는지 확인하려면
* <pre>
* CreateTopicsResult newWrongTopic =
* adminClient.createTopics(
* List.of(new NewTopic(TOPIC_NAME, Optional.empty(), Optional.empty())));
* </pre>
*/
CreateTopicsResult newTopic =
adminClient.createTopics(
List.of(new NewTopic(TOPIC_NAME, NUM_PARTITIONS, REPLICATION_FACTOR)));
/**
* CreateTopic 의 결과물을 확인하기 위해 get() 을 다시 호출하고 있기 때문에 이 메서드가 예외를 발생시킬 수 있으며,
* 이 경우 TopicExistsException 이 발생한다.
* 보통은 설정을 확인하기 위해 토픽 상세 내역을 조회함으로써 처리한다.
*/
if (newTopic.numPartitions(TOPIC_NAME).get() != NUM_PARTITIONS) {
log.error("Topic was created with wrong number of partitions. Exiting.");
System.exit(1);
}
}
토픽 삭제
삭제할 토픽 리스트를 인자로 하여 deleteTopics() 메서드를 호출한 뒤 get() 을 호출해서 작업이 끝날 때까지 기다린다.
- 카프카에서 삭제된 토픽은 복구가 불가능하기 때문에 데이터 유실이 발생할 수 있으므로 토픽을 삭제할 때는 특별히 주의해야 한다.
AdminClient adminClient = AdminClient.create(props);
// 토픽 삭제
adminClient.deleteTopics(TOPIC_LIST).all().get();
try {
// 삭제 작업이 비동기적으로 이루어지므로 이 시점에서 토픽이 여전히 남아있을 수 있다.
sampleTopic.topicNameValues().get(TOPIC_NAME).get();
log.info("Topic {} is still around.");
} catch (ExecutionException e) {
log.info("Topic {} is gone.", TOPIC_NAME);
}
get() 말고 비동기적으로 토픽의 상세 정보 조회하기
어드민 작업은 드물고, 작업이 성공하거나 타임아웃이 날 때까지 기다리는 것 또한 대체로 받아들일만 하기 때문에 get() 메서드를 이용해도 된다.
하지만 예외 케이스가 하나 있는데 바로 많은 어드민 요청을 처리할 것으로 예상되는 서버를 개발하는 경우이다.
- 카프카가 응답할 때까지 기다리는 동안 서버 스레드가 블록되는 것보다는, 사용자로부터 계속해서 요청을 받고, 카프카로 요청을 보내고, 카프카가 응답하면 그 때 클라이언트로 응답을 보내는 것이 더 합리적이다.
- 이럴 때 KafkaFuture의 융통성은 매우 도움이 된다.
// 서버 생성
Vertx vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(1));
HttpServerOptions options = new HttpServerOptions().setLogActivity(true);
vertx
// Vertx 를 사용하여 간단한 HTTP 서버 생성
.createHttpServer(options)
// 이 서버는 요청을 받을 때마다 requestHandler 호출한다.
.requestHandler(
req -> {
// 요청은 매개 변수로 토픽 이름을 보내고, 응답은 토픽의 상세 설정을 내보냄
String topic = req.getParam("topic");
String timeout = req.getParam("timeout");
int timeoutMs = NumberUtils.toInt(timeout, 1000); // timeout 이 없으면 디폴트 1000ms
// AdminClient.describeTopics() 를 호출하여 응답에 들어있는 Future 객체를 받아옴
DescribeTopicsResult demoTopic =
adminClient.describeTopics(
List.of(topic), new DescribeTopicsOptions().timeoutMs(timeoutMs));
demoTopic
.topicNameValues() // Map<String, KafkaFuture<TopicDescription>> 반환
.get(topic) // KafkaFuture<TopicDescription> 반환
// 호출 시 블록되는 get() 을 호출하는 대신 Future 객체의 작업이 완료되면 호출될 함수 생성
.whenComplete(
new KafkaFuture.BiConsumer<TopicDescription, Throwable>() {
@Override
public void accept(TopicDescription topicDescription, Throwable throwable) {
if (throwable != null) {
log.info("got exception");
// Future 가 예외를 발생시키면서 완료될 경우 HTTP 클라이언트에 에러를 전송
req.response()
.end(
"Error trying to describe topic "
+ topic
+ " due to "
+ throwable.getMessage());
} else {
// Future 가 성공적으로 완료될 경우 HTTP 클라이언트에 토픽의 상세 정보를 전송
req.response().end(topicDescription.toString());
}
}
});
})
.listen(8080);
호출 시 블록되는 get() 을 호출하는 대신 Future객체의 작업이 완료되면 호출되는 whenComplete()를 호출하므로 카프카로부터의 응답을 기다리지 않는다.
- 카프카로부터 응답이 도착하면 DescribeTopicResult 가 HTTP 클라이언트에게 응답을 보내고 그 사이에 HTTP 서버는 다른 요청을 처리할 수 있다.
설정 관리
설정 관리는 ConfigResource 객체를 사용해서 할 수 있다.
- 브로커, 브로커 로그, 토픽
브로커와 브로커 로깅 설정은 kafka-configs.sh 혹은 다른 카프카 관리 툴을 사용하는 것이 보통이지만, 애플리케이션에서 사용하는 토픽의 설정을 확인하거나 수정하는 것은 상당히 빈번하다.
많은 애플리케이션들은 정확한 작동을 위해 압착 설정이 된 토픽을 사용하는데, 애플리케이션이 주기적으로 해당 토픽에 실제로 압착 설정이 되어있는지를 확인한 후, 설정이 안되어 있다면 설정을 교정해주는 것이 합리적이다.
- 보존 기한 기본값보다 짧은 주기로 하는 것이 안전하다.
// ======= 토픽 압착(compacted) 설정 확인 및 교정
// 특정한 토픽의 설정 확인
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
// describeConfig() 의 결과물인 DescribeConfigsResult 는 ConfigResource 를 key 로, 설정값의 모음을 value 로 갖는 맵
DescribeConfigsResult configsResult = adminClient.describeConfigs(List.of(configResource));
Config configs = configsResult.all().get().get(configResource);
/**
* 토픽 설정이 기본값이 아닌 것으로 취급되는 경우는 2 가지 경우가 있다.
* 1. 사용자가 토픽의 설정값을 기본값이 아닌 것으로 잡아준 경우
* 2. 브로커 단위 설정이 수정된 상태에서 토픽이 생성되어 기본값이 아닌 값을 브로커 설정으로부터 상속받은 경우
*/
configs.entries().stream()
// 각 설정 항목은 해당 값이 기본값에서 변경되었는지 확인할 수 있게 해주는 isDefault() 메서드를 가짐
.filter(entry -> !entry.isDefault())
.forEach(System.out::println);
// 설정의 이름과 설정값. 여기서는 cleanup.policy 가 설정 이름이고, compact 가 설정값이다.
ConfigEntry compaction =
new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT);
// 토픽에 압착 (compacted) 설정이 되어있는지 확인하고 설정이 되어있지 않을 경우 압착 설정해 준다.
if (!configs.entries().contains(compaction)) {
Collection<AlterConfigOp> configOp = new ArrayList<>();
/**
* 각각의 설정 변경 작업은 설정 항목과 작업 유형으로 이루어지며 카프카에서는 4가지 형태의 설정 변경이 가능하다.
* 설정값을 잡아주는 SET, 현재 설정값을 삭제하고 기본값으로 되돌리는 DELETE, 그리고 APPEND, SUBSTRACT
* APPEND 와 SUBSTRACT 는 목록 형태의 설정에만 사용 가능하며, 이걸 사용하면 전체 목록을 주고받을 필요없이 필요한 설정만 추가/삭제 가능하다.
*/
configOp.add(new AlterConfigOp(compaction, AlterConfigOp.OpType.SET));
// 설정을 변경하려면 변경하고자 하는 ConfigResource 를 key 로, 바꾸고자 하는 설정값 모음을 value 로 하는 맵을 지정함
Map<ConfigResource, Collection<AlterConfigOp>> alterConfigs = new HashMap<>();
alterConfigs.put(configResource, configOp);
adminClient.incrementalAlterConfigs(alterConfigs).all().get();
} else {
log.info("Topic {} is compacted topic.", TOPIC_NAME);
}
}
🥹 상세한 설정값을 얻어오는 것은 비상 상황에서 큰 도움이 된다.
업그레이드를 하다가 실수로 브로커 설정 파일이 깨졌을 때, 첫 번째 브로커의 재시작 실패를 보고 깨지기 전 원본 파일을 복구할 방법이 없어 난감할 수 있다.
- 이 때 AdminClient 를 사용하여 아직 남아있던 브로커 중 하나의 설정값을 통째로 덤프를 떠서 설정 파일을 복구할 수 있다.
컨슈머 그룹 관리
seekToBeginning(), seekToEnd(), seek(), assignment(), offsetsForTimes() 컨슈머 API를 사용하여 처리 지점을 되돌려서 오래된 메시지를 다시 토픽으로부터 읽어올 수 있다.
AdminClient 를 사용하여 프로그램적으로 컨슈머 그룹과 이 그룹들이 커밋한 오프셋을 조회,수정하는 방법을 알아보자
컨슈머 그룹 목록 조회 / 특정 그룹 상세 조회
컨슈머 그룹의 목록 조회
AdminClient adminClient = AdminClient.create(props);
adminClient.listConsumerGroups().valid().get().forEach(System.out::println);
주의할 점은 valid() 와 get() 메서드를 호출함으로써 리턴되는 모음은 클러스터가 에러없이 리턴한 컨슈머 그룹만을 포함한다
- 이 과정에서 발생한 에러가 예외의 형태로 발생하지는 않는데, errors() 메서드를 사용하여 모든 예외를 가져올 수 있다.
- 만일 valid() 가 아닌 all() 메서드를 호출하면 클러스터가 리턴한 에러 중 맨 첫 번째 것만 예외 형태로 발생한다.
🤔 특정 그룹 상세 조회
final String CONSUMER_GROUP = "ConsumerGroup";
final List<String> CONSUMER_GROUP_LIST = List.of(CONSUMER_GROUP);
AdminClient adminClient = AdminClient.create(props);
ConsumerGroupDescription groupDescription =
adminClient
.describeConsumerGroups(CONSUMER_GROUP_LIST) // DescribeConsumerGroupsResult 반환
.describedGroups() // Map<String, KafkaFuture<ConsumerGroupDescription>> 반환
.get(CONSUMER_GROUP) // KafkaFuture<ConsumerGroupDescription> 반환
.get();
log.info("Description of Consumer group: {} - {}", CONSUMER_GROUP, groupDescription);
ConsumerGroupDescription 는 해당 그룹에 대한 상세한 정보를 가진다.
- 그룹 멤버와 멤버별 식별자
- 호스트명
- 멤버별로 할당된 파티션
- 할당 알고리즘
- 그룹 코디네이터의 호스트명
하지만 컨슈머 그룹이 읽고 있는 각 파티션에 대해 마지막으로 커밋된 오프셋 값이 무엇인지, 최신 메시지에서 얼마나 뒤떨어졌는지(lag) 에 대한 정보는 없다.
- 예전에는 커밋 정보를 얻어오는 유일한 방법으로 컨슈머 그룹이 카프카 내부 토픽에 쓴 커밋 메시지를 가져와서 파싱하는 것 뿐이었다.
- 카프카가 내부 메시지 형식의 호환성 같은 것에 대해 아무런 보장을 하지 않기 때문에 권장되지 않는다.
커밋 정보 얻어오기
AdminClient adminClient = AdminClient.create(props);
// 컨슈머 그룹이 사용 중인 모든 토픽 파티션을 key 로, 각각의 토픽 파티션에 대해 마지막으로 커밋된 오프셋을 value인 Map
Map<TopicPartition, OffsetAndMetadata> offsets =
adminClient.listConsumerGroupOffsets(CONSUMER_GROUP).partitionsToOffsetAndMetadata().get();
Map<TopicPartition, OffsetSpec> reqLatestOffsets = new HashMap<>();
Map<TopicPartition, OffsetSpec> reqEarliestOffsets = new HashMap<>();
Map<TopicPartition, OffsetSpec> reqOlderOffsets = new HashMap<>();
Instant resetTo = ZonedDateTime.now(ZoneId.of("Asia/Seoul")).minusHours(2).toInstant();
// 컨슈머 그룹에서 커밋한 토픽의 모든 파티션에 대해 최신 오프셋, 가장 오래된 오프셋, 2시간 전의 오프셋 조회
for (TopicPartition tp : offsets.keySet()) {
reqLatestOffsets.put(tp, OffsetSpec.latest());
reqEarliestOffsets.put(tp, OffsetSpec.earliest());
reqOlderOffsets.put(tp, OffsetSpec.forTimestamp(resetTo.toEpochMilli()));
}
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestOffsets =
adminClient.listOffsets(reqLatestOffsets).all().get();
// Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> earliestOffsets =
// adminClient.listOffsets(reqEarliestOffsets).all().get();
// Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> orderOffsets =
// adminClient.listOffsets(reqOlderOffsets).all().get();
// 모든 파티션을 반복해서 각각의 파티션에 대해 마지막으로 커밋된 오프셋, 파티션의 마지막 오프셋, 둘 사이의 lag 출력
for (Map.Entry<TopicPartition, OffsetAndMetadata> e : offsets.entrySet()) {
String topic = e.getKey().topic();
int partition = e.getKey().partition();
long committedOffset = e.getValue().offset();
long latestOffset = latestOffsets.get(e.getKey()).offset();
log.info(
"Consumer group {} has committed offset {} to topic {}, partition {}.",
CONSUMER_GROUP,
committedOffset,
topic,
partition);
log.info(
"The latest offset in the partition is {} to consumer group is {} records behind.",
latestOffset,
(latestOffset - committedOffset));
}
- describeConsumerGroups()와 달리 listConsumerGroupOffsets()은 컨슈머 그룹의 모음이 아닌 하나의 컨슈머 그룹을 받는다.
🤔 컨슈머 그룹에서 커밋한 모든 토픽의 파티션에 대해 오프셋 조회하는 방법 (OffsetSpec 정적 메서드)
- latest(): 해당 파티션의 마지막 레코드의 오프셋 조회
- earliest(): 해당 파티션의 첫 번째 레코드의 오프셋 조회
- forTimestamp(): 해당 파티션의 지정된 시각 이후에 쓰여진 레코드의 오프셋 조회
컨슈머 그룹 수정하기
AdminClient는 컨슈머 그룹을 수정하기 위한 메서드들을 가지고 있다.
- 그룹 삭제, 멤버 제외, 커밋된 오프셋 삭제 혹은 변경
- 비상 상황에서 복구를 위한 툴을 제작할 때 자주 사용된다.
오프셋 삭제는 컨슈머를 맨 처음부터 실행시키는 가장 간단한 방법으로 보일수도 있지만, 이것은 컨슈머 설정에 의존한다.
- 컨슈머가 시작되었는데 커밋된 오프셋을 못 찾을 경우 어떻게 동작할지는 auto.offset.reset 설정값을 갖고 있지 않는 한 알 수 없다.
⚠️ 주의점
1️⃣ 오프셋 토픽의 오프셋 값을 변경한다고 해도 컨슈머 그룹에 변경 여부는 전달되지 않는다.
- 컨슈머 그룹은 컨슈머가 새로운 파티션을 할당받거나, 새로 시작할 때만 오프셋 토픽에 저장된 값을 읽어올 뿐이다.
- 컨슈머가 모르는 오프셋 변경을 방지하지 위해 카프카에서는 현재 작업이 돌아가고 있는 컨슈머 그룹에 대한 오프셋을 수정하는 것을 허용하지 않는다.
2️⃣ 대부분의 스트림 처리 애플리케이션에서는 상태를 가지고 있는 컨슈머 애플리케이션에서 오프셋을 리셋하고, 해당 컨슈머 그룹이 토픽의 맨 처음부터 처리를 시작하도록 할 경우 저장된 상태가 깨질 수 있다.
예를 들어 한 상점 스트림 애플리케이션에 문제가 있다는 것을 오전 8시에 발견해서 오전 3시부터 다시 판매 내역을 집계해야 한다면?
- 만일 저장된 집계값을 변경하지 않고 오프셋만 오전 3시로 되돌린다면 오전 3시~8시까지의 판매 내역은 두 번씩 계산되게 된다.
- 이런 이유로 상태 저장소를 적절히 변경해 줄 필요가 있다.
AdminClient adminClient = AdminClient.create(props);
// ======= 컨슈머 그룹에서 오프셋 커밋 정보 조회
Map<TopicPartition, OffsetAndMetadata> offsets =
adminClient.listConsumerGroupOffsets(CONSUMER_GROUP).partitionsToOffsetAndMetadata().get();
Map<TopicPartition, OffsetSpec> reqLatestOffsets = new HashMap<>();
Map<TopicPartition, OffsetSpec> reqEarliestOffsets = new HashMap<>();
Map<TopicPartition, OffsetSpec> reqOlderOffsets = new HashMap<>();
Instant resetTo = ZonedDateTime.now(ZoneId.of("Asia/Seoul")).minusHours(5).toInstant();
for (TopicPartition tp : offsets.keySet()) {
reqLatestOffsets.put(tp, OffsetSpec.latest());
reqEarliestOffsets.put(tp, OffsetSpec.earliest());
reqOlderOffsets.put(tp, OffsetSpec.forTimestamp(resetTo.toEpochMilli()));
}
/**
* 컨슈머 그룹을 특정 오프셋으로 리셋
* 맨 앞 오프셋부터 처리하도록 컨슈머 그룹을 리셋하기 위해 토픽의 맨 앞 오프셋 값 조회
* reqOlderOffsets 를 사용하면 5시간(8-5=3시 장애 point) 전으로 오프셋 리셋 가능
*/
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> earliestOffsets =
adminClient.listOffsets(reqEarliestOffsets).all().get();
/**
* listOffsets() 가 리턴한 ListOffsetsResultInfo 의 맵 객체 변환
* alterConsumerGroupOffsets()를 호출하기 위해 OffsetAndMetadata의 맵 객체로 변환
*/
Map<TopicPartition, OffsetAndMetadata> resetOffsets = new HashMap<>();
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> e :
earliestOffsets.entrySet()) {
log.info("Will reset topic-partition: {}, to offset: {}", e.getKey(), e.getValue().offset());
resetOffsets.put(e.getKey(), new OffsetAndMetadata(e.getValue().offset()));
}
try {
// alterConsumerGroupOffsets()를 호출한 뒤 get() 을 호출하여 Future 객체가 작업을 성공적으로 완료할 때까지 기다린다.
adminClient.alterConsumerGroupOffsets(CONSUMER_GROUP, resetOffsets).all().get();
} catch (ExecutionException e) {
log.error(
"Failed to update the offsets committed by group: {}, with error: {}",
CONSUMER_GROUP,
e.getMessage());
/**
* alterConsumerGroupOffsets()가 실패하는 가장 흔한 이유 중 하나는 컨슈머 그룹을 미리 정지시키지 않아서다.
* 이때는 특정 컨슈머 그룹을 정지시키는 어드민 명령은 없기 때문에 컨슈머 애플리케이션을 정지시키는 것 외에는 방법이 없다.
*
* 만일 컨슈머 그룹이 여전히 돌아가고 있는 중이라면, 컨슈머 코디네이터 입장에서는 컨슈머 그룹에 대한 오프셋 변경 시도를
* 그룹의 멤버가 아닌 클라이언트가 오프셋을 커밋하려는 것으로 간주하므로 UnknownMemberIdException이 발생된다.
*/
if (e.getCause() instanceof UnknownMemberIdException) {
log.error("Check if consumer group is still active.");
}
}
클러스터 메타데이터
애플리케이션이 연결된 클러스터에 대한 정보를 명시적으로 읽어와야 하는 경우는 드물다.
- 클라이언트는 얼마나 많은 브로커가 있고, 어느 브로커가 컨트롤러인지 알 필요없이 토픽과 파티션에 대한 정보만 알면 메시지를 읽거나 쓸 수 있기 때문이다.
하지만 궁금하다면 아래처럼 클러스터 id, 클러스터 안의 브로커들, 컨트롤러를 조회할 수 있다.
DescribeClusterResult cluster = adminClient.describeCluster();
log.info("Connected to cluster: {}", cluster.clusterId().get());
log.info("The brokers in the cluster are: ");
cluster.nodes().get().forEach(node -> log.info(" * {}", node));
log.info("The controller is {}", cluster.controller().get());
- 클러스터 식별자는 GUID 이기 때문에 사람이 읽을 수 없지만 클라이언트가 정확한 클러스터에 연결되었는지 확인하는 용도로는 여전히 유용하다.
고급 어드민 작업
여기서는 잘 쓰이지도 않고 위험하기도 하지만 필요할 때 사용하면 매우 유용한 메서드 몇 개에 대해 알아보자
이 메서드들은 사고에 대응 중인 SRE 에게 매우 유용할 것이다.
- createPartitions()
- deleteRecords()
- elecLeader()
- alterPartitionReassignments()
토픽에 파티션 추가하기
대체로 토픽의 파티션 수는 토픽이 생성될 때 결정된다.
- 각 파티션은 매우 높은 처리량을 받아낼 수 있기 때문에 파티션 수를 늘리는 것을 드물다.
- 토픽의 메시지들이 키를 갖고 있는 경우 같은 키를 가진 메시지들은 모두 동일한 파티션에 들어가 동일한 컨슈머에 의해 동일한 순서로 처리될 것으로 예상할 수 있다.
현재 파티션이 처리할 수 있는 최대 처리량까지 부하가 차서 파티션 추가 외에는 선택지가 없는 경우도 있다.
- createPartitions() 메서드를 이용하여 지정된 토픽들에 파티션을 추가할 수 있다.
- 토픽에 파티션을 추가해야 한다면 파티션을 추가함으로써 토픽을 읽고 있는 애플리케이션들이 깨지지는 않을지 확인해봐야 한다.
- 여러 토픽을 한 번에 확장할 경우 일부 토픽은 성공하고, 나머지는 실패할 수도 있다는 점을 주의해야 한다.
AdminClient adminClient = AdminClient.create(props);
// 토픽의 파티션을 확장할 때는 새로 추가될 파티션 수가 아닌 파티션이 추가된 뒤의 파티션 수를 지정해야 한다.
newPartitions.put(TOPIC_NAME, NewPartitions.increaseTo(NUM_PARTITIONS+2));
adminClient.createPartitions(newPartitions).all().get();
토픽에서 레코드 삭제하기
카프카는 토픽에 대해 데이터 보존 정책을 설정할 수 있도록 되어있지만, 법적인 요구 조건을 보장할 수 있을 수준의 기능이 구현되어 있지는 않다.
- 토픽에 30일간의 보존 기한이 설정되어 있더라도 파티션 별로 모든 데이터가 하나의 세그먼트에 저장되어 있다면 보존 기한을 넘긴 데이터라도 삭제되지 않을 수 있다.
deleteRecords() 메서드는 호출 시점을 기준으로 지정된 오프셋보다 더 오래된 모든 레코드에 삭제 표시를 함으로써 컨슈머가 접근할 수 없도록 한다.
- deleteRecords() 메서드는 삭제된 레코드의 오프셋 중 가장 큰 값을 리턴하므로 의도한 대로 삭제가 이루어졌는데 확인할 수 있다.
- 이렇게 삭제 표시된 레코드를 디스크에서 실제로 삭제하는 작업은 비동기적으로 일어난다.
Map<TopicPartition, OffsetAndMetadata> offsets =
adminClient.listConsumerGroupOffsets(CONSUMER_GROUP).partitionsToOffsetAndMetadata().get();
Map<TopicPartition, OffsetSpec> reqOrderOffsets = new HashMap<>();
Instant resetTo = ZonedDateTime.now(ZoneId.of("Asia/Seoul")).minusHours(2).toInstant();
for (TopicPartition tp : offsets.keySet()) {
reqOrderOffsets.put(tp, OffsetSpec.forTimestamp(resetTo.toEpochMilli()));
}
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> olderOffsets =
adminClient.listOffsets(reqOrderOffsets).all().get();
Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> e :
olderOffsets.entrySet()) {
recordsToDelete.put(e.getKey(), RecordsToDelete.beforeOffset(e.getValue().offset()));
}
adminClient.deleteRecords(recordsToDelete).all().get();
리더 선출
elecLeader() 메서드는 2 가지 서로 다른 형태의 리더 선출을 할 수 있게 해준다.
1️⃣ 선호 리더 선출 (preferred leader election)
- 각 파티션은 선호 리더 (preferred leader) 라 불리는 레플리카를 하나씩 가진다.
- preferred 라는 이름이 붙은 이유는 모든 파티션이 preferred leader 레플리카를 리더로 삼을 경우 각 브로커마다 할당되는 리더의 개수가 균형을 이룬다.
- 기본적으로 카프카는 5분마다 선호 리더 레플리카가 실제로 리더를 맡고 있는지 확인하여 리더를 맡을 수 있는데도 맡고 있지 않은 경우 해당 레플리카를 리더로 삼는다.
- auto.leader.rebalance.enable 가 false로 설정되어 있거나, 좀 더 빨리 이 과정을 작동시키고 싶을 때 electLeader() 메서드 호출하면 된다.
2️⃣ 언클린 리더 선출 (unclean leader election)
어느 파티션의 리더 레플리카가 사용 불능 상태가 되었는데 다른 레플리카들은 리더를 맡을 수 없는 상황일 경우 해당 파티션은 리더가 없게 되고, 따라서 사용 불능 상태가 된다.
- 보통 데이터가 없어서 그런데 문제를 해결하는 방법 중 하나는 리더가 될 수 없는 레플리카를 그냥 리더로 삼아버리는 언클린 리더 선출을 작동시키는 것이다.
- 예전 리더에 쓰여졌지만 새 리더로 복제되지 않은 모든 이벤트는 유실되므로 데이터 유실을 초래한다.
- 이렇게 언클린 리더 선출을 작동시키기 위해서도 elecLeader() 메서드 사용 가능하다.
Set<TopicPartition> electableTopics = new HashSet<>();
electableTopics.add(new TopicPartition(TOPIC_NAME, 0));
try {
/**
* 특정 토픽의 한 파티션에 대해 선호 리더 선출한다.
* 지정할 수 있는 토픽과 파티션 수에는 제한이 없다.
* null 값을 지정하여 아래 명령어를 실행할 경우 모든 파티션에 대해 지정된 선출 유형 작업을 시작한다.
*/
adminClient.electLeaders(ElectionType.PREFERRED, electableTopics).all().get();
} catch (ExecutionException e) {
/**
* 선호 리더 선출과 언클린 리더 선출은 선호 리더가 아닌 레플리카가 현재 리더를 맡고 있을 경우에만 수행된다.
* 즉, 클러스터 상태가 좋다면 아무런 작업도 일어나지 않는다.
*/
if (e.getCause() instanceof ElectionNotNeededException) {
log.error("All leaders are preferred leaders, no need to do anything.");
}
}
레플리카 재할당
레플리카의 현재 위치를 다른 브로커로 옮겨야 할 경우가 있다.
- 브로커에 너무 많은 레플리카가 올라가 있어서 몇 개를 다른 곳으로 옮겨야 할 경우
- 레플리카를 추가할 경우
- 장비를 내리기 위해 모든 레플리카를 다른 장비로 내보내야 하는 경우
- 몇몇 토픽에 대한 요청이 너무 많아서 나머지에서 따로 분리해야 하는 경우
alterPartitionReassignment() 메서드를 사용하면 파티션에 속한 각각의 레플리카의 위치를 정밀하게 제어할 수 있다.
⚠️ 주의점
레플리카를 하나의 브로커에서 다른 브로커로 재할당하는 일은 브로커 간에 대량의 데이터 복제가 일어난다.
- 사용 가능한 네트워크 대역폭에 주의하고, 필요할 경우 쿼터를 설정하여 복제 작업을 스로틀링 해주는 것이 좋다.
- 쿼터 역시 브로커의 설정이기 때문에 AdminClient 를 사용하여 조회/수정 가능하다.
💻 예제
토픽에는 여러 개의 파티션이 있고 각각의 파티션은 ID가 0인 브로커에 하나의 레플리카를 갖고 있다.
- 새로운 브로커를 추가해 준 다음 이 토픽의 레플리카 일부를 새 브로커에 저장하며 이 토픽의 각 파티션을 약간 다른 방법으로 배치한다.
AdminClient adminClient = AdminClient.create(props);
// ======= 새로운 브로커로 파티션 재할당 (레플리카 재할당)
Map<TopicPartition, Optional<NewPartitionReassignment>> reassignment = new HashMap<>();
// 파티션 0 에 새로운 레플리카 를 추가하고, 새 레플리카를 ID 가 1인 새 브로커에 배치하고 리더는 변경하지 않는다.
reassignment.put(
new TopicPartition(TOPIC_NAME, 0),
Optional.of(new NewPartitionReassignment(Arrays.asList(0, 1))));
/**
* 파티션 1 에는 아무런 레플리카도 추가하지 않고 이미 있던 레플리카를 새 브로커로 옮긴다.
* 레플리카가 하나뿐인만큼 이것이 리더가 된다.
*/
reassignment.put(
new TopicPartition(TOPIC_NAME, 1),
Optional.of(new NewPartitionReassignment(Arrays.asList(0))));
/**
* 파티션 2 에 새로운 레플리카를 추가하고 이것을 선호 리더로 설정한다.
* 다음 선호 리더 선출 시 새로운 브로커에 있는 새로운 레플리카로 리더가 바뀌고 이전 레플리카는 팔로워가 된다.
*/
reassignment.put(
new TopicPartition(TOPIC_NAME, 2),
Optional.of(new NewPartitionReassignment(Arrays.asList(1, 0))));
/**
* 파티션 3 에 대해서는 진행중인 재할당 작업이 없다.
* 하지만 그런게 있다면 작업이 취소되고 재할당 작업이 시작되기 전 상태로 원상복구된다.
*/
reassignment.put(new TopicPartition(TOPIC_NAME, 3), Optional.empty());
try {
adminClient.alterPartitionReassignments(reassignment).all().get();
} catch (ExecutionException e) {
if (e.getCause() instanceof NoReassignmentInProgressException) {
log.error(
"We tried cancelling a reassignment that was not happening anyway. Let's ignore this.");
}
}
// 현재 진행중인 재할당을 보여준다.
log.info(
"Currently reassigning: {}",
adminClient.listPartitionReassignments().reassignments().get());
DescribeTopicsResult sampleTopic = adminClient.describeTopics(TOPIC_LIST);
TopicDescription topicDescription = sampleTopic.topicNameValues().get(TOPIC_NAME).get();
// 새로운 상태를 보여주는데, 일관적인 결과가 보일 때까지는 잠시 시간이 걸릴 수 있다.
log.info("Description of sample topic: {}", topicDescription);
adminClient.close(Duration.ofSeconds(30));
테스트하기
아파치 카프카는 원하는 수만큼의 브로커를 설정해서 초기화할 수 있는 MockAdminClient 테스트 클래스를 제공한다.
- MockAdminClient 클래스를 사용하면 실제 카프카 클러스터를 돌려서 거기에 실제 어드민 작업을 수행할 필요없이 애플리케이션이 정상 작동하는지 확인할 수 있다.
MockAdminClient 클래스의 편리한 점은 자주 사용되는 메서드가 매우 포괄적인 mock-up 기능을 제공한다.
- MockAdminClient 의 토픽 생성 메서드를 호출한 후 listTopics() 를 호출하면 방금 전에 생성한 토픽이 리턴된다.
⚠️ 주의점
MockAdminClient는 카프카 API의 일부가 아닌만큼 언제고 사전 경고없이 변경될 수 있다.
- 공개된 메서드에 대한 mock-up 이기 때문에 메서드 시그니처는 여전히 호환성을 유지하지만, MockAdminClient 를 이용하여 사용중인 API 의 실제 API 가 변경되어서 테스트가 깨질 위험성이 있다.
💻 예제
먼저 MockAdminClient 를 사용하여 테스트하는 방법에 대해 알아보기 위해 AdminClient 를 사용하여 토픽을 생성하는 클래스를 하나 정의한다.
// 토픽 이름이 test 로 시작할 경우 토픽을 생성하는 메서드 (실제 쓸법하지는 않은데 예제임)
public void maybeCreateTopic(String topicName) throws ExecutionException, InterruptedException {
Collection<NewTopic> topics = new ArrayList<>();
// 파티션 1개, 레플리카 1개인 토픽 생성
topics.add(new NewTopic(topicName, 1, (short) 1));
// 토픽이 test로 시작하지 않으면 incrementalAlterConfigs 호출이 되지 않는다.
if (topicName.toLowerCase().startsWith("test")) {
adminClient.createTopics(topics);
// 설정 변경
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
ConfigEntry compaction =
new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
Collection<AlterConfigOp> configOp = new ArrayList<>();
configOp.add(new AlterConfigOp(compaction, AlterConfigOp.OpType.SET));
Map<ConfigResource, Collection<AlterConfigOp>> alterConfigs = new HashMap<>();
alterConfigs.put(configResource, configOp);
adminClient.incrementalAlterConfigs(alterConfigs).all().get();
}
}
- 토픽 설정을 변경하는 부분은 mock-up 클라이언트에 구현되어 있지 않기 때문에 이 부분은 Mockito 테스트 프레임워크를 사용할 것이다.
mock-up 클라이언트를 생성함으로써 테스트를 시작
private AdminClient adminClient;
@BeforeEach
void setUp() {
// id 가 0 인 가짜 브로커를 생성한다. 테스트를 실행하는 도중에 실제로 실행되는 브로커는 없다.
Node broker = new Node(0, "localhost", 9092);
/**
* 브로커 목록과 컨트롤러를 지정하여 MockAdminClient 객체 생성한다. (여기서는 하나만 지정함)
* 나중에 TopicCreator 가 제대로 실행되었는지 확인하기 위해 spy() 메서드의 주입 기능을 사용한다.
*/
this.adminClient = spy(new MockAdminClient(List.of(broker), broker));
/**
* 아래 내용이 없으면 테스트 시
* java.lang.UnsupportedOperationException: Not implemented yet 예외가 발생되는데 버전에 따라서 안날 수 있다.
*
* Mockito 의 doReturn() 메서드를 사용하여 mock-up 클라이언트가 예외를 발생시키지 않도록 함
* 테스트하고자 하는 메서드는 AlterConfigResult 객체가 필요하고, 이 객체는 KafkaFuture 객체를 리턴하는 all() 메서드가 있어야 한다.
* 가짜 incrementalAlterConfigs() 메서드는 정확히 이것을 리턴해야 한다.
*/
AlterConfigsResult emptyResult = mock(AlterConfigsResult.class);
doReturn(KafkaFuture.completedFuture(null)).when(emptyResult).all();
doReturn(emptyResult).when(adminClient).incrementalAlterConfigs(any());
}
가짜 AdminClient 를 만들었으니 maybeCreateTopic() 메서드가 정상적으로 작동하는지 확인해본다.
private AdminClient adminClient;
@Test
public void testCreateTopic() throws ExecutionException, InterruptedException {
TopicCreator tc = new TopicCreator(adminClient);
tc.maybeCreateTopic("test.is.a.test.topic");
// createTopics() 가 1번 호출되었는지 확인한다.
verify(adminClient, times(1)).createTopics(any());
}
@Test
public void tetNotTopic() throws ExecutionException, InterruptedException {
TopicCreator tc = new TopicCreator(adminClient);
tc.maybeCreateTopic("not.a.test");
// 토픽 이름이 test로 시작하지 않을 경우 createTopics()가 호출이 안되었는지 확인한다.
verify(adminClient, never()).createTopics(any());
}
아파치 카프카는 MockAdminClient를 test jar 에 담아서 공개하므로 아래와 같은 dependency를 추가해야 한다.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
📚 reference
https://dongjinleekr.github.io/kafka-the-definitive-guide-v2/example/
'책 > 카프카 핵심 가이드' 카테고리의 다른 글
신뢰성 있는 데이터 전달(Kafka) (0) | 2025.01.14 |
---|---|
카프카 내부 메커니즘 (1) | 2024.12.22 |
카프카 컨슈머 : 카프카에서 데이터 읽기 (1) | 2024.11.24 |
카프카 프로듀서: 카프카에 메시지 쓰기 (1) | 2024.11.09 |