kafka集群部署

2021/03/17

kafka集群部署

参考资料

  • 官网地址:https://kafka.apache.org/
  • 下载地址:https://kafka.apache.org/downloads
  • 文档地址:

1 Linux环境集群安装(zookeeper模式)

【组件部署】kafka + zookeeper集群安装
## 一、概述:
安装方式:编译安装 
环境信息:3 zookeeper(192.168.0.5、192.168.0.6、192.168.0.7) + 3 kafka(192.168.0.109、192.168.0.110、192.168.0.111)

## 二、zookeeper安装(在每台zookeeper设备上操作)
1、环境准备
安装jdk,此处省略

# 创建数据、日志目录
mkdir -p /data/zookeeper/data
mkdir -p /data/zookeeper/log
mkdir -p /data/package

2、下载解压
下载地址:http://mirrors.hust.edu.cn/apache/zookeeper
cd /data/package
wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
tar -zxf zookeeper-3.4.10.tar.gz
mv zookeeper-3.4.10/* /data/zookeeper //统一安装目录于/data

3、修改配置文件
cd /data/zookeeper/conf
cp zoo_sample.cfg zoo.cfg
-- 配置以下内容
clientPort=2181
tickTime=2000
initLimit=30000
syncLimit=10
maxClientCnxns=2000
autopurge.snapRetainCount=10
autopurge.purgeInterval=1
preAllocSize=131072
snapCount=3000000
 
maxSessionTimeout=60000000
dataDir=/data/zookeeper/data
dataLogDir=/data/zookeeper/log
server.1=192.168.0.5:2888:3888
server.2=192.168.0.6:2888:3888
server.3=192.168.0.7:2888:3888

4、添加环境变量
vim /etc/profile && vim /etc/bashrc
export ZK_HOME=/data/zookeeper
export PATH=$ZK_HOME/bin:$PATH
-- 注释
/etc/profile //交互式全局环境变量配置文件
/etc/bashrc //非交互式全局环境变量配置文件,自动化远程执行脚本用到此配置文件
echo 2 >/data/zookeeper/data/myid //每个节点的myid不一样

5、修改log4j日志配置
vim /data/zookeeper/conf/log4j.properties

zookeeper.root.logger=INFO,ROLLINGFILE
zookeeper.log.file=zookeeper.log
log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
#log4j.appender.ROLLINGFILE.MaxFileSize=10MB
vim /data/zookeeper/bin/zkEnv.sh

if [ "x${ZOO_LOG_DIR}" = "x" ]
then
    ZOO_LOG_DIR="/data/zookeeper/service-logs"
fi
 
if [ "x${ZOO_LOG4J_PROP}" = "x" ]
then
    ZOO_LOG4J_PROP="INFO,ROLLINGFILE"
fi

6、启动服务
zkServer.sh start

7、查看集群状态
zkServer.sh status

## 三、kafka安装(每台kafka设备上操作)
1、下载解压
cd /data/package
wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.12-1.1.0.tgz
tar -xf kafka_2.12-1.1.0.tgz
mv kafka_2.12-1.1.0/* /data/kafka

2、修改配置文件
cd /data/kafka/config
vim server.properties
-- 配置以下内容
broker.id=0 //各kafka节点编号各不通
listeners=PLAINTEXT://192.168.0.109:9092 //kafka各节点ip
log.dirs=/data/kafka/kafka-logs
zookeeper.connect=192.168.0.109:2181,192.168.0.110:2181,192.168.0.111:2181 //zookeeper各节点IP:PORT

3、添加环境变量
export KAFKA_HOME=/data/kafka
export PATH=$KAFKA_HOME/bin:$PATH

4、启动kafka各节点
kafka-server-start.sh -daemon /data/kafka/config/server.properties

5、集群状态检查
#创建test topic
kafka-topics.sh --create --zookeeper 192.168.0.109:2181,192.168.0.110:2181,192.168.0.111:2181 --replication-factor 3 --partitions 3 --topic test
#打开2个窗口
#窗口1:
kafka-console-producer.sh --broker-list 192.168.0.109:9092,192.168.0.110:9092,192.168.0.111:9092 --topic test
#交互界面中输入 hello
#窗口2:
kafka-console-consumer.sh --bootstrap-server 192.168.0.109:9092,192.168.0.110:9092,192.168.0.111:9092 --topic test --from-beginning
#接收到 hello ,说明集群搭建成功

2. Linux环境集群安装(Kraft模式)

  • KRaft 模式:这是 Kafka 的新架构,不再依赖外部的 ZooKeeper 集群。元数据由 Kafka 自身通过 Raft 协议管理,简化了部署和运维。
  • SASL 认证:负责身份认证,确保只有合法用户能访问。常见的机制有:
    • PLAIN:使用简单的用户名和密码,配置简单,但密码以明文形式存储,适合测试或内部安全网络。
    • SCRAM(如 SCRAM-SHA-256/512):更安全的机制,密码在服务器端不以明文存储,支持动态管理用户,推荐用于生产环境。
以下配置以三节点 KRaft 集群为例,节点 IP 假设为 10.0.0.101, 10.0.0.102, 10.0.0.103。
mkdir -p /data/kafka/
tar  -zxvf  kafka_2.13-3.8.1.tar.gz
mv kafka_2.13-3.8.1/* /data/kafka/

1. /data/kafka/config/kraft下,新建配置 JAAS 文件
创建一个 JAAS 文件,如 server_jaas.conf,为 Broker 提供认证信息。这里以 SCRAM 机制为例。 
KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="admin"
    password="admin-secret";
};

username和 password是 Broker 自身用于身份验证的凭据。
若使用 PLAIN 机制,格式类似,但密码管理方式不同。
通过环境变量指定该文件: 
export KAFKA_OPTS="-Djava.security.auth.login.config=/data/kafka/config/kraft/server_jaas.conf"

2. 配置 server.properties 每个节点的 config/kraft/server.properties文件是核心。以下是节点1的配置示例,节点2和3需调整 node.id和网络地址。
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@172.17.1.2:9094,2@172.17.1.3:9094,3@172.17.1.4:9094
listeners=INNER://172.17.1.2:9092,PUBLIC://172.17.1.2:9093,CONTROLLER://172.17.1.2:9094

inter.broker.listener.name=INNER
advertised.listeners=INNER://172.17.1.2:9092,PUBLIC://36.123.123.123:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,INNER:SASL_PLAINTEXT,PUBLIC:SASL_PLAINTEXT

sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
super.users=User:admin;User:ANONYMOUS
allow.everyone.if.no.acl.found=false
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer

num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/kraft-combined-logs

num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

default.replication.factor=2
log.cleaner.enable=true
log.cleanup.policy=delete
auto.create.topics.enable=true
delete.topic.enable=true

# 节点2和节点3的配置差异:
节点2: node.id=2, advertised.listeners,listeners
节点3: node.id=3, advertised.listeners,listeners
所有节点的 controller.quorum.voters配置必须完全相同。

3. 集群初始化与启动
生成集群 ID:bin/kafka-storage.sh random-uuid

# 在kafka-node-1节点上执行,格式化存储目录(关键步骤):在初始化时直接创建管理员用户,这是 KRaft 模式下推荐的做法。 
bin/kafka-storage.sh format -t <生成的集群ID> -c ./config/kraft/server.properties --add-scram 'SCRAM-SHA-256=[name="admin",password="admin-secret"]'
# 在 kafka-node-2 和 kafka-node-3 上执行
bin/kafka-storage.sh format -t <生成的集群ID> -c ./config/kraft/server.properties

重要提示:--add-scram参数将用户信息直接写入集群元数据。

# 启动集群:在每个节点上执行,确保已设置 KAFKA_OPTS环境变量指向 JAAS 文件。 
bin/kafka-server-start.sh -daemon ./config/kraft/server.properties

4.命令行工具验证
创建一个配置文件,如 client.properties。 
# 安全协议与机制
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
# SASL JAAS 配置(普通用户)
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret";

使用示例: 
# 创建主题
bin/kafka-topics.sh --bootstrap-server 10.0.0.101:9092 --create --topic test-topic --partitions 3 --replication-factor 3 --command-config client.properties

# 控制台生产者
bin/kafka-console-producer.sh --bootstrap-server 10.0.0.101:9092 --topic test-topic --producer.config client.properties

# 控制台消费者
bin/kafka-console-consumer.sh --bootstrap-server 10.0.0.101:9092 --topic test-topic --from-beginning --consumer.config client.properties

3.常用命令

查topic列表:
bin/kafka-topics.sh --bootstrap-server 192.168.1.60:9092,192.168.1.61:9092 --list

查看某个Topic的详情
bin/kafka-topics.sh --bootstrap-server 192.168.1.60:9092--describe --topic kaoqinTopic

创建topic:
bin/kafka-topics.sh --bootstrap-server 192.168.1.60:9092,192.168.1.61:9092,192.168.1.62:9092 --create --topic kaoqinTopic --partitions 3 --replication-factor 3

删除topic
bin/kafka-topics.sh --bootstrap-server 192.168.1.60:9092 --delete --topic first

-------
查看group list
kafka-consumer-groups.sh --bootstrap-server 192.168.1.60:9092--list

查topic消费组group详情:
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.60:9092 --group kaoqinConsumerGroup --describe 

#删除消费组
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.60:9092 --delete --group kaoqinConsumerGroup

-------
查看Kafka的分区数
bin/kafka-topics.sh --bootstrap-server 192.168.1.60:9092 --describe --topic  kaoqinTopic

修改分区数
bin/kafka-topics.sh --bootstrap-server 192.168.1.60:9092 --topic kaoqinTopic --partitions 6

查看topic的偏移量offset
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list  192.168.1.60:9092  --topic  kaoqinTopic

重置topic的偏移量offset:https://blog.csdn.net/qq_42764468/article/details/128307313
重置需要先停止consumer:https://www.pianshen.com/article/57081563519/
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.60:9092 --group kaoqinConsumerGroup --reset-offsets --topic topicName --to-offset 1000  --execute
bin/kafka-consumer-groups.sh --bootstrap-server  192.168.1.60:9092 --group kaoqinConsumerGroup --reset-offsets --topic kaoqinTopic --to-earliest --execute

  • Producer 消息的生产者
  • Consumer 消息的消费者
  • ConsumerGroup 消费者组,实现单播和广播的手段
  • Broker kafak服务集群节点,Kafka集群中的一台或多台服务器统称broker.
  • Topic Kafka处理资源的消息源(feeds of messages)的不同分类
  • Partition Topic 物理上的分组,一个topic可以分为多个partion,每个partion是一个有序 的队列。partion中每条消息都会被分配一个有序的Id(offset)
  • Message 消息,是通信的基本单位,每个producer可以向一个topic(主题)发布 一些消息
  • Producers 消息和数据生成者,向Kafka的一个topic发布消息的 过程叫做producers

  • 生产者: 生产者生产消息不仅必须指定Topic,还可按照需求指定发往特定的分区
  • 消费者: Kafka 消费消息后不会删除消息 消费者是通过offset 偏移量来控制消费消息,offset 持久化在消费者一方==
  • 一个Topic 可被一个或多个消费者消费
  • 一个消费者可消费不同的多个topic
  • 消费者不仅可以指定要消费的Topic, 还可指定消费的分区
  • 同一个Group 可以定义一个或多个消费者
  • 同一个Group 中的多个消费者只会有一个收到消息
  • 不同Group 相同Topic 的消费者都会收到消息(fanout)
  • Kafka 只保证分区内的记录是有序的,而不保证主题中不同分区的顺序
  • Kafka 作为一个集群,运行在一台或者多台服务器上.
  • Kafka 通过topic 对存储的流数据进行分类。
  • 每条记录中包含一个key ,一个value 和一个timestamp (时间戳)。
  • Kafak 争抢模式实现 多个消费者,同一个Topic 同一个Group
  • Kafak 广播模式实现 多个消费者,同一个topic, 不同Group

问题记录

Post Directory

扫码关注公众号:暂无公众号
发送 290992
即可立即永久解锁本站全部文章