分布式消息队列-Kafka
参考资料
- 消息队列MQ
- 消息中间件(一)MQ详解及四大MQ比较
- MQ消息队列详解、四大MQ的优缺点分析
- Kafka学习之路 (一)
- Kafka学习之路 (二)Kafka的架构
- 秒懂 kafka HA(高可用)
- 大白话 kafka 架构原理
一、JMS & 消息中间件
1.1 JMS(JAVA Message Service,java消息服务):
是一个消息服务的标准或者说是规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。
消息中间件(MOM:Message Orient middleware)
JMS用途和优点:
- 1.将数据从一个应用程序传送到另一个应用程序,或者从软件的一个模块传送到另外一个模块;
- 2.负责建立网络通信的通道,进行数据的可靠传送。
- 3.保证数据不重发,不丢失
- 4.能够实现跨平台操作,能够为不同操作系统上的软件集成技工数据传送服务
1.2 MQ:Message Queue,消息队列
是一个消息的接受和转发的容器,可用于消息推送。
顾名思义,它就是一个队列,简单来说就是一个应用程序A将数据丢到一个队列中,由另一个应用程序B从队列中拿到这个数据,再去做一些其他的业务操作。我们把应用程序A叫做生产者,应用程序B叫做消费者,它们之间传输的数据称作消息。
常见MQ组件:
- ActiveMQ: 一款开源的JMS具体实现。由Apache出品的消息中间件,完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,非常快速,支持多种语言的客户端和协议。
- RabbitMQ: https://blog.csdn.net/yuanlong122716/category_9754690.html
- RocketMQ:
- Kafka: 是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等
- ZeroMQ: 号称最快的消息队列系统,尤其针对大吞吐量的需求场景。
1.3 MQ的使用场景
- 应用解耦
假设系统A通过调用接口推送数据给B、C、D,如果后续系统E也需要被推送、或者B不再需要被推送呢?
那我们就需要修改系统A的代码,加上给E推送数据的逻辑,去掉给B推送数据的逻辑。显然A系统和其他系统严重耦合.
在这个场景中,如果使用 MQ,通过发布订阅模型,就可以实现A和其他系统的解耦。A 产生一条数据,发送到 MQ ,哪个系统需要就去订阅消费,如果某个系统不再需要,取消对消息的订阅即可。 - 异步通信
将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。 - 削峰限流
商城秒杀活动,对于商城系统,可能在0点左右会有个短暂的高峰期,其余时间的并发量也没那么高,假如我们的后台系统直接操作数据库,平时可能没什么问题,但如果突然有很高的的并发量进来,就会因为MySQL并发量过大导致系统瘫痪。
如果使用MQ,请求会短期积压在MQ中,后台系统从MQ中分批拉取消息,从而保证数据库不会被压垮。等高峰期一过,系统就会将MQ中积压的消息慢慢解决掉。这就是MQ的”削峰限流”作用。
1.4 消息传递模式
-
点对点 消息传递模式: 生产者发送一条消息到queue,只有一个消费者能收到。
在点对点消息系统中,消息持久化到一个队列中。一条消息只能被消费一次。当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除。该模式即使有多个消费者同时消费数据,也能保证数据处理的顺序
-
发布-订阅 消息传递模式
在发布-订阅消息系统中,消息被持久化到一个topic中。与点对点消息系统不同的是,消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者
二、Kafka
2.0 Kafka介绍
Kafka是Apache下的一个子项目,是一个高性能、跨语言、分布式发布、发布-订阅 消息队列系统
Kafka相对于ActiveMQ是一个非常轻量级的消息系统.
主要应用场景是:日志收集系统 和 消息系统。
上图中一个topic配置了3个partition。Partition1有两个offset:0和1。Partition2有4个offset。Partition3有1个offset。副本的id和副本所在的机器的id恰好相同。
如果一个topic的副本数为3,那么Kafka将在集群中为每个partition创建3个相同的副本。集群中的每个broker存储一个或多个partition。多个producer和consumer可同时生产和消费数据。
2.1.Kafka的优点
- 应用解耦
- 异步处理
- 消息通信:
- 流量削峰: 秒杀场景
- 冗余(副本):消息队列把数据进行持久化直到它们已经被完全处理
- 可恢复性:加入队列中的消息仍然可以在系统恢复后被处理
- 顺序保证:Kafka保证一个Partition内的消息的有序性,主题topic会有多个分区Partition,所以在整个主题的范围内,是无法保证消息的顺序的,单个分区则可以保证
2.2.Kafka中的术语解释
broker
- Kafka 集群包含一个或多个服务器,服务器节点称为broker
- broker存储topic的数据。
- 如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。
- 如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据
- 如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。
Topic
- 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic
- 物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处
- topic会有多个分区Partition,所以在整个主题的范围内,是无法保证消息的顺序的
Partition
- topic中的数据分割为一个或多个partition。每个topic至少有一个partition
- 每个partition中的数据使用多个segment文件存储
- partition中的数据是有序的,同一topic不同partition间的数据丢失了数据的顺序
- 在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1
Producer
- 生产者即数据的发布者,该角色将消息发布到Kafka的topic中
- broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。
- 生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。
Consumer
- 消费者可以从broker中读取数据。
- 消费者可以消费多个topic中的数据
Consumer Group
- 每个Consumer属于一个特定的Consumer Group
- 可为每个Consumer指定group name,若不指定group name则属于默认的group
Leader
- 每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition
Follower
- Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。
- 当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“同步中副本”“in sync replicas”(ISR)列表中删除,重新创建一个Follower。
2.3.kafka安装
2.3.1 windows环境单点安装
2.3.2 Linux环境单点安装
2.3.3 Linux环境集群安装
2.4.Springboot集成使用kafka
//1.添加依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
//2.添加配置
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092 #指定kafka server的地址,集群配多个,中间,逗号隔开
producer:
# Kafka提供的序列化和反序列化类
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default_consumer_group #群组ID
enable-auto-commit: true
# 提交offset延时(接收到消息后多久提交offset)
auto-commit-interval: 1000
# Kafka提供的序列化和反序列化类
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 消费端监听的topic不存在时,项目启动会报错(关掉)
missing-topics-fatal: false
//3.简单生产者Producer
@RestController
public class ProducerController {
@Resource
private KafkaTemplate<String, Object> kafkaTemplate;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate1;
@RequestMapping("message/send")
public String send(){
//使用kafka模板发送信息,newTopic:topic不存在时会自行创建
kafkaTemplate.send("newTopic", "This is msg");
kafkaTemplate1.send("testDemo","This is msg too");
return "success";
}
}
//4.简单消费者Consumer
@Component
public class Consumer {
/**
* 定义此消费者接收topics = "testDemo"的消息,与producerController中的topic对应上即可
* @param record 变量代表消息本身,可以通过ConsumerRecord<?,?>类型的record变量来打印接收的消息的各种信息
*/
@KafkaListener(topics = "newTopic")
public void listen (ConsumerRecord<?, ?> record){
System.out.printf("topic is %s, offset is %d, value is %s \n", record.topic(), record.offset(), record.value());
}
@KafkaListener(topics = "testDemo")
public void listen2 (ConsumerRecord<?, ?> record){
System.out.printf("topic is %s, offset is %d, value is %s \n", record.topic(), record.offset(), record.value());
}
}
2.5 spring-cloud-stream使用kafka
官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架
三、RocketMQ
3.0 RocketMQ介绍
3.1 RocketMQ集群安装部署
四、RabbitMQ
4.0 RabbitMQ介绍
4.1 RabbitMQ集群安装部署
五、问题整理
5.1 消息可靠性
- 消息不丢失
- 消息不会重复消费
- 消息有序消费
5.2 消息防丢失
由于网络问题,我们难保证生产者发送的消息能100%到达消息队列服务器,也就是说有消息丢失的可能性
因此,生产者就必项具有【消息丢失检测】和【重发】机制,也就是我们说的消息队列的亊物机制
同步的事务——停止等待
同步的事务——连续ARQ
异步的事务——回调机制
5.3 消息防重复消费
5.4 消息按序消费
附:常用命令
## 创建topic
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092 --create --topic newTopic --partitions 4 --replication-factor 3
## 查看topic-list
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list
## 查看消费group详情
bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group new-consumer-group --describe
# 查询消费组
kafka-consumer-groups.sh --bootstrap-server {kafka连接地址} --list
# 删除消费组
kafka-consumer-groups.sh --bootstrap-server {kafka连接地址} --delete --group {消费组}