아파치 주키퍼를 사용하는 이유
카프카는 현재 클러스터의 멤버인 브로커들의 목록을 유지하기 위해서다.
- 각 브로커는 브로커 설정 파일에 정의되었거나 아니면 자동으로 생성된 고유한 식별자를 가진다.
브로커 프로세스는 시작될 때마다 주키퍼 Ephemeral 노드의 형태로 ID를 등록한다.
- 컨트롤러를 포함한 카프카 브로커들과 몇몇 생태계 툴들은 브로커가 등록되는 주키퍼의 /brokers/ids 경로를 구독함으로써 브로커가 추가/제거 될 때마다 알림을 받는다.
- ID는 중복되지 않으며, 중복된 값이 추가된다면 동일한 브로커 ID를 갖는 ZNode가 있기 때문에 실패하며 에러가 발생한다.
ZNode의 종류는 영속 종류에 따라 다음과 같이 구분된다. (참조)
- Persistent Nodes(영구 노드) : 명시적으로 삭제되기 전까지 존재함
- Ephemeral Nodes(임시 노드) : 세션이 유지되는 동안 활성(세션이 종료되면 삭제됨), 자식 노드를 가질 수 없음
- Sequence Nodes(순차 노드) : 경로의 끝에 일정하게 증가하는 카운터 추가됨, 영구 및 임시 노드 모두에 적용 가능
브로커와 주키퍼 간의 연결이 끊어질 경우(브로커 정지, 네트워크 단절, 가비지 수집이 오래걸리는 경우), 브로커가 시작될 때 생성한 Ephemeral 노드는 자동으로 주키퍼에서 삭제된다.
- watch를 설정한 카프카 컴포넌트들은 이 브로커 목록을 지켜보고 해당 브로커가 내려갔음을 알아차린다.
브로커가 정지하면 브로커를 나타내는 ZNode 역시 삭제되지만, 브로커의 ID는 다른 자료구조에 남아 있게 된다.
- 각 토픽의 레플리카 목록에는 해당 레플리카를 저장하는 브로커의 ID가 포함되기 때문이다.
- 특정 브로커가 완전히 유실되어 동일한 ID를 가진 새로운 브로커를 투입할 경우, 곧바로 클러스터에서 유실된 브로커의 자리를 대신해서 이전 브로커의 토픽과 파티션들을 할당받는다.
컨트롤러
컨트롤러는 일반적인 카프카 브로커의 기능을 더해서 파티션 리더를 선출하는 역할을 추가로 맡는다.
- 클러스터에서 가장 먼저 시작되는 브로커는 주키퍼의 /controller에 Ephemeral 노드를 생성함으로써 컨트롤러가 된다.
클러스터 안에 단 한 개의 컨트롤러만 있도록 보장하기 위해서 브로커들은 /controller에 Ephemeral 노드를 watch를 설정한다.
- 주키퍼의 컨트롤러 노드에 변동이 생겼을 때 알아차릴 수 있다.
- 다른 브로커들도 시작할 때 해당 위치에 노드를 생성하려고 시도하지만 노드가 이미 존재한다는 예외를 받으므로 클러스터에 이미 컨트롤러 노드가 존재한다는 것을 알 수 있다.
컨트롤러 브로커가 멈추거나 주키퍼와의 연결이 끊어질 경우
- 주키퍼 클라이언트가 zookeeper.session.timeout.ms에 설정된 값보다 더 오랫동안 주키퍼에 하트비트를 전송하지 않는 경우
- Ephemeral 노드는 삭제되는데 클러스터 안의 다른 브로커들은 주키퍼에 설정된 와치를 통해 컨트롤러가 없어졌음을 알게되고 주키퍼에 컨트롤러 노드를 생성하려고 시도한다.
- 노드가 이미 존재한다는 예외를 받으면 새 컨트롤러 노드에 대한 와치를 다시 생성한다.
브로커는 새로운 컨트롤러가 선출될 때마다 주키퍼의 조건적 증가 연산에 의해 증가된 epoch 값(세대)을 전달받게 된다.
- 브로커는 현재 컨트롤러의 epoch 값을 알고 있기 때문에, 만약 더 낮은 epoch 값을 가진 컨트롤러부터 메시지를 받을 경우 무시한다.
- 새로운 컨트롤러가 선출된 것을 알지 못한 채 브로커에 메시지를 보내는 컨트롤러를 좀비라고 부르는데 이런 현상을 방지하는 방법이기도 하다.
브로커가 컨트롤러가 되면, 클러스터 메타데이터 관리와 리더 선출을 시작하기 전에 먼저 주키퍼로부터 최신 레플리카 상태 맵을 읽어온다.
- 이 적재 작업은 비동기 API를 사용하며, 지연을 줄이기 위해 읽기 요청을 여러 단계로 나눠서 주키퍼로 보낸다.
- 파티션 수가 매우 많은 클러스터에서는 적재 작업이 몇 초씩 걸릴 수도 있다.
브로커가 클러스터를 나갔다는 사실을 컨트롤러가 알게된 경우
- 컨트롤러는 해당 브로커가 리더를 맡고 있었던 모든 파티션에 대해 새로운 브로커를 할당해준다.
- 새로운 리더가 필요한 모든 파티션을 순회해 가면서 새로운 리더가 될 브로커를 결정한다.
- 단순히 해당 파티션의 레플리카 목록에서 바로 다음 레플리카가 새 브로커가 된다.
새로운 리더가 될 브로커를 결정 했다면?
- 결정된 새로운 상태를 주키퍼에 쓴 뒤 새로 리더가 할당된 파티션의 레플리카를 포함하는 모든 브로커에 LeaderAndISR 요청을 보낸다.
- 지연을 줄이기 위해 요청을 여러 개로 나눠서 비동기 방식으로 주키퍼로 보낸다.
- 해당 파티션들에 대한 새로운 리더와 팔로워 정보를 포함한다.
- 효율성을 위해 이러한 요청들은 배치 단위로 묶여서 전송된다.
- 즉, 각각의 요청은 같은 브로커에 레플리카가 있는 다수의 파티션에 대한 새 리더십 정보를 포함하게 된다.
새로 리더가 된 브로커 각각은 클라이언트로부터의 쓰기 혹은 읽기 요청을 처리하고 팔로워들은 새 리더로부터 메시지를 복제하기 시작한다.
- 클러스터 안의 모든 브로커는 클러스터 내 전체 브로커와 레플리카의 맵을 포함하는 MetadataCache를 가지고 있다.
- 컨트롤러는 모든 브로커에 리더십 변경 정보를 포함하는 UpdateMetadata 요청을 보내서 각각의 캐시를 업데이트 하도록 한다.
- 브로커가 백업을 시작할 때도 비슷한 과정이 반복되며 차이점은 브로커에 속한 모든 레플리카들은 팔로워로 시작하며, 리더로 선출될 자격을 얻기 위해서는 그 전에 리더에 쓰여진 메시지를 따라잡아야 한다.
요약
- 컨트롤러는 브로커가 클러스터에 추가되거나 제거될 때 파티션과 레플리카 중에서 리더를 선출할 책임을 가진다.
- 컨트롤러는 서로 다른 2개의 브로커가 자신이 현재 컨트롤러라 생각하는 split brain 현상을 방지하기 위해 epoch 번호를 사용한다.
복제
복제가 중요하나 이유는 개별적인 노드에 필연적으로 장애가 발생할 수 밖에 없는 상황에서 카프카가 신뢰성과 지속성을 보장하는 방식이기 때문이다.
- 각각의 레플리카는 브로커에 저장되는데, 대개 하나의 브로커는 수백 개에서 수천 개의 레플리카를 저장한다.
리더 역할 레플리카는 어느 팔로워 레플리카가 최신 상태를 유지하고 있는지 확인한다.
- 팔로워 레플리카는 메시지가 도착하는 즉시 리더 레플리카로부터 모든 메시지를 복제해 옴으로써 최신 상태를 유지하지만, 다양한 원인으로 동기화가 깨질 수 있다.
- 팔로워 레플리카들은 리더 레플리카에 읽기 요청을 보내는데, 컨슈머가 메시지를 읽어오기 위해 사용하는 요청과 동일하다.
- 다음번에 받아야 할 메시지 오프셋을 포함할 뿐만 아니라 언제나 메시지를 순서대로 돌려 준다.
- 리더 레플리카는 각 팔로워 레플리카가 마지막으로 요청한 오프셋을 확인해 뒤쳐지는 정도를 알 수 있다.
팔로워 레플리카가 일정 시간 이상 읽기 요청을 보내지 않거나, 읽기 요청을 보냈는데 가장 최근에 추가된 메시지를 따라잡지 못하는 경우에 해당 레플리카는 동기화가 풀린 것으로 간주한다. (out-of-sync replica)
- 해당 레플리카가 모든 메시지를 가지고 있지 않기 때문에 이 경우 장애 상황에서 리더가 될 수 없다.
- 즉, 지속적으로 최신 메시지를 요청하는 레플리카(in-sync replica)만 리더로 선출될 수 있다.
- 여기서 말하는 일정 시간은 replica.lag.time.max.ms 설정 매개변수에 의해 결정된다.
각 파티션은 리더 레플리카 뿐만 아니라 토픽이 처음 생성 되었을 때 리더 레플리카였던 레플리카(선호 리더)를 갖는다.
- 파티션이 처음 생성되던 시점에는 리더 레플리카가 모든 브로커에 걸쳐 균등하게 분포되기 때문에 선호라는 표현이 붙었다.
- 클러스터 내의 모든 파티션에 대해 선호 리더가 실제 리더가 될 경우 부하가 브로커 사이에 균등하게 분배될 것이라고 예상할 수 있다.
- 카프카에는 auto.leader.rebalance.enable=true 설정이 기본값이기 때문에 선호 리더가 현재 리더가 아니지만, 현재 리더와 동기화가 되고 있을 경우 리더로 선출한다.
선호 리더는 kafka-topics.sh 툴이 출력하는 레플리카 상세 정보를 보고 찾을 수 있다.
- 현재 리더가 어느 브로커에 있는지, 레플리카 재할당 툴을 사용해서 다른 브로커로 재할당 되었는지의 여부와 상관없이 목록에 표시된 첫 번째 레플리카가 선호 리더다.
- 수동으로 레플리카를 재할당 한다면 첫 번째로 지정하는 레플리카가 선호 레플리카가 된다.
- 선호 레플리카를 서로 다른 브로커들로 분산함으로써 부하가 한쪽으로 몰리지 않도록 하자.
요청 처리
카프카 브로커가 하는 일의 대부분은 클라이언트, 파티션 레플리카, 컨트롤러가 파티션 리더에게 보내는 요청을 처리하는 것이다.
- 카프카는 TCP로 전달되는 이진 프로토콜을 가지며 요청에 대해서 브로커가 응답하는 방식을 정의한다.
언제나 클라이언트가 연결을 시작하고 요청을 전송하며, 브로커는 요청을 처리하고 클라이언트로 응답을 보낸다.
- 특정 클라이언트가 브로커로 전송한 모든 요청은 받은 순서대로 처리되기 때문에 저장하는 메시지 순서가 보장되며, 카프카를 메시지 큐로 사용할 수도 있게 된다.
모든 요청에는 아래와 같은 내용을 포함하는 표준 헤더를 갖는다.
- 요청 유형: API 키라고도 불린다.
- 요청 버전: 브로커는 서로 다른 버전의 클라이언트로부터 요청을 받아 각각의 버전에 맞는 응답을 한다.
- Correlation ID: 각각의 요청에 붙는 고유한 식별자로 응답이나 에러 로그에도 포함되기 때문에 트러블 슈팅에 사용할 수 있다.
- 클라이언트 ID: 요청을 보낸 애플리케이션을 식별하기 위해 사용한다.
브로커가 요청을 처리하는 방식
브로커는 연결을 받는 각 포트별로 acceptor 스레드를 하나씩 실행시킨다.
- acceptor 스레드는 연결을 생성하고 들어온 요청을 processor 스레드에 넘겨 처리하도록 한다.
- processor 스레드는 네트워크 스레드라고도 불리며 수는 설정이 가능하다.
- processor 스레드는 클라이언트 연결로부터 들어온 요청들을 받아서 요청 큐에 넣고, 응답 큐에서 응답을 가져다 클라이언트로 보낸다.
- 요청 큐에 저장된 요청은 I/O 스레드가 처리해서 응답 큐에 저장한다.
- 지연된 응답들은 완료될 때까지 퍼거토리에 저장된다.
가장 일반적인 형태의 클라이언트 요청 유형
쓰기 요청
- 카프카 브로커로 메시지를 쓰고 있는 프로듀서가 보낸 요청
읽기 요청
- 카프카 브로커로부터 메시지를 읽어오고 있는 컨슈머나 팔로워 레플리카가 보낸 요청
- 컨슈머의 경우 브로커 쪽에 데이터가 준비되었을 때에만 응답을 보낼 수 있다.
어드민 요청
- 토픽 생성이나 삭제와 같이 메타데이터 작업을 수행중인 어드민 클라이언트가 보낸 요청
쓰기 요청과 읽기 요청 모두 파티션의 리더 레플리카로 전송되어야 한다.
- 만약 브로커가 다른 브로커가 리더를 맡고 있는 파티션에 대한 쓰기 요청을 받을 경우, 쓰기 요청을 보낸 클라이언트는 Not a Leader for Partition 에러를 응답으로 받을 것이다.
- 읽기 요청을 받은 경우에도 동일한 에러가 발생한다.
카프카의 클라이언트는 요청에 맞는 파티션의 리더를 맡고 있는 브로커에 쓰기나 읽기 요청을 전송할 책임을 진다.
- 메타데이터 요청이라 불리는 또 다른 유형의 요청을 통해서 어디로 보낼지 알 수 있다.
- 서버는 클라이언트가 다루고자 하는 토픽들에 파티션들과 각 파티션의 레플리카 정보 그리고 어떤 레플리카가 리더인지를 명시하는 응답을 리턴한다.
- 클라이언트는 응답값을 캐시해 두었다가 각 파티션의 리더 역할을 맡고 있는 브로커에 바로 쓰거나 읽는다.
- 모든 브로커들은 메타데이터 캐시를 가지고 있기 때문에 아무 브로커에나 보내도 상관없다.
클라이언트는 토픽 메타데이터가 변경될 경우에도 최신값을 유지할 수 있도록 새로고침 해야 한다.
- 새 브로커가 추가되거나 일부 레플리카가 새 브로커로 이동한 경우와 Not a Leader 에러를 리턴받을 경우 요청을 재시도하기 전에 먼저 메타데이터를 새로고침한다.
- 새로고침 간격은 metadata.max.age.ms 설정 매개변수로 조절 가능하다.
쓰기 요청
acks 설정 매개변수는 쓰기 작업이 성공한 것으로 간주되기 전 메시지에 대한 응답을 보내야 하는 브로커의 수를 가리킨다.
- acks=1 : 리더만이 메시지를 받았을 때
- acks=all : 모든 인-싱크 레플리카들이 메시지를 받았을 때
- acks=0 : 메시지가 보내졌을 때 (브로커의 응답을 기다리지 않는다.)
파티션의 리더 레플리카를 가지고 있는 브로커가 해당 파티션에 대한 쓰기 요청을 받게되면 몇가지 유효성 검증 부터 한다.
- 데이터를 보내고 있는 사용자가 토픽에 대한 쓰기 권한이 있는지
- 요청에 지정되어 있는 acks 설정값이 올바른지 (0, 1, 아니면 all)
- acks 설정값이 all인 경우 메시지를 안전하게 쓸 수 있을 만큼 충분한 in-sync replica가 있는지
- 현재 in-sync replica 수가 설정된 값 아래로 내려가면 새로운 메시지를 받지 않도록 브로커 설정 가능
검증이 완료되면 브로커는 새 메시지들을 로컬 디스크에 쓴다.
- 리눅스의 경우 메시지는 파일시스템 캐시에 쓰여지는데, 언제 디스크에 반영될지에는 보장이 없다.
- 카프카는 데이터가 디스크에 저장될 때까지 기다리지 않는다.
- 즉, 메시지의 지속성을 위해 복제에 의존한다.
메시지가 파티션 리더에 쓰여지면 브로커는 acks 설정에 따라 응답을 내려보낸다.
- acks=0 or 1: 바로 응답을 보낸다.
- acks=all: 일단 요청을 퍼거토리 버퍼에 저장하고 팔로워 레플리카들이 메시지를 복제한 것을 확인한 다음에야 클라이언트에 응답을 보낸다.
읽기 요청
어떠한 경우에도 브로커는 적절한 에러를 클라이언트에게 리턴하고, 컨슈머는 사용자가 개입하지 않아도 모든 파티션 메타데이터를 포함하는 읽기 요청을 보낸다.
읽기 요청도 쓰기 요청과 유사하게 파티션 리더를 맡고 있는 브로커에게 전송된다.
- 메타데이터에 대한 요청을 보낸다.
- 요청을 받은 파티션 리더는 요청이 유효한지 확인한다. (지정된 오프셋이 해당 파티션에 존재하는지)
- 너무 오래되어 파티션에서 삭제된 메시지나 존재하지 않으면 브로커는 에러를 응답으로 보낸다.
- 오프셋이 존재하면 브로커는 파티션으로 부터 클라이언트가 요청에 지정한 크기 한도만큼의 메시지를 읽어서 클라이언트에게 응답한다.
클라이언트는 각 파티션에 대해 브로커가 리턴할 수 있는 최대 데이터의 양을 지정할 수 있다.
- 브로커가 되돌려준 응답을 담을 수 있을 정도로 충분히 큰 메모리를 할당해야 하기 때문이다.
- 추가적으로 보내는 데이터가 최소 어느정도 용량까지 쌓이면 결과를 리턴하도록 설정도 가능하다.
- 전체적으로는 같은 양의 데이터를 읽지만, 데이터를 주고받는 횟수(네트워크 통신)는 훨씬 적어 오버헤드를 줄일 수 있다.
- 너무 높게 잡으면 계속 기다리기 때문에 타임아웃을 지정할 수도 있다.
카프카는 클라이언트에게 보내는 메시지에 zero-copy 최적화를 적용하는 것으로 유명하다.
- 리눅스의 파일시스템 캐시에서 읽어 온 메시지들을 중간 버퍼를 거치지 않고 바로 네트워크 채널로 보낸다.
- 클라이언트에게 데이터를 보내기 전에 로컬 캐시에 저장하는 대부분의 DB와의 차이점으로 데이터를 복사하고 메모리 상에 버퍼를 관리하기 위한 오버헤드가 사라져 성능이 향상된다.
파티션 리더에 존재하는 모든 데이터를 클라이언트가 읽을 수 있는건 아니다.
- 모든 in-sync replica에 쓰여진 메시지들만 읽을 수 있다.
- 팔로워 레플리카들은 컨슈머이긴 하지만, 복제 기능을 위해서 이 룰에서는 예외다.
- 만약 읽을려고 할 경우 예외가 발생하는 것이 아니라 빈 응답이 리턴된다.
- 리더에만 존재하는 모든 메시지들을 읽을 수 있다면 크래시 상황에서 일관성이 결여될 수 있다.
브로커 사이의 복제가 늦어지면, 새 메시지가 컨슈머에 도달하는 데 걸리는 시간도 길어진다.
- 지연되는 시간은 replica.lag.time.max.ms 설정값에 따라 제한된다.
- in-sync 상태로 판정되는 레플리카가 새 메시지를 복제하는 과정에서 지연될 수 있는 최대 시간
- 시간 이상으로 지연되면 out-of-sync replica가 된다.
컨슈머가 매우 많은 수의 파티션들로부터 이벤트를 읽어오는 경우
읽고자 하는 파티션의 전체 목록을 요청할 때마다 브로커에게 메타데이터를 요청하는건 비효율적이다.
- 파티션의 집합이나 연관된 메타데이터는 잘 바뀌지 않고 많지도 않기 때문에 카프카는 읽기 세션 캐시를 사용한다.
컨슈머는 읽고 있는 파티션의 목록과 그 메타데이터를 캐시하는 세션을 생성할 수 있다.
- 브로커는 변경 사항이 있는 경우에만 응답에 메타데이터를 포함하면 된다.
- 세션 캐시의 크기에도 한도가 있는 만큼 카프카는 팔로워 레플리카나 읽고 있는 파티션의 수가 더 많은 컨슈머를 우선시할 수 밖에 없다.
- 캐시된 세션이 아예 생성되지 않거나 생성되었던 것이 해제될 수도 있다.
물리적 저장소
카프카의 기본 저장 단위는 파티션 레플리카이다.
- 파티션은 서로 다른 브로커들 사이에 분리될 수 없으며, 같은 브로커의 서로 다른 디스크에 분할 저장되는 것조차 불가능하다.
- 파티션의 크기는 특정 마운트 지점에 사용 가능한 공간에 제한을 받는다.
- 파티션들이 저장될 디렉토리 목록을 정의할 때 log.dirs 매개변수에 지정된다.
- log4j.properties 파일에 정의되는, 에러 로그등을 저장하는 디렉토리와 혼동하지 말자
- 카프카가 사용할 각 마운트 지점별로 하나의 디렉토리를 포함하도록 설정하는 것이 일반적이다.
계층화된 저장소
계층화된 저장소 기능은 무한한 저장 공간, 더 낮은 비용, 탄력성뿐만 아니라 오래 된 데이터와 실시간 데이터를 읽는 작업을 분리시키는 기능이 있다.
- 카프카 3.0부터 파티션 데이터를 저장할 때 대량의 데이터를 저장하기 위해 계층화된 저장소를 사용한다.
계층화 된 저장소가 필요했던 이유
파티션별로 저장 가능한 데이터에 한도가 있다.
- 최대 보존 기한과 파티션 수는 물리적인 디스크 크기에 제한을 받는다.
디스크와 클러스터 크기는 저장소 요구 조건에 의해 결정된다.
- 지연과 처리량이 주 고려사항일 경우 클러스터는 필요한 것 이상으로 커지는 경우가 많다.
클러스터의 크기를 키우거나 줄일 때, 파티션의 위치를 다른 브로커로 옮기는 데 걸리는 시간은 파티션의 크기에 따라 결정된다.
- 파티션의 크기가 클수록 클러스터의 탄력성은 줄어든다.
- 클라우드 환경의 유연한 옵션을 활용할 수 있도록 최대한 탄력성을 가지는 것이 아키텍처 설계의 추세다.
계층화된 저장소 기능에서 카프카 클러스터의 저장소를 두 계층으로 나눈다.
로컬 계층과, 새로운 원격 계층으로 분리된다.
- 로컬 계층: 현재의 카프카 저장소 계층과 똑같이 로컬 세그먼트를 저장하기 위해 카프카 브로커의 로컬 디스크를 사용한다.
- 원격 계층: 완료된 로그 세그먼트를 저장하기 위해 HDFS나 S3와 같은 전용 저장소 시스템을 사용한다.
계층별로 서로 다른 보존 정책을 설정할 수 있다.
로컬 저장소가 리모트 계층 저장소에 비해 훨씬 비싼 것이 보통이므로 로컬의 보존기한은 대개 몇 시간 이하로 설정하고, 원격 계층은 며칠이나 몇 달로 설정할 수 있다.
로컬 저장소는 원격 저장소에 비해 지연이 훨씬 짧다.
- 로컬 계층에 저장되어 있는 최신 레코드를 읽어오는 만큼, 데이터를 전달하기 위해 페이지 캐시를 효율적으로 활용하는 카프카의 메커니즘에 의해 문제없이 동작한다.
- 빠진 처리 결과를 메꾸는 작업이나 장애에서 복구되고 있는 애플리케이션은 오래된 데이터를 필요로 하는 만큼 원격 계층에 있는 데이터가 전달된다.
구조를 분리함으로 카프카 클러스터의 메모리와 CPU에 상관없이 저장소를 확장할 수 있다.
- 카프카는 장기간용 저장 솔루션으로서의 역할을 할 수 있게 되었다.
- 카프카 브로커의 로컬에 저장되는 데이터 양 역시 줄어들며, 복구와 리벨런싱 과정에서 복사되어야 할 데이터 양 역시 줄어든다.
- 원격 계층에 저장되는 로그 세그먼트들은 굳이 브로커로 복원될 필요 없이 원격 계층에서 바로 클라이언트로 전달된다.
- 모든 데이터가 브로커에 저장되는 것은 아닌 만큼, 보존 기한을 늘려잡아 주더라도 더 이상 카프카 클러스터 저장소를 확장하거나 새로운 노드를 추가해 줄 필요가 없다.
- 동시에 전체 데이터 보존 기한 역시 더 길게 잡아줄 수 있다.
- 카프카의 데이터를 외부 저장소로 복사하는 별도의 데이터 파이프라인을 구축할 필요가 없다.
파티션 할당
사용자가 토픽을 생성하면, 카프카는 우선 이 파티션을 브로커 중 하나에 할당한다.
브로커가 6개있을 때 파티션이 10개, 복제 팩터가 3인 토픽을 생성한다고 가정해보자
카프카는 30개의 파티션 레플리카를 브로커 6개에 할당해야 한다.
- 레플리카들을 가능한 한 브로커 간에 고르게 분산시킨다. (브로커 별로 5개의 레플리카 할당)
- 각 파티션에 대해 각각의 레플리카는 서로 다른 브로커에 배치되도록 한다.
- 같은 파티션(리더나 팔로워) 여러개가 하나의 브로커에 배치될 수 없다.
- 브로커에 랙 정보가 설정되어 있다면 가능한 각 파티션의 레플리카들을 서로 다른 랙에 할당해야 한다.
- 하나의 랙 전체가 작동 불능에 빠지더라도 파티션 전체가 사용 불능에 빠지는 사태를 방지
위에 규칙을 지키기 위해 임의의 브로커부터 시작해서 각 브로커에 RR 방식으로 파티션을 할당함으로써 리더를 결정한다.
랙 인식 기능을 고려할 경우
RR 방식 대신 서로 다른 랙의 브로커가 번갈아 선택되도록 순서를 정해야 한다.
브로커 0,1이 같은 랙에 있고 2,3은 서로 다른 랙에 있다고 가정
각 브로커는 다른 랙에 있는 브로커 다음에 온다. (0, 2, 1, 3과 같은 순서로 선택)
- 파티션 0의 리더가 브로커 2에 할당 된다면 첫 번째 팔로워는 완전히 다른 랙에 위치한 브로커 1에 할당 된다.
첫 번째 랙이 오프라인이 되더라도 여전히 작동 가능한 레플리카가 있는 만큼 파티션은 여전히 사용 가능하다.
- 모든 레플리카에도 마찬가지며, 랙에 장애가 발생할 경우에도 가용성을 보장할 수 있다.
각 파티션과 레플리카에 올바른 브로커를 선택했다면?
새 파티션을 저장할 디렉토리를 결정해야 한다.
- 파티션별로 독립적으로 수행된다.
- 각 디렉토리에 저장되어 있는 파티션의 수를 센 뒤, 가장 적은 파티션이 저장된 디렉토리에 새 파티션을 저장한다.
- 새로운 디스크를 추가할 경우, 모든 새 파티션들은 이 디스크에 생성된다.
- 균형이 잡힐 때까지는 새 디스크가 항상 적은 수의 파티션을 보유하기 때문이다.
⚠️ 디스크 공간에 주의할 점
어떤 브로커들이 다른 것들에 비해 더 공간이 많이 남았다면 몇몇 파티션들이 비정상적으로 크거나, 같은 브로커에 서로 다른 크기의 디스크들이 장착되어 있을 수 있다.
- 파티션을 브로커에 할당해 줄 때 사용 가능한 공간이나 현재 부하 같은 것은 고려되지 않는다.
- 파티션을 디스크에 할당해 줄 때 디스크에 저장된 파티션의 수만이 고려될 뿐 크기는 고려되지 않는다.
파일 관리
카프카는 영구히 데이터를 저장하지도, 데이터를 지우기 전에 모든 컨슈머들이 메시지를 읽어갈 수 있도록 기다리지도 않는다.
- 각각의 토픽에 대한 보존 기한(retention period)은 설정할 수 있다.
- 기간과, 용량을 지정해 메시지를 관리한다.
큰 파일에서 삭제할 메시지를 찾아서 지우는 작업은 시간도 오래걸리고 에러의 가능성도 높다.
- 하나의 파티션을 여러 개의 세그먼트로 분할되며, 각 세그먼트는 1GB의 데이터 혹은 최근 1주일치의 데이터 중 적은 쪽 만큼을 저장한다.
- 카프카가 파티션 단위로 메시지를 쓰는 만큼 각 세그먼트 한도가 다 차면 세그먼트를 닫고 새 세그먼트를 생성한다.
- 현재 쓰여지고 있는 세그먼트를 엑티브 세그먼트라 부르며 세그먼트가 닫히기 전까지는 데이터를 삭제할 수도 없다.
카프카 브로커는 각 파티션의 모든 세그먼트에 대해 파일 핸들을 연다.
- 액티브 세그먼트가 아닌 세그먼트들도 예외는 아니다.
- 사용중인 파일 핸들 수가 매우 높게 유지될 수 있는 만큼 운영체제 역시 여기에 맞춰서 튜닝해 줄 필요가 있다.
파일 형식
각 세그먼트는 하나의 데이터 파일 형태로 저장되며, 파일 안에는 카프카 메시지와 오프셋이 저장된다.
- 디스크에 저장되는 데이터의 형식은 사용자가 프로듀서를 통해서 브로커로, 브로커로부터 컨슈머로 보내지는 메시지 형식과 동일하다.
네트워크를 통해 전달되는 형식과 디스크에 저장되는 형식을 통일했다.
- 카프카는 컨슈머에 메시지를 전송할 때 제로카피 최적화를 달성할 수 있었다.
- 프로듀서가 이미 압축한 메시지를 압축 해제해서 형식에 맞춘뒤 다시 압축하는 수고 역시 덜 수 있다.
메시지 형식을 변경한다면 네트워크 프로토콜과, 디스크 저장 형식이 모두 변경되어야 한다.
- 카프카 메시지는 사용자 payload와 시스템 header 두 부분으로 나누어 진다.
- 사용자 payload는 키값(선태사항)과 벨류값, header 모음(선택사항)을 포함한다.
- 각각의 header는 자체적인 key/value 순서쌍이다.
카프카 프로듀서는 언제나 메시지를 배치 단위로 전송한다. (버전 0.11, v2 메시지 형식 부터)
- 메시지를 배치 단위로 묶음으로써 공간을 절약하게 되는 만큼 네트워크 대역폭과 디스크 공간을 덜 사용한다.
- linger.ms 설정을 잡아서 딜레이를 추가함으로써 더 많은 메시지가 같은 배치로 묶일 확률을 올릴 수 있다.
카프카가 파티션 별로 별도의 배치를 생성하는 만큼, 더 적은 수의 파티션에 쓰는 프로듀서가 더 효율적이다.
- 카프카 프로듀서가 같은 쓰기 요청에 여러 개의 배치를 포함할 수 있다.
메시지와 레코드 헤더 정보 아래 접은글
메시지 배치 헤더는 아래 같은 것들이 포함된다.
- 메시지 형식의 현재 버전을 가리키는 매직 넘버(여기서는 v2를 기준으로 설명)
- 배치에 포함된 첫 번째 메시지의 오프셋과 마지막 오프셋의 차이
- 이 값들은 나중에 배치가 압착되어 일부 메시지가 삭제되더라도 보존된다.
- 이 배치를 처음으로 저장하는 브로커(파티션 리더)가 이 값을 실제 오프셋으로 교체한다.
- 첫 번째 메시지의 타임스탬프와 배치에서 가장 큰 타임스탬프
- 타임스탬프 유형이 생성 시각이 아닌 추가 시각으로 잡혀있을 경우, 브로커가 타임스탬프를 잡아준다.
- 바이트 단위로 표시된 배치의 크기
- 해당 배치를 받은 리더의 epoch 값
- 이 값은 새 리더가 선출된 뒤 메시지를 절삭할 때 사용된다.
- 배치가 오염되지 않았음을 확인하는 체크섬
- 서로 다른 속성을 표시하기 위한 16bit
- 압축 유형, 타임스탬프 유형, 배치가 트랜잭션의 일부 혹은 컨트롤 배치인지의 여부
- 프로듀서 ID, 프로듀서 epoch, 배치의 첫 번째 시퀀스 넘버
- 이 값은 “정확히 한 번” 보장을 위해 사용된다.
- 배치에 포함된 메시지들의 집합
각각의 레코드 역시 자체적인 시스템 헤더를 가지고 있다. (사용자가 지정해 줄 수 있는 헤더와 다르다.)
- 바이트 단위로 표시된 레코드 크기
- 속성: 현재로서 레코드 단위 속성은 없기 때문에 사용되지 않는다.
- 현재 레코드의 오프셋과 배치 내 첫 번째 레코드의 오프셋과의 차이
- 현재 레코드의 타임스탬프와 배치 내 첫 번째 레코드의 타임스탬프와의 차이(ms)
- 사용자 페이로드: key, value 그리고 header
대부분 시스템 정보는 배치 단위에서 저장되어 있으며, 사용자 데이터를 저장하는 메시지 배치 외에도, 카프카에는 컨트롤 배치가 있다.
- 컨트롤 배치는 트랜잭션 커밋 등을 가리킨다.
- 컨슈머가 받아서 처리하기 때문에 사용자 애플리케이션 관점에서는 보이지 않으며, 현재로서는 버전과 타입 정보만을 포함한다.
- 0은 중단된 트랜잭션이고, 1은 커밋이다.
이 모든 내용물들을 직접 보고 싶다면, 카프카 브로커에 포함되어 있는 DumpLogSegment 툴을 사용하면 된다.
- 파일시스템에 저장된 파티션 세그먼트를 읽고 내용물을 확인할 수 있게 해준다.
$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments
- —deep-iteration 매개변수를 선택해 준다면, 래퍼 메시지 안에 압축되어 있는 메시지들에 대한 정보도 보여준다.
메시지 형식 다운 컨버젼(Down Conversion)
메시지는 v2 형식으로 저장되어 있는데 v2 형식을 지원하지 않는 구버전 컨슈머가 읽으려고 하는 경우 문제가 될 수 있다.
- 브로커는 v2 형식 메시지를 v1 형식으로 변환해서 컨슈머가 파싱할 수 있도록 해 줘야 된다.
- 변환 작업은 일반적인 읽기 작업에 비해 훨씬 많은 CPU와 메모리 자원을 사용하므로 가능하면 피하는 것이 좋다.
인덱스
카프카는 컨슈머가 임의의 사용 가능한 오프셋에서부터 메시지를 읽어오기 시작할 수 있도록 한다.
- 컨슈머의 요청에 브로커는 오프셋에 해당하는 메시지가 저장된 위치를 빠르게 찾아서 해당 오프셋부터 메시지를 읽기 시작할 수 있어야 한다.
- 브로커가 주어진 오프셋의 메시지를 빠르게 찾을 수 있도록 카프카는 각 파티션에 대해 오프셋을 유지한다.
- 이 인덱스는 오프셋과 세그먼트 파일 및 그안에서의 위치를 매핑한다.
카프카 스트림즈는 타임스탬프 기준 메시지 검색을 광범위하게 사용하며, 몇몇 장애 복구 상황에서도 유용하게 사용할 수 있다.
- 카프카는 타임스탬프와 메시지 오프셋을 매핑하는 또 다른 인덱스를 가지고 있다.
- 이 인덱스는 타임스탬프를 기준으로 메시지를 찾을 때 사용된다. (스트림즈에서 자주 사용)
인덱스들 역시 세그먼트 단위로 분할된다.
- 메시지를 삭제할 때 오래 된 인덱스 항목 역시 삭제할 수 있다.
카프카는 인덱스에 체크섬을 유지하거나 하지 않는다.
- 인덱스가 오염될 경우, 해당하는 로그 세그먼트에 포함된 메시지들을 다시 읽어서 오프셋과 위치를 기록하는 방식으로 재생성된다.
- 인덱스는 어차피 자동으로 다시 생성되기 때문에 인덱스 세그먼트를 임의로 삭제해도 복구 시간이 길어질 수 있지만 완벽하게 안전하다.
압착
사용자에 변경 내역 중 가장 최근것만 저장하는 것이 모든 이력을 저장하는 것보다 효율적일 수 있다.
- 만약 크래시가 발생한 뒤 복구할 경우 변경 내역은 크래시가 발생하기 직전에 최신 정보다.
카프카는 두 가지 보존 정책을 허용함으로써 위에 상황을 지원한다.
- 삭제 보존 정책: 지정된 보존 기한보다 더 오래 된 이벤트들을 삭제한다.
- 압착 보존 정책: 토픽에서 각 키의 가장 최근값만 저장하도록 한다.
애플리케이션이 key, value를 모두 포함하는 이벤트를 생성하는 토픽의 경우 압착 설정을 잡아주는 것이 합리적이다.
- 토픽에 키값이 null인 메시지가 있을 경우 압착은 실패한다.
보존 기한과 압착 설정을 동시에 적용하도록 delete, compact 값을 잡아줄 수도 있다.
- 지정된 보존 기한보다 오래 된 메시지들은 key에 대해서 가장 최근값인 경우에도 삭제된다.
- 압착된 토픽이 지나치게 크게 자라나는 것을 방지해주고, 일정 기한이 지나간 레코드들을 삭제해야 하는 경우에 활용할 수 있다.
압착의 작동 원리
각 로그는 두 영역으로 나누어진다.
클린
이전에 압착된 적이 있었던 메시지들이 저장된다.
- 이 영역은 하나의 키마다 하나의 값만을 포함한다.
- 이 값은 이전 압착 작업 시점에서의 최신값이기도 하다.
더티
마지막 압착 작업 이후 쓰여진 메시지들이 저장된다.
카프카가 시작되었는데 log.cleaner.enable 설정으로 압착 기능이 활성화 되어 있는 경우
- 각 브로커는 압착 매니저 스레드와 함께 다사의 압착 스레드를 시작시킨다.
- 각 스레드는 전체 파티션 크기 대비 더티 메시지의 비율이 가장 높은 파티션을 골라서 압착한 뒤 클린 상태로 만든다.
파티션을 압착하기 위해서, 클리너 스레드는 파티션의 더티 영역을 읽어서 in-memory 맵을 생성한다.
- 맵의 각 항목은 메시지 키의 16바이트 해시와 같은 키값을 갖는 이진 메시지의 오프셋(8바이트)으로 이루어 진다.
(맵의 각 항목은 16 + 8 = 24 바이트만을 사용한다.) - 개별 메시지의 크기가 1KB인 1GB 크기의 세그먼트를 압착한다면, 세그먼트 안에는 백만 개의 메시지가 있을 것이므로 압착에 필요한 맵의 크기는 24M가 된다.
- 실제로는 키값이 반복되는 만큼 동일한 해시 엔트리를 재사용하므로 메모리도 그만큼 적게 사용된다.
카프카를 설정할 때는, 압착 스레드가 오프셋 맵을 저장하기 위해 사용할 수 있는 메모리의 양을 잡아줄 수 있다.
- 스레드마다 다른 맵을 생성하게 되지만, 전체 스레드가 사용할 수 있는 메모리의 총량을 정의한다.
- 압착 오프셋 맵 크기로 1GB를 지정했는데 클리너 스레드가 5개가 떴다면, 각 스레드는 오프셋 맵을 생성하는데 200MB를 사용할 수 있다.
- 이 맵에는 최소한 하나의 세그먼트 전체는 들어갈 수 있어야 한다.
- 그렇지 않을 경우 카프카는 에러 메시지를 로깅하게 되므로 운영자는 오프셋 멥 메모리를 늘리거나 클리너 스레드의 수를 줄여야 한다.
클리너 스레드가 오프셋 맵을 생성한 다음부터는, 클린 세그먼트들을 오래된 것부터 읽어 들이면서 대조한다.
- 각각의 메시지에 대해 키값이 현재 오프셋 맵에 저장되어 있는지 확인한다.
- 저장되어 있지 않다면 메시지 값은 최신값이라는 의미로 교체용 세그먼트로 메시지를 복사한다.
- 저장되어 있다면 파티션 내에 같은 키값을 가졌지만 오래된 것부터 읽어 들이기 때문에 새로운 값을 갖는 메시지가 있다는 의미로 건너뛰게 된다.
키값에 대한 최신 값을 갖는 모든 메시지들이 복사되고 나면, 압착 스레드는 교체용 세그먼트와 원본 세그먼트를 바꾼 뒤 다음 세그먼트로 계속 진행한다.
- 작업이 완료되면 키별로 최신 값을 포함하는 하나의 메시지만 남게된다.
삭제된 이벤트
특정 키를 갖는 모든 메시지를 삭제하고 싶을 때 어떻게 해야할까?
- 사용자가 서비스를 탈퇴해서 해당 사용자의 모든 흔적을 시스템에서 지워야할 법적인 의무가 생기는 경우
해당 key값과 null value를 갖는 메시지를 써주면 삭제할 수 있다.
- 클리너 스레드가 이 메시지를 발견하면 평소대로 압착 작업을 한 뒤 null value를 갖는 메시지만 보존한다.
- 카프카는 사전에 설정된 기간동안 이 특별한 메시지(tombstone 이라고 불린다.)를 보존한다.
컨슈머가 카프카에서 읽어 온 데이터를 RDB로 복사하는 경우
- 톰스톤 메시지를 보고 해당 사용자를 DB에서 지워야 함을 알 수 있다.
- 사전에 설정된 시간이 지나면 클리너 스레드는 톰스톤 메시지를 삭제한다.
- 해당 키 역시 카프카 파티션에서 완전히 삭제된다.
- 컨슈머가 톰스톤 메시지를 볼 수 있도록 충분한 시간을 줘야 한다.
- 컨슈머가 장애로 인해 확인을 못하면 카프카에서 삭제되었는지 DB에서 지워줘야 하는지 확인할 수 없다.
카프카의 어드민 클라이언트에 지정된 오프셋 이전의 모든 레코드를 삭제하는 deleteRecords는 위에와 다르게 동작한다.
- 카프카는 파티션의 첫 번째 레코드를 가리키는 low-water mark를 해당 오프셋으로 이동시킨다.
- 컨슈머는 업데이트된 low-water mark 이전 메시지들을 읽을 수 없게 되므로, 사실상 접근 불가능하게 된다.
- 이 레코드들은 나중에 클리너 스레드에 의해 실제로 삭제된다.
- 이 메서드는 보존 기한이 설정되어 있거나 압착 설정이 되어있는 토픽에 사용 가능하다. (클리너 스레드 때문인듯?)
토픽은 언제 압착되는가?
삭제 정책이 현재의 액티브 세그먼트를 절대로 삭제하지 않는 것과 마찬가지로, 압착 정책 역시 현재의 액티브 세그먼트를 절대로 압착하지 않는다.
- 액티브 세그먼트가 아닌 세그먼트에 저장되어 있는 메시지만 압착의 대상이 된다.
기본적으로 카프카는 아래에 의견에 트레이드 오프해서 토픽 내용물의 50% 이상이 더티 레코드인 경우에만 압착을 시작한다.
- 압착은 토픽의 읽기/쓰기 성능에 영향을 줄 수 있어서, 토픽을 지나치게 자주 압착하지 않아야 한다.
- 디스크 공간을 많이 잡아먹기 때문에 너무 많은 더티 레코드가 존재하지 않도록 해야 한다.
운영자에 의해 압착이 시작되는 시점을 조절해서 튜닝할 수 있다.
min.compaction.lag.ms: 메시지가 쓰여진 뒤 압착될 때까지 지나가야 하는 최소 시간
max.compaction.lag.ms: 메시지가 쓰여진 뒤 압착이 가능해질 때까지 딜레이 될 수 있는 최대 시간
- 특정 기한 안에 압착이 반드시 실행된다는 것을 보장해야 하는 상황에 자주 사용된다.
- 유럽 연합의 개인정보 보호법인 일반 데이터 보호 규정(GDRP)은 특정한 정보가 삭제가 요청된 지 30일 안에 실제로 삭제될 것을 요구한다.
'책 > 카프카 핵심 가이드' 카테고리의 다른 글
신뢰성 있는 데이터 전달(Kafka) (0) | 2025.01.14 |
---|---|
프로그램 내에서 코드로 카프카 관리하기 (2) | 2024.12.08 |
카프카 컨슈머 : 카프카에서 데이터 읽기 (1) | 2024.11.24 |
카프카 프로듀서: 카프카에 메시지 쓰기 (1) | 2024.11.09 |