kafka集群部署

2021/03/17

kafka集群部署

参考资料

  • 官网地址:https://kafka.apache.org/
  • 下载地址:https://kafka.apache.org/downloads
  • 文档地址:

1 Linux环境集群安装

【组件部署】kafka + zookeeper集群安装
## 一、概述:
安装方式:编译安装 
环境信息:3 zookeeper(192.168.0.5、192.168.0.6、192.168.0.7) + 3 kafka(192.168.0.109、192.168.0.110、192.168.0.111)

## 二、zookeeper安装(在每台zookeeper设备上操作)
1、环境准备
安装jdk,此处省略

# 创建数据、日志目录
mkdir -p /data/zookeeper/data
mkdir -p /data/zookeeper/log

2、下载解压
下载地址:http://mirrors.hust.edu.cn/apache/zookeeper
cd /data/package
wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
tar -zxf zookeeper-3.4.10.tar.gz
mv zookeeper-3.4.10 /data/package/zookeeper //统一安装目录于/data

3、修改配置文件
cd /data/zookeeper/conf
cp zoo_sample.cfg zoo.cfg
-- 配置以下内容
clientPort=2181
tickTime=2000
initLimit=30000
syncLimit=10
maxClientCnxns=2000
autopurge.snapRetainCount=10
autopurge.purgeInterval=1
preAllocSize=131072
snapCount=3000000
 
maxSessionTimeout=60000000
dataDir=/data/zookeeper/data
dataLogDir=/data/zookeeper/log
server.1=192.168.0.5:2888:3888
server.2=192.168.0.6:2888:3888
server.3=192.168.0.7:2888:3888

4、添加环境变量
vim /etc/profile && vim /etc/bashrc
export ZK_HOME=/data/zookeeper
export PATH=$ZK_HOME/bin:$PATH
-- 注释
/etc/profile //交互式全局环境变量配置文件
/etc/bashrc //非交互式全局环境变量配置文件,自动化远程执行脚本用到此配置文件
echo 2 >/data/zookeeper/data/myid //每个节点的myid不一样

5、修改log4j日志配置
vim /data/zookeeper/conf/log4j.properties

zookeeper.root.logger=INFO,ROLLINGFILE
zookeeper.log.file=zookeeper.log
log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
#log4j.appender.ROLLINGFILE.MaxFileSize=10MB
vim /data/zookeeper/bin/zkEnv.sh

if [ "x${ZOO_LOG_DIR}" = "x" ]
then
    ZOO_LOG_DIR="/data/zookeeper/service-logs"
fi
 
if [ "x${ZOO_LOG4J_PROP}" = "x" ]
then
    ZOO_LOG4J_PROP="INFO,ROLLINGFILE"
fi

6、启动服务
zkServer.sh start

7、查看集群状态
zkServer.sh status

## 三、kafka安装(每台kafka设备上操作)
1、下载解压
cd /data/package
wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.12-1.1.0.tgz
tar -xf kafka_2.12-1.1.0.tgz
mv kafka_2.12-1.1.0 /data/kafka

2、修改配置文件
cd /data/kafka/config
vim server.properties
-- 配置以下内容
broker.id=0 //各kafka节点编号各不通
listeners=PLAINTEXT://192.168.0.109:9092 //kafka各节点ip
log.dirs=/data/kafka/kafka-logs
zookeeper.connect=192.168.0.109:2181,192.168.0.110:2181,192.168.0.111:2181 //zookeeper各节点IP:PORT

3、添加环境变量
export KAFKA_HOME=/data/kafka
export PATH=$KAFKA_HOME/bin:$PATH

4、启动kafka各节点
kafka-server-start.sh -daemon /data/kafka/config/server.properties

5、集群状态检查
#创建test topic
kafka-topics.sh --create --zookeeper 192.168.0.109:2181,192.168.0.110:2181,192.168.0.111:2181 --replication-factor 3 --partitions 3 --topic test
#打开2个窗口
#窗口1:
kafka-console-producer.sh --broker-list 192.168.0.109:9092,192.168.0.110:9092,192.168.0.111:9092 --topic test
#交互界面中输入 hello
#窗口2:
kafka-console-consumer.sh --bootstrap-server 192.168.0.109:9092,192.168.0.110:9092,192.168.0.111:9092 --topic test --from-beginning
#接收到 hello ,说明集群搭建成功

2.常用命令

查topic列表:
bin/kafka-topics.sh --bootstrap-server 192.168.1.60:9092,192.168.1.61:9092 --list

创建topic:
bin/kafka-topics.sh --bootstrap-server 192.168.1.60:9092,192.168.1.61:9092,192.168.1.62:9092 --create --topic kaoqinTopic --partitions 4 --replication-factor 3

删除topic
bin/kafka-topics.sh --bootstrap-server 192.168.1.60:9092 --delete --topic first

查看某个Topic的详情
bin/kafka-topics.sh --bootstrap-server 192.168.1.60:9092--describe --topic kaoqinTopic

查topic消费组详情:
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.60:9092 --group kaoqinConsumerGroup --describe 

查看Kafka的分区数
bin/kafka-topics.sh --bootstrap-server 192.168.1.60:9092 --describe --topic  kaoqinTopic

修改分区数
bin/kafka-topics.sh --bootstrap-server 192.168.1.60:9092 --topic kaoqinTopic --partitions 6

查看topic的偏移量offset
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list  192.168.1.60:9092  --topic  kaoqinTopic

重置topic的偏移量offset:https://blog.csdn.net/qq_42764468/article/details/128307313
重置需要先停止consumer:https://www.pianshen.com/article/57081563519/
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.60:9092 --group kaoqinConsumerGroup --reset-offsets --topic topicName --to-offset 1000  --execute
bin/kafka-consumer-groups.sh --bootstrap-server  192.168.1.60:9092 --group kaoqinConsumerGroup --reset-offsets --topic kaoqinTopic --to-earliest --execute

  • Producer 消息的生产者
  • Consumer 消息的消费者
  • ConsumerGroup 消费者组,实现单播和广播的手段
  • Broker kafak服务集群节点,Kafka集群中的一台或多台服务器统称broker.
  • Topic Kafka处理资源的消息源(feeds of messages)的不同分类
  • Partition Topic 物理上的分组,一个topic可以分为多个partion,每个partion是一个有序 的队列。partion中每条消息都会被分配一个有序的Id(offset)
  • Message 消息,是通信的基本单位,每个producer可以向一个topic(主题)发布 一些消息
  • Producers 消息和数据生成者,向Kafka的一个topic发布消息的 过程叫做producers

  • 生产者: 生产者生产消息不仅必须指定Topic,还可按照需求指定发往特定的分区
  • 消费者: Kafak 消费消息后不会删除消息 消费者是通过offset 偏移量来控制消费消息,offset 持久化在消费者一方==
  • 一个Topic 可被一个或多个消费者消费
  • 一个消费者可消费不同的多个topic
  • 消费者不仅可以指定要消费的Topic, 还可指定消费的分区
  • 同一个Group 可以定义一个或多个消费者
  • 同一个Group 中的多个消费者只会有一个收到消息
  • 不同Group 相同Topic 的消费者都会收到消息(fanout)
  • Kafka 只保证分区内的记录是有序的,而不保证主题中不同分区的顺序
  • Kafka 作为一个集群,运行在一台或者多台服务器上.
  • Kafka 通过topic 对存储的流数据进行分类。
  • 每条记录中包含一个key ,一个value 和一个timestamp (时间戳)。
  • Kafak 争抢模式实现 多个消费者,同一个Topic 同一个Group
  • Kafak 广播模式实现 多个消费者,同一个topic, 不同Group

问题记录

Post Directory

扫码关注公众号:暂无公众号
发送 290992
即可立即永久解锁本站全部文章