카프카의 ‘정확히 한 번’ 의미 구조는 멱등적 프로듀서(idempotent producer)와 트랜잭션 의미 구조의 조합으로 이루어진다.
- 멱등적 프로듀서: 프로듀서 재시도로 인해 발생하는 중복을 방지
- 트랜잭션 의미 구조: 스트림 처리 애플리케이션에서 ‘정확히 한 번’ 처리를 보장
멱등적 프로듀서
DB에서 아래 구문은 몇 번을 호출하든 간에 x는 18이기 때문에 멱등적이라고 할 수 있다.
UPDATE t SET x=18 where y=5
가장 고전적인 카프카에서 재시도로 인한 메시지 중복 예시
- 파티션 리더가 프로듀서로부터 레코드를 받아서 팔로워들에게 성공적으로 복제했지만 프로듀서에게 응답을 보내기 전, 파티션 리더가 있는 브로커에 크래시가 발생한 경우
- 프로듀서 입장에서는 응답을 받지 못한 채 타임아웃이 발생하고, 메시지를 재전송하는데 이 때 이 메시지가 새 리더에 중복 저장되게 된다.
카프카의 멱등적 프로듀서 기능은 자동으로 이러한 중복을 탐지하고 처리한다.
멱등적 프로듀서의 작동 원리
멱등적 프로듀서 기능을 켜면 모든 메시지는 고유한 프로듀서 ID와 시퀀스 넘버(sequence ID)를 가지게 된다.
- 대상 토픽 및 파티션과 위 두 값을 합치면 각 메시지의 고유한 식별자가 된다.
max.in.flight.requests.per.connection의 기본값은 5이므로 각 브로커는 해당 브로커에 할당된 모든 파티션들에 쓰여진 마지막 5개 메시지들을 추적하기 위해 고유 식별자를 사용한다.
- 파티션별로 추적되어야 하는 시퀀스 넘버의 수를 제한하고 싶다면 max.in.flight.requests.per.connection 설정값이 5이하로 잡혀 있어야 한다.
- max.in.flight.requests.per.connection 매개변수는 5이하, retries는 1이상 acks=all로 잡아주어야 설정되며, 만족하지 않으면 ConfigException이 발생한다.
브로커가 예전에 받은 적이 있는 메시지를 받게 될 경우, 적절한 에러를 발생시켜 중복 메시지를 거부한다.
- 프로듀서 클라이언트에서는 record-error-rate 지표값을 통해서 에러를 확인할 수 있다.
- 브로커의 경우 RequestMetrics(유형별 에러 수가 기록) 유형의 ErrorsPerSec 지표값에 기록된다.
브로커가 예상보다 높은 시퀀스 넘버를 받으면 브로커는 out of order sequence number 에러를 발생시킨다.
- 트랜잭션 기능 없이 멱등적 프로듀서만 사용하고 있다면 이 에러는 무시해도 좋다.
- 브로커가 2번 메시지 뒤에 27번 메시지를 받으면 3번 부터 26번 메시지까지 뭔가가 일어났다.
- 이 경우 프로듀서와 브로커 설정을 재점검하고 프로듀서 설정이 고신뢰성을 위해 권장되는 값으로 잡혀있는지, 아니면 언클린 리더 선출이 발생했는지의 여부를 확인해 볼 필요가 있다.
작동이 실패한 경우 멱등적 프로듀서의 처리
1️⃣ 프로듀서 재시작
프로듀서에 장애가 발생할 경우, 보통 새 프로듀서를 생성해서 장애가 난 프로듀서를 대체한다.
- 프로듀서가 시작될 때 멱등적 프로듀서 기능이 켜져 있을 경우, 프로듀서는 초기화 과정에서 카프카 브로커로부터 프로듀서 ID를 생성받는다.
- 트랜잭션 기능을 켜지 않았을 경우, 프로듀서를 초기화할 때마다 완전히 새로운 ID가 생성된다.
기존과 다른 프로듀서 ID를 생성받아 새 프로듀서가 기존 프로듀서에서 이미 전송한 메시지를 재전송할 경우
- 브로커는 두 메시지가 서로 다른 프로듀서 ID와 시퀀스 넘버를 갖기 때문에 메시지 중복을 알 수 없다.
- 서로 다른 ID를 가진 서로 다른 프로듀서로 간주되는 만큼 기존 프로듀서는 좀비로 취급되지 않는다.
2️⃣ 브로커 장애
브로커 장애가 발생할 경우, 컨트롤러는 장애가 난 브로커가 리더를 맡았던 파티션들에 대해 새 리더를 선출한다.
- 프로듀서는 메타데이터 프로토콜을 통해 특정 팔로워 레플리카가에 브로커가 새 리더임을 알아차리고 거기로 메시지를 쓰기 시작할 것이다.
리더는 새 메시지가 쓰여질 때마다 인-메모리 프로듀서 상태에 저장된 최근 5개의 시퀀스 넘버를 업데이트 한다.
- 팔로워 레플리카는 리더로부터 새로운 메시지를 복제할 때마다 자체적인 인-메모리 버퍼를 업데이트 한다.
- 팔로워가 리더가 된 시점에 이미 메모리 안에 최근 5개의 시퀀스 넘버를 가지고 있어서, 아무 이슈나 지연 없이 새로 쓰여진 메시지의 유효성 검증이 재개될 수 있다.
예전 리더가 다시 돌아온다면 어떻게 될까?
- 재시작 후에는 인-메모리 프로듀서 상태는 더 이상 메모리 안에 저장되어 있지 않기 때문에, 복구 과정에 도움이 될 수 있도록 브로커는 종료되거나 새 세그먼트가 생성될 때마다 프로듀서 상태에 대한 스냅샷을 파일 형태로 저장한다.
- 브로커가 시작되면 파일에서 최신 상태를 읽어와 현재 리더로부터 복제한 레코드를 사용해서 프로듀서 상태를 업데이트함으로써 이 브로커가 다시 리더를 맡을 준비가 될 시점에는 최신 시퀀스 넘버를 가지게 된다.
브로커가 크래시 나서 최신 스냅샷이 업데이트 되지 않는다면?
- 프로듀서 ID와 시퀀스 넘버는 둘 다 카프카 로그에 저장되는 메시지 형식의 일부다.
- 크래시 복구 작업이 진행되는 동안 프로듀서 상태는 더 오래 된 스냅샷뿐만 아니라 각 파티션 최신 세그먼트의 메시지들 역시 (카프카 로그를 통해서?) 복구된다.
- 복구 작업이 완료되는 대로 새로운 스냅샷 파일이 저장된다.
보존 기한 동안 메시지가 하나도 들어오지 않는 토픽에 경우 어떻게 될까?
- 브로커가 크래시 날 경우, 프로듀서 상태를 복구하기 위해 사용할 수 있는 메시지 역시 없다.
- 하지만 메시지가 없다는 얘기는 중복이 없다는 의미라서 즉시 새 메시지를 받기 시작하며, 새로 들어오는 메시지들을 기준으로 프로듀서 상태를 생성할 수 있다. (프로듀서 상태가 없다는 경고가 로그에 찍힌다.)
멱등적 프로듀서의 한계
멱등적 프로듀서는 프로듀서의 내부 로직으로 인한 재시도가 발생할 경우 생기는 중복만을 방지한다.
- 동일한 메시지를 가지고 producer.send()를 두 번 호출하면 개입하지 않고 중복 메시지 발생
- 프로듀서 입장에서는 전송된 두 개가 실제로는 동일한 레코드인지 알 수 없기 때문이다.
- 파일 디렉토리와 같은 원본에서 데이터를 읽어서 카프카로 쓰는 애플리케이션에서 꽤 흔히 발생한다.
멱등적 프로듀서 사용법
프로듀서 설정에 enable.idempotence=true를 추가하면 된다.
- 프로듀서에서 acks=all 설정이 이미 잡혀 있다면 성능에는 차이가 없다.
💻 멱등적 프로듀서 기능 활성화시 바뀌는 것들
- 프로듀서 ID를 받아오기 위해 프로듀서 시동 과정에서 API를 하나 더 호출한다.
- 전송되는 각각의 레코드 배치에는 프로듀서 ID와 배치 내 첫 메시지의 시퀀스 넘버가 포함된다. (각 메시지의 시퀀스 넘버는 첫 메시지의 시퀀스 넘버에 변화량을 더하면 나온다.)
- 새로 포함되는 필드들은 각 메시지 배치에 96비트를 추가하는데 대부분 작업 부하에 오버헤드도 되지 않는다. (프로듀서 ID는 long 타입이고 시퀀스 넘버는 integer타입)
- 브로커들은 모든 프로듀서 인스턴스에서 들어온 레코드 배치의 시퀀스 넘버를 검증해서 메시지 중복을 방지한다.
- max.in.flight.request.per.connection 설정값이 1보다 크게 잡히더라도, 장애가 발생할 때 각 파티션에 쓰여지는 메시지의 순서는 보장된다.
v2.5 이후부터는 레코드 배치에서 치명적인 에러가 발생할 경우, 에러가 발생한 배치를 포함한 모든 전송중인 배치들은 거부될 것이다.
- 애플리케이션 쪽에서 발생한 예외를 잡아서 이 레코들을 건너뛸지, 중복이나 순서 반전의 위험을 감수하고 재시도할지를 결정할 수 있다.
트랜잭션
트랜잭션 기능은 카프카 스트림즈를 사용해서 개발된 애플리케이션에 정확성을 보장하기 위해 도입되었다.
- 스트림 처리 애플리케이션의 기본 패턴인 ‘읽기-처리-쓰기’ 패턴에서 사용하도록 개발되었다.
- 각 입력 레코드의 처리는 애플리케이션의 내부 상태가 업데이트 되고 결과가 출력 토픽에 성공적으로 쓰여졌을 때 완료된것으로 간주된다. (정확히 한번)
트랜잭션이 필요한 이유
스트림 처리 애플리케이션이 개별 레코드 변환과 필터만을 수행한다면 업데이트할 상태 자체가 없는 만큼 처리 과정에서 중복이 발생하더라도 출력 스트림에서 걸러 내는 것은 상당히 단순할 것이다.
- 스트림 처리 애플리케이션이 다수의 레코드를 직접해서 하나로 만들 경우, 결과 레코드가 잘못되었는지의 여부를 판단하는 것은 훨씬 어렵다.
- 몇 개의 입력 레코드가 한 번 이상 처리되었을 수 있으며 주어진 입력을 다시 처리하지 않는 한 결과를 교정하는 것은 불가능하다.
트랜잭션이 해결하는 문제
원본 토픽으로부터 이벤트를 읽어서 처리를 한 다음, 결과를 토픽에 썻을 때 각 메시지에 대해 결과가 정확히 한번만 쓰여지도록 하고 싶다면 무엇이 잘못될 수 있을까?
1️⃣ 애플리케이션 크래시로 인한 재처리
원본 클러스터로부터 메시지를 읽어서 처리한 뒤, 애플리케이션은 두가지를 해야한다.
- 결과를 출력 토픽에 쓰기
- 우리가 읽어 온 메시지의 오프셋을 커밋하기
출력 토픽에는 이미 결과를 썻는데 입력 오프셋이 커밋되기 전에 애플리케이션이 크래시가 난다면?
- 컨슈머가 읽어오고 있던 파티션들은 다른 컨슈머로 재할당 되고, 컨슈머는 새로 할당된 파티션의 마지막으로 커밋된 오프셋으로 부터 레코드를 읽어올 것이다.
- 커밋된 오프셋에서부터 크래시가 난 시점까지 애플리케이션에 의해 처리된 모든 레코드들은 다시 처리되고 결과는 출력 토픽에 다시 쓰여져 중복이 발생하게 된다.
2️⃣ 좀비 애플리케이션에 의해 발생하는 재처리
애플리케이션이 카프카로부터 레코드 배치를 읽어온 직후 멈추거나 카프카로 연결이 끊어졌다고 해보자
- 컨슈머에 할당되어 있던 파티션들은 컨슈머 그룹 내 다른 컨슈머가 재할당되어 레코드 배치를 다시 읽어서 처리하고, 출력 토픽에 결과를 쓰고, 작업을 계속할 것이다.
리벨런싱 되어 처리되는 사이에 멈췄던 애플리케이션이 다시 동작할 수 있다.
- 멈추기 전에 마지막으로 읽어 왔던 레코드 배치를 처리하고 결과를 출력 토픽에 쓰게 된다.
- 레코드를 읽어오기 위해 새로 카프카를 폴링하거나, 하트비트를 보냈다가 자기가 죽은 것으로 판정되어 다른 인스턴스들이 현재 해당 파티션을 할당받은 상태라는 것을 알아차릴 때까지 작업을 계속할 수 있다.
스스로가 죽은 상태인지 모르는 컨슈머를 좀비라고 부른다.
- 이런 상황을 위해서 추가적인 보장이 없으면 좀비는 출력 토픽으로 데이터를 쓸 수 있으며 중복된 결과가 발생할 수 있다.
트랜잭션은 어떻게 ‘정확히 한 번’을 보장할까?
‘정확히 한 번’ 처리는 읽기, 처리, 쓰기 작업이 원자적으로 이루어진다는 의미다.
- 읽어 온 원본 메시지의 오프셋이 커밋되고 결과가 성공적으로 쓰여지거나, 아니면 둘 다 일어나면 안된다.
원자적으로 이루어지기 위해 카프카 트랜잭션은 원자적 다수 파티션 쓰기 기능을 도입했다.
- 오프셋을 커밋하는 것과 결과를 쓰는 것은 둘 다 파티션에 메시지 쓰는 과정을 수반한다는 점에서 착안되었다.
- 결과는 출력 토픽에, 오프셋은 _consumer_offsets 토픽
- 트랜잭션을 시작해서 양쪽 토픽에 메시지를 쓰고, 둘 다 성공해서 커밋할 수 있거나 재시도하기 위해 중단될 수 있다면, 그 다음부터는 정확히 한 번 의미 구조가 알아서 해준다.
아래는 읽어온 이벤트의 오프셋을 커밋함과 동시에 두 개의 파티션에 원자적 다수 파티션 쓰기를 수행하는 간단한 스트림 처리 애플리케이션을 보여준다.
✅ 트랜잭션을 사용해서 원자적 다수 파티션 쓰기를 수행하기 위해서는 트랜잭션 프로듀서를 사용해야 한다.
- 보통 프로듀서와 차이점은 transactional.id 설정이 잡혀 있고 initTransactions()을 호출해서 초기화한 점이다.
- 카프카 브로커에 의해 자동으로 생성되는 producer.id와 다르게 transactional.id는 프로듀서 설정의 일부이며, 재시작을 하더라도 값이 유지된다.
transactional.id의 주 용도는 재시작 후에도 동일한 프로듀서를 식별하는 것이다.
- 카프카 브로커는 transactional.id에서 producer.id로의 대응 관계를 유지하고 있다.
- 이미 존재하는 transactional.id 프로듀서가 initTransactions()를 다시 호출하면 랜덤값이 아닌 이전 producer.id 값을 할당해 준다.
애플리케이션의 좀비 인스턴가 중복 프로듀서를 생성하는 것을 방지하려면?
좀비 펜싱 혹은 애플리케이션의 좀비 인스턴스가 출력 스트림에 결과를 쓰는 것을 방지할 필요가 있다.
😎 일반적인 좀비 펜싱 방법 (epoch 사용하기)
카프카는 트랜잭션 프로듀서가 초기화를 위해 initTransaction()을 호출하면 transactional.id에 해당하는 에포크 값을 증가한다.
- 같은 transactional.id를 가지고 있지만 에포크 값은 낮은 프로듀서가 메시지 전송, 트랜잭션 커밋, 트랜잭션 중단 요청을 보낼 경우 FencedProducer 에러가 발생하면서 거부된다.
- 오래된 프로듀서는 출력 스트림을 쓰는 것이 불가능하기 때문에 close()로 닫아주는 것 말고는 방법이 없다.
v.2.5 이후부터는 트랜잭션 메타데이터에 컨슈머 그룹 메타데이터를 추가할 수 있는 옵션이 생겼다.
- 메타데이터 역시 펜싱에 사용되어 좀비 인스턴스를 펜싱하면서 서로 다른 트랜잭션 ID를 갖는 프로듀서들이 같은 파티션들에 레코드를 쓸 수 있게 되었다.
트랜잭션은 대부분 프로듀서 쪽 기능이다.
트랜잭션적 프로듀서를 생성하고, 트랜잭션을 시작하고, 다수의 파티션에 레코드를 쓰고, 이미 처리된 레코드들을 표시하기 위해 오프셋을 쓰고, 트랜잭션을 커밋하거나 중단하는 모든 작업이 프로듀서로부터 이루어진다.
- 트랜잭션 기능을 사용해서 쓰여진 레코드는 비록 결과적으로 중단된 트랜잭션에 속할지라도 다른 레코드들과 같이 파티션에 쓰여진다.
- 컨슈머에 올바른 격리 수준이 설정되어 있지 않을 경우 ‘정확히 한 번’ 보장은 이루어 지지 않게 된다.
컨슈머에 격리 수준
isolation.level 설정값을 잡아주면 트랜잭션 기능으로 쓰여진 메시지들을 읽어오는 방식을 제어할 수 있다.
1️⃣ read_committed
토픽들을 구독한 뒤 consumer.poll() 호출하면 커밋된 트랜잭션에 속한 메시지나 처음부터 트랜잭션에 속하지 않은 메시지만 리턴된다.
- 중단된 트랜잭션에 속한 메시지나 아직 진행중인 트랜잭션에 속하는 메시지는 리턴되지 않는다.
2️⃣ read_uncommitted (deafult)
진행중이거나 중단된 트랜잭션에 속하는 것들을 포함해서 모든 레코드가 리턴된다.
- read_committed로 설정한다고 해서 특정 트랜잭션에 속한 모든 메시지가 리턴 된다고 보장되지는 않는다.
- 트랜잭션에 속하는 토픽의 일부만 구독했기 때문에 일부 메시지만 리턴받을 수도 있다.
- 트랜잭션이 언제 시작되고 끝날지, 어느 메시지가 어느 트랜잭션에 속하는지에 대해서 애플리케이션은 알 수 없다.
👊🏻 read_committed와 read_uncommitted 모드 비교
메시지의 읽기 순서를 보장하기 위해 read_committed 모드에서는 아직 진행중인 트랜잭션이 처음으로 시작된 시점
(Last Stable Offset, LSO) 이후에 쓰여진 메시지는 리턴되지 않는다.
- 이 메시지들은 트랜잭션이 프로듀서에 의해 커밋되거나 중단될 때까지, 혹은 transaction.timeout.ms 설정값(기본값 15m)만큼 시간이 지나 브로커가 트랜잭션을 중단시킬 때까지 보류된다.
- ⚠️ 트랜잭션이 오랫동안 닫히지 않고 있으면 컨슈머들이 지체되면서 종단 지연이 길어진다.
- read_committed 모드로 작동 중인 컨슈머는 read_uncommitted모드로 작동하는 컨슈머(기본값)보다 약간 더 뒤에 있는 메시지를 읽는다.
스트림 처리 애플리케이션은 입력 토픽이 트랜잭션 없이 쓰여졌을 경우에도 ‘정확히 한 번’ 출력을 보장한다.
- 원자적 다수 파티션 쓰기 기능은 만약 출력 레코드가 출력 토픽에 커밋되었을 경우, 입력 레코드의 오프셋 역시 해당 컨슈머에 대해 커밋되는 것을 보장한다. (입력 레코드는 다시 처리되지 않는다.)
트랜잭션으로 해결할 수 없는 문제들
트랜잭션 기능은 다수의 파티션에 대한 원자적 쓰기 기능(읽기 기능X)을 제공하고, 스트림 처리 애플리케이션에서 좀비 프로듀서를 방지하기 위한 목적으로 추가되었다.
트랜잭션 기능과 관련해서 자주 하는 실수들이 있다.
- ‘정확히 한 번 보장’이 카프카에 대한 쓰기 이외의 작동에서도 보장된다고 착각한다.
- 컨슈머가 항상 전체 트랜잭션을 읽어 온다고(트랜잭션 간의 경계에 대해 알고 있다고) 가정한다.
트랜잭션 기능이 ‘정확히 한 번’ 보장에 도움이 되지 않는 경우
1️⃣ 스트림 처리에 있어서의 부수 효과(side effect)
스트림 처리 애플리케이션의 처리 단계에서 사용자 이메일을 보내는 작업이 있을 때 ‘정확히 한 번’ 의미 구조를 활성화 하더라도 이 기능은 카프카에 쓰여지는 레코드에만 적용되기 때문에 이메일이 한 번만 발송되지 않는다.
- 레코드 중복을 방지하기 위해 시퀀스 넘버를 사용하는 것은 트랜잭션을 중단 혹은 취소하기 위해 카프카 안에서 마커를 사용해서 작동하는 것이다. (이메일 발송을 취소하지 못한다.)
- 스트림 처리 애플리케이션 안에서 외부 효과를 일으키는 REST API 호출, 파일 쓰기 등에도 해당한다.
2️⃣ 카프카 토픽에서 읽어서 데이터베이스(DB)에 쓰는 경우
레코드는 JDBC와 같은 DB 드라이버를 통해 DB에 쓰여지고, 오프셋은 컨슈머에 의해 카프카에 커밋된다.
- 카프카가 아닌 DB에 결과물을 쓸 경우 프로듀서가 사용되지 않는다.
하나의 트랜잭션에서 외부 DB에는 결과를 쓰고 카프카에 오프셋을 커밋하는 메커니즘은 없지만, 오프셋을 DB에서 저장하도록 할 수 있다.
- 하나의 트랜잭션에서 데이터와 오프셋을 동시에 DB에 저장할 수 있지만 카프카가 아닌, DB 트랜잭션 보장에 의해 달렸다.
마이크로서비스는 ‘아웃박스’라고 불리는 카프카 토픽에 메시지를 쓰는 작업까지만 하고, 별도의 메시지 중계 서비스가 카프카로부터 메시지를 읽어와서 DB를 업데이트 한다. (아웃 박스 패턴)
- 카프카가 DB 업데이트에 대해 ‘정확히 한 번’ 보장을 하지 않는 만큼, 업데이트 작업은 반드시 멱등적이어야 한다.
- 반대로 DB 테이블을 아웃박스로 사용하고 릴레이 서비스가 테이블 업데이트 내역을 카프카에 메시지로 쓸 수 있다.
- 이 패턴은 고유 키나 외래 키와 같이 RDB에 제약 조건이 적용되어야 할 때 유용하다.
3️⃣ 데이터베이스에서 읽어서, 카프카에 쓰고, 여기서 다시 다른 데이터베이스에 쓰는 경우
카프카 트랜잭션은 3️⃣의 내용 종류의 종단 보장(end-to-end guarantee)에 필요한 기능을 가지고 있지 않다.
- 하나의 트랜잭션 안에서 레코드와 오프셋을 함께 커밋하는 문제 외에도 또 다른 문제가 있기 때문이다.
카프카 컨슈머의 read_committed 보장은 DB 트랜잭션을 보존하기엔 너무 약하다.
- 컨슈머가 아직 커밋되지 않은 레코드를 볼 수 없을 뿐만아니라, 일부 토픽에서 랙이 발생했을 수도 있는 만큼 이미 커밋된 트랜잭션의 레코드를 모두 봤을 거라는 보장 또한 없다.
- 트랜잭션 경계를 알 수 있는 방법 역시 없기 때문에 언제 트랜잭션이 시작되었는지, 끝났는지, 레코드 중 어느 정도를 읽었는지도 알 수 없다.
4️⃣ 한 클러스터에서 다른 클러스터로 데이터 복제
하나의 카프카 클러스터에서 다른 클러스터로 데이터를 복사할 때 ‘정확히 한 번’을 보장할 수 있다. (미러메이커 2.0)
애플리케이션이 여러 개의 레코드와 오프셋을 트랜잭션으로 쓰고, 미러메이커 2.0이 이 레코드들을 다른 카프카 클러스터에 복사한다면, 복사 과정에서 트랜잭션 속성이나 보장 같은 것은 유실된다.
- 트랜잭션 원자성을 보장하지 않는다.
카프카에서 데이터를 읽어오는 컨슈머 입장에서는 트랜잭션의 모든 데이터를 읽어왔는지 알 수도 없고 보장할 수도 없다.
- 토픽의 일부만 구독했을 경우 전체 트랜잭션의 일부만 복사할 수 있다.
5️⃣ 발행/구독 패턴
read_committed 모드가 설정된 컨슈머들은 중단된 트랜잭션에 속한 레코드들을 보지 못한다.
- 하지만 이러한 보장은 오프셋 커밋 로직에 따라 컨슈머들은 메시지를 한 번 이상 처리할 수 있어서, ‘정확히 한 번’에 미치지 못한다.
위에 같은 경우 카프카가 보장하는 것은 JMS 트랜잭션에서 보장하는 것과 비슷하지만, 커밋되지 않은 트랜잭션들이 보이지 않도록 컨슈머들에 read_committed 설정이 되어 있어야 한다.
- JMS 브로커들은 모든 컨슈머에 커밋되지 않은 트랜잭션의 레코드를 주지 않는다.
메시지를 쓰고 나서 커밋하기 전에 다른 애플리케이션이 응답하기를 기다리는 패턴은 피하자
- 다른 애플리케이션은 트랜잭션이 커밋될 때까지 메시지를 받지 못해 데드락이 발생한다.
트랜잭션 사용법
트랜잭션은 브로커 기능이기도 하며, 카프카 프로토콜의 일부인 만큼 여러 클라이언트들이 트랜잭션을 지원한다.
카프카 스트림즈에서 exactly-once 보장 활성화 (트랜잭션 기능을 사용하는 가장 일반적이며 권장)
- 트랜잭션 기능을 직접적으로 사용할 일은 전혀 없으며, 카프카 스트림즈가 대신 해당 기능을 사용해서 우리가 필요로 하는 보장을 제공한다. (가장 쉽고 예상했던 결과를 얻을 가능성이 높은 방법)
- 카프카 스트림즈 애플리케이션에서 ‘정확히 한 번’ 보장 기능을 활성화 하려면 processing.guarantee 설정을 exactly_once이나 exactly_once_bata로 잡아주면 된다.
- exactly_once_bata는 크래시가 나거나 트랜잭션 전송중에 멈춘 애플리케이션 인스턴스를 처리하는 방식이 조금 다르며, 트랜잭션적 프로듀서에서 더 많은 파티션을 효율적으로 다룰 수 있다.
💡 exactly_once 와 exactly_once_beta의 차이는 내부 로직에서 사용되는 프로듀서의 수 차이
📌 exactly_once(deprecated) < exactly_once_beta < exactly_once_v2 (권장)
카프카 스트림즈를 사용하지 않을 경우는 트랜잭션 API를 직접 사용하면 된다.
트랜잭션 ID와 펜싱
프로듀서가 사용할 트랜잭션 ID를 잘못 할당해 줄 경우 애플리케이션에 에러가 발생하거나 “정확히 한 번” 보장을 준수할 수 없게 될 수도 있다.
📌 핵심 요구 조건
트랜잭션 ID가 동일 애플리케이션 인스턴스가 재시작 했을 때는 일관적으로 유지하고, 서로 다른 애플리케이션 인스턴스에 대해서는 서로 달라야 한다.
- 그렇지 않다면 브로커는 좀비 인스턴스의 요청을 쳐내지 못하게 된다.
v2.5 이전에는 스레드에 할당되는 트랜잭션 ID는 랜덤하게 결정되고, 동일한 파티션에 쓰기 작업을 할 때 언제나 동일한 트랜잭션 ID가 쓸거라는 보장이 없다.
- 트랜잭션 ID를 파티션에 정적으로 대응시켜야 했었다.
트랜잭션 ID와 컨슈머 그룹 메타데이터를 함께 사용하는 펜싱을 도입하였다. (v2.5)
- 동일한 그룹에 속한 서로 다른 컨슈머가 읽어 온 레코드를 트랜잭션적 프로듀서에게 넘겨준다. (프로듀서 A/B)
컨슈머 A와 프로듀서 A가 포함된 애플리케이션 인스턴스가 좀비가 되고 컨슈머 B가 두 파티션을 처리할 경우
- 애플리케이션은 기존 프로듀서의 쓰기를 펜싱하고 파티션 0에 무사히 쓰기 작업을 수행하기 위해 트랜잭션 ID가 A인 새 프로듀서를 생성해야 한다. (낭비 작업)
트랜잭션에 컨슈머 그룹 정보를 포함하면, 프로듀서 B로부터의 트랜잭션은 다음 세대의 컨슈머 그룹에서 온 것이 명백하므로 문제없이 작업을 진행할 수 있다.
- 좀비가 된 프로듀서 A의 트랜잭션은 이전 세대의 컨슈머 그룹에서 온 것인 만큼 펜싱된다.
트랜잭션의 작동 원리
카프카의 트랜잭션은 다수의 파티션에 대해 트랜잭션이 커밋되었거나 중단되었다는 것을 표시하기 위해 마커 메시지를 사용한다. (찬디-램포트-스냅샷 알고리즘 영향 받음)
- 프로듀서가 트랜잭션을 커밋하기 위해 트랜잭션 코디네이터에 ‘커밋’ 메시지를 보내면 트랜잭션 코디네이터가 트랜잭션에 관련된 모든 파티션에 커밋 마커를 쓴다.
🤔 일부 파티션에만 커밋 메시지가 쓰여진 상태에서 프로듀서가 크래시 발생한 경우
카프카 트랜잭션은 2단계 커밋과 트랜잭션 로그를 사용해서 이 문제를 해결한다.
- transaction_state라는 이름의 내부 토픽을 사용한다. (로그)
- 1️⃣ 현재 진행중인 트랜잭션이 존재함을 로그에 기록한다 (연관된 파티션들 역시 함께 기록한다.)
- 2️⃣ 로그에 커밋 혹은 중단 시도를 기록한다. (일단 로그에 기록이 남으면 최종적으로는 커밋되거나 중단되어야 한다.)
- 3️⃣ 모든 파티션에 트랜잭션 마커를 쓴다.
- 4️⃣ 트랜잭션이 종료되었음을 로그에 쓴다.
코드 레벨에서는 어떻게 흘러갈까?
initTransaction()를 호출해서 자신이 트랜잭션 프로듀서임을 등록해야 한다.
producer.initTransaction();
- 트랜잭션 프로듀서의 트랜잭션 코디네이터 역할을 맡은 브로커로 요청이 보내진다.
- 각 트랜잭션 ID의 트랜잭션 코디네이터는 트랜잭션 ID에 해당하는 트랜잭션 로그 파티션의 리더 브로커가 맡는다.
각 브로커는 전체 프로듀서의 트랜잭션 코디네이터 역할을 나눠서 맡는다.
- 각 브로커가 전체 컨슈머 그룹의 컨슈머 그룹 코디네이터 역할을 나눠서 맡는 것과 비슷하다.
initTransaction() API는 코디네이터에 새 트랜잭션 ID를 등록하거나, 좀비 프로듀서들을 펜싱하기 위해서 기존 트랜잭션 ID의 epoch 값을 증가시킨다.
- epoch 값이 증가되면 아직 완료되지 않은 트랜잭션들은 중단될 것이다.
beginTransaction() API를 호출하면 프로듀서에 현재 진행중인 트랜잭션이 있음을 알려준다.
producer.beginTransaction();
- 프로토콜의 일부는 아니며, 브로커 쪽의 트랜잭션 코디네이터는 트랜잭션이 시작되었다는 사실을 모른다.
그렇다면 트랜잭션이 시작되었다는 것을 어떻게 알 수 있을까?
poducer.sendOffsetsToTransaction() API 메서드 호출을 통해서 프로듀서가 레코드 전송을 시작한다.
- 다른 KafkaProducer 요청들을 봐도 마지막에 producerMetrics에 실행시간을 기록하는걸 볼 수 있다.
TransactionManager의 sendOffsetsToTransaction() 안에서 요청을 만들어서 handler로 감싼 뒤 우선순위 큐에 넣어준다.
- 이 메서드를 호출하면 트랜잭션 코디네이터로 오프셋과 컨슈머 그룹 ID가 포함된 요청이 전송된다.
public synchronized TransactionalRequestResult sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata groupMetadata) {
this.ensureTransactional();
this.throwIfPendingState("sendOffsetsToTransaction");
this.maybeFailWithError();
if (this.currentState != TransactionManager.State.IN_TRANSACTION) {
throw new IllegalStateException("Cannot send offsets if a transaction is not in progress (currentState= " + this.currentState + ")");
} else {
this.log.debug("Begin adding offsets {} for consumer group {} to transaction", offsets, groupMetadata);
// 요청을 만들고
AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(
(new AddOffsetsToTxnRequestData())
.setTransactionalId(this.transactionalId)
.setProducerId(this.producerIdAndEpoch.producerId)
.setProducerEpoch(this.producerIdAndEpoch.epoch)
.setGroupId(groupMetadata.groupId()));
AddOffsetsToTxnHandler handler = new AddOffsetsToTxnHandler(builder, offsets, groupMetadata, (SyntheticClass_1)null);
// 요청을 handler로 감싸서 우선순위 큐에 넣어준다.
this.enqueueRequest(handler);
return handler.result;
}
}
- 트랜잭션 코디네이터는 컨슈머 그룹 ID를 사용해서 컨슈머 그룹 코디네이터를 찾은 뒤, 컨슈머 그룹이 보통 하는 것과 같은 방식으로 오프셋을 커밋한다.
즉, 프로듀서가 레코드 전송을 시작하면 프로듀서는 새로운 파티션으로 레코드를 전송할 때 마다 브로커에 트랜잭션임을 알리는 요청을 보낸다. (AddPartitionsToTxn)
- 이 요청에 현재 이 프로듀서에 진행중인 트랜잭션이 있으며 레코드가 추가되는 파티션들이 트랜잭션의 일부임을 알린다.
- 이 정보는 트랜잭션 로그에 기록될 것이다.
트랜잭션이 커밋되기 전에 sendOffsetsToTransaction() 호출로 오프셋을 커밋했으므로 트랜잭션을 커밋 아니면 중단을 해주면 된다.
try {
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction(); // 트랜잭션 중단
resetToLastCommittedPositions(consumer); // 재시도를 위해 컨슈머 위치를 뒤로 돌린다.
}
트랜잭션을 커밋하거나, 중단하면 트랜잭션 매니저의 beginCommit(), beginAbort()이 호출되는데
메서드 내부에서 beginCompletingTransaction()를 호출하고 트랜잭션 코디네이터에 EndTxn 요청이 전송된다.
- 트랜잭션 코디네이터는 트랜잭션 로그에(transaction_state) 커밋 혹은 중단 시도를 기록한다.
- 기록을 성공적으로 한다면 트랜잭션을 커밋하거나, 중단하는 것은 트랜잭션 코디네이터에 달려 있다.
private TransactionalRequestResult beginCompletingTransaction(TransactionResult transactionResult) {
if (!this.newPartitionsInTransaction.isEmpty()) {
this.enqueueRequest(this.addPartitionsToTransactionHandler());
}
if (!(this.lastError instanceof InvalidPidMappingException)) {
EndTxnRequest.Builder builder = new EndTxnRequest.Builder(
(new EndTxnRequestData())
.setTransactionalId(this.transactionalId)
.setProducerId(this.producerIdAndEpoch.producerId)
.setProducerEpoch(this.producerIdAndEpoch.epoch)
.setCommitted(transactionResult.id));
EndTxnHandler handler = new EndTxnHandler(builder, (SyntheticClass_1)null);
this.enqueueRequest(handler);
if (!this.epochBumpRequired) {
return handler.result;
}
}
return this.initializeTransactions(this.producerIdAndEpoch);
}
- 트랜잭션 코디네이터는 우선 트랜잭션에 포함된 모든 파티션에 커밋 마커를 쓴 다음 트랜잭션 로그에 커밋이 성공적으로 완료되었음을 기록해 넣는다.
커밋 시도는 로그에 기록되었지만 전체 과정이 완료되기 전에 트랜잭션 코디네이터가 종료되거나 크래시날 수 있다.
- 이 경우 새로운 트랜잭션 코디네이터가 선출되어 트랜잭션 로그에 대한 커밋 작업을 대신 마무리 짓는다.
- 트랜잭션이 transaction.timout.ms에 설정된 시간 내에 커밋되지도, 중단되지도 않는다면, 트랜잭션 코디네이터는 자동으로 트랜잭션을 중단한다.
트랜잭션 혹은 멱등적 프로듀서로부터 레코드를 전달받는 각 브로커는 메모리상에 프로듀서/트랜잭션 ID와 프로듀서가 전송한 마지막 배치 5개에 연관된 상태(시퀀스 넘버, 오프셋 등)를 저장한다.
- 이 상태는 프로듀서가 정지하고 나서도 transactional.id.expiration.ms 밀리초만큼 저장된다. (기본값: 7일)
애플리케이션을 설계할 때 초기화 과정에서 오랫동안 유지되는 프로듀서는 몇 개만 생성한 뒤 애플리케이션이 종료될 때까지 재사용하길 권장한다.
- 브로커에 메모리 고갈이나 심각한 가비지 수집 문제를 발생시킬 수 있다.
재사용하지 못 한다면 transactional.id.expiration.ms 설정값을 낮춰 잡자.
- 트랜잭션 ID가 더 빨리 만료되도록 하여, 재활용도 불가능한 오래 된 상태가 브로커 메모리의 상당 부분을 차지하는 사태를 방지할 것을 권장한다.
트랜잭션이 멈췄다면?
트랜잭션이 진행중인 파티션의 LSO값이 증가되지 않는 문제가 발생할 수 있다. (hanging transaction)
- 해당 파티션을 읽어오고 있는, 격리 수준이 read-committed로 설정된 컨슈머 읽기 작업이 멈출 수 있다.
- 해당 트랜잭션이 로그 보존 기능(log.retention.ms, 압착)등으로 인해 삭제될 때까지 작업이 재개되지 않는다.
갑자기 정상 동작하던 컨슈머가 멈췄다면 이 현상을 의심해 볼 필요가 있다.
- retention.ms 설정값을 동적으로 잡아 줌으로써 문제가 되는 트랜잭션을 삭제해서 문제를 넘길 수 있다.
- 어느 정도의 데이터 유실은 감수해야 한다.
- 토픽 파티션별로 LastStableOffsetLag 지표를 모니터링 해서 파티션의 LSO가 최신 오프셋 값에서 얼마나 떨어졌는지 확인해서 알 수 있다. (무한히 증가하는 상황이 발생한 경우)
트랜잭션 성능
✅ 프로듀서 입장에서 트랜잭션 오버헤드
- 프로듀서를 생성해서 사용하는 동안 트랜잭션 ID 등록 요청은 단 한 번만 발생한다.
- 파티션들을 등록하는 추가적인 호출은 각 트랜잭션에 있어서 파티션 별로 최대 한 번씩만 이루어진다.
- 각 트랜잭션이 커밋 요청을 전송하면, 파티션마다 커밋 마커가 추가된다.
트랜잭션 초기화와 커밋 요청은 동기적으로 작동한다.
- 성공, 실패, 타임아웃 되거나 할 때까지 어떤 데이터도 전송되지 않아서 오버헤드는 더 증가한다. (동기)
프로듀서에 있어서 트랜잭션 오버헤드는 트랜잭션에 포함된 메시지의 수와는 무관하다.
- 트랜잭션마다 많은 수의 메시지를 집어넣는 쪽이 상대적으로 오버헤드가 적을 뿐 아니라 동기적으로 실행되는 단계의 수도 줄어든다.
- 즉, 메시지를 조금씩 여러번 트랜잭션을 계속 초기화하고 커밋하는 동기작업을 줄일 수 있으므로 전체 처리량은 올라간다.
✅ 컨슈머 입장에서 트랜잭션 오버헤드
- 커밋 마커를 읽어오는 작업에 관련해서 약간의 오버헤드가 있다.
read_committed 모드 컨슈머에서는 아직 완료되지 않은 트랜잭션의 레코드들이 리턴되지 않는다.
- 트랜잭션 커밋 사이의 간격이 길어질수록 컨슈머는 메시지가 리턴될 때까지 더 기다려야 한다.
- 결과적으로 종단 지연 역시 그만큼 길어지게 된다.
컨슈머는 아직 완료되지 않은 트랜잭션에 속하는 메시지들을 버퍼링할 필요가 없다.
- 브로커는 컨슈머가 보낸 읽기 요청을 받는다고 해서 이 메시지들을 리턴하지 않는다.
- 트랜잭션 데이터를 읽을 때 컨슈머 쪽에 추가적인 작업은 없어, 자연히 처리량이 줄어들지도 않는다.
'책 > 카프카 핵심 가이드' 카테고리의 다른 글
신뢰성 있는 데이터 전달(Kafka) (0) | 2025.01.14 |
---|---|
카프카 내부 메커니즘 (1) | 2024.12.22 |
프로그램 내에서 코드로 카프카 관리하기 (2) | 2024.12.08 |
카프카 컨슈머 : 카프카에서 데이터 읽기 (1) | 2024.11.24 |
카프카 프로듀서: 카프카에 메시지 쓰기 (1) | 2024.11.09 |