후니의 IT인프라 사전

3주차 - Kafka & Strimzi 오퍼레이터 본문

프로젝트&&스터디/DOIK 스터디

3주차 - Kafka & Strimzi 오퍼레이터

james_janghun 2022. 6. 25. 10:35

1. 카프카

소개

   비동기 방식의 스트리밍 플랫폼이다. 분산 데이터 스트리밍 플랫폼

1.1 특징

- 높은 처리량 : 묶음 단위 배치 처리로 대용량 실시간 로그 데이터 처리에 적합

- 높은 확장성 : 브로커을 통해 scale in/out이 편리

- 높은 영속성 : 데이터를 캐싱 메모리를 통해 일시적으로 저장하고 빠르게 불러올 수 있음

- 높은 가용성 : 데이터 복제를 통해 장애 상황에 대비

 

1.2 기본 용어정리

 

kafka cluster (출처 : https://d2.naver.com/helloworld/0853669)

 

- Producer : 데이터 제공자. 데이터(메시지, 이벤트)를 생성하여 카프카로 전달함.

- Consumer : 데이터 소비자. 카프카로 부터 데이터를 받아 필요한 상태로 가공하여 사용함.

- Brokers : Kafka 클러스터를 이루는 구성 단위이며, 쿠버네티스의 워커노드와 비슷한 느낌이다.

- Zookeeper: 분산 애플리케이션에서 메타데이터를 관리한다.

💡 메시지 브로커(RabbitMQ)와 이벤트 브로커(Kafka)의 차이는? (설명 잘된 블로그)
  - 브로커라는 말처럼, 중개인이라는 의미와 걸맞게 데이터가 공급자와 소비자 사이에 바로 전달되지 않고, 중개서비스를 통해 전달되는 점에서 동일하다.
  - 메시지 브로커의 경우 MQ(Message Queue)를 둔다는 점에서 비동기 방식으로 실행될 수 있다는 장점이 있지만, queue의 특성상 한 번 처리되면 더 이상 저장되지 않으며, 별다른 분류가 없어 대용량 데이터 등에는 적합하지 않을 수 있다.
  - 이벤트 브로커의 경우, 특히 kafka 역시 브로커를 두어 비동기 방식으로 실행되고, 브로커 내부에서 topic으로 데이터를 분류하고 이를 partitioning 하여 분산 저장 및 분류 작업을 수행함으로 써 대용량 데이터에 적합한 것으로 볼 수 있다.
💡 스트림 데이터와 기존 데이터의 차이는?
  - 기존의 데이터는 어느 정도 정형적이고, 데이터 제공의 빈도가 많지 않은 경우가 많았다.

예를들어 쇼핑몰에서 회원가입을 하면 그 때 발생되는 데이터가 회원정보 DB table에 저장될 것이다.
장바구니에 의류 몇 개를 담고 주문을 넣게 되면 그 때 주문 DB table에 주문 정보가 기록 될 것이다.

이렇게 이벤트 발생 횟 수가 많지 않다면 서버나 DB에 큰 무리없이 데이터가 전달되고 사용될 것이다.

그런데 스트림 데이터의 경우, 기존의 데이터들이 지속적으로 실시간으로 계속 생산되고 조회(소비)되는 경우 발생한다.

만약 쇼핑몰에서 재고 정보를 실시간으로 확인하거나 실시간으로 주식이 거래된다면, 그에 따른 지속적인 요청이 들어오고, 이것을 많은 사람들이 한다면 순간적으로 많은 부하가 서버 및 DB에 전달되고 데이터에 대한 정확성도 떨어질 수 있다.
💡 이벤트 처리 vs 이벤트 스트림 처리의 차이는?
- 이벤트 처리는 1건 1건의 이벤트를 한 번에 하나씩 처리하는 것을 말하며, 이벤트 스트림 처리는 한 꺼번에 발생하는 수 많은 이벤트를 한 꺼번에 처리하는 것입니다.

2. Strimzi 소개

  -  쿠버네티스 환경에서 Kafka의 운영 관리에 도움을 주는 Operator. 링크 

 

2.1 Strimzi 기능

  - kafka 클러스터의 매니징

  - Kafka를 쉽고 빠르게 연결하도록 함.

  - Prometheus로 모니터링 가능

  - Topic 매니징 가능

 

 

3. [실습] Strimzi 설치 & 카프카 클러스터 생성

Strimzi 클러스터 구성 (출처 : https://strimzi.io/docs/operators/latest/quickstart.html#key-features-operators_str)

 

3.1 Kafka Namespace 생성 및 repo 추가

(🚴|DOIK-Lab:default) root@k8s-m:~# kubectl create namespace kafka
namespace/kafka created
(🚴|DOIK-Lab:default) root@k8s-m:~# helm repo add strimzi https://strimzi.io/charts/
"strimzi" has been added to your repositories

3.2 Master Node에 Strimzi-kafka-operator를 설치합니다.

  - operator의 장점 중 하나는 원하는 버전의 카프카로 오퍼레이터를 통해서 손쉽게 업그레이드가 가능합니다.

(🚴|DOIK-Lab:default) root@k8s-m:~# printf 'tolerations: [{key: node-role.kubernetes.io/master, operator: Exists, effect: NoSchedule}]\n' | \
helm install kafka-operator strimzi/strimzi-kafka-operator --version 0.29.0 --namespace kafka \
  --set nodeSelector."kubernetes\.io/hostname"=k8s-m --values /dev/stdin
NAME: kafka-operator
LAST DEPLOYED: Sat Jun 11 12:00:49 2022
NAMESPACE: kafka
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
Thank you for installing strimzi-kafka-operator-0.29.0

To create a Kafka cluster refer to the following documentation.

https://strimzi.io/docs/operators/latest/deploying.html#deploying-cluster-operator-helm-chart-str

(🚴|DOIK-Lab:default) root@k8s-m:~# kubectl describe deploy -n kafka | grep KAFKA_IMAGES: -A4
      STRIMZI_KAFKA_IMAGES:                               3.0.0=quay.io/strimzi/kafka:0.29.0-kafka-3.0.0
                                                          3.0.1=quay.io/strimzi/kafka:0.29.0-kafka-3.0.1
                                                          3.1.0=quay.io/strimzi/kafka:0.29.0-kafka-3.1.0
                                                          3.1.1=quay.io/strimzi/kafka:0.29.0-kafka-3.1.1
                                                          3.2.0=quay.io/strimzi/kafka:0.29.0-kafka-3.2.0

3.3 kafka 클러스터를 yaml 로 배포해 보겠습니다.

  - zookeeper가 먼저 생성되고, kafka이름의 브로커가 설치됩니다.

(🚴|DOIK-Lab:default) root@k8s-m:~/DOIK/3# cat kafka-1.yaml
───────┬──────────────────────────────────────────────────────────────────────
       │ File: kafka-1.yaml
───────┼──────────────────────────────────────────────────────────────────────
   1   │ apiVersion: kafka.strimzi.io/v1beta2
   2   │ kind: Kafka
   3   │ metadata:
   4   │   name: my-cluster
   5   │ spec:
   6   │   kafka:
   7   │     #version: 3.1.1     # 설치 버전 규정
   8   │     replicas: 3         # 브로커 파드가 3개로 배포
   9   │     listeners:
  10   │       - name: plain     # 평문
  11   │         port: 9092
  12   │         type: internal
  13   │         tls: false
  14   │       - name: tls       # TLS 암호화
  15   │         port: 9093
  16   │         type: internal
  17   │         tls: false
  18   │       - name: external  # 외부에서 브로커 접근 (nodeport로 구성)
  19   │         port: 9094
  20   │         type: nodeport
  21   │         tls: false
  22   │     storage:            # storage localhost 사용
  23   │       type: jbod
  24   │       volumes:
  25   │       - id: 0
  26   │         type: persistent-claim
  27   │         size: 10Gi
  28   │         deleteClaim: true
  29   │     config:
  30   │       offsets.topic.replication.factor: 3
  31   │       transaction.state.log.replication.factor: 3
  32   │       transaction.state.log.min.isr: 2
  33   │       default.replication.factor: 3
  34   │       min.insync.replicas: 2
  35   │       #inter.broker.protocol.version: "3.1.1"
  36   │     template:
  37   │       pod:
  38   │         affinity:
  39   │           podAntiAffinity:        # pod 균등 분포
  40   │             requiredDuringSchedulingIgnoredDuringExecution:
  41   │               - labelSelector:
  42   │                   matchExpressions:
  43   │                     - key: app.kubernetes.io/name
  44   │                       operator: In
  45   │                       values:
  46   │                         - kafka
  47   │                 topologyKey: "kubernetes.io/hostname"
  48   │   zookeeper:      # 주키퍼 3개 설치
  49   │     replicas: 3
  50   │     storage:
  51   │       type: persistent-claim
  52   │       size: 10Gi
  53   │       deleteClaim: true
  54   │     template:
  55   │       pod:
  56   │         affinity:
  57   │           podAntiAffinity:
  58   │             requiredDuringSchedulingIgnoredDuringExecution:
  59   │               - labelSelector:
  60   │                   matchExpressions:
  61   │                     - key: app.kubernetes.io/name
  62   │                       operator: In
  63   │                       values:
  64   │                         - zookeeper
  65   │                 topologyKey: "kubernetes.io/hostname"
  66   │   entityOperator:
  67   │     topicOperator: {}
  68   │     userOperator: {}
───────┴────────────────────────

 

3.4 설치 확인 

  - statefulset으로 배포되어 확인할 수 있습니다.

(🚴|DOIK-Lab:default) root@k8s-m:~/DOIK/3# kubectl get sts -n kafka -owide
NAME                   READY   AGE    CONTAINERS   IMAGES
my-cluster-kafka       3/3     93s    kafka        quay.io/strimzi/kafka:0.29.0-kafka-3.2.0
my-cluster-zookeeper   3/3     3m6s   zookeeper    quay.io/strimzi/kafka:0.29.0-kafka-3.2.0

 - 서비스 구성

    - (my-cluster-kafka-bootstrap) 9091 포트 통해서 내부에서 kafka 클러스터 접근이 가능

    - (my-cluster-kafka-brokers) 9090 포트로 kafka 개별 pod 접속 가능

    - (my-cluster-kafka-external-bootstrap) 9094 포트로 외부에서 kafka 클러스터로 접근 가능

(🚴|DOIK-Lab:default) root@k8s-m:~/DOIK/3# k get svc -n kafka
NAME                                  TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)                               AGE
my-cluster-kafka-0                    NodePort    10.200.1.167   <none>        9094:32377/TCP                        2m34s
my-cluster-kafka-1                    NodePort    10.200.1.73    <none>        9094:30746/TCP                        2m34s
my-cluster-kafka-2                    NodePort    10.200.1.166   <none>        9094:32755/TCP                        2m34s
my-cluster-kafka-bootstrap            ClusterIP   10.200.1.117   <none>        9091/TCP,9092/TCP,9093/TCP            2m34s
my-cluster-kafka-brokers              ClusterIP   None           <none>        9090/TCP,9091/TCP,9092/TCP,9093/TCP   2m34s
my-cluster-kafka-external-bootstrap   NodePort    10.200.1.37    <none>        9094:30460/TCP                        2m34s
my-cluster-zookeeper-client           ClusterIP   10.200.1.122   <none>        2181/TCP                              4m7s
my-cluster-zookeeper-nodes            ClusterIP   None           <none>        2181/TCP,2888/TCP,3888/TCP            4m7s

  - 오퍼레이터에 의해 확장된 kafka object를 직접 조회할 수 있음

     - kafka pod 3개, zookeeper pod 3개 모두 준비 완료 됨을 표시해줍니다.

(🚴|DOIK-Lab:default) root@k8s-m:~/DOIK/3# k get kafka -n kafka
NAME         DESIRED KAFKA REPLICAS   DESIRED ZK REPLICAS   READY   WARNINGS
my-cluster   3                        3                     True

 

3.5 myclient를 통해서 테스트 파드 생성

  - bitnami/kafka:3.2 이미지 파드에서 테스트용 카프카 yaml들을 제공한다.

(🚴|DOIK-Lab:default) root@k8s-m:~/DOIK/3# cat myclient.yaml
───────┬────────────────────────────────────────────────────────────────────────────────────
       │ File: myclient.yaml
───────┼────────────────────────────────────────────────────────────────────────────────────
   1   │ apiVersion: v1
   2   │ kind: Pod
   3   │ metadata:
   4   │   name: ${PODNAME}
   5   │   labels:
   6   │     app: myclient
   7   │ spec:
   8   │   nodeName: k8s-m
   9   │   containers:
  10   │   - name: ${PODNAME}
  11   │     image: bitnami/kafka:3.2
  12   │     command: ["tail"]
  13   │     args: ["-f", "/dev/null"]
  14   │   terminationGracePeriodSeconds: 0
───────┴────────────────────────────────────────────────────────────────────────────────────

(🚴|DOIK-Lab:default) root@k8s-m:~/DOIK/3# for ((i=1; i<=3; i++)); do PODNAME=myclient$i envsubst < ~/DOIK/3/myclient.yaml | kubectl apply -f - ; done
pod/myclient1 created
pod/myclient2 created
pod/myclient3 created

 

4. 토픽 생성 및 메시지 주고 받기

 

4.1 카프카 브로커 및 토픽 사전정보 획득

# 카프카 파드의 SVC 도메인이름을 변수에 지정
SVCDNS=my-cluster-kafka-bootstrap.kafka.svc:9092

# 브로커 정보
kubectl exec -it myclient1 -- kafka-broker-api-versions.sh --bootstrap-server $SVCDNS

# 기본 topic 정보
(🚴|DOIK-Lab:default) root@k8s-m:~/DOIK/3# kubectl exec -it myclient1 -- kafka-topics.sh --bootstrap-server $SVCDNS --list
__consumer_offsets
__strimzi-topic-operator-kstreams-topic-store-changelog
__strimzi_store_topic

# kafkatopics object 도 조회가 가능하다. 오퍼레이터의 기능이라고 볼 수 있다.
(🚴|DOIK-Lab:default) root@k8s-m:~/DOIK/3# kubectl get kafkatopics -n kafka
NAME                                                                                               CLUSTER      PARTITIONS   REPLICATION FACTOR   READY
consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a                                        my-cluster   50           3                    True
strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55                                     my-cluster   1            3                    True
strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b   my-cluster   1            3                    True

4.2 토픽 생성

 

(🚴|DOIK-Lab:default) root@k8s-m:~/DOIK/3# cat mytopic.yaml
───────┬───────────────────────────────────────────────────────────────────────────────────────
       │ File: mytopic.yaml
───────┼───────────────────────────────────────────────────────────────────────────────────────
   1   │ apiVersion: kafka.strimzi.io/v1beta2
   2   │ kind: KafkaTopic
   3   │ metadata:
   4   │   name: ${TOPICNAME}
   5   │   labels:
   6   │     strimzi.io/cluster: "my-cluster"
   7   │ spec:
   8   │   partitions: 1
   9   │   replicas: 3
  10   │   config:
  11   │     retention.ms: 7200000     # 메시지 저장 기간
  12   │     segment.bytes: 1073741824   # 저장 데이터 사이즈 단위
  13   │     min.insync.replicas: 2
───────┴───────────────────────────────────────────────────────────────────────────────────────

(🚴|DOIK-Lab:default) root@k8s-m:~/DOIK/3# TOPICNAME=mytopic1 envsubst < ~/DOIK/3/mytopic.yaml | kubectl apply -f - -n kafka
kafkatopic.kafka.strimzi.io/mytopic1 created

  - mytopic 1이라는 토픽이 생성되었다.

(🚴|DOIK-Lab:default) root@k8s-m:~/DOIK/3# kubectl get kafkatopics -n kafka  

NAME                                                                                               CLUSTER      PARTITIONS   REPLICATION FACTOR   READY
consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a                                        my-cluster   50           3                    True
mytopic1                                                                                           my-cluster   1            3                    True
strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55                                     my-cluster   1            3                    True
strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b   my-cluster   1            3                    True

  - 토픽 상세 정보 조회

(🚴|DOIK-Lab:default) root@k8s-m:~/DOIK/3# kubectl exec -it myclient1 -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic1 --describe
Topic: mytopic1 TopicId: xRr2WFAfQ-OTW70wMpNgpw PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2,segment.bytes=1073741824,retention.ms=7200000,message.format.version=3.0-IV1
        Topic: mytopic1 Partition: 0    Leader: 2       Replicas: 2,1,0 Isr: 2,1,0

  - 토픽2를 생성하면서 커스텀

   - 처음에 배포할 때 설정정보를 구체적으로 기입하여 overwrite 가능함을 확인

   - 두번째로 alter를 통해서 기존에 설정된 토픽의 partition을 2개로 늘림

      ㄴ 파티션 0의 경우, 브로커 파드1을 리더로 두고 있으며, 1-2-0의 싱크를 갖춘다. (2, 0은 팔로워 파티션으로 본다)

      ㄴ 파티션 1의 경우, 브로커 파드2를 리더로 두고 있으며, 2-1-0의 싱크를 갖춘다. (1, 0은 팔로워 파티션으로 본다)

   - 파티션을 늘리는 것은 가능하지만 줄이는 것은 불가능하며, 편법을 쓰더라도 위험성을 감수해야한다.

# topic2 생성
(🚴|DOIK-Lab:default) root@k8s-m:~/DOIK/3# kubectl exec -it myclient1 -- kafka-topics.sh --create --bootstrap-server $SVCDNS --topic mytopic2 --partitions 1 --replication-factor 3 --config retention.ms=172800000
Created topic mytopic2.

# 파티셔닝 2개
(🚴|DOIK-Lab:default) root@k8s-m:~/DOIK/3# kubectl exec -it myclient1 -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --alter --partitions 2



(🚴|DOIK-Lab:default) root@k8s-m:~/DOIK/3# kubectl exec -it myclient1 -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --describe
Topic: mytopic2 TopicId: r71n2ePRThi04biLiNzdIA PartitionCount: 2       ReplicationFactor: 3    Configs: min.insync.replicas=2,retention.ms=172800000,message.format.version=3.0-IV1
        Topic: mytopic2 Partition: 0    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
        Topic: mytopic2 Partition: 1    Leader: 2       Replicas: 2,1,0 Isr: 2,1,0

 

4.3 메시지 주고 받기

# 카프카 파드의 SVC 도메인이름을 변수에 지정
SVCDNS=my-cluster-kafka-bootstrap.kafka.svc:9092

# 토픽에 데이터 넣어보기
kubectl exec -it myclient1 -- kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic1

# 토픽 데이터 확인
kubectl exec -it myclient2 -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic1 --from-beginning

왼쪽 데이터 입력 / 오른쪽 데이터 조회 (성공)

4.4 메시지 key:value 형태로 주고 받기

  - property 옵션을 통해서 해당 양식을 지정해줄 수 있다.

  - key 값이 없으면 null로 표현된다. (이전에 입력했던 데이터들이 null- XX로 표시된다)

  - parse.key=true 옵션에서는 key를 없이 입력하면 에러가 발생하면서 이탈된다. 옵션을 false로 변경하면 키가 없으면 null로 표시된다.

# 토픽에 데이터(메시지키+메시지값) 넣어보기
kubectl exec -it myclient1 -- kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic1 --property "parse.key=true" --property "key.separator=:"

# 토픽에 데이터(메시지키+메시지값) 확인
kubectl exec -it myclient2 -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic1 --property print.key=true --property key.separator="-" --from-beginning

 

4.5 토픽 삭제

(🚴|DOIK-Lab:default) root@k8s-m:~/DOIK/3# kubectl delete kafkatopics mytopic2 -n kafka
kafkatopic.kafka.strimzi.io "mytopic2" deleted
(🚴|DOIK-Lab:default) root@k8s-m:~/DOIK/3# kubectl delete kafkatopics mytopic1 -n kafka
kafkatopic.kafka.strimzi.io "mytopic1" deleted

 

 

이상으로 kafka 토픽 생성 및 실습 마치겠습니다 :)

 

항상 가시다님 감사합니다.