ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 10. 카프카 명령어(Command)
    BackEnd/kafka 2021. 11. 14. 08:55
    반응형

      윈도우(windows) 환경에서 카프카 명령어 실행은 설치폴더\bin\windows 하위의 bat 파일을 사용하고, 맥(Mac) 환경에서 카프카 명령어 실행은 설치폴더\bin 하위의 sh 파일을 사용합니다.

    windows
    Mac

     

    kafka-topics

      토픽이란 카프카에서 데이터를 구분하는 가장 기본적인 개념입니다. 마치 RDBMS(relational database management system)에서 사용하는 테이블과 유사하다고 볼 수 있습니다. 토픽에는 파티션(partition)이 존재하는데 파티션의 개수는 최소 1개부터 시작합니다. 토픽을 생성하는 상황은 크게 2가지가 있습니다. 첫 번째는 카프카 컨슈머 또는 프로듀서가 카프카 브로커에 생성되지 않은 토픽에 대해 데이터를 요청할 때, 그리고 두 번째는 커맨드 라인 툴로 명시적으로 토픽을 생성하는 것입니다.

     

    1. 토픽 생성

    # hello.kafka 생성 for Windows
    # bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --topic hello.kafka
    # powershell에서 개행을 위해 (`) 사용
    
    PS C:\kafka\kafka_2.12-2.8.1> bin\windows\kafka-topics.bat `
    >>  --create `
    >>  --bootstrap-server localhost:9092 `
    >>  --topic hello.kafka
    WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
    Created topic hello.kafka.
    
    # hello.kafka 생성 for Mac
    bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic hello.kafka

      클러스터 정보와 토픽 이름은 토픽을 만들기 위한 필수 값입니다. 파티션 개수, 복제 개수, 토픽 데이터 유지 기간 옵션들을 지정하여 토픽을 생성하고 싶다면 다음과 같이 명령을 실행합니다.

    PS C:\kafka\kafka_2.12-2.8.1> bin\windows\kafka-topics.bat `
    >>  --create `
    >>  --bootstrap-server localhost:9092 `
    >>  --partitions 3 `
    >>  --replication-factor 1 `
    >>  --config retention.ms=172800000 `
    >>  --topic hello.kafka

      --partitions 옵션을 사용하지 않으면 카프카 브로커 설정파일(config/server.properties)에 있는 num.partitions 옵션값에 따라 생성됩니다.

      --replication-factor를 1로 설정하면 복제를 하지 않고 사용한다는 의미입니다. 2이면 1개의 복제본을 사용하겠다는 의미입니다. 설정하지 않으면 카프카 브로커 설정에 있는 default.replication.factor 옵션값에 따라 생성됩니다.

      --config를 통해 kafka-topics 명령에 포함되지 않은 추가적인 설정을 할 수 있습니다. retention.ms는 토픽의 데이터를 유지하는 기간을 뜻하며, 172800000ms는 2일을 ms단위로 나태낸 것입니다.

     

    (참고) 토픽 생성 시 --zookeeper가 아니라 --bootstrap-server 옵션을 사용하는 이유는 카프카 2.1을 포함한 이전 버전에서는 일부 카프카 커맨드 라인 툴이 주키퍼와 직접 통신하여 명령을 실행했으나, 카프카 2.2 버전 이후로는 주키퍼와 통신하는 대신 카프카를 통해 토픽과 관련된 명령을 실행할 수 있게 되었기 때문입니다.

     

    2. 토픽 리스트 조회(--list)

    PS C:\kafka\kafka_2.12-2.8.1> bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --list
    hello.kafka

      카프카에 내부 관리를 위한 인터널 토픽(internal topic)이 존재하는데, --exclude-internal 옵션을 추가하여 조회 시 목록에서 제외할 수 있습니다.

     

    3. 토픽 상세 조회(--describe)

    PS C:\kafka\kafka_2.12-2.8.1> bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic hello.kafka
    Topic: hello.kafka      TopicId: bFEIsxS9TsCItoslitsUQw PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
            Topic: hello.kafka      Partition: 0    Leader: 0       Replicas: 0     Isr: 0

      여러 대의 브로커로 카프카 클러스터를 운영할 때는 토픽의 리더 파티션(Leader: 0)을 확인하여 토픽의 리더 파티션 쏠림 현상을 확인합니다.

     

    4. 토픽 옵션 수정

      토픽에 설정된 옵션을 변경하기 위해서는 kafka-topics 또는 kafka-configs를 사용합니다.

    # --alter 옵션과 --partitions 옵션을 사용하여 파티션 개수 변경
    PS C:\kafka\kafka_2.12-2.8.1> bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 `
    >>  --topic hello.kafka `
    >>  --alter `
    >>  --partitions 2
    
    # 확인
    PS C:\kafka\kafka_2.12-2.8.1> bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 `
    >>  --topic hello.kafka `
    >>  --describe
    Topic: hello.kafka      TopicId: bFEIsxS9TsCItoslitsUQw PartitionCount: 2       ReplicationFactor: 1    Configs: segment.bytes=1073741824
            Topic: hello.kafka      Partition: 0    Leader: 0       Replicas: 0     Isr: 0
            Topic: hello.kafka      Partition: 1    Leader: 0       Replicas: 0     Isr: 0

      토픽의 파티션을 늘릴 수는 있지만 줄일 수는 없습니다. 그러므로 파티션 개수를 늘릴 때는 반드시 늘려야 하는 상황인지 판단하는 것이 중요합니다.

    # kafka-configs와 --alter, --add-config 옵션을 사용하여 retention.ms 수정
    # --add-config: 이미 존재하는 설정값은 변경, 존재하지 않으면 신규 추가
    PS C:\kafka\kafka_2.12-2.8.1> bin\windows\kafka-configs.bat --bootstrap-server localhost:9092 `
    >>  --entity-type topics `
    >>  --entity-name hello.kafka `
    >>  --alter --add-config retention.ms=86400000
    Completed updating config for topic hello.kafka.
    
    # retention.ms=86400000(1일) 확인
    PS C:\kafka\kafka_2.12-2.8.1> bin\windows\kafka-configs.bat --bootstrap-server localhost:9092 `
    >>  --entity-type topics `
    >>  --entity-name hello.kafka `
    >>  --describe
    Dynamic configs for topic hello.kafka are:
      retention.ms=86400000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=86400000}

     

    kafka-console-producer

      생성된 토픽에 데이터를 넣을 수 있는 명령어입니다. 토픽에 넣는 데이터는 '레코드(record)'라 부르며 메시지 키(key)와 메시지 값(value)로 이루어져 있습니다. 메시지 키 없이 메시지 값만 보낼 경우, 메시지 키는 자바의 null로 기본 설정되어 브로커로 전송됩니다.

    # 메시지 값만 가지는 레코드 전송
    PS C:\kafka\kafka_2.12-2.8.1> bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9092 `
    >>  --topic hello.kafka
    >hello, kafka !
    >nice to meet you.
    
    # 메시지 키를 가지는 레코드 전송
    PS C:\kafka\kafka_2.12-2.8.1> bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9092 `
    >>  --topic hello.kafka `
    >>  --property "parse.key=true" `
    >>  --property "key.separator=:"
    >key1:value1
    >key2:value2
    >key3:value3

      kafka-console-producer로 전송되는 레코드 값은 UTF-8 기반으로 Byte로 변환되고 ByteArraySerializer로만 직렬화됩니다. 즉, String이 아닌 타입으로는 직렬화하여 전송할 수 없습니다. 다른 타입으로 직렬화하여 데이터를 브로커로 전송하고 싶다면 카프카 프로듀서 애플리케이션을 직접 개발해야 합니다.

      메시지 키를 가지는 레코드를 전송하기 위해서는 parse.key를 true로 설정해야 합니다. key.separator를 선언하지 않으면 기본 설정은 Tab delimiter(\t)입니다.

     

    kafka-console-consumer

      kafka-console-producer로 보낸 메시지 값을 확인할 수 있습니다. --from-beginning 옵션을 주면 토픽에 저장된 가장 처음 데이터부터 출력할 수 있으며, 데이터의 메시지 키와 메시지 값을 확인하고 싶으면 --property 옵션을 사용하면 됩니다.

    PS C:\kafka\kafka_2.12-2.8.1> bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 `
    >>  --topic hello.kafka `
    >>  --property print.key=true `
    >>  --property key.separator="-" `
    >>  --group hello-group `
    >>  --from-beginning
    null-hello, kafka !
    key2-value2
    null-nice to meet you.
    key1-value1
    key3-value3

      --group 옵션을 통해 신규 컨슈머 그룹(consumer group)을 생성합니다. 컨슈머 그룹은 1개 이상의 컨슈머로 이루어져 있으며, 가져간 메시지에 대해 커밋(commit)을 합니다. 커밋이란 컨슈머가 특정 레코드까지 처리를 완료했다고 레코드의 오프셋 번호를 카프카 브로커에 저장하는 것입니다. 커밋 정보는 __consumer_offsets 이름의 내부 토픽에 저장됩니다.

     

    kafka-consumer-groups

      컨슈머 그룹은 따로 생성하는 명령 없이 컨슈머를 동작할 때 컨슈머 그룹 이름을 지정하면 새로 생성됩니다.

    # 생성된 컨슈머 그룹의 리스트 확인(--list)
    PS C:\kafka\kafka_2.12-2.8.1> bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --list
    hello-group

      컨슈머 그룹에 대한 상세 내용은 --group 옵션과 --describe 옵션을 통해 확인할 수 있습니다.

    PS C:\kafka\kafka_2.12-2.8.1> bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 `
    >>  --group hello-group `
    >>  --describe
    
    Consumer group 'hello-group' has no active members.
    
    GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
    hello-group     hello.kafka     1          6               6               0               -               -               -
    hello-group     hello.kafka     0          7               8               1               -               -               -

      CURRENT-OFFSET은 컨슈머 그룹이 가져간 토픽의 파티션에 가장 최신 오프셋(offset)이 몇 번인지 나타내며, LOG-END-OFFSET은 해당 컨슈머 그룹의 컨슈머가 어느 오프셋까지 커밋했는지 알 수 있습니다. CURRENT-OFFSET은 LOG-END-OFFSET과 같거나 작은 값일 수 있습니다. LAG은 컨슈머 그룹이 토픽의 파티션에 있는 데이터를 가져가는 데 얼마나 지연이 발생하는지 나타내는 지표입니다. 랙은 컨슈머 그룹이 커밋한 오프셋과 해당 파티션의 가장 최신 오프셋 간의 차이입니다.

      CUNSUMER-ID는 컨슈머의 토픽(파티션) 할당을 카프카 내부적으로 구분하기 위해 사용하는 ID이며, HOST는 컨슈머가 동작하는 host명을 출력합니다. CLIENT-ID는 컨슈머에 할당된 ID로 사용자가 지정할 수 있으며 지정하지 않으면 자동 생성됩니다.

     

    kafka-verifiable-producer, consumer

      카프카 클러스터 설치가 완료된 이후 토픽에 데이터를 전송하여 간단한 네트워크 통신 테스트를 할 때 유용합니다. 윈도우는 지원하지 않습니다.

    # kafka-verifiable-producer
    bin/kafka-verifiable-producer.sh --bootstrap-server localhost:9092 \
    --max-messages 10 \
    --topic verify-test
    
    # kafka-verifiable-consumer
    bin/kafka-verifiable-consumer.sh --bootstrap-server localhost:9092 \
    --topic verify-test \
    --group-id test-group

     

    kafka-delete-records

      이미 적재된 토픽의 데이터를 지울 수 있습니다. 단, 카프카에서는 토픽의 파티션에 저장된 특정 데이터만 삭제할 수 없습니다. kafka-delete-records는 이미 적재된 토픽의 데이터 중 가장 오래된 데이터(가장 낮은 숫자의 오프셋)부터 특정 시점의 오프셋까지 삭제할 수 있습니다.

      삭제하고자 하는 데이터에 대한 정보를 파일로 저장해서 사용해야 합니다. delete-topic.json 파일을 설치 폴더에 생성합니다.

      delete-topic.json 파일 내용은 아래와 같습니다. 삭제하고자 하는 토픽, 파티션, 오프셋 정보가 들어갑니다.

    {"partitions": [{"topic": "hello.kafka", "partition": 0, "offset": 1}], "version":1}

      --offset-json-file 옵션값으로 입력하면 파일을 읽어서 데이터 삭제를 진행합니다. 삭제가 완료되면 각 파티션에서 삭제된 오프셋 정보를 출력합니다.

    PS C:\kafka\kafka_2.12-2.8.1> bin\windows\kafka-delete-records.bat --bootstrap-server localhost:9092 `
    >>  --offset-json-file delete-topic.json
    Executing records delete operation
    Records delete operation completed:
    partition: hello.kafka-0        low_watermark: 1
    반응형

    댓글

Designed by Tistory.