프로서 개요
애플리케이션이 카프카에 메시지를 써야하는 상황들
- 감사 혹은 분석을 목적으로 하는 사용자 행동 기록
- 성능 매트릭 기록
- 로그 메시지 저장 (ELK 스택에서 Filebeat Kafka output 설정등)
- 스마트 가전 정보 수집
- 다른 애플리케이션과의 비동기적 통신 수행
- 임의의 정보 데이터베이스 저장 전 버퍼링
아래는 카프카에 데이터를 전송할 때 수행되는 주요 단계들이다.
카프카에 메시지를 쓰는 작업은 ProducerRecord객체를 생성함으로써 시작되며 토픽과 밸류 지정은 필수 사항이지만, 키와 파티션 지정은 선택사항입니다.
ProducerRecord를 전송하는 API를 호출했을 때(send 메서드)
프로듀서는 가장 먼저 키와 값 객체가 네트워크 상에서 전송될 수 있도록 직렬화해서 바이트 배열로 변환합니다.
@Async("threadPoolTaskExecutor")
@EventListener
public void subscribedEvent(UserSubscribedEvent subscribedEvent) {
ProducerRecord<String, SubscribeVo> record = new ProducerRecord<>(
subscribeTopicName,
subscribedEvent.userEmail(),
new SubscribeVo(subscribedEvent.newsletterEmail(), subscribedEvent.token())
);
try {
subscribeKafkaTemplate.send(record);
} catch (Exception e) {
logger.error(e.getMessage(),e);
} finally {
subscribeKafkaTemplate.flush();
subscribeKafkaTemplate.destroy();
}
}
Tip - 프로듀서의 안전한 종료
프로듀서를 안전하게 종료하기 위해서는 강제 종료하는 destroy() 메서드를 호출 하기 보다 close() 메서드를 사용하여 Accumulator에 저장되어 있는 모든 데이터를 카프카 클러스터로 전송해야 한다.
flush 메서드도 함께 사용하면 좋다.
ProducerRecord 객체의 키를 통해서 파티션이 결정되어 메시지가 전송될 토픽과 파티션이 확정되면 프로듀서는 이 레코드를 같은 토픽 파티션으로 전송될 레코드들을 모은 레코드 배치(record batch)에 추가하고 별도의 스레드가 이 레코드 배치를 적절한 카프카 브로커에게 전송합니다.
이후 브로커가 메시지를 성공적으로 저장되면 브로커는 토픽, 파티션 그리고 해당 파티션 안에서의 레코드의 오프셋을 담은 RecordMetadata 객체를 반환합니다. (반대로 실패하게 되면 에러를 반환)
- 프로듀서가 에러를 수신했을 경우 메시지 쓰기를 포기하고 사용자에게 에러를 리턴하기 전까지 몇 번 더 재전송을 시도할 수 있다.
프로듀서 필수 값 3가지
bootstrap.servers, key.serializer, value.serializer
@Bean
public ProducerFactory<String, SubscribeVo> factory() {
Map<String, Object> props = new HashMap<>();
// Properties props = new Properties();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
1️⃣ bootstrap.servers
카프카 클러스터와 첫 연결을 하기 위해 프로듀서가 사용할 브로커의 host:port 목록을 의미합니다.
프로듀서가 첫 연결을 생성한 뒤 추가 정보를 받아오게 되어 있기 떄문에 이 주소에 모든 브로커가 포함될 필요는 없다.
Properties kafkaProps = new Properties();
kafkaPrps.put("**bootstrap.servers", "broker1:9092", "broker2:9092")**
- 브로커 중 하나가 작동을 정지하는 경우에도 프로듀서가 클러스터에 연결할 수 있도록 최소 2개 이상을 지정할 것을 권장합니다.
2️⃣ key.serializer
카프카에 쓸 레코드의 키 값을 직렬화 하기 위해 사용하는 시리얼라이저 클래스의 이름이다.
카프카 브로커는 메시지의 키값, 밸류값으로 바이트 배열을 받는데 임의의 객체를 키 혹은 밸류로 전송할 수 있도록 매개변수화된 타입을 사용할 수 있다.
- 카프카의 Client 패키지에 ByteArraySerializer, StringSerializer 등 자주 사용되는 타입이 정의되어 있으며 키값 없이 밸류값만 보내도 key.serializer가 필요한데 VoidSerializer를 사용할 수 있다.
3️⃣ value.serializer
키값으로 쓰일 객체를 직렬화 하는것처럼 밸류값으로 쓰일 객체를 직렬화하는 클래스 이름을 설정해주면 된다.
메시지 전송 방법 💬
파이어 앤 포켓(Fire and forget)
메시지를 서버에 전송만 하고 성공 혹은 실패 여부에는 신경 쓰지 않기 때문에 카프카에서 재시도를 할 수 없는 에러가 발생하거나 타임아웃이 발생하는 경우 데이터가 유실되며 Application은 여기에 대해 아무런 정보나 예외를 전달받지 않는다.
동기적 전송(Synchronous send)
- 실제 애플리케이션에서 잘 사용되지 않는다.
카프카 프로듀서는 언제나 비동기적으로 작동하는데 메시지를 보내면 send() 메서드는 Future 객체를 리턴한다.
₩var record = new ProducerRecord<String, String>("topic", "key", "value")
try {
producer.send(record).get()
} catch (e: Exception) {
e.printStacktrace()
}
- 다음 메시지를 전송하기 전에 get() 메서드를 통해서 동기적으로 전송하며 성공적으로 전송되지 않았을 경우 예외가 발생한다. (카프카에 메시지를 전송하기 전이나 전송하는 도중에 에러가 발생해도 예외 발생)
- 예외가 발생하지 발생하지 않으면 RecordMetadata 객체를 반환하는데 메시지가 쓰여진 오프셋과 다른 메타데이터를 가져올 수 있다.
Future의 주요 메소드
Future 인터페이스에는 다음과 같은 주요 메소드들이 있습니다.
get(): 연산의 결과를 반환합니다.
만약 연산이 아직 완료되지 않았다면, 완료될 때까지 기다립니다.
이 메소드는 Future가 가지는 제네릭 타입의 객체를 반환합니다.
get(long timeout, TimeUnit unit): 지정한 시간 동안만 결과를 기다리고, 그 시간이 지나면 TimeoutException을 던집니다. 이 메소드는 시간이 지나면 TimeoutException을 던지며, 그 전에 작업이 완료되면 그 결과를 반환합니다.
isDone(): 연산이 완료되었는지의 여부를 반환합니다.
작업이 완료되었다면 true, 그렇지 않다면 false를 반환합니다.
cancel(boolean mayInterruptIfRunning): 연산을 취소합니다.
매개 변수는 작업이 진행 중일 때 중단해도 되는지를 결정합니다.
isCancelled(): 작업이 취소되었는지의 여부를 반환합니다.
작업이 취소되었다면 true, 그렇지 않다면 false를 반환합니다.
동기적 메시지 전송은 단순하지만 카프카 브로커가 쓰기 요청(produce request)에 에러를 내놓거나 재전송 횟수가 소진되었을 때 발생되는 예외를 받아서 처리해야 한다.
- 카프카 클러스터에 얼마나 작업이 몰리느냐에 따라서 브로커는 쓰기 요청에 응답하기까지 2ms~ 최대 몇 초까지 지연될 수 있어 성능이 크게 낮아지게 된다.
비동기적 전송(Asynchronous send) - 가장 일반적이고 권장되는 방식
- 대부분 Application은 메시지를 전송하고 레코드의 토픽, 파티션, 오프셋 같은 메타데이터가 필요없어 비동기적으로 사용된다.
- 실패 했을 경우에만 처리하면 되는데 프로듀서는 레코드를 전송할 때 콜백을 지정해 예외를 처리할 수 있다.
// org.apache.kafka.client.producer.Callback
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e){
if(e != null) {
// 여기서 로그를 남기던 사후 분석을 위해 에러 파일에 메시지를 쓰게 할 수 있다.
e.printStackTrace();
}
}
}
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountury", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());
- 콜백 함수와 함께 send() 메서드를 호출하면 카프카 브로커로부터 응답을 받는 시점에서 자동으로 콜백 함수가 호출된다.
주의점
콜백은 프로듀서의 메인 스레드에서 실행되는데 두 개의 메시지를 동일한 파티션에 전송한다면 콜백 역시 우리가 보낸 순서대로 실행된다. (메시지 키값이 유니크해야 하는 이유)
- 콜백 안에서 블로킹 작업을 수행하는 것 역시 권장되지 않으며 이 경우 블로킹 작업을 동시에 수행하는 다른 스레드를 사용해야 한다.
메시지 전송시 발생할 수 있는 에러
- SerializationException: 메시지를 직렬화 하는데 실패하는 경우
- TimeoutException: 버퍼가 가득찰 경우
- InterruptException: 작업을 수행하는 스레드에 인터럽트가 걸리는 경우
추가적으로 KafkaProducer는 연결 에러(연결이 회복되면 연결), 메시지를 전송받은 브로커가 해당 파티션리더가 아닐 경우(해당 파티션에 새 리더가 선출되고 클라이언트 메타데이터가 업데이트되면 해결) 재시도하도록 KafkaProducer를 설정할 수 있다.
- 발생하는 오류 등은 재전송 횟수가 소진되고서도 에러가 해결되지 않을 경우 예외가 발생하며 메시지가 너무 클 경우도 KafkaProducer는 재시도 없이 바로 예외를 발생시킵니다.
프로듀서 설정하기
프로듀서는 굉장히 많은 설정값을 가지고 있는데 대부분의 경우 합리적인 기본값을 가지고 있어서 일일이 잡아줄 필요는 없다.
- https://kafka.apache.org/documentation/#producerconfigs
- https://godekdls.github.io/Apache%20Kafka/producer-configuration/#google_vignette
몇몇 설정값의 경우 메모리 사용량이나 성능, 신뢰성 등에 상당한 영향을 미치기 때문에 살펴보자
client.id
프로듀서와 그것을 사용하는 어플리케이션을 구분하기 위한 논리적 식별자를 의미합니다.
- 브로커는 프로듀서가 보내온 메시지를 서로 구분하기 위해서 이 값을 사용한다.
브로커가 로그 메시지를 출력하거나 성능 메트릭 값을 집계할 때 그리고 클라이언트별로 사용량을 할당할 때 사용된다.
"IP 104.23.23.123에서 인증 실패가 발생하고 있네? // client.id 지정했을 때
"주문 확인 인증 서비스가 실패하네?" 로라에게 한 번 봐달라고 해야되나? // client.id를 지정하지 않았을 때
- 이 값을 잘 사용하면 트러블슈팅을 쉽게 할 수 있다.
acks
임의의 쓰기 작업이 성공했다고 판별하기 위해 얼마나 많은 파티션 레플리카가 해당 레코드를 받아야 하는지를 결정합니다.
- 이 매개변수는 메시지가 유실될 가능성에 큰 영향을 미치는데 구체적인 상황에 따라서 기본값이 최적의 선택이 아닐 수 있다.
acks=0
메시지 전달을 보장하지 않고 한번만 전달하기 때문에 브로커가 메시지를 받지 못했을 경우 그대로 유실되지만 프로듀서가 서버로부터 응답을 기다리지 않기 때문에 높은 처리량이 필요할 때 사용될 수 있다.
- 속도는 가장 빠르지만 신뢰도는 가장 낮지만 GPS 같은 기능에서 사용될 수 있다.
acks=1
프로듀서는 리더 레플리카가 메시지를 받는 순간 브로커로부터 성공했다는 응답을 받는다.
- 대부분 모든 데이터를 처리할 수있고 큰 장애가 아닌 이상은 지연이 많이 발생하는 일이 없어 유실이 거의 없고 일반적으로 acks=1 옵션을 많이 사용한다.
주의점
복제 개수를 2 이상으로 운영할 경우 리더 파티션에 적재가 완료되어도 팔로워 파티션에는 아직 데이터가 동기화되지 않을 수 있는데 팔로워 파티션이 데이터를 복제하기 직전에 리더 파티션이 있는 브로커에 장애가 발생하면 동기화되지 못한 데이터가 유실될 수 있다.
acks=all(-1)
브로커는 모든 In-Sync-Replica에 메시지가 전달되어야 프로듀서에게 성공했다는 응답을 보낸다.
- 데이터 처리량이 너무 낮아서 쓰기 힘들다고 하는데 kafka 3.0 이상부터는 기본값이다.
주의점
replication factor에 따라서 모든 팔로워를 체크하는 것이 아니라 min.insync.replicas 옵션에 맞춰진다.
- ReplicationFactor=3, min.insync.replicas=2 인 경우 리더와 팔로워 파티션 2개가 아니라 1개만 확인한다.
- 아무리 극한의 상황이라도 리더 파티션과 확인하는 팔로워 파티션이 동시에 장애가 나는 경우는 거의 일어나지 않아 min.insync.replicas=2로 하더라도 안전하게 데이터를 처리할 수 있다고 한다.
즉 min.insync.replicas=2 부터 acks=all(-1)을 사용하는 의미가 있다.
- min.insync.replicas 옵션은 프로듀서가 리더 파티션과 팔로워 파티션에 데이터가 적재되었는지 하기위한 최소 ISR그룹의 파티션 개수인데 min.insync.replicas=1 이라면 ISR 중 최소 1개 이상의 파티션에 데이터가 적재되었음을 확인하는 것이고 이 경우 acks=1과 동일하게 동작한다.
- ISR 중 가장 처음 적재가 완료되는 파티션은 리더 파티션이기 때문이다.
따라서 min.insync.replicas=2로 설정했을 경우 ISR의 2개 이상의 파티션에 적재되었음을 확인한다는 뜻이고 적어도 리더 파티션과 1개의 팔로워 파티션에 데이터가 정상적으로 적재되었음을 보장한다.
정리
- 처리속도: acks=0 > acks=1 > acks=all(-1)
- 안정성: acks=all(-1) > acks=1 > acks=0
- 단순한 프로듀서 지연이 아니라 레코드가 생성되어 컨슈머가 읽을 수 있을 때까지의 시간을 의미하는 종단 지연이 주로 고려되어야 한다면 가장 신뢰성 있는 설정을 택해도 똑같다. (모든 In-Sync-Replica에 복제 시간은 다른이야기)
메시지 전달 시간
- 사용자가 기다릴 수 있는 시간이며 요청 실패를 인정하고 포기할 때까지 기다릴 수 있는 시간(timeout)
아파치 카프카 2.1부터 개발진은 ProducerRecord를 보낼 때 걸리는 시간을 두 구간으로 나누어 따로 처리할 수 있도록 합니다.
- send()에 대한 비동기 호출이 이뤄진 시각부터 결과를 리턴할 때까지 걸리는 시간 (이 시간동안 send()를 호출한 스레드는 블록)
- send()에 대한 비동기 호출이 성공적으로 리턴한 시각부터(성공했든 실패했든) 콜백이 호출될 때 까지 걸리는 시간
send()를 동기적으로 호출하는 경우 메시지를 보내는 스레드는 두 구간에 대해 연속적으로 블록되게 된다.
- 각각의 구간이 어느 정도 걸렸는지 알 수 없기 때문에 send()를 콜백과 함께 비동기적으로 호출한다.
필수값이 아닌 설정들
max.block.ms (기본값 60,000ms)
- 프로듀서가 얼마나 오랫동안 블록되는지를 결정합니다.
프로듀서의 전송 버퍼가 가득 차거나, 메타데이터가 아직 사용 가능하지 않을 때 max.block.ms만큼 시간이 지나면 예외가 발생합니다.
delivery.timeout.ms
- 레코드 전송 준비가 완료된 시점(send()가 무사히 리턴되고 레코드가 배치에 저장된 시점)부터 브로커의 응답을 받거나 전송을 포기하게 되는 시점까지의 제한시간을 결정합니다.
delivery.timeout.ms ≥ linger.ms + request.timeout.ms 제약 조건을 벗어난 설정으로 카프카 프로듀서를 생성하면 예외가 발생한다.
- 카프카가 재시도를 하는 도중 delivery.timeout.ms가 넘어가버린다면, 마지막으로 재시도 하기 전에 브로커가 리턴한 에러에 해당하는 예외와 함께 콜백이 호출됩니다.
- 레코드 배치가 전송을 기다리는 와중에 delivery.timeout.ms가 넘어가버리면 타임아웃 예외와 함께 콜백이 호출된다.
request.timeout.ms
- 서버로부터 응답을 받기 위해 얼마나 기다릴지를 정의합니다.
각각의 쓰기 요청 후 전송을 포기하기까지 대기하는 시간으로 재시도 시간이나, 실제 전송 이전에 소요되는 시간 등을 포함하지 않는다.
응답 없이 타임아웃이 발생할 경우, 프로듀서는 재전송을 시도하거나 TimeoutException과 함께 콜백을 호출한다.
retries, retry.backoff.ms
retries는 일시적인 에러인 경우 프로듀서가 메시지 전송을 포기하고 에러를 발생시킬 때까지 메시지를 재전송하는 횟수를 결정한다.
- 기본적으로 프로듀서는 각각의 재시도 사이에 100ms 동안 대기하는데 retry.backoff.ms 매개변수를 사용해서 이 간격을 조정할 수 있다.
이 값들을 조정하려면 재전송을 시도하는 전체 시간이 카프카 클러스터가 크래시로부터 복구되기까지의 시간(모든 파티션에 대해 새 리더가 선출되는 데 걸리는 시간)보다 더 길게 잡아줘야 한다.
- 프로듀서는 너무 일찍 메시지 전송을 포기하게 되기 때문이다.
- 이 값들을 수정하기 보다 delivery.timeout.ms를 조정하자.
에러에 대해서 재시도 가능한지 아닌지는 프로듀서가 알아서 재전송 할지 말지를 처리해주기 때문에 Application 코드에는 관련 처리를 수행하는 코드가 필요없다.
- 개발자는 재시도 불가능한 에러를 처리하거나 재시도 횟수가 고갈되었을 경우에 대한 처리에만 집중하면 된다.
- 재전송 기능을 끄는 방법은 retries=0 으로 설정하는 것뿐이다.
이전 재시도 옵션에 대한 이슈 🥹
retries 옵션은 기본값으로 2147483647인데 재시도 할 필요가 없거나 조금만 재시도 하고 싶을 때 설정해줄 수 있어 간단한 줄 알았다.
- 재시도를 5회로 수정한다는등...
delivery.timeout.ms는 레코드를 send()하고 시간에 따라서 성공/실패를 결정하는 옵션이다.
- 브로커로부터 ack를 받기 위해 대기하는 시간이며 시간 범위 안에서는 실패 시 재전송을 허용된다.
- 복구할수 없는 에러나 재시도 횟수를 다 소모하게되면 delivery.timeout.ms 설정시간보다 적어도 에러가 발생할 수 있다.
필수값 옵션 이미지 공식처럼 delivery.timeout.ms는 request.timeout.ms, retry.backoff.ms, linger.ms 옵션들의 시간보다 같거나 커야한다.
- 해당하는 3가지 옵션과 retries 옵션을 수정해서 적절하게 처리해야 한다.
lingers.ms
- 현재 배치를 전송하기 전까지 대기하는 시간을 정의합니다.
KafkaProducer는 현재 배치가 가득 차거나 linger.ms에 설정된 제한 시간이 되었을 때 메시지 배치를 전송한다.
- 프로듀서는 메시지 전송에 사용할 수 있는 스레드가 있을 때 곧바로 전송하도록 기본값으로 되어있다.
linger.ms를 0보다 큰 값으로 설정하면 프로듀서가 브로커에 메시지 배치를 전송하기 전에 메시지를 추가할 수 있도록 설정한 ms가량 더 기다리도록 할 수 있다.
- 지연을 조금 증가시키는 대신 처리율을 크게 증대시킨다.
- 단위 메시지당 추가적으로 드는 시간은 매우 작지만 압축이 설정되어 있거나 할 경우 훨씬 더 효율적이다.
buffer.memory (기본값 32MB)
- 프로듀서가 메시지를 전송하기 전 메시지를 대기시키는 버퍼의 크기(메모리의 양)를 결정한다.
send()는 max.block.ms 동안 블록되어 버퍼 메모리에 공간이 생기기를 기다리는데 해당 시간 동안 대기하고서도 공간이 확보되지 않으면 예외를 발생시킨다.
- 대부분 프로듀서 예외와 다르게 이 타임아웃은 send() 메서드에서 발생하며 send() 메서드가 리턴하는 Future 객체에서 발생하지 않는다.
compression.type
기본적으로 메시지는 압축되지 않은 상태로 전송되는데 이 매개변수를 설정하면 해당 압축 알고리즘을 사용해서 메시지를 압축한 뒤 브로커로 전송된다. (lingers.ms 로 처리율 증가시킬 때 좋을 듯 테스트 필수)
- snappy: CPU의 부하가 적으면서 좋은 압축률을 보여주기 때문에 압축 성능과 네트워크 대역폭 모두가 중요할 때 권장된다.
- gzip: CPU의 부하가 높지만, 더 좋은 압축률을 가지며 네트워크 대역폭이 제한적일 때 사용하면 좋다.
- lz4 , zstd
batch.size (기본값 16KB)
같은 파티션에 다수의 레코드가 전송될 경우 프로듀서는 이것들을 배치 단위로 모아서 한꺼번에 전송한다.
- 각각의 배치에 사용될 메모리 양을 결정하는데 개수가 아니라 바이트 단위다.
배치가 가득 차면 배치에 들어 있는 모든 메시지가 한꺼번에 전송되지만, 각각의(파티션으로 분류) 배치가 가득 차야지 전송되는 것이 아니다. (linger.ms만큼 기다리고 전송함)
- batch.size를 큰 값으로 유지한다고 해서 메시지 전송 지연이 발생하지 않으며 오히려 작은 경우 배치가 가득차서 자주 메시지를 전송해야 하기 때문에 오버헤드가 발생할 수 있다.
batch.size, buffer.memory 정리
batch.size: 단일 요청에서 보낼 수 있는 최대 데이터 양. batch.size가 (32*1024)이면 단일 요청에서 32KB를 보낼 수 있다는 의미입니다.
buffer.memory: Kafka Producer가 Kafka 브로커에 메시지(배치)를 보낼 수 없는 경우(브로커가 다운된 경우) 버퍼 메모리(기본값 32MB)에 메시지 배치를 축적하기 시작합니다.
- 버퍼가 가득 차면 버퍼를 비울 수 있도록 max.block.ms(기본값 60,000ms)를 기다립니다. 버퍼 공간이 없는 경우 예외를 throw합니다.
max.in.flight.requests.per.connection (기본값 5)
프로듀서가 서버로부터 응답을 받지 못한 상태에서 전송할 수 있는 최대 메시지 수를 결정한다.
- 이 값을 올려주면 메모리 사용량은 증가되지만 처리량 역시 증가한다.
순서 보장
retreies 값이 0보다 크고 max.in.flight.requests.per.connection을 1 이상으로 잡아줄 경우 메시지의 순서가 뒤집어질 수 있다.
- 브로커가 첫 번째 배치를 받아서 쓰려다 실패했는데, 두 번째 배치(in-flight 상태)를 쓸 때는 성공한 경우 다시 첫 번째 배치가 재전송 시도되어 성공하면 메시지의 순서가 뒤집어진다.
성능상의 고려 때문에 in.flight 요청이 최소 2이상은 되어야 하고 신뢰성을 보장하기 위해 재시도 횟수 또한 높아야 한다면?
enable.idepotence=ture 설정을 통해서 최대 5개의 in-flight 요청을 허용하면서도 순서를 보장하고, 재전송이 발생하더라도 중복이 발생하는 것 또한 방지해준다. (멱등적 프로듀서)
- max.in.flight.requests.per.connection 매개변수는 5이하, retries는 1이상 acks=all로 잡아주어야 설정되며 만족하지 않으면 ConfigException이 발생한다.
max.request.size (기본값 1MB)
프로듀서가 한번에 전송하는 쓰기 요청의 크기를 결정한다.
- 메시지의 최대 크기를 제한하기도 하지만 한 번의 요청에 보낼 수 있는 최대 메시지의 개수 역시 제한한다.
- 기본값 기준 한번에 보낼 수 있는 1KB 크기의 메시지 개수는 1024개
브로커에는 브로커가 받아들일 수 있는 최대 메시지 크기를 결정하는 message.max.bytes가 있으므로 max.request.size와 동일하게 맞춰 프로듀서에서 브로커가 받아들이지 못하는 크기의 메시지를 전송하려 하지 않게 하는게 좋다.
receive.buffer.bytes, send.buffer.bytes
데이터를 읽거나 쓸 때 소켓이 사용하는 TCP 송수신 버퍼의 크기를 결정한다.
- 각각의 값이 -1일 경우 운영체제의 기본값이 사용된다.
- 프로듀서나 컨슈머가 다른 데이터센터에 위치한 브로커와 통신할 경우 네트워크 대역폭은 낮고 지연은 길어지는 것이 보통이기 때문에 이 값들을 올려잡아 주는 것이 좋다.
enable.idepotence
- 정확히 한 번 전송할 수 있는 기능을 제공합니다.
acks=all이기 때문에 모든 브로커가 성공적으로 프로듀서로부터 레코드를 받아 로컬 디스크에 썻다고 할 때 첫 번째 브로커가 프로듀서로 응답을 보내기 전에 크래시가 났다고 가정해보자
- request.timeout.ms 만큼 대기한 뒤 재전송을 시도하게 되는데 이때 메시지는 이미 메시지를 받은 새 리더 브로커에게 전달되어 메시지가 중복되어 저장된다.
- enable.idepotence=true로 설정을 잡아주면 위에 상황을 방지할 수 있다.
멱등적 프로듀서 기능이 활성화되면 프로듀서는 레코드를 보낼 때마다 순차적인 번호를 붙여서 보내게 되는데 브로커가 동일한 번호를 가진 레코드를 2개 이상 받을 경우 하나만 저장하고 프로듀서는 별다른 문제를 발생시키지 않는 DuplicateSequenceException을 받게된다.
시리얼라이저
기본으로 제공하는 시리얼라이저로는 모든 데이터를 직렬화할 수 없다.
커스텀 시리얼라이저
전송하는 객체가 문자열이나 정수값이 아닐 경우 더 일반적인 레코드를 직렬화하는 두 가지 선택지
1️⃣ 레코드를 생성하기 위해 에이브로(Avro), 스리프트(Thrift), 프로토버프(Protofuf)와 같은 범용 직렬화 라이브러리를 사용한다.
(강력하게 권장한다.)
2️⃣ 사용하고 있는 객체를 직렬화하기 위한 커스텀 직렬화(Serializer<T> 구현) 로직을 작성한다.
- 기존 형식과 새 형식 사이의 호환성을 유지해야 하는 심각한 문제가 있음
- 디버깅하기 위해서는 서로 다른 바이트 뭉치를 일일이 비교해야 한다.
- 같은 회사의 여러 팀에서 같은 타입 데이터를 카프카로 쓰고 있다면 모두가 같은 로직을 사용하고 있어야 하기 때문에 코드를 동시에 변경해야 한다.
아파치 에이브로
주의점
- 호환성 규칙 참고하기
- 역직렬화를 할 때는 데이터를 쓸 때 사용했던 스키마에 접근이 가능해야 한다.
파일 안에 전체 스키마를 저장함으로써 오버헤드를 감수하는 에이브로 파일도 있지만 오픈소스 구현체 중 하나로 골라서 스키마 레지스트리를 사용할 수 있다.
- 카프카에 데이터를 쓰기 위해 사용되는 모든 스키마를 레지스트리에 저장한다.
- 카프카에 쓰는 레코드에는 사용된 스키마의 고유 식별자만 심어주면 되는데 이 식별자를 통해서 컨슈머는 스키마 레지스트리에서 스키마를 가져와서 데이터를 역직렬화 할 수 있다.
파티션
ProduceRecord객체는 토픽, 키, 밸류의 값을 포함합니다. 카프카 메시지는 키-밸류 순서쌍이라고 할 수 있는데, 키의 기본값이 null인만큼 토픽과 밸류의 값만 있어도 객체를 생성하고 파티션에 할당할 수 있습니다.
키 값이 null인 경우
레코드는 현재 사용 가능한 토픽의 파티션 중 랜덤하게 저장된다.
- 균형을 맞추기 위해서 RB알고리즘이 사용된다.
아파치 카프카 2.4 프로듀서부터 기본 파티셔너는 키 값이 null인 경우 sticky 처리를 하기 위해 RB 알고리즘을 사용한다.
- 프로듀서가 메시지 배치를 채울 때 다음 배치로 넘어가기 전에 이전 배치를 먼저 채워서 요청 수를 줄인다.
- 지연시간을 줄이고 브로커의 CPU 사용량을 줄일 수 있다.
키 값이 존재하는 경우
기본적으로 키값을 해시한 결과를 기준으로 메시지를 저장할 파티션을 특정하기 때문에 도중에 파티션 수가 변하지 않는 한 항상 같은 키값을 가진 레코드는 같은 파티션에 저장된다.
- 한쪽 파티션에 레코드들이 몰리지 않으려면 key 값이 유니크해야 한다.
- 키값은 항상 동일한 파티션에 저장되게 하기 위해서 사용 가능한 파티션만 대상으로 하는것이 아니라 모든 파티션을 대상으로 선택하는데 이 때 데이터를 쓰려고 하는 파티션에 장애가 발생하면 에러가 발생한다. (이러한 경우는 드물다.)
- 파티셔너는 자체적인 해싱 알고리즘을 사용하기 때문에 자바 버전이 업그레이드 되어도 해시값은 변하지 않는다.
기본 파티셔너 외에도 RoundRobinPartitioner, UniformStickyPartitioner를 포함하는데 메시지가 키값이 존재해도 각각의 알고리즘으로 파티션에 할당할 수 있다.
- RoundRobinPartitioner는 순회하는 파티셔너로 레코드가 들어오는 대로 파티션을 순회하면서 전송하면서 Accumulator에서 묶이는 정도가 낮아 처리량이 낮다.
- UniformStickyPartitioner는 Accumulator에 배치로 묶일 때까지 기다렸다가 전송해 처리량을 높일 수 있다.
- 이 Partitioner들은 카프카에 저장된 데이터를 RDB로 보낼 때 카프카 레코드의 키값을 기본 키로 사용하는 ETL Application에서 유용하다.
카프카 클라이언트 라이브러리에서는 사용자 지정 파티셔너를 생성하기 위한 Partitioner 인터페이스를 제공한다.
헤더
레코드에 키값, 밸류 값외에도 헤더를 포함할 수 있습니다. 레코드 헤더는 카프카 레코드의 키/밸류를 건드리지 않고 추가 메타데이터를 심을 때 사용합니다.
- 주된 용도 중 하나는 메시지의 전달 내역을 기록하는 것이다.
- 데이터가 생성된 곳의 정보를 헤더에 저장하면 메시지를 파싱할 필요없이 메시지를 라우팅하거나 출처를 추적할 수 있다. (데이터는 암호화 필요)
- 헤더는 키/밸류 쌍의 집합으로 구현되어 있다.
record.headers().add("키는 문자열만","메시지의 벨류값처럼 직렬화된 객체라도 상관없다.")
인터셉터
카프카 클라이언트의 코드를 고치지 않으면서 작동을 변경해야 하거나, 모든 애플리케이션에 동일한 작동을 집어 넣을 때 혹은 원래 코드를 사용할 수 없는 상황에서 ProducerInterceptor를 사용할 수 있다.
- 인터셉터의 일반적인 사용 사례는 모니터링, 정보 추적, 표준 헤더 삽입등이 있으며 민감한 정보를 삭제 처리하는 등의 용도로 활용할 수 있다.
ProducerRecord<K, V> onSend(ProducerRecord<K, V> record)
- onSend는 프로듀서가 레코드를 브로커로 보내기 전, 직렬화되기 직전에 호출됩니다.
- 보내질 레코드에 담긴 정보를 볼 수 있고 고칠 수 있는데 유효한 ProducerRecord를 꼭 리턴해야 한다.
ProducerRecord<K, V> onAcknowledgemnt(ProducerRecord<K, V> record)
- onAcknowledement는 브로커의 응답을 클라이언트가 받았을 때 호출되는데 변경은 불가능하지만 정보는 읽을 수 있다.
이러한 인터셉터는 클라이언트 코드를 변경하지 않고 아파치 카프카와 함께 배포되는 .sh 툴과 함께 사용될 수 있다.
쿼터(한도), 쓰로틀링(JMX 매트릭를 통해서 작동여부 확인 가능)
카프카 브로커에는 한도(쿼터)를 설정해주면 읽기/쓰기 속도를 제한할 수 있는 기능이 있다.
- 읽기 쿼터/쓰기 쿼터
- 클라이언트가 데이터를 전송하거나 받는 속도를 초당 바이트 수 단위로 제한한다.
- 요청 쿼터
- 브로커가 요청을 처리하는 시간 비율 단위로 제한한다.
쿼터는 기본값으로 설정하거나, 특정한 client.id 값에 대해 설정하거나, 특정한 사용자에 대해 설정할 수 있다.
- 복수 적용도 가능하다.
- 사용자에 대해 설정된 쿼터는 보안 기능과 클라이언트 인증 기능이 활성화되어 있는 클라이언트에서만 작동
모든 클라이언트에 적용되는 읽기/쓰기 쿼터의 기본값은 브로커를 설정할 때 quoto.producer.default=nM 라고 추가하거나 특정 클라이언트에 대한 쿼터값을 정의해서(권장하지 않는 방식이다.) quota.producer.override="clientA:nM,clientB:nM 설정할 수 있다.
- 카프카의 설정 파일에 정의된 쿼터값은 고정되어 있기 때문에 설정 파일을 변경한 뒤 모든 브로커를 재시작 해야하는 문제가 있다.
- 특정한 클라이언트에 쿼터를 적용할 때는 Kafka-config.sh 또는 AdminClient API에서 제공하는 동적 설정 기능을 사용하는 것이 보통이다.
클라이언트가 할당량을 다 채웠을 경우, 브로커는 클라이언트의 요청에 대한 스로틀링을 시작하여 할당량을 초과하지 않도록 한다.
- 대부분의 경우 클라이언트는 응답 대기가 가능한 요청수(max.in.flight.request.per.connection)가 한정되어 있기 때문에 자동적으로 요청 속도를 줄이는 것이 보통이다.
스로틀링되는 와중에도 오작동하는 클라이언트가 추가 요청을 쏟아낼 경우 브로커는 해당 클라이언트와의 커뮤니케이션 채널을 일시적으로 무시함으로써 정해진 할당량을 맞추고 브로커를 보호한다.
브로커가 처리할 수 있는 용량과 프로듀서가 데이터를 전송하는 속도를 모니터링 하는 이유
비동기적으로 Producer.send()를 호출하는데 브로커가 받아들일 수 있는 양 이상의 메시지를 전송한다면?
우선 클라이언트가 사용하는 메모리 상의 큐에 적재된다.
- 이 상태에서 계속해서 브로커가 받아들이는 양 이상으로 전송을 시도할 경우 클라이언트의 버퍼 메모리가 고갈되면서 그 다음 비동기로 호출한 Producer.send()을 블록하게 된다.
브로커가 밀린 메시지를 처리해서 프로듀서 버퍼에 공간이 확보될 때까지 걸리는 시간이 타임아웃 딜레이를 넘어가면 TimeoutException을 발생시킨다.
- TimeoutException이 발생하면서 send() 메서드 호출 때 지정했던 콜백이 호출된다.
- 콜백은 메인 스레드에서 실행되므로 이전에 말했지만 콜백 안에서 블로킹 작업을 수행하는 것 역시 권장되지 않으며 이 경우 블로킹 작업을 동시에 수행하는 다른 스레드를 사용해야 한다.
'책 > 카프카 핵심 가이드' 카테고리의 다른 글
신뢰성 있는 데이터 전달(Kafka) (0) | 2025.01.14 |
---|---|
카프카 내부 메커니즘 (1) | 2024.12.22 |
프로그램 내에서 코드로 카프카 관리하기 (2) | 2024.12.08 |
카프카 컨슈머 : 카프카에서 데이터 읽기 (1) | 2024.11.24 |