前言
对于分布式消息中间件,首先要理解两个根底的概念,即什么是分布式系统,什么又是中间件。
分布式系统
“A distributed system is one in which components located at networked computers communicate and coordinate their actions only by passing messasges.”——《Distributed Systems Concepts and Design》
从下面这个解释能够失去分布式系统的两个特点:组件散布在网络计算机上组件之间通过音讯来协调口头
中间件
Middleware is computer software that provides services to software applications beyond those available from the operating system. It can be described as “software glue”. Middleware makes it easier for software developers to implement communication and input/output, so they can focus on the specific purpose of their application.——维基百科
中间件被形容为为应用程序提供操作系统所提供的服务之外的服务,简化应用程序的通信、输入输出的开发,使他们专一于本人的业务逻辑。从维基百科上对中间件的解释感觉有点绕,其实能够从“空间”的角度去了解中间件,即中间件是处于“中间层”的组件,是下层的应用程序和底层的服务之间的桥梁(比方DB中间件的下层是应用程序,底层是DB服务),也是利用与利用之间的桥梁(比方分布式服务组件)。
分布式消息中间件
“Message-oriented middleware (MOM) is software or hardware infrastructure supporting sending and receiving messages between distributed systems.”——维基百科
维基百科给出的消息中间件的定义是反对在分布式系统中发送和承受音讯的硬件或软件基础设施(对咱们这里探讨的范畴来说必定就是软件了)。
那么分布式消息中间件其实就是指消息中间件自身也是一个分布式系统。
消息中间件能做什么?
任何中间件必然都是要去解决特定畛域的某个问题,消息中间件解决的就是分布式系统之间消息传递的问题。消息传递是分布式系统必然要面对的一个问题。
简略概括一下消息中间件的利用场景大抵如下:
- 业务解耦:交易系统不须要晓得短信告诉服务的存在,只须要公布音讯
- 削峰填谷:比方上游零碎的吞吐能力高于上游零碎,在流量洪峰时可能会冲垮上游零碎,消息中间件能够在峰值时沉积音讯,而在峰值过来后上游零碎缓缓生产音讯解决流量洪峰的问题
- 事件驱动:零碎与零碎之间能够通过消息传递的模式驱动业务,以流式的模型解决
分布式消息中间件长什么样?
一个形象的对分布式消息中间件的认知大略是这样:
- 有一个SDK,提供给业务零碎发送、生产音讯的接口
- 有一批Server节点用于承受和存储音讯,并在适合的时候发送给上游的零碎进行生产
别嫌啰嗦,大抵介绍一下,不便上面的了解,本系列次要讲三个罕用的消息中间件,也就是Rabbitmq、RocketMq和Kafka,当然篇幅所限必定讲不完,只能挑比拟重要的货色写,但也能让不会的同学初步把握怎么去应用。
完整版的消息中间件学习材料和我集体整顿的笔记
能够间接点击蓝字支付
好了,话不多说,发车喽!
RabbitMQ除了像兔子一样跑的很快以外,还有这些特点:
- 开源、性能优良,稳定性保障
- 提供可靠性音讯投递模式、返回模式
- 与Spring AMQP完满整合,API丰盛
- 集群模式丰盛,表达式配置,HA模式,镜像队列模型
- 保证数据不失落的前提做到高可靠性、可用性
一、Rabbitmq音讯队列利用
1、RabbitMQ介绍
RabbitMQ是一款基于AMQP(音讯队列协定),由Erlang开发的开源音讯队列组件。是一款优良的音讯队列组件,他由两局部组成:服务端和客户端,客户端反对多种语言的驱动,如:.Net、JAVA、Erlang等。RabbitMQ与其余音讯队列组件性能比拟,在此不作介绍,网上有大把的材料。
2、RabbitMQ原理简介
RabbitMQ中间件分为服务端(RabbitMQ Server)和客户端(RabbitMQ Client),服务端能够了解为是一个音讯的代理消费者,客户端又分为音讯生产者(Producer)和音讯消费者(Consumer)。
2.1 音讯生产者(Producer):次要生产音讯并将音讯基于TCP协定,通过建设Connection和Channel,将音讯传输给RabbitMQ Server,对于Producer而言根本就实现了工作。
2.2 服务端(RabbitMQ Server):次要负责解决音讯路由、散发、入队列、缓存和入列。次要由三局部组成:Exchange、RoutingKey、Queue。
(1)Exchange:用于接管音讯生产者发送的音讯,有三种类型的exchange:direct, fanout,topic,不同类型实现了不同的路由算法;
A. direct exchange:将与routing key 比配的音讯,间接推入绝对应的队列,创立队列时,默认就创立同名的routing key。
B. fanout exchange:是一种播送模式,疏忽routingkey的规定。
C. topic exchange:利用主题,依据key进行模式匹配路由,例如:若为abc则推入到所有abc绝对应的queue;若为abc.#则推入到abc.xx.one ,abc.yy.two对应的queue。
(2)RoutingKey:是RabbitMQ实现路由散发到各个队列的规定,并联合Binging提供于Exchange应用将音讯推送入队列;
(3)Queue:是音讯队列,能够依据须要定义多个队列,设置队列的属性,比方:音讯移除、音讯缓存、回调机制等设置,实现与Consumer通信;
2.3 音讯消费者(Consumer):次要负责生产Queue的音讯,同样基于TCP协定,通过建设Connection和Channel与Queue传输音讯,一个音讯能够给多个Consumer生产;
2.4 要害名词阐明:Connection、Channel、Binging等;
(1)Connection:是建设客户端与服务端的连贯。
(2)Channel:是基于Connection之上建设通信通道,因为每次Connection建设TCP协定通信开销及性能耗费较大,所以一次建设Connection后,应用多个Channel通道通信缩小开销和进步性能。
(3)Binging:是一个捆绑定义,将exchange和queue捆绑,定义routingkey相干策略。
3、RabbitMQ装置部署
以上对RabbitMQ简介,接下来咱们通过理论搭建音讯队列服务实际。RabbitMQ服务端能运行于Window、Linux和Mac平台,客户端也反对多种技术的实现。本次咱们将在Linux之CentOS7平台搭建。
3.1 装置Erlang运行环境
因为RabbitMQ应用Erlang技术开发,所以须要先装置Erlang运行环境后,能力装置音讯队列服务。
(1)配置零碎能失常拜访公网,设置默认网关
`route add ``default` `gw 192.168.1.1`
(2)装置erlang
`su -c ``'rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-5.noarch.rpm'` `sudo yum install erlang`
(3)查看erlang是否装置胜利
`erl`
(4)装置胜利
3.2 装置RabbitMQ服务端
(1)下载安装包
`wget http:``//www.rabbitmq.com/releases/rabbitmq-server/v3.6.0/rabbitmq-server-3.6.0-1.noarch.rpm`
(2)装置和配置RabbitMQ服务端,3.6.0版本:
`rpm --import https:``//www.rabbitmq.com/rabbitmq-signing-key-public.asc` `yum install rabbitmq-server-3.6.0-1.noarch.rpm`
(3)启用web治理插件
`rabbitmq-plugins enable rabbitmq_management`
(4)启动RabbitMQ
`chkconfig rabbitmq-server ``on` `/sbin/service rabbitmq-server start`
(5)防火墙开明端口
`# firewall-cmd --permanent --zone=public --add-port=5672/tcp` `# firewall-cmd --permanent --zone=public --add-port=15672/tcp` `# firewall-cmd --reload`
(6)rabbitmq默认会创立guest账号,只能用于localhost登录页面管理员,本机拜访地址:
`rabbitmqctl add_user test test` `rabbitmqctl set_user_tags test administrator<br>rabbitmqctl set_permissions -p / test ``".*"` `".*"` `".*"`
RabbitMQ 管理员页面。
4、RabbitMQ利用
本章节形容,web利用生产的日志,通过rabbitmq传输,而后日志服务接管音讯队列的音讯。
本零碎采纳官网的Client,通过nuget援用。
4.1 Web利用生产业务日志
[HttpPost] public ActionResult Create() { this.HttpContext.Session["mysession"] = DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss"); var txt = Request.Form["txtSite"].ToString(); RabbitMQHelper helper = new RabbitMQHelper(); helper.SendMsg(txt + ",操作日志,工夫:" + DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss")); return RedirectToAction("Index"); } `}`
页面效果图。
4.2 日志服务接管日志音讯
基于window form开发一个日志解决服务,并将接管的音讯打印进去。
private void btnReceive_Click(object sender, EventArgs e) { isConnected = true; using (var channel = connection.CreateModel()) { channel.QueueDeclare("MyLog", false, false, false, null); var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume("MyLog", true, consumer); while (isConnected) { var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); var body = ea.Body; var message = Encoding.UTF8.GetString(body); txtMsg.Text += message + "\r\n"; } } }
4.3 RabbitMQ页面监控状况
RabbitMQ自带页面监控工具,通过此工具能够监控MQ的状况:
完整版的消息中间件学习材料和我集体整顿的笔记能够间接点击蓝字支付
二、Rabbitmq音讯确认机制
1、生产端 Confirm 音讯确认机制
音讯的确认,是指生产者投递音讯后,如果 Broker 收到音讯,则会给咱们生产者一个应答。生产者进行接管应答,用来确定这条音讯是否失常的发送到 Broker ,这种形式也是音讯的可靠性投递的外围保障!
Confirm 确认机制流程图
2、如何实现Confirm确认音讯?
- 第一步:在 channel 上开启确认模式:
channel.confirmSelect()
- 第二步:在 channel 上增加监听:
channel.addConfirmListener(ConfirmListener listener);
, 监听胜利和失败的返回后果,依据具体的后果对音讯进行从新发送、或记录日志等后续解决!
import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; public class ConfirmProducer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_confirm_exchange"; String routingKey = "item.update"; //指定音讯的投递模式:confirm 确认模式 channel.confirmSelect(); //发送 final long start = System.currentTimeMillis(); for (int i = 0; i < 5 ; i++) { String msg = "this is confirm msg "; channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); System.out.println("Send message : " + msg); } //增加一个确认监听, 这里就不敞开连贯了,为了能保障能收到监听音讯 channel.addConfirmListener(new ConfirmListener() { /** * 返回胜利的回调函数 */ public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("succuss ack"); System.out.println(multiple); System.out.println("耗时:" + (System.currentTimeMillis() - start) + "ms"); } /** * 返回失败的回调函数 */ public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.printf("defeat ack"); System.out.println("耗时:" + (System.currentTimeMillis() - start) + "ms"); } }); } }
`import com.rabbitmq.client.*; import java.io.IOException; public class ConfirmConsumer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(3000); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_confirm_exchange"; String queueName = "test_confirm_queue"; String routingKey = "item.#"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, false, false, false, null); //个别不必代码绑定,在治理界面手动绑定 channel.queueBind(queueName, exchangeName, routingKey); //创立消费者并接管音讯 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; //设置 Channel 消费者绑定队列 channel.basicConsume(queueName, true, consumer); } }
咱们此处只关注生产端输入音讯
Send message : this is confirm msg Send message : this is confirm msg Send message : this is confirm msg Send message : this is confirm msg Send message : this is confirm msg succuss ack true 耗时:3ms succuss ack true 耗时:4ms
注意事项
- 咱们采纳的是异步 confirm 模式:提供一个回调办法,服务端 confirm 了一条或者多条音讯后 Client 端会回调这个办法。除此之外还有单条同步 confirm 模式、批量同步 confirm 模式,因为事实场景中很少应用咱们在此不做介绍,如有趣味间接参考官网文档。
-
咱们运行生产端会发现每次运行后果都不一样,会有多种状况呈现,因为 Broker 会进行优化,有时会批量一次性 confirm ,有时会离开几条 confirm。
`succuss ack true 耗时:3ms succuss ack false 耗时:4ms 或者 succuss ack true 耗时:3ms`
3、Return 音讯机制
- Return Listener 用于解决一-些不可路 由的音讯!
- 音讯生产者,通过指定一个
Exchange
和Routingkey
,把音讯送达到某一个队列中去,而后咱们的消费者监听队列,进行生产解决操作! - 然而在某些状况下,如果咱们在发送音讯的时候,以后的 exchange 不存在或者指定的路由 key 路由不到,这个时候如果咱们须要监听这种不可达的音讯,就要应用
Return Listener !
- 在根底API中有一个要害的配置项:
Mandatory
:如果为true
,则监听器会接管到路由不可达的音讯,而后进行后续解决,如果为false
,那么 broker 端主动删除该音讯!
Return 音讯机制流程图
Return 音讯示例
- 首先咱们须要发送三条音讯,并且成心将第 0 条音讯的
routing Key
设置为谬误的,让他无奈失常路由到生产端。 mandatory
设置为true
路由不可达的音讯会被监听到,不会被主动删除.即channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes());
-
最初增加监听即可监听到不可路由到生产端的音讯
channel.addReturnListener(ReturnListener r))
`import com.rabbitmq.client.*; import java.io.IOException; public class ReturnListeningProducer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_return_exchange"; String routingKey = "item.update"; String errRoutingKey = "error.update"; //指定音讯的投递模式:confirm 确认模式 channel.confirmSelect(); //发送 for (int i = 0; i < 3 ; i++) { String msg = "this is return——listening msg "; //@param mandatory 设置为 true 路由不可达的音讯会被监听到,不会被主动删除 if (i == 0) { channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes()); } else { channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes()); } System.out.println("Send message : " + msg); } //增加一个确认监听, 这里就不敞开连贯了,为了能保障能收到监听音讯 channel.addConfirmListener(new ConfirmListener() { /** * 返回胜利的回调函数 */ public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("succuss ack"); } /** * 返回失败的回调函数 */ public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.printf("defeat ack"); } }); //增加一个 return 监听 channel.addReturnListener(new ReturnListener() { public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("return relyCode: " + replyCode); System.out.println("return replyText: " + replyText); System.out.println("return exchange: " + exchange); System.out.println("return routingKey: " + routingKey); System.out.println("return properties: " + properties); System.out.println("return body: " + new String(body)); } }); } }
`import com.rabbitmq.client.*; import java.io.IOException; public class ReturnListeningConsumer { public static void main(String[] args) throws Exception { //1\. 创立一个 ConnectionFactory 并进行设置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(3000); //2\. 通过连贯工厂来创立连贯 Connection connection = factory.newConnection(); //3\. 通过 Connection 来创立 Channel Channel channel = connection.createChannel(); //4\. 申明 String exchangeName = "test_return_exchange"; String queueName = "test_return_queue"; String routingKey = "item.#"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, false, false, false, null); //个别不必代码绑定,在治理界面手动绑定 channel.queueBind(queueName, exchangeName, routingKey); //5\. 创立消费者并接管音讯 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; //6\. 设置 Channel 消费者绑定队列 channel.basicConsume(queueName, true, consumer); } }
咱们只关注生产端后果,生产端只收到两条音讯。
`Send message : this is return——listening msg Send message : this is return——listening msg Send message : this is return——listening msg return relyCode: 312 return replyText: NO_ROUTE return exchange: test_return_exchange return routingKey: error.update return properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null) return body: this is return——listening msg succuss ack succuss ack succuss ack
4、生产端 Ack 和 Nack 机制
生产端进行生产的时候,如果因为业务异样咱们能够进行日志的记录,而后进行弥补!如果因为服务器宕机等重大问题,那咱们就须要手工进行ACK保障生产端生产胜利!生产端重回队列是为了对没有解决胜利的音讯,把音讯从新会递给Broker!个别咱们在理论利用中,都会敞开重回队列,也就是设置为False。
参考 api
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
void basicAck(long deliveryTag, boolean multiple) throws IOException;
如何设置手动 Ack 、Nack 以及重回队列
- 首先咱们发送五条音讯,将每条音讯对应的循环下标 i 放入音讯的
properties
中作为标记,以便于咱们在前面的回调办法中辨认。 - 其次, 咱们将生产端的 ·
channel.basicConsume(queueName, false, consumer);
中的autoAck
属性设置为false
,如果设置为true
的话 将会失常输入五条音讯。 - 咱们通过
Thread.sleep(2000)
来延时一秒,用以看清后果。咱们获取到properties
中的num
之后,通过channel.basicNack(envelope.getDeliveryTag(), false, true);
将num
为0的音讯设置为 nack,即生产失败,并且将requeue
属性设置为true
,即生产失败的音讯重回队列末端。
`import com.rabbitmq.client.*; import java.util.HashMap; import java.util.Map; public class AckAndNackProducer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_ack_exchange"; String routingKey = "item.update"; String msg = "this is ack msg"; for (int i = 0; i < 5; i++) { Map<String, Object> headers = new HashMap<String, Object>(); headers.put("num" ,i); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .deliveryMode(2) .headers(headers) .build(); String tem = msg + ":" + i; channel.basicPublish(exchangeName, routingKey, true, properties, tem.getBytes()); System.out.println("Send message : " + msg); } channel.close(); connection.close(); } }
import com.rabbitmq.client.*; import java.io.IOException; public class AckAndNackConsumer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(3000); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); String exchangeName = "test_ack_exchange"; String queueName = "test_ack_queue"; String routingKey = "item.#"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, false, false, false, null); //个别不必代码绑定,在治理界面手动绑定 channel.queueBind(queueName, exchangeName, routingKey); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } if ((Integer) properties.getHeaders().get("num") == 0) { channel.basicNack(envelope.getDeliveryTag(), false, true); } else { channel.basicAck(envelope.getDeliveryTag(), false); } } }; //6\. 设置 Channel 消费者绑定队列 channel.basicConsume(queueName, false, consumer); } }
咱们此处只关怀生产端输入,能够看到第 0 条生产失败从新回到队列尾部生产。
[x] Received 'this is ack msg:1' [x] Received 'this is ack msg:2' [x] Received 'this is ack msg:3' [x] Received 'this is ack msg:4' [x] Received 'this is ack msg:0' [x] Received 'this is ack msg:0' [x] Received 'this is ack msg:0' [x] Received 'this is ack msg:0' [x] Received 'this is ack msg:0'
三、Rabbitmq镜像队列
1、 镜像队列的设置
镜像队列的配置通过增加policy实现,policy增加的命令为:
rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
-p Vhost: 可选参数,针对指定vhost下的queue进行设置
Name: policy的名称
Pattern: queue的匹配模式(正则表达式)
Definition: 镜像定义,包含三个局部 ha-mode,ha-params,ha-sync-mode
ha-mode: 指明镜像队列的模式,有效值为 all/exactly/nodes
-
all示意在集群所有的节点上进行镜像
- exactly示意在指定个数的节点上进行镜像,节点的个数由ha-params指定
- nodes示意在指定的节点上进行镜像,节点名称通过ha-params指定
ha-params: ha-mode模式须要用到的参数
ha-sync-mode: 镜像队列中音讯的同步形式,有效值为automatic,manually
Priority: 可选参数, policy的优先级
例如,对队列名称以hello结尾的所有队列进行镜像,并在集群的两个节点上实现镜像,policy的设置命令为:
rabbitmqctl set_policy hello-ha "^hello" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
2、 镜像队列的大略实现
2.1 整体介绍
通常队列由两局部组成:一部分是amqqueue_process,负责协定相干的音讯解决,即接管生产者公布的音讯、向消费者投递音讯、解决音讯confirm、acknowledge等等;另一部分是backing_queue,它提供了相干的接口供amqqueue_process调用,实现音讯的存储以及可能的长久化工作等。
镜像队列同样由这两局部组成,
- amqqueue_process仍旧进行协定相干的音讯解决
- backing_queue则是由master节点和slave节点组成的一个非凡的backing_queue
- master节点和slave节点都由一组过程组成,一个负责音讯播送的gm,一个负责对gm收到的播送音讯进行回调解决。
- master节点上回调解决是coordinator
- slave节点上则是mirror_queue_slave。mirror_queue_slave中蕴含了一般的backing_queue进行音讯的存储
- master节点中backing_queue蕴含在mirror_queue_master中由amqqueue_process进行调用。
留神:音讯的公布与生产都是通过master节点实现。master节点对音讯进行解决的同时将音讯的解决动作通过gm播送给所有的slave节点,slave节点的gm收到音讯后,通过回调交由mirror_queue_slave进行理论的解决。
2.2 gm(Guaranteed Multicast)
传统的主从复制形式:由master节点负责向所有slave节点发送须要复制的音讯,在复制过程中,如果有slave节点出现异常,master节点须要作出相应的解决;如果是master节点自身呈现问题,那么slave节点间可能会进行通信决定本次复制是否持续。当然为了解决各种异常情况,整个过程中的日志记录是免不了的。
然而rabbitmq中并没有采纳这种形式,而是将所有的节点造成一个循环链表,每个节点都会监控位于本人左右两边的节点,当有节点新增时,相邻的节点保障以后播送的音讯会复制到新的节点上;当有节点生效时,相邻的节点会接管保障本次播送的音讯会复制到所有节点。
在master节点和slave节点上的这些gm造成一个group,group的信息会记录在mnesia中。不同的镜像队列造成不同的group。
音讯从master节点对应的gm收回后,顺着链表顺次传送到所有节点,因为所有节点组成一个循环链表,master节点对应的gm最终会收到本人发送的音讯,这个时候master节点就晓得音讯曾经复制到所有slave节点了。
2.3 重要的表构造
rabbit_queue表记录队列的相干信息:
-record(amqqueue, { name, %%队列的名称 durable, %%标识队列是否长久化 auto_delete, %%标识队列是否主动删除 exclusive_owner, %%标识是否独占模式 arguments, %%队列创立时的参数 pid, %%amqqueue_process过程PID slave_pids, %%mirror_queue_slave过程PID汇合 sync_slave_pids, %%已同步的slave过程PID汇合 po<i style="color:transparent">来源gaodai$ma#com搞$$代**码网</i>licy, %%与队列无关的policy %%通过set_policy设置,没有则为undefined gm_pids, %%{gm,mirror_queue_coordinator},{gm,mirror_queue_slave}过程PID汇合 decorator %% }).
留神:slave_pids的存储是依照slave退出的工夫来排序的,以便master节点生效时,晋升”资格最老”的slave节点为新的master。
gm_group表记录gm造成的group的相干信息:
-record(gm_group, { name, %%group的名称,与queue的名称统一 version, %%group的版本号, 新增节点/节点生效时会递增 members, %%group的成员列表, 依照节点组成的链表程序进行排序 }).
3、镜像队列的一些细节
3.1 新增节点
slave节点先从gm_group中获取对应group的所有成员信息,而后随机抉择一个节点并向这个节点发送申请,这个节点收到申请后,更新gm_group对应的信息,同时告诉左右节点更新街坊信息(调整对左右节点的监控)及以后正在播送的音讯,而后回复告诉申请节点胜利退出group。申请退出group的节点收到回复后再更新rabbit_queue中的相干信息,并依据须要进行音讯的同步。
3.2 音讯的播送
音讯从master节点收回,顺着节点链表发送。在这期间,所有的slave节点都会对音讯进行缓存,当master节点收到本人发送的音讯后,会再次播送ack音讯,同样ack音讯会顺着节点链表通过所有的slave节点,其作用是告诉slave节点能够革除缓存的音讯,当ack音讯回到master节点时对应播送音讯的生命周期完结。
下图为一个简略的示意图,A节点为master节点,播送一条内容为”test”的音讯。”1″示意音讯为播送的第一条音讯;”id=A”示意音讯的发送者为节点A。左边是slave节点记录的状态信息。
为什么所有的节点都须要缓存一份公布的音讯呢?
master公布的音讯是顺次通过所有slave节点,在这期间的任何时刻,有可能有节点生效,那么相邻的节点可能须要从新发送给新的节点。例如,A->B->C->D->A造成的循环链表,A为master节点,播送音讯发送给节点B,B再发送给C,如果节点C收到B发送的音讯还未发送给D时异样完结了,那么节点B感知后节点C生效后须要从新将音讯发送给D。同样,如果B节点将音讯发送给C后,B,C节点中新增了E节点,那么B节点须要再将音讯发送给新增的E节点。
gm的状态记录:
-record(state, { self, %%gm自身的ID left, %%该节点右边的节点 right, %%该节点左边的节点 group_name, %%group名称 与队列名统一 module, %%回调模块 rabbit_mirror_queue_slave或者 %%rabbit_mirror_queue_coordinator view, %%group成员列表视图信息 %%记录了成员的ID及每个成员的左右街坊节点 pub_count, %%以后已公布的音讯计数 members_state, %%group成员状态列表 记录了播送状态:[#member{}] callback_args, %%回调函数的参数信息 %%rabbit_mirror_queue_slave/rabbit_mirror_queue_coordinator过程PID confirms, %%confirm列表 broadcast_buffer, %%缓存待播送的音讯 broadcast_timer, %%播送音讯定时器 txn_executor }). -record(member, { pending_ack, %%待确认的音讯,也就是已公布的音讯缓存的中央 last_pub, %%最初一次公布的音讯计数 last_ack %%最初一次确认的音讯计数 }).
3.3 节点的生效
当slave节点生效时,仅仅是相邻节点感知,而后从新调整街坊节点信息、更新rabbit_queue、gm_group的记录等。如果是master节点生效,”资格最老”的slave节点被晋升为master节点,slave节点会创立出新的coordinator,并告知gm批改回调解决为coordinator,原来的mirror_queue_slave充当amqqueue_process解决生产者公布的音讯,向消费者投递音讯等。
下面提到如果是slave节点生效,只有相邻的节点能感知到,那么master节点生效是不是也是只有相邻的节点能感知到?如果是这样的话,如果相邻的节点不是”资格最老”的节点,怎么告诉”资格最老”的节点晋升为新的master节点呢?
实际上,所有的slave节点在退出group时,mirror_queue_slave过程会对master节点的amqqueue_process过程(也可能是mirror_queue_slave过程)进行监控,如果master节点生效的话,mirror_queue_slave会感知,而后再通过gm进行播送,这样所有的节点最终都会晓得master节点生效。当然,只有”资格最老”的节点会晋升本人为新的master。
另外,在slave晋升为master时,mirror_queue_slave
外部来了一次”移花接木”,即本来须要回调mirror_queue_slave
的handle_call/handle_info/handle_cast
等接口进行解决的音讯,全副改为调用amqqueue_process
的handle_call/handle_info/handle_cast
等接口,从而能够解释下面说的,mirror_queue_slave
过程充当了amqqueue_process
实现协定相干的音讯的解决。
rabbit_mirror_queue_slave.erl handle_call({gm_deaths,LiveGMPids},From, State = #state{q = Q = #amqqueue{name=QName,pid=MPid}})-> Self = self(), case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, LiveGMPids) of {ok,Pid,DeadPids} -> case Pid of MPid -> %% master hasn't changed gen_server2:reply(From, ok), noreply(State); Self -> %% we've become master QueueState = promote_me(From,State), {become, %% 改由rabbit_amqqueue_process模块解决音讯 rabbit_amqqueue_process, QueueState, hibernate}; ... gen_server2.erl handle_common_reply(Reply,Msg,GS2State = #gs2_state{name=Name, debug=Debug})-> case Reply of ... {become, Mod, NState, Time1} -> Debug1=common_become(Name,Mod,NState,Debug), loop(find_prioritisers( GS2State#gs2_state{mod=Mod, state=NState, time=Time1, debug=Debug1})); ... handle_msg({'gen_call',From,Msg}, GS2State=#gs2_state{mod=Mod, state=State, name=Name, debug=Debug}) -> case catch Mod:handle_call(Msg, From, State) of ... handle_msg(Msg,GS2State=#gs2_state{mod=Mod,state=State})-> Reply = (catch dispatch(Msg,Mod,State)), handle_common_reply(Reply, Msg, GS2State). dispatch({'$gen_cast',Msg},Mod,State)-> Mod:handle_cast(Msg, State); dispatch(Info, Mod, State)-> Mod:handle_info(Info,State).
4、音讯的同步
配置镜像队列的时候有个ha-sync-mode属性,这个有什么用呢?
新节点退出到group后,最多能从右边节点获取到以后正在播送的音讯内容,退出group之前曾经播送的音讯则无奈获取到。如果此时master节点可怜生效,而新节点有恰好成为了新的master,那么退出group之前曾经播送的音讯则会全副失落。
留神:这里的音讯具体是指新节点退出前曾经公布并复制到所有slave节点的音讯,并且这些音讯还未被消费者生产或者未被消费者确认。如果新节点退出前,所有播送的音讯被消费者生产并确认了,master节点删除音讯的同时会告诉slave节点实现相应动作。这种状况等同于新节点退出前没有公布任何音讯。
防止这种问题的解决办法就是对新的slave节点进行音讯同步。当ha-sync-mode配置为主动同步(automatic)时,新节点退出group时会主动进行音讯的同步;如果配置为manually则须要手动操作实现同步
就先写到这把,原本是想一篇文把三个中间件都写了的,没想到人不知;鬼不觉写了这么多我都感觉Rabbitmq还有很多货色还没写到,前面会再写两篇专门讲一下 RocketMq和kafka,感兴趣的敌人能够给我点个关注。
完整版的消息中间件学习材料和我集体整顿的笔记间接点击蓝字支付
如果能够点个赞就更好了,你说呢
end