通过 docker stack 部署kafka集群 和 kafka-ui 可视化面板
单节点配置
version: "3.7"
networks:
default:
driver: bridge
x-image: &x-image
image: bitnami/kafka:3.2.3
x-common: &x-common
<<: *x-image
user: root
networks:
- default
logging:
driver: json-file #仅在 json-file 驱动程序下,可以使用以下参数,限制日志得数量和大小。
options:
max-size: "200m" # 单个文件大小为200m
max-file: "3" # 最多1个文件
services:
kafka1:
<<: *x-common
container_name: kafka1
ports:
- 9192:9092
- 9193:9093
- 9194:9094
environment:
## 通用配置
# 允许使用kraft,即Kafka替代Zookeeper
- KAFKA_ENABLE_KRAFT=yes
# kafka角色,做broker,也要做controller
- KAFKA_CFG_PROCESS_ROLES=broker,controller
# 指定供外部使用的控制类请求信息
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# 定义kafka服务端socket监听端口
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,LOCAL://:9094
# 定义安全协议
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,LOCAL:PLAINTEXT
# 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可
- KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA
# 集群地址
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093
# 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
- ALLOW_PLAINTEXT_LISTENER=yes
# 设置broker最大内存,和初始内存
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
# 不允许自动创建主题
#- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
## broker配置
# 定义外网访问地址(宿主机ip地址和端口)
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://10.27.0.9:39304,LOCAL://10.10.10.200:9194
#- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
# broker.id,必须唯一
- KAFKA_BROKER_ID=1
- KAFKA_CFG_NODE_ID=1
volumes:
- /opt/kafka/data/kafka1:/bitnami/kafka
#extra_hosts:
#- "kafka1:云服务器IP"
kafka-ui:
<<: *x-common
image: provectuslabs/kafka-ui:v0.7.2
container_name: kafka-ui
restart: always
ports:
- 9091:8080
volumes:
- /etc/localtime:/etc/localtime:ro
environment:
# 集群名称
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=10.10.10.200:9194
集群配置
version: "3.7"
networks:
default:
driver: bridge
x-image: &x-image
image: bitnami/kafka:3.2.3
x-common: &x-common
<<: *x-image
user: root
networks:
- default
logging:
driver: json-file #仅在 json-file 驱动程序下,可以使用以下参数,限制日志得数量和大小。
options:
max-size: "200m" # 单个文件大小为200m
max-file: "3" # 最多1个文件
services:
kafka1:
<<: *x-common
container_name: kafka1
ports:
- 9192:9092
- 9193:9093
- 9194:9094
environment:
## 通用配置
# 允许使用kraft,即Kafka替代Zookeeper
- KAFKA_ENABLE_KRAFT=yes
# kafka角色,做broker,也要做controller
- KAFKA_CFG_PROCESS_ROLES=broker,controller
# 指定供外部使用的控制类请求信息
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# 定义kafka服务端socket监听端口
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,LOCAL://:9094
# 定义安全协议
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,LOCAL:PLAINTEXT
# 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可
- KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA
# 集群地址
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
# 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
- ALLOW_PLAINTEXT_LISTENER=yes
# 设置broker最大内存,和初始内存
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
# 不允许自动创建主题
#- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
## broker配置
# 定义外网访问地址(宿主机ip地址和端口)
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://10.27.0.9:39304,LOCAL://10.10.10.200:9194
#- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
# broker.id,必须唯一
- KAFKA_BROKER_ID=1
- KAFKA_CFG_NODE_ID=1
volumes:
- /opt/kafka/data/kafka1:/bitnami/kafka
#extra_hosts:
#- "kafka1:云服务器IP"
#- "kafka2:云服务器IP"
#- "kafka3:云服务器IP"
kafka2:
<<: *x-common
container_name: kafka2
ports:
- 9292:9092
- 9293:9093
- 9294:9094
environment:
## 通用配置
# 允许使用kraft,即Kafka替代Zookeeper
- KAFKA_ENABLE_KRAFT=yes
# kafka角色,做broker,也要做controller
- KAFKA_CFG_PROCESS_ROLES=broker,controller
# 指定供外部使用的控制类请求信息
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# 定义kafka服务端socket监听端口
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,LOCAL://:9094
# 定义安全协议
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,LOCAL:PLAINTEXT
# 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可
- KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA
# 集群地址
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
# 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
- ALLOW_PLAINTEXT_LISTENER=yes
# 设置broker最大内存,和初始内存
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
# 不允许自动创建主题
#- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
## broker配置
# 定义外网访问地址(宿主机ip地址和端口)
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://10.27.0.9:39305,LOCAL://10.10.10.200:9294
# broker.id,必须唯一
- KAFKA_BROKER_ID=2
- KAFKA_CFG_NODE_ID=2
volumes:
- /opt/kafka/data/kafka2:/bitnami/kafka
#extra_hosts:
#- "kafka1:云服务器IP"
#- "kafka2:云服务器IP"
#- "kafka3:云服务器IP"
kafka3:
<<: *x-common
container_name: kafka3
ports:
- 9392:9092
- 9393:9093
- 9394:9094
environment:
## 通用配置
# 允许使用kraft,即Kafka替代Zookeeper
- KAFKA_ENABLE_KRAFT=yes
# kafka角色,做broker,也要做controller
- KAFKA_CFG_PROCESS_ROLES=broker,controller
# 指定供外部使用的控制类请求信息
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# 定义kafka服务端socket监听端口
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,LOCAL://:9094
# 定义安全协议
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,LOCAL:PLAINTEXT
# 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可
- KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA
# 集群地址
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
# 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
- ALLOW_PLAINTEXT_LISTENER=yes
# 设置broker最大内存,和初始内存
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
# 不允许自动创建主题
#- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
## broker配置
# 定义外网访问地址(宿主机ip地址和端口)
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://10.27.0.9:39306,LOCAL://10.10.10.200:9394
# broker.id,必须唯一
- KAFKA_BROKER_ID=3
- KAFKA_CFG_NODE_ID=3
volumes:
- /opt/kafka/data/kafka3:/bitnami/kafka
#extra_hosts:
#- "kafka1:云服务器IP"
#- "kafka2:云服务器IP"
#- "kafka3:云服务器IP"
kafka-ui:
<<: *x-common
image: provectuslabs/kafka-ui:v0.7.2
container_name: kafka-ui
restart: always
ports:
- 9091:8080
volumes:
- /etc/localtime:/etc/localtime:ro
environment:
# 集群名称
- KAFKA_CLUSTERS_0_NAME=local
# 集群地址
# - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092,kafka2:9092,kafka3:9092
# - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=10.27.0.9:39304,10.27.0.9:39305,10.27.0.9:39306
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=10.10.10.200:9194,10.10.10.200:9294,10.10.10.200:9394
# # 集群名称
# - KAFKA_CLUSTERS_1_NAME=ssh-tunnel
# # 集群地址
# - KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS=127.0.0.1:9192,127.0.0.1:9292,127.0.0.1:9392
命令行
通过 portainer 面板进入容器内执行 或者 通过 docker exec -it kafka1 /bin/bash 进入容器内执行
查看主题列表
kafka-topics.sh --bootstrap-server kafka1:9094,kafka2:9094,kafka3:9094 --list
创建主题
kafka-topics.sh --create --bootstrap-server kafka1:9094,kafka2:9094,kafka3:9094 --topic test --partitions 3 --replication-factor 1
查看主题详情
kafka-topics.sh --bootstrap-server kafka1:9094,kafka2:9094,kafka3:9094 --topic test --describe
分区扩容
分区只能增加 不能减少
kafka-topics.sh --alter --bootstrap-server kafka1:9094,kafka2:9094,kafka3:9094 --topic test --partitions 15
删除主题
kafka-topics.sh --bootstrap-server kafka1:9094,kafka2:9094,kafka3:9094 --delete --topic test
备份数据
全量备份
kafka-console-consumer.sh --bootstrap-server kafka1:9094,kafka2:9094,kafka3:9094 --topic test --from-beginning > /bitnami/kafka/backup/test.log
运行一段时间无数据增长后退出
全量恢复
kafka-console-producer.sh --broker-list kafka1:9094,kafka2:9094,kafka3:9094 --topic terminal-status-history < /bitnami/kafka/backup/terminal-status-history.log
Topic 添加/修改动态配置
kafka-configs.sh --bootstrap-server kafka1:9094,kafka2:9094,kafka3:9094 --alter --entity-type topics --entity-name terminal-status-history --add-config file.delete.delay.ms=60000,delete.retention.ms=604800000
Topic删除动态配置
kafka-configs.sh --bootstrap-server kafka1:9094,kafka2:9094,kafka3:9094 --alter --entity-type topics --entity-name terminal-status-history --delete-config file.delete.delay.ms,retention.ms
主题配置
-
cleanup.policy
字符串要么是“delete”,要么是“compact”,或者两者都是。此配置指定在旧日志段上使用的保留策略。默认策略(“delete”)将在达到保留时间或大小限制时丢弃旧段。“compact”设置将启用topic上的日志压缩。服务器提供的默认配置是log.cleanup.policy。 -
compression.type
指定给定topic的最终压缩类型。该配置接受标准的压缩编解码器('gzip'、'snappy'、'lz4'、'zstd')。它另外接受“uncompressed”,这相当于没有压缩;和“producer”,即保留由生产者设置的原始压缩编解码器。服务器提供的默认配置是compression.type。 -
delete.retention.ms
为日志压缩主题保留删除tombstone标记的时间,默认86400000毫秒,即24小时。这个设置还提供了一个时间的限制如果消费者从偏移量0开始,他们必须在这个时间内完成读取,以确保他们获得最后阶段的有效快照(否则删除tombstones可能会在他们完成扫描之前被收集)。服务器提供的默认配置是log.cleaner.delete.retention.ms。 -
file.delete.delay.ms
从文件系统中删除文件之前等待的时间,默认60000毫秒。服务器提供的默认配置是log.segment.delete.delay.ms。 -
flush.messages
此设置允许指定一个时间间隔,在该时间间隔,我们将强制将fsync数据写入日志,默认9223372036854775807。例如,如果这个被设为1我们会在每条消息后进行fsync;如果是5,我们会在每5条消息后进行fsync。一般来说,我们建议您不要设置这个值,而是使用复制来保证持久性,并允许操作系统的后台刷新功能,因为这样更有效。可以根据每个topic重写此设置。服务器提供的默认配置是log.flush.interval.messages。 -
flush.ms
此设置允许指定一个时间间隔,在这个时间间隔,我们将强制将fsync数据写入日志,默认9223372036854775807。例如,如果这个值设置为1000,我们将在1000毫秒之后进行fsync。一般来说,我们建议您不要设置这个值,而是使用复制来保证持久性,并允许操作系统的后台刷新功能,因为这样更有效。服务器提供的默认配置是log.flush.interval.ms。 -
follower.replication.throttled.replicas
应该在follower端对其日志复制进行限制的副本列表。该列表应该以[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:…或者也可以使用通配符'*'来限制此主题的所有副本。服务器提供的默认配置是follower.replication.throttled.replicas。 -
index.interval.bytes
此设置控制Kafka向其偏移索引添加索引条目的频率。默认设置确保我们大约每4096bytes索引一条消息。更多的索引允许读取更接近日志中的确切位置,但使索引更大。一般情况下我们不需要改变这个默认配置。服务器提供的默认配置是log.index.interval.bytes。 -
leader.replication.throttled.replicas
一个副本列表,它的日志复制应该在leader端被限制。该列表应该以[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:…或者,也可以使用通配符'*'来限制此主题的所有副本。服务器提供的默认配置是leader.replication.throttled.replicas。 -
max.compaction.lag.ms
消息在日志中不符合压缩条件的最长时间,默认9223372036854775807。仅适用于正在压缩的日志。服务器提供的默认配置是log.cleaner.max.compaction.lag.ms。 -
max.message.bytes
Kafka允许的最大记录批大小(如果启用压缩,则在压缩之后),默认1048588bytes,即1M。如果增加这个值,并且有版本小于0.10.2的consumer,那么consumer的获取大小也必须增加,以便他们能够获取这么大的记录批。在最新的消息格式版本中,为了提高效率,记录总是分组成批。在以前的消息格式版本中,未压缩的记录不会分组成批,在这种情况下,此限制仅适用于单个记录。服务器提供的默认配置是message.max.bytes。 -
message.format.version
指定broker将用于向日志追加消息的消息格式版本,该值应该是一个有效的ApiVersion,默认2.5-IV0。通过设置特定的消息格式版本,用户可以确认磁盘上现有的所有消息都小于或等于指定的版本。不正确地设置此值将导致使用旧版本的用户崩溃,因为他们将接收到他们不理解的格式的消息。服务器提供的默认配置是log.message.format.version。 -
message.timestamp.difference.max.ms
broker接收消息时的时间戳与消息中指定的时间戳之间允许的最大差异,默认9223372036854775807。如果message.timestamp.type=CreateTime,那么如果时间戳的差异超过此阈值,则消息将被拒绝。如果message.timestamp.type=LogAppendTime,则忽略此配置。服务器提供的默认配置是log.message.timestamp.difference.max.ms。 -
message.timestamp.type
定义消息中的时间戳是消息创建时间还是日志追加时间,默认是CreateTime。可选值是“CreateTime”或“LogAppendTime”。服务器提供的默认配置是log.message.timestamp.type。 -
min.cleanable.dirty.ratio
此配置控制日志压缩器尝试清理日志的频率(假设启用了日志压缩)。默认情况下,我们将避免清理压缩超过50%的日志,这个比率限制了重复日志所浪费的最大空间(最多50%的日志是重复的)。较高的比率意味着更少、更有效的清理,但也意味着日志中浪费的空间更多。如果还指定了max.compaction.lag.ms或min.compaction.lag.ms配置,那么日志压缩程序将认为日志符合压缩条件:1)、脏比率阈值已经满足,并且日志至少在min.compaction.lag.ms期间拥有脏(未压缩)记录;
2)、如果日志最多在max.compaction.lag.ms期间拥有脏(未压缩)记录。
服务器提供的默认配置是log.cleaner.min.cleanable.ratio。
- min.compaction.lag.ms
消息在日志中保持未压缩状态的最小时间,默认值为0。仅适用于正在压缩的日志。服务器提供的默认配置是log.cleaner.min.compaction.lag.ms。 - min.insync.replicas
当生产者将acks设置为“all”(或“-1”)时,此配置指定必须确认写入操作的最小副本数量,以便认为写入操作成功。如果不能满足这个最小值,那么生成器将抛出一个异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend),默认值为1。当一起使用时,min.insync.replicas和acks允许强制执行更大的持久性保证。典型的场景是创建复制因子为3的主题,设置min.insync.replicas为2,并且设置acks为“all”,这将确保在大多数副本没有收到写操作时,生成程序会引发异常。服务器提供的默认配置是min.insync.replicas。 - preallocate
如果在创建新的日志段时应该在磁盘上预先分配文件,则设置为true,默认为false。服务器提供的默认配置是log.preallocate。 - retention.bytes
如果使用“delete”保留策略,在我们丢弃旧的日志段以释放空间之前,这个配置控制一个分区(由日志段组成)可以增长到的最大大小。默认情况下没有大小限制,只有时间限制,默认值为-1。由于这个限制是在分区级别强制执行的,所以用它乘以分区数就可以计算以字节为单位的主题保留量。服务器提供的默认配置是log.retention.bytes。 - retention.ms
如果使用“delete”保留策略,在丢弃旧日志段以释放空间之前,此配置控制保留日志的最大时间,默认为604800000毫秒,即7天。这表示关于用户必须多长时间读取数据的SLA。如果设置为-1,则不应用时间限制。服务器提供的默认配置是log.retention.ms。 - segment.bytes
此配置控制日志的段文件大小,默认1073741824bytes,即1G。保留和清理总是一次对一个文件进行,因此更大的段大小意味着更少的文件,但对保留的粒度控制更少。服务器提供的默认配置是log.segment.bytes。 - segment.index.bytes
此配置控制将偏移量映射到文件位置的索引的大小,默认10485760bytes,即10M。我们预先分配这个索引文件,只有在日志滚动之后才收缩它。通常不需要更改此设置。服务器提供的默认配置是log.index.size.max.bytes。 - segment.jitter.ms
从预定分段滚动时间中减去的最大随机时基误差,以避免成群的分段滚动,默认为0。服务器提供的默认配置是log.roll.jitter.ms。 - segment.ms
这个配置控制了Kafka强制日志滚动的时间,即使段文件没有满,以确保保留可以删除或压缩旧数据,默认604800000毫秒,即7天。服务器提供的默认配置是log.roll.ms。 - unclean.leader.election.enable
指示是否将不位于ISR集中的副本作为leader最后选择,即使这样做可能导致数据丢失。默认值为false。服务器提供的默认配置是unclean.leader.election.enable。 - message.downconversion.enable
此配置控制是否启用消息格式的向下转换以满足消费请求,默认为true。当设置为false时,broker将不会为希望使用旧消息格式的用户执行向下转换。broker将会以UNSUPPORTED_VERSION错误响应来自这样旧的客户端的消费请求。此配置不适用将可能需要的任何消息格式向下转换复制到followers。服务器提供的默认配置是log.message.downconversion.enable