
消息队列初探(一)
前序知识:
数据结构之队列
微服务和分布式相关基础知识 http://zxlmdonnie.cn/archives/1705313187073
MQ 是什么?MQ从哪里发展而来?为什么在微服务系统中使用MQ?
本文
从操作系统的进程间通信(IPC)开始
到同步的 RPC
再到异步的 MQ
最后介绍一种常用的MQ:RocketMQ
1. 同计算机进程间通信 IPC
我们在上一篇文章介绍了进程,并介绍如何使用C语言开启一个进程。
不过,有些复杂程序或者是系统需要多个进程或者线程共同完成某个具体的任务,那么也就需要进程之间通信和数据访问。整个系统以进程粒度运行可以进一步提高系统整体并行性能和内存访问安全,每个进程可以有各自的分工。所以多个进程共同完成一个大的系统是比单个进程多线程要有很大的优势。
进程间通信(IPC,Interprocess communication)是一组编程接口,让程序员能够协调不同的进程,使之能在一个操作系统里同时运行,并相互传递、交换信息。IPC方法包括管道(PIPE)、消息队列、信号、共享存储以及套接字(Socket)。
1.1 管道
我们最常见的一种方式就是(匿名)管道,比如以下命令就可以查看所有带有关键字 "java" 的进程:
ps -ef | grep "java"
显然,管道这种方式是同步的,实际上是一个固定大小的缓冲区;并且它是半双工数据传输。
半双工数据传输: 允许数据在两个方向上传输,但是,在某一时刻,只允许数据在一个方向上传输,它实际上是一种切换方向的单工通信;在同一时间只可以有一方接受或发送信息,可以实现双向通信。举例:对讲机。
1.2 消息队列
是的,消息队列这个词最早出自于这里。
管道通信属于一股脑的输入,能不能稍微温柔点有规矩点的发送消息?消息队列在发送数据的时候,按照一个个独立单元(消息体)进行发送,其中每个消息体规定大小块,同时发送方和接收方约定好消息类型或者正文的格式。
直接通信方式
在消息传递系统中,进程间的数据交换是以格式化的消息(Message)为单位的。若通信的进程之间不存在可直接访问的共享空间,则必须利用操作系统提供的消息传递方法实现进程通信。
直接通信方式:发送进程直接把消息发送给接收进程,并将它挂在接收进程的消息缓冲队列上,接收进程从消息缓冲队列中取得消息。
间接通信方式:发送进程把消息发送到某个中间实体中,接收进程从中间实体中取得消息。这种中间实体一般称为信箱,这种通信方式又称为信箱通信方式。该通信方式广泛引用于计算机网络中,相应的通信系统称为电子邮件系统。
简单理解就是,A要告诉B某些事情,就要写信,然后通过邮差送给B。直接通信就是,邮差把信直接送到B手上,间接通信就是,B家门口有一个邮箱,邮差把信放到邮箱里。
我们在 Ubuntu 22.04(已安装中文语言包) 上输入以下命令
# ipcs -q
--------- 消息队列 -----------
键 msqid 拥有者 权限 已用字节数 消息
可以使用C语言创建一个消息队列:
#include <stdio.h>
#include <sys/types.h> /*包含系统类型定义,是使用某些系统调用时所必需的。*/
#include <sys/ipc.h> /*包含进程间通信相关的头文件,提供了消息队列、信号量等IPC机制的定义。*/
#include <sys/msg.h> /*包含消息队列相关的头文件,定义了操作消息队列所需的数据结构和函数。*/
int main()
{
/*使用ftok函数创建一个唯一的键(key),这个键用于标识消息队列。*/
/*这里使用的是/home/xiaoming路径和项目标识符777。*/
key_t key = ftok("/home/xiaoming",777);
/*使用msgget函数获取或创建一个消息队列。*/
/*第一个参数是之前创建的键,第二个参数是操作标志。*/
/*IPC_CREAT表示如果消息队列不存在,则创建它;*/
/*0666是一个八进制数,表示消息队列的权限设置,相当于读写权限对所有人开放。*/
int msgid = msgget(key, IPC_CREAT | 0666);
return 0;
}
需要注意的是,这段代码并没有实际发送或接收消息,它只是创建了一个消息队列。要使用消息队列进行通信,还需要编写发送和接收消息的代码,并且这段代码中没有错误处理,实际使用时应该检查
msgget
和ftok
的返回值,以确保操作成功。
root@LAPTOP-M3IADGND:/home/dogitry/learning# vim msgget.c
root@LAPTOP-M3IADGND:/home/dogitry/learning# gcc -o msgget msgget.c
root@LAPTOP-M3IADGND:/home/dogitry/learning# ./msgget
root@LAPTOP-M3IADGND:/home/dogitry/learning# ipcs -q
--------- 消息队列 -----------
键 msqid 拥有者 权限 已用字节数 消息
0xffffffff 0 root 666 0 0
2. 远程过程调用 RPC
进程间通信 概念泛指进程之间任何形式的通信行为,除了本地的通讯,还可以包括计算机之间的进程间通讯。
不过 IPC 这个概念通常指的是同一台计算机的进程间通信。
RPC是其中一种。
2.1 进程间通信分类
为了理清本文各个通信模型的关系,进程间通信可分类如下:
进程间通信IPC
消息队列 MQ
远程过程调用 RPC
HTTP是一种应用层协议,主要用于客户端和服务器之间的通信,但也可以在进程间通信中使用,尤其是在需要网络通信的场景。也就是说HTTP不属于IPC但是可以用于进程间通信。
RESTful 是一种设计哲学,它使用HTTP协议的特点来构建可伸缩的网络服务,通常使用HTTP方法和URI来访问和操作资源。
一个RESTful API 可以作为RPC机制暴露服务,同时使用MQ来异步处理一些操作。它们之间是独立的关系。
2.2 RPC 简要介绍
RPC 特指一种隐藏了过程调用时实际通信细节的IPC方法。客户端将调用一个本地方法,而这个本地方法则是负责透明的与远程服务端进行过程间通信。这个本地方法会讲相关参数顺序打包到一个消息中,然后把这个消息发送给服务端提供的方法,服务端的方法会从消息中解出序列化发出来的参数,然后执行,最后仍以同样的方式将方法的返回值发送给客户端。
RPC通常建立在网络协议之上,可以跨网络进行通信。传输方式可以是 Socket,或者用 Asio,ZeroMQ,activemq,rabbitmq,Netty 。传输协议可以TCP,UDP,HTTP。
主流的RPC服务框架包括但不限于以下几种:
Dubbo:由阿里巴巴开源的Java RPC框架,专注于解决分布式服务治理问题,支持多种协议和序列化方式,具有高性能和良好的服务治理能力。
Spring Cloud:Spring Cloud是基于Spring Boot的微服务解决方案,提供了一整套的微服务管理和协调工具,如服务发现、配置管理、消息总线等。
gRPC:由Google开发的开源RPC框架,使用HTTP/2协议和Protocol Buffers序列化,支持多种编程语言,以高性能和通用性著称。
Thrift:由Facebook开源的跨语言服务开发框架,支持多种编程语言,提供完整的客户端/服务端堆栈,支持同步和异步通信。
2.3 RPC 通信模型
来源于RocketMQ官网
服务提供者 provider(或者 生产者 producer):右边那一列都是服务生产者。提供具体的调用方法的系统,根据上图也就是服务端,服务端才是方法真正的提供者
服务消费者 consumer :左边那一列都是服务消费者。调用服务的系统,根据模型图,也就是客户端
也就是说,消费者调用生产者提供的方法 / 接口 / 数据!
2.4 RPC 特点
面向动作
请求响应模式:客户端必须明确要调用哪个服务器
同步:调用需要立即得到返回结果
对象级/函数级通信
如果服务的调用需要在短期内返回结果时,并且同一个请求的关注者只有一个,这个时候就应该使用RPC。
3 作为中间件的MQ
前文提到了 RPC 的特点:同步、请求响应、同一个请求关注者唯一。那么如果我们想要一个异步、同一个请求可以有多个消费者,又需要用到什么模型呢?
于是有人便想到了操作系统的消息队列这一个概念:
IPC通信的消息队列
这种消息队列解决了异步消息的问题,不过没有解决同一个请求可以有多个消费者的模型。因此为了解耦,再结合 RPC 的提供者——消费者模型,我们就抽取出来一个模型:作为中间件的MQ。
作为中间件的MQ
后文所称的MQ都是作为中间件的MQ。
3.1 原始模型的进化
点对点模式(PTP):一个生产者发送的每一个消息,都只有一个消费者消费,看起来就像消息从一个点被传递到另一个点,也就是单播。
消费匿名:消息上下游沟通的唯一的身份就是队列,下游消费者从队列获取消息无法申明独立身份。
一对一通信:基于消费匿名特点,下游消费者即使有多个,但都没有自己独立的身份,因此共享队列中的消息,每一条消息都只会被唯一一个消费者处理。因此点对点模型只能实现一对一通信。
发布订阅模式(PubSub):一个生产者发送的每一个消息,都会发送到所有订阅了此队列的消费者,也就是广播。
消费独立:相比队列模型的匿名消费方式,发布订阅模型中消费方都会具备的身份,一般叫做订阅组(订阅关系),不同订阅组之间相互独立不会相互影响。
一对多通信:基于独立身份的设计,同一个主题内的消息可以被多个订阅组处理,每个订阅组都可以拿到全量消息。因此发布订阅模型可以实现一对多通信。
4 消息队列的应用场景
4.1 MQ 特点
MQ一般都有以下特点:
面向数据
生产者与消费者不直接交互:
发送者并不明确谁是消费者,也不关心谁来消费信息
各个消费者可以从不同的角度入手处理消息,处理结果也不返回给发送者
消息的关注者不止一个
有缓冲节点
异步
消息的发送和处理是异步的
系统级/模块级通信
因此消息队列在实际应用中的常用使用场景有:
异步处理
应用解耦
流量削峰
消息通讯
4.2 异步处理
很简单,直接上两张图一看就懂:
引入MQ前,同步方式,先后进行,速度慢
引入MQ后,异步方式,同时进行,速度提升
除了异步处理,这个MQ还可以提供流量削峰作用:队列转储了消息,做了一个中间站,对于超出系统承载能力的场景,"MQ"作为漏斗进行限流保护,就是说把最高的峰值削一部分给峰谷,就好像把波浪线修成直线一样,均衡一下
4.3 日志处理
对于大量的日志数据,可以用Kafka接着:
日志采集客户端:负责日志数据采集、定时写入Kafka队列
Kafka消息队列:负责日志数据的接收,存储和转发
日志处理应用:订阅并削峰kafka队列中的日志数据
4.4 消息通讯
分为点对点通讯(也就是单向私聊)和聊天室(也就是群聊或双向私聊)
点对点通讯: 两个客户端之间公用一个队列
聊天室通讯:多个客户端订阅同一主题(注意这里是订阅不是消费,说明一个消息可以被多次消费)
4.5 MQ 会带来的问题
系统可用性降低:系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响。
系统复杂度提高:MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。
一致性问题:A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理完成、D处理失败、如何保证消息数据处理的一致性。
消息没有被重复消费、怎么处理消息丢失情况、如何保证消息传递的顺序性。
5 各个MQ对比
简单看一下就好,看看目前业界都有哪些MQ。
6 RocketMQ 介绍
RocketMQ 是一个 队列模型 的消息中间件,具有高性能、高可靠、高实时、分布式 的特点。
它是一个采用 Java 语言开发的分布式的消息系统。
RocketMQ 领域模型:发布订阅模式
RocketMQ 4.x 系统架构图
看起来有点复杂,其实使用起来很简单,我们只需要提取出我们需要了解的东西就可以了。
6.1 使用RocketMQ需要了解的基本概念
来自 RocketMQ官方文档:https://rocketmq.apache.org/zh/docs/introduction/02concepts
一般的发布订阅(pub-sub)领域模型
Topic:Apache RocketMQ 中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息。主题通过TopicName来做唯一标识和区分。5.0 以后每个Topic下面的消息类型必须相同。
也就是说,每个 message 消息都必须有一个 Topic,还可以有更细化的 Tag 。
每个 Topic 和一个业务逻辑(如下订单)对应。
可以在上图中理解成对应一个业务逻辑。
消息类型:Apache RocketMQ 支持的消息类型有普通消息、顺序消息、事务消息和定时/延时消息。
rcmq的领域模型,相比上图在架构上也只是多了SubScription Relationship
订阅关系:是Apache RocketMQ 系统中消费者获取消息、处理消息的规则和状态配置。订阅关系由消费者分组动态注册到服务端系统,并在后续的消息传输中按照订阅关系定义的过滤规则进行消息匹配和消费进度维护。
没了。就这么简单。
其余的一些概念之前就已经介绍过了,这里只是搬一下官方文档:
生产者:是Apache RocketMQ 系统中用来构建并传输消息到服务端的运行实体。生产者通常被集成在业务系统中,将业务消息按照要求封装成消息并发送至服务端。
消费者:是Apache RocketMQ 中用来接收并处理消息的运行实体。消费者通常被集成在业务系统中,从服务端获取消息,并将消息转化成业务可理解的信息,供业务逻辑处理。
6.2 部署RocketMQ需要了解的基本概念
不过有个问题,如果我们想在Java客户端使用RCMQ,我们需要先学会部署RCMQ服务,而这又要让我们学一些它的架构知识:
RCMQ服务端:NameServer 和 Brocker
Broker:主要负责消息的存储、查询消费。
NameServer:各个Broker的注册中心。提供两个功能
Broker 管理
Topic 与 Broker 之间的关联,即路由信息管理。Broker 和 Topic 是多对多关系。可以让生产者 / 消费者根据Topic找到Broker。
Nameserver可以管理broker和路由信息,让broker和生产者消费组解耦,生产者和消费组不需要关注broker的添加删除变动,只需要与nameserver交流就能根据topic找到对应的broker进行交互,这样broker就可以比较灵活的进行变动了。
注册中心可以理解为 SpringCloud微服务体系的Nacos那样的注册中心。
NameServer有注册中心所具有的很多功能,如Broker 会将自己的信息注册到 NameServer 中,消费者和生产者就从 NameServer 中根据要查找 的topic查询broker-topic路由表,和查找到的 Broker 进行通信(生产者和消费者定期会向 NameServer去查询相关的 Broker 的信息)
也就是说 RCMQ 其实就是一个小微服务系统,有一堆核心服务集群和它们的注册中心。
6.3 快速开始
此部分为参照官方文档,参考价值不大:https://rocketmq.apache.org/zh/docs/quickStart/03quickstartWithDockercompose
docker-compose.yml
version: '3.8'
services:
namesrv:
image: apache/rocketmq:5.2.0
container_name: rmqnamesrv
ports:
- 9876:9876
networks:
- rocketmq
command: sh mqnamesrv
broker:
image: apache/rocketmq:5.2.0
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
- 10912:10912
environment:
- NAMESRV_ADDR=rmqnamesrv:9876
depends_on:
- namesrv
networks:
- rocketmq
command: sh mqbroker
proxy:
image: apache/rocketmq:5.2.0
container_name: rmqproxy
networks:
- rocketmq
depends_on:
- broker
- namesrv
ports:
- 8080:8080
- 8081:8081
restart: on-failure
environment:
- NAMESRV_ADDR=rmqnamesrv:9876
command: sh mqproxy
networks:
rocketmq:
driver: bridge
运行:
PS D:\Users\Donnie\Desktop\RCMQ> docker-compose -p rockermq_project up -d
[+] Running 11/11
✔ proxy Pulled 198.8s
✔ broker Pulled 198.8s
✔ namesrv 8 layers [⣿⣿⣿⣿⣿⣿⣿⣿] 0B/0B Pulled 198.8s
✔ 2d473b07cdd5 Pull complete 109.3s
✔ 9e946c336cea Pull complete 19.1s
✔ 8b01ee688ae6 Pull complete 75.6s
✔ 033e219267ec Pull complete 22s
✔ 79debc7b0c0c Pull complete 108.6s
✔ c33017b514b2 Pull complete 78.2s
✔ 7686c4289dd9 Pull complete 174.3s
✔ ef67f0a22d72 Pull complete 186.8s
[+] Running 4/4
✔ Network rockermq_project_rocketmq Created 0.1s
✔ Container rmqnamesrv S... 0.3s
✔ Container rmqbroker St... 0.1s
✔ Container rmqproxy Sta... 0.1s
运行了三个服务
C:\Users\Donnie>docker exec -it rmqbroker bash
[rocketmq@e4b475f4221d bin]$ sh mqadmin updatetopic -t TestTopic -c DefaultCluster
create topic to 172.20.3:10911 success.
TopicConfig [topicName=TestTopic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={}]
进入broker容器,通过mqadmin创建 Topic。
Java 客户端:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.7</version>
</dependency>
pom.xml
package com.werun.werunjuly.common.rcmq;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerExample {
private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);
public static void main(String[] args) throws ClientException {
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081
String endpoint = "localhost:8081";
// 消息发送的目标Topic名称,需要提前创建。
String topic = "TestTopic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
ClientConfiguration configuration = builder.build();
// 初始化Producer时需要设置通信配置以及预绑定的Topic。
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
// 普通消息发送。
Message message = provider.newMessageBuilder()
.setTopic(topic)
// 设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
// 设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
// 消息体。
.setBody("messageBody".getBytes())
.build();
try {
// 发送消息,需要关注发送结果,并捕获失败等异常。
SendReceipt sendReceipt = producer.send(message);
logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (ClientException e) {
logger.error("Failed to send message", e);
}
// producer.close();
}
}
生产者
package com.werun.werunjuly.common.rcmq;
import java.io.IOException;
import java.util.Collections;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PushConsumerExample {
private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081
String endpoints = "localhost:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
// 订阅消息的过滤规则,表示订阅所有Tag的消息。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// 为消费者指定所属的消费者分组,Group需要提前创建。
String consumerGroup = "YourConsumerGroup";
// 指定需要订阅哪个目标Topic,Topic需要提前创建。
String topic = "TestTopic";
// 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// 设置消费者分组。
.setConsumerGroup(consumerGroup)
// 设置预绑定的订阅关系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// 设置消费监听器。
.setMessageListener(messageView -> {
// 处理消息并返回消费结果。
logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
// 如果不需要再使用 PushConsumer,可关闭该实例。
// pushConsumer.close();
}
}
消费者
最终呈现效果,多次重启生产者方法,部署成功
6.4 配置 RocketMQ 控制台(待整理)
docker 部署方式:
docker run -d --name rocketmq-dashboard -e "JAVA_OPTS=-Drocketmq.namesrv.addr=127.0.0.1:8081" -p 8091:8080 -t apacherocketmq/rocketmq-dashboard:latest
参考
【操作系统】进程通信的几种方式 https://blog.csdn.net/dl962454/article/details/117374916
消息队列MQ快速入门(概念、RPC、MQ实质思路、队列介绍、队列对比、应用场景)https://blog.csdn.net/Dean_xiu/article/details/119942872
王梓主席23年12月的RocketMQ讲稿
RCMQ 官方文档