通过 docker stack 部署kafka集群 和 kafka-ui 可视化面板

单节点配置

version: "3.7"

networks:
  default:
    driver: bridge

x-image: &x-image
  image: bitnami/kafka:3.2.3

x-common: &x-common
  <<: *x-image
  user: root
  networks:
    - default
  logging:
    driver: json-file #仅在 json-file 驱动程序下,可以使用以下参数,限制日志得数量和大小。
    options:
      max-size: "200m" # 单个文件大小为200m
      max-file: "3" # 最多1个文件
      
services:
  kafka1:
    <<: *x-common
    container_name: kafka1
    ports:
      - 9192:9092
      - 9193:9093
      - 9194:9094
    environment:
      ## 通用配置
      # 允许使用kraft,即Kafka替代Zookeeper
      - KAFKA_ENABLE_KRAFT=yes
      # kafka角色,做broker,也要做controller
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      # 指定供外部使用的控制类请求信息
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      # 定义kafka服务端socket监听端口
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,LOCAL://:9094
      # 定义安全协议
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,LOCAL:PLAINTEXT
      # 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可
      - KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA
      # 集群地址
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093
      # 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
      - ALLOW_PLAINTEXT_LISTENER=yes
      # 设置broker最大内存,和初始内存
      - KAFKA_HEAP_OPTS=-Xmx512M -Xms256M 
      # 不允许自动创建主题
      #- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false

      ## broker配置
      # 定义外网访问地址(宿主机ip地址和端口)
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://10.27.0.9:39304,LOCAL://10.10.10.200:9194
      #- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      # broker.id,必须唯一
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_NODE_ID=1
    volumes:
      - /opt/kafka/data/kafka1:/bitnami/kafka
    #extra_hosts:
      #- "kafka1:云服务器IP"

  kafka-ui:
    <<: *x-common
    image: provectuslabs/kafka-ui:v0.7.2
    container_name: kafka-ui
    restart: always
    ports:
      - 9091:8080
    volumes:
      - /etc/localtime:/etc/localtime:ro
    environment:
      # 集群名称
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=10.10.10.200:9194

集群配置

version: "3.7"

networks:
  default:
    driver: bridge

x-image: &x-image
  image: bitnami/kafka:3.2.3

x-common: &x-common
  <<: *x-image
  user: root
  networks:
    - default
  logging:
    driver: json-file #仅在 json-file 驱动程序下,可以使用以下参数,限制日志得数量和大小。
    options:
      max-size: "200m" # 单个文件大小为200m
      max-file: "3" # 最多1个文件
      
services:
  kafka1:
    <<: *x-common
    container_name: kafka1
    ports:
      - 9192:9092
      - 9193:9093
      - 9194:9094
    environment:
      ## 通用配置
      # 允许使用kraft,即Kafka替代Zookeeper
      - KAFKA_ENABLE_KRAFT=yes
      # kafka角色,做broker,也要做controller
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      # 指定供外部使用的控制类请求信息
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      # 定义kafka服务端socket监听端口
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,LOCAL://:9094
      # 定义安全协议
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,LOCAL:PLAINTEXT
      # 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可
      - KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA
      # 集群地址
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
      # 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
      - ALLOW_PLAINTEXT_LISTENER=yes
      # 设置broker最大内存,和初始内存
      - KAFKA_HEAP_OPTS=-Xmx512M -Xms256M 
      # 不允许自动创建主题
      #- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false

      ## broker配置
      # 定义外网访问地址(宿主机ip地址和端口)
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://10.27.0.9:39304,LOCAL://10.10.10.200:9194
      #- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      # broker.id,必须唯一
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_NODE_ID=1
    volumes:
      - /opt/kafka/data/kafka1:/bitnami/kafka
    #extra_hosts:
      #- "kafka1:云服务器IP"
      #- "kafka2:云服务器IP"
      #- "kafka3:云服务器IP"

  kafka2:
    <<: *x-common
    container_name: kafka2
    ports:
      - 9292:9092
      - 9293:9093
      - 9294:9094
    environment:
      ## 通用配置
      # 允许使用kraft,即Kafka替代Zookeeper
      - KAFKA_ENABLE_KRAFT=yes
      # kafka角色,做broker,也要做controller
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      # 指定供外部使用的控制类请求信息
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      # 定义kafka服务端socket监听端口
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,LOCAL://:9094
      # 定义安全协议
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,LOCAL:PLAINTEXT
      # 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可
      - KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA
      # 集群地址
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
      # 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
      - ALLOW_PLAINTEXT_LISTENER=yes
      # 设置broker最大内存,和初始内存
      - KAFKA_HEAP_OPTS=-Xmx512M -Xms256M 
      # 不允许自动创建主题
      #- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
      
      ## broker配置
      # 定义外网访问地址(宿主机ip地址和端口)
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://10.27.0.9:39305,LOCAL://10.10.10.200:9294
      # broker.id,必须唯一
      - KAFKA_BROKER_ID=2
      - KAFKA_CFG_NODE_ID=2
    volumes:
      - /opt/kafka/data/kafka2:/bitnami/kafka
    #extra_hosts:
      #- "kafka1:云服务器IP"
      #- "kafka2:云服务器IP"
      #- "kafka3:云服务器IP"

  kafka3:
    <<: *x-common
    container_name: kafka3
    ports:
      - 9392:9092
      - 9393:9093
      - 9394:9094
    environment:
      ## 通用配置
      # 允许使用kraft,即Kafka替代Zookeeper
      - KAFKA_ENABLE_KRAFT=yes
      # kafka角色,做broker,也要做controller
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      # 指定供外部使用的控制类请求信息
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      # 定义kafka服务端socket监听端口
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,LOCAL://:9094
      # 定义安全协议
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,LOCAL:PLAINTEXT
      # 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可
      - KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA
      # 集群地址
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
      # 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
      - ALLOW_PLAINTEXT_LISTENER=yes
      # 设置broker最大内存,和初始内存
      - KAFKA_HEAP_OPTS=-Xmx512M -Xms256M 
      # 不允许自动创建主题
      #- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
      
      ## broker配置
      # 定义外网访问地址(宿主机ip地址和端口)
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://10.27.0.9:39306,LOCAL://10.10.10.200:9394
      # broker.id,必须唯一
      - KAFKA_BROKER_ID=3
      - KAFKA_CFG_NODE_ID=3
    volumes:
      - /opt/kafka/data/kafka3:/bitnami/kafka
    #extra_hosts:
      #- "kafka1:云服务器IP"
      #- "kafka2:云服务器IP"
      #- "kafka3:云服务器IP"

  kafka-ui:
    <<: *x-common
    image: provectuslabs/kafka-ui:v0.7.2
    container_name: kafka-ui
    restart: always
    ports:
      - 9091:8080
    volumes:
      - /etc/localtime:/etc/localtime:ro
    environment:
      # 集群名称
      - KAFKA_CLUSTERS_0_NAME=local
      # 集群地址
      # - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092,kafka2:9092,kafka3:9092
      # - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=10.27.0.9:39304,10.27.0.9:39305,10.27.0.9:39306
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=10.10.10.200:9194,10.10.10.200:9294,10.10.10.200:9394
    #   # 集群名称
    #   - KAFKA_CLUSTERS_1_NAME=ssh-tunnel
    #   # 集群地址
    #   - KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS=127.0.0.1:9192,127.0.0.1:9292,127.0.0.1:9392

命令行

通过 portainer 面板进入容器内执行 或者 通过 docker exec -it kafka1 /bin/bash 进入容器内执行

查看主题列表

kafka-topics.sh --bootstrap-server kafka1:9094,kafka2:9094,kafka3:9094 --list

创建主题

kafka-topics.sh --create --bootstrap-server kafka1:9094,kafka2:9094,kafka3:9094 --topic test --partitions 3 --replication-factor 1

查看主题详情

kafka-topics.sh --bootstrap-server kafka1:9094,kafka2:9094,kafka3:9094 --topic test --describe

分区扩容

分区只能增加 不能减少

kafka-topics.sh --alter --bootstrap-server kafka1:9094,kafka2:9094,kafka3:9094 --topic test --partitions 15

删除主题

kafka-topics.sh --bootstrap-server kafka1:9094,kafka2:9094,kafka3:9094 --delete --topic test

备份数据

全量备份

kafka-console-consumer.sh --bootstrap-server kafka1:9094,kafka2:9094,kafka3:9094 --topic test --from-beginning > /bitnami/kafka/backup/test.log

运行一段时间无数据增长后退出

全量恢复

kafka-console-producer.sh --broker-list kafka1:9094,kafka2:9094,kafka3:9094 --topic terminal-status-history < /bitnami/kafka/backup/terminal-status-history.log

Topic 添加/修改动态配置

kafka-configs.sh --bootstrap-server kafka1:9094,kafka2:9094,kafka3:9094 --alter --entity-type topics --entity-name terminal-status-history --add-config file.delete.delay.ms=60000,delete.retention.ms=604800000

Topic删除动态配置

kafka-configs.sh --bootstrap-server kafka1:9094,kafka2:9094,kafka3:9094 --alter --entity-type topics --entity-name terminal-status-history --delete-config file.delete.delay.ms,retention.ms

主题配置

  1. cleanup.policy
    字符串要么是“delete”,要么是“compact”,或者两者都是。此配置指定在旧日志段上使用的保留策略。默认策略(“delete”)将在达到保留时间或大小限制时丢弃旧段。“compact”设置将启用topic上的日志压缩。服务器提供的默认配置是log.cleanup.policy。

  2. compression.type
    指定给定topic的最终压缩类型。该配置接受标准的压缩编解码器('gzip'、'snappy'、'lz4'、'zstd')。它另外接受“uncompressed”,这相当于没有压缩;和“producer”,即保留由生产者设置的原始压缩编解码器。服务器提供的默认配置是compression.type。

  3. delete.retention.ms
    为日志压缩主题保留删除tombstone标记的时间,默认86400000毫秒,即24小时。这个设置还提供了一个时间的限制如果消费者从偏移量0开始,他们必须在这个时间内完成读取,以确保他们获得最后阶段的有效快照(否则删除tombstones可能会在他们完成扫描之前被收集)。服务器提供的默认配置是log.cleaner.delete.retention.ms。

  4. file.delete.delay.ms
    从文件系统中删除文件之前等待的时间,默认60000毫秒。服务器提供的默认配置是log.segment.delete.delay.ms。

  5. flush.messages
    此设置允许指定一个时间间隔,在该时间间隔,我们将强制将fsync数据写入日志,默认9223372036854775807。例如,如果这个被设为1我们会在每条消息后进行fsync;如果是5,我们会在每5条消息后进行fsync。一般来说,我们建议您不要设置这个值,而是使用复制来保证持久性,并允许操作系统的后台刷新功能,因为这样更有效。可以根据每个topic重写此设置。服务器提供的默认配置是log.flush.interval.messages。

  6. flush.ms
    此设置允许指定一个时间间隔,在这个时间间隔,我们将强制将fsync数据写入日志,默认9223372036854775807。例如,如果这个值设置为1000,我们将在1000毫秒之后进行fsync。一般来说,我们建议您不要设置这个值,而是使用复制来保证持久性,并允许操作系统的后台刷新功能,因为这样更有效。服务器提供的默认配置是log.flush.interval.ms。

  7. follower.replication.throttled.replicas
    应该在follower端对其日志复制进行限制的副本列表。该列表应该以[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:…或者也可以使用通配符'*'来限制此主题的所有副本。服务器提供的默认配置是follower.replication.throttled.replicas。

  8. index.interval.bytes
    此设置控制Kafka向其偏移索引添加索引条目的频率。默认设置确保我们大约每4096bytes索引一条消息。更多的索引允许读取更接近日志中的确切位置,但使索引更大。一般情况下我们不需要改变这个默认配置。服务器提供的默认配置是log.index.interval.bytes。

  9. leader.replication.throttled.replicas
    一个副本列表,它的日志复制应该在leader端被限制。该列表应该以[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:…或者,也可以使用通配符'*'来限制此主题的所有副本。服务器提供的默认配置是leader.replication.throttled.replicas。

  10. max.compaction.lag.ms
    消息在日志中不符合压缩条件的最长时间,默认9223372036854775807。仅适用于正在压缩的日志。服务器提供的默认配置是log.cleaner.max.compaction.lag.ms。

  11. max.message.bytes
    Kafka允许的最大记录批大小(如果启用压缩,则在压缩之后),默认1048588bytes,即1M。如果增加这个值,并且有版本小于0.10.2的consumer,那么consumer的获取大小也必须增加,以便他们能够获取这么大的记录批。在最新的消息格式版本中,为了提高效率,记录总是分组成批。在以前的消息格式版本中,未压缩的记录不会分组成批,在这种情况下,此限制仅适用于单个记录。服务器提供的默认配置是message.max.bytes。

  12. message.format.version
    指定broker将用于向日志追加消息的消息格式版本,该值应该是一个有效的ApiVersion,默认2.5-IV0。通过设置特定的消息格式版本,用户可以确认磁盘上现有的所有消息都小于或等于指定的版本。不正确地设置此值将导致使用旧版本的用户崩溃,因为他们将接收到他们不理解的格式的消息。服务器提供的默认配置是log.message.format.version。

  13. message.timestamp.difference.max.ms
    broker接收消息时的时间戳与消息中指定的时间戳之间允许的最大差异,默认9223372036854775807。如果message.timestamp.type=CreateTime,那么如果时间戳的差异超过此阈值,则消息将被拒绝。如果message.timestamp.type=LogAppendTime,则忽略此配置。服务器提供的默认配置是log.message.timestamp.difference.max.ms。

  14. message.timestamp.type
    定义消息中的时间戳是消息创建时间还是日志追加时间,默认是CreateTime。可选值是“CreateTime”或“LogAppendTime”。服务器提供的默认配置是log.message.timestamp.type。

  15. min.cleanable.dirty.ratio
    此配置控制日志压缩器尝试清理日志的频率(假设启用了日志压缩)。默认情况下,我们将避免清理压缩超过50%的日志,这个比率限制了重复日志所浪费的最大空间(最多50%的日志是重复的)。较高的比率意味着更少、更有效的清理,但也意味着日志中浪费的空间更多。如果还指定了max.compaction.lag.ms或min.compaction.lag.ms配置,那么日志压缩程序将认为日志符合压缩条件:

    1)、脏比率阈值已经满足,并且日志至少在min.compaction.lag.ms期间拥有脏(未压缩)记录;

    2)、如果日志最多在max.compaction.lag.ms期间拥有脏(未压缩)记录。

服务器提供的默认配置是log.cleaner.min.cleanable.ratio。

  1. min.compaction.lag.ms
    消息在日志中保持未压缩状态的最小时间,默认值为0。仅适用于正在压缩的日志。服务器提供的默认配置是log.cleaner.min.compaction.lag.ms。
  2. min.insync.replicas
    当生产者将acks设置为“all”(或“-1”)时,此配置指定必须确认写入操作的最小副本数量,以便认为写入操作成功。如果不能满足这个最小值,那么生成器将抛出一个异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend),默认值为1。当一起使用时,min.insync.replicas和acks允许强制执行更大的持久性保证。典型的场景是创建复制因子为3的主题,设置min.insync.replicas为2,并且设置acks为“all”,这将确保在大多数副本没有收到写操作时,生成程序会引发异常。服务器提供的默认配置是min.insync.replicas。
  3. preallocate
    如果在创建新的日志段时应该在磁盘上预先分配文件,则设置为true,默认为false。服务器提供的默认配置是log.preallocate。
  4. retention.bytes
    如果使用“delete”保留策略,在我们丢弃旧的日志段以释放空间之前,这个配置控制一个分区(由日志段组成)可以增长到的最大大小。默认情况下没有大小限制,只有时间限制,默认值为-1。由于这个限制是在分区级别强制执行的,所以用它乘以分区数就可以计算以字节为单位的主题保留量。服务器提供的默认配置是log.retention.bytes。
  5. retention.ms
    如果使用“delete”保留策略,在丢弃旧日志段以释放空间之前,此配置控制保留日志的最大时间,默认为604800000毫秒,即7天。这表示关于用户必须多长时间读取数据的SLA。如果设置为-1,则不应用时间限制。服务器提供的默认配置是log.retention.ms。
  6. segment.bytes
    此配置控制日志的段文件大小,默认1073741824bytes,即1G。保留和清理总是一次对一个文件进行,因此更大的段大小意味着更少的文件,但对保留的粒度控制更少。服务器提供的默认配置是log.segment.bytes。
  7. segment.index.bytes
    此配置控制将偏移量映射到文件位置的索引的大小,默认10485760bytes,即10M。我们预先分配这个索引文件,只有在日志滚动之后才收缩它。通常不需要更改此设置。服务器提供的默认配置是log.index.size.max.bytes。
  8. segment.jitter.ms
    从预定分段滚动时间中减去的最大随机时基误差,以避免成群的分段滚动,默认为0。服务器提供的默认配置是log.roll.jitter.ms。
  9. segment.ms
    这个配置控制了Kafka强制日志滚动的时间,即使段文件没有满,以确保保留可以删除或压缩旧数据,默认604800000毫秒,即7天。服务器提供的默认配置是log.roll.ms。
  10. unclean.leader.election.enable
    指示是否将不位于ISR集中的副本作为leader最后选择,即使这样做可能导致数据丢失。默认值为false。服务器提供的默认配置是unclean.leader.election.enable。
  11. message.downconversion.enable
    此配置控制是否启用消息格式的向下转换以满足消费请求,默认为true。当设置为false时,broker将不会为希望使用旧消息格式的用户执行向下转换。broker将会以UNSUPPORTED_VERSION错误响应来自这样旧的客户端的消费请求。此配置不适用将可能需要的任何消息格式向下转换复制到followers。服务器提供的默认配置是log.message.downconversion.enable