分布式消息队列-Kafka

2021/06/18

分布式消息队列-Kafka

参考资料

一、JMS & 消息中间件

1.1 JMS(JAVA Message Service,java消息服务):

是一个消息服务的标准或者说是规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。

消息中间件(MOM:Message Orient middleware)

JMS用途和优点:

  • 1.将数据从一个应用程序传送到另一个应用程序,或者从软件的一个模块传送到另外一个模块;
  • 2.负责建立网络通信的通道,进行数据的可靠传送。
  • 3.保证数据不重发,不丢失
  • 4.能够实现跨平台操作,能够为不同操作系统上的软件集成技工数据传送服务

1.2 MQ:Message Queue,消息队列

是一个消息的接受和转发的容器,可用于消息推送。

顾名思义,它就是一个队列,简单来说就是一个应用程序A将数据丢到一个队列中,由另一个应用程序B从队列中拿到这个数据,再去做一些其他的业务操作。我们把应用程序A叫做生产者,应用程序B叫做消费者,它们之间传输的数据称作消息。

常见MQ组件:

  • ActiveMQ: 一款开源的JMS具体实现。由Apache出品的消息中间件,完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,非常快速,支持多种语言的客户端和协议。
  • RabbitMQ: https://blog.csdn.net/yuanlong122716/category_9754690.html
  • RocketMQ:
  • Kafka: 是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等
  • ZeroMQ: 号称最快的消息队列系统,尤其针对大吞吐量的需求场景。

1.3 MQ的使用场景

  • 应用解耦
    假设系统A通过调用接口推送数据给B、C、D,如果后续系统E也需要被推送、或者B不再需要被推送呢?
    那我们就需要修改系统A的代码,加上给E推送数据的逻辑,去掉给B推送数据的逻辑。显然A系统和其他系统严重耦合.
    在这个场景中,如果使用 MQ,通过发布订阅模型,就可以实现A和其他系统的解耦。A 产生一条数据,发送到 MQ ,哪个系统需要就去订阅消费,如果某个系统不再需要,取消对消息的订阅即可。
  • 异步通信
    将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
  • 削峰限流
    商城秒杀活动,对于商城系统,可能在0点左右会有个短暂的高峰期,其余时间的并发量也没那么高,假如我们的后台系统直接操作数据库,平时可能没什么问题,但如果突然有很高的的并发量进来,就会因为MySQL并发量过大导致系统瘫痪。
    如果使用MQ,请求会短期积压在MQ中,后台系统从MQ中分批拉取消息,从而保证数据库不会被压垮。等高峰期一过,系统就会将MQ中积压的消息慢慢解决掉。这就是MQ的”削峰限流”作用。

1.4 消息传递模式

  • 点对点 消息传递模式: 生产者发送一条消息到queue,只有一个消费者能收到。
    在点对点消息系统中,消息持久化到一个队列中。一条消息只能被消费一次。当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除。该模式即使有多个消费者同时消费数据,也能保证数据处理的顺序

  • 发布-订阅 消息传递模式
    在发布-订阅消息系统中,消息被持久化到一个topic中。与点对点消息系统不同的是,消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者

二、Kafka

2.0 Kafka介绍

Kafka是Apache下的一个子项目,是一个高性能、跨语言、分布式发布、发布-订阅 消息队列系统

Kafka相对于ActiveMQ是一个非常轻量级的消息系统.

主要应用场景是:日志收集系统 和 消息系统。

上图中一个topic配置了3个partition。Partition1有两个offset:0和1。Partition2有4个offset。Partition3有1个offset。副本的id和副本所在的机器的id恰好相同。

如果一个topic的副本数为3,那么Kafka将在集群中为每个partition创建3个相同的副本。集群中的每个broker存储一个或多个partition。多个producer和consumer可同时生产和消费数据。

2.1.Kafka的优点

  • 应用解耦
  • 异步处理
  • 消息通信:
  • 流量削峰: 秒杀场景
  • 冗余(副本):消息队列把数据进行持久化直到它们已经被完全处理
  • 可恢复性:加入队列中的消息仍然可以在系统恢复后被处理
  • 顺序保证:Kafka保证一个Partition内的消息的有序性,主题topic会有多个分区Partition,所以在整个主题的范围内,是无法保证消息的顺序的,单个分区则可以保证

2.2.Kafka中的术语解释

broker

  • Kafka 集群包含一个或多个服务器,服务器节点称为broker
  • broker存储topic的数据。
    • 如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。
    • 如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据
    • 如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。

Topic

  • 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic
  • 物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处
  • topic会有多个分区Partition,所以在整个主题的范围内,是无法保证消息的顺序的

Partition

  • topic中的数据分割为一个或多个partition。每个topic至少有一个partition
  • 每个partition中的数据使用多个segment文件存储
  • partition中的数据是有序的,同一topic不同partition间的数据丢失了数据的顺序
  • 在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1

Producer

  • 生产者即数据的发布者,该角色将消息发布到Kafka的topic中
  • broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。
  • 生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。

Consumer

  • 消费者可以从broker中读取数据。
  • 消费者可以消费多个topic中的数据

Consumer Group

  • 每个Consumer属于一个特定的Consumer Group
  • 可为每个Consumer指定group name,若不指定group name则属于默认的group

Leader

  • 每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition

Follower

  • Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。
  • 当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“同步中副本”“in sync replicas”(ISR)列表中删除,重新创建一个Follower。

2.3.kafka安装

2.3.1 windows环境单点安装

2.3.2 Linux环境单点安装

2.3.3 Linux环境集群安装

2.4.Springboot集成使用kafka

//1.添加依赖
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

//2.添加配置
spring:   
  kafka:
    bootstrap-servers: 127.0.0.1:9092 #指定kafka server的地址,集群配多个,中间,逗号隔开
    producer:
      # Kafka提供的序列化和反序列化类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: default_consumer_group #群组ID
      enable-auto-commit: true
      # 提交offset延时(接收到消息后多久提交offset)
      auto-commit-interval: 1000
      # Kafka提供的序列化和反序列化类
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 消费端监听的topic不存在时,项目启动会报错(关掉)
      missing-topics-fatal: false

//3.简单生产者Producer
@RestController
public class ProducerController {
    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate1;

    @RequestMapping("message/send")
    public String send(){
        //使用kafka模板发送信息,newTopic:topic不存在时会自行创建
        kafkaTemplate.send("newTopic", "This is msg");
        kafkaTemplate1.send("testDemo","This is msg too");
        return "success";
    }
}

//4.简单消费者Consumer 
@Component
public class Consumer {
    /**
     * 定义此消费者接收topics = "testDemo"的消息,与producerController中的topic对应上即可
     * @param record 变量代表消息本身,可以通过ConsumerRecord<?,?>类型的record变量来打印接收的消息的各种信息
     */
    @KafkaListener(topics = "newTopic")
    public void listen (ConsumerRecord<?, ?> record){
        System.out.printf("topic is %s, offset is %d, value is %s \n", record.topic(), record.offset(), record.value());
    }

    @KafkaListener(topics = "testDemo")
    public void listen2 (ConsumerRecord<?, ?> record){
        System.out.printf("topic is %s, offset is %d, value is %s \n", record.topic(), record.offset(), record.value());
    }
}

2.5 spring-cloud-stream使用kafka

官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架


三、RocketMQ

3.0 RocketMQ介绍

3.1 RocketMQ集群安装部署

四、RabbitMQ

4.0 RabbitMQ介绍

4.1 RabbitMQ集群安装部署

五、问题整理

5.1 消息可靠性

  • 消息不丢失
  • 消息不会重复消费
  • 消息有序消费

5.2 消息防丢失

由于网络问题,我们难保证生产者发送的消息能100%到达消息队列服务器,也就是说有消息丢失的可能性
因此,生产者就必项具有【消息丢失检测】和【重发】机制,也就是我们说的消息队列的亊物机制

同步的事务——停止等待

同步的事务——连续ARQ

异步的事务——回调机制

5.3 消息防重复消费

5.4 消息按序消费

附:常用命令

## 创建topic
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092 --create --topic newTopic --partitions 4 --replication-factor 3

## 查看topic-list
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list

## 查看消费group详情
bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group new-consumer-group --describe

# 查询消费组
kafka-consumer-groups.sh --bootstrap-server {kafka连接地址} --list 
 
# 删除消费组
kafka-consumer-groups.sh --bootstrap-server {kafka连接地址} --delete --group {消费组}

Post Directory

扫码关注公众号:暂无公众号
发送 290992
即可立即永久解锁本站全部文章