2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一 。
优点:由于 erlang 语言的高并发特性 ,性能较好;吞吐量到万级 ,MQ 功能比较完备,健壮、稳定、易用、跨平台、支持多种语言 如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持 AJAX 文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃度高;更新频率相当高 https://www.rabbitmq.com/news.html
缺点:商业版需要收费,学习成本较高
结合erlang 语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高 ,管理界面用起来十分方便,如果你的数据量没有那么大 ,中小型公司优先选择功能比较完备的 RabbitMQ。
主要概念 生产者
产生数据发送消息的程序是生产者
交换机
交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
队列
队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
消费者
消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
6种工作模式 和原理
1、 消息产生着§将消息放入队列;
2、 消息的消费者(consumer)监听(while)消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)应用场景:聊天(中间有一个过度的服务器;p端,c端);
1、 消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2,同时监听同一个队列,消息被消费?C1C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患,高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize,与同步锁的性能不一样)保证一条消息只能被一个消费者使用);
2、 应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢);
publish/subscribe发布订阅(共享资源)
1、 X代表交换机rabbitMQ内部组件,erlang消息产生者是代码完成,代码的执行效率不高,消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费;
2、 相关场景:邮件群发,群聊天,广播(广告);
1、 消息生产者将消息发送给交换机按照路由判断,路由是字符串(info)当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;;
2、 根据业务功能定义路由字符串;
3、 从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中业务场景:error通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误;;
1、 星号井号代表通配符;2、 星号代表多个单词,井号代表一个单词;3、 路由功能添加模糊匹配;4、 消息产生者产生消息,把消息交给交换机;5、 交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费;
1、 客户端即是生产者也是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列;
2、 服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果;
3、 服务端将RPC方法的结果发送到RPC响应队列;
4、 客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果
Broker :接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
Virtual host :出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等
Connection :publisher/consumer 和 broker 之间的 TCP 连接
Channel :如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的Connection 极大减少了操作系统建立 TCP connection 的开销
Exchange :message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout(multicast)
Queue :消息最终被送到这里等待 consumer 取走
Binding :exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
简单工作模式
在上图的模型中,有以下概念:
P:生产者,也就是要发送消息的程序
C:消费者:消息的接受者,会一直等待消息到来。
queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
1、导入相关依赖 1 2 3 4 5 6 7 8 9 10 11 12 13 14 <dependencies > <dependency > <groupId > com.rabbitmq</groupId > <artifactId > amqp-client</artifactId > <version > 5.9.0</version > </dependency > <dependency > <groupId > commons-io</groupId > <artifactId > commons-io</artifactId > <version > 2.6</version > </dependency > </dependencies >
2、编写生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 package com.zww.spring.rabbitmq.one;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Producer { public static final String QUEUE_NAME = "hello" ; public static void main (String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory (); factory.setHost("192.168.137.4" ); factory.setPort(5672 ); factory.setUsername("admin" ); factory.setPassword("123" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false ,false ,false ,null ); String message = "hello world" ; channel.basicPublish("" ,QUEUE_NAME,null ,message.getBytes()); System.out.println("消息发送完毕" ); } }
3、运行生产者查看rabbitMQ管理界面
4、编写消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 package com.zww.spring.rabbitmq.one;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Consumer { public static final String QUEUE_NAME = "hello" ; public static void main (String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory (); factory.setHost("192.168.137.4" ); factory.setPort(5672 ); factory.setUsername("admin" ); factory.setPassword("123" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); DeliverCallback deliverCallback = (consumerTag,message) -> { System.out.println(new String (message.getBody())); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("消息消费被中断" ); }; channel.basicConsume(QUEUE_NAME,true ,deliverCallback,cancelCallback); } }
5、消费者消费生产者的消息
work queues工作模式 Work queues ,也被称为(Task queues ),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息 。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
轮询分发消息
一个生产者发送消息,由多个工作线程(消费者)轮询接收
在这个案例中我们会启动两个工作线程,一个消息发送线程,我们来看看他们两个工作线程是如何工作的。
编写工具类,提取重复代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 package com.zww.spriong.rabbitmq.utils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class RabbitMQUtils { public static Channel getChannel () throws Exception{ ConnectionFactory factory = new ConnectionFactory (); factory.setHost("192.168.137.4" ); factory.setPort(5672 ); factory.setUsername("admin" ); factory.setPassword("123" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
编写消息发送线程,在控制台输入发送的消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package com.zww.spring.rabbitmq.two;import com.rabbitmq.client.Channel;import com.zww.spring.rabbitmq.utils.RabbitMQUtils;import java.util.Scanner;public class Task01 { public static final String QUEUE_NAME = "hello02" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.queueDeclare(QUEUE_NAME,false ,false ,false ,null ); Scanner scanner = new Scanner (System.in); while (scanner.hasNext()){ String message = scanner.next(); channel.basicPublish("" ,QUEUE_NAME,null ,message.getBytes()); System.out.println("发送消息完成" +message); } } }
编写两个接收消息的工作线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 package com.zww.spring.rabbitmq.two;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.zww.spring.rabbitmq.utils.RabbitMQUtils;public class Worker01 { public static final String QUEUE_NAME = "hello02" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag,message) ->{ System.out.println("接收到的消息" +new String (message.getBody())); }; CancelCallback cancelCallback = (consumerTag) -> { System.out.println(consumerTag+":消息取消消费接口回调逻辑" ); }; System.out.println("C1等待接收消息" ); channel.basicConsume(QUEUE_NAME,true ,deliverCallback,cancelCallback); } } package com.zww.spring.rabbitmq.two;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.zww.spring.rabbitmq.utils.RabbitMQUtils;public class Worker02 { public static final String QUEUE_NAME = "hello02" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) ->{ System.out.println("接收到的消息" +new String (message.getBody())); }; CancelCallback cancelCallback = (consumerTag) -> { System.out.println(consumerTag+":消息取消消费接口回调逻辑" ); }; System.out.println("C2等待接收消息" ); channel.basicConsume(QUEUE_NAME,true ,deliverCallback,cancelCallback); } }
先启动消息发送线程创建hello02信道,再启动两个接收消息的工作线程
在消息发送线程控制台输入以下内容
查看两个工作线程分别接收到的消息
哪个线程先启动,哪个最先接收消息!
消息应答 概念:消费者消费完一条消息可能需要等待一段时间,但如果这段时间内消费者在未完成消费信息的情况下时就挂掉了,这时候会怎么样?RabbitMQ一旦向消费者传递一条消息,该消息就会被标记为删除,这种情况下消费者挂掉了正在处理的消息就会丢失,为了保证消息在发送的过程中不会丢失,RabbitMQ引入了应答机制,即在消费者接收并处理了该条消息后告诉RabbitMQ它已经把该条消息处理了,RabbitMQ可以把这条消息删除了。
1、 自动应答;
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,这种模式下万一消费者的连接或信道关闭,消息就丢失了,不过这种模式对传递的消息数量没有限制,但如果消息太多太大,消费者来不及消费,也可能出现消息的堆积导致内存耗尽,最终消费者程序被操作系统杀死的情况,所以这种模式只能在消费者可以高效的、高速率的处理消息的前提下使用。
2、 手动应答;
以下方法用于手动应答
(1)channel.basicAck()(用于肯定确认,即向RabbitMQ表示该消息已经发送并处理成功了,可以将其丢弃)
(2)channel.basicNack()(用于否定确认,即不处理该信息直接丢弃)
(3)channel.basicReject()(用于否定确认,即不处理该信息直接丢弃,比basicNack方法少一个Multiple参数)
3、 Multiple参数解释;
channel.basicNack(deliveryTag,true)(第二个参数就是Multiple参数)
multiple的true和false的区别:
(1)true表示批量应答channel上未应答的消息,比如channel上有传送tag为5,6,7,8的消息,当前tag是8,那么此时5-8还未应答的消息就会被确认收到消息应答,但如果处理6或7消息失败了,5也会被应答,导致5消息丢失,所以一般情况下multiple为false。
(2)false表示只会应答tag=8的消息,5,6,7这三个消息依然不会被确认收到消息应答
4、 消息重新入队;
如果消费者由于某些原因失去连接,导致消费者未成功发送ACK确认应答,RabbitMQ将会对未完全处理完的消息重新入队,如果其他消费者可以处理,则该消息将被分配到另一个消费者,从而保证消息未丢失。
5、 在utils包下新建一个名为SleepUtils的类,该类的方法能让线程睡眠指定的时间,用于模拟业务的处理时间,代码如下;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 package com.ken.utils;public class SleepUtils { public static void sleep (int second) { try { Thread.sleep(1000 * second); } catch (InterruptedException e) { e.printStackTrace(); } } }
效果图:
6、 使用代码实现消息手动应答,为此先新建一个名为ack的包,用于装消息手动应答的代码;
效果图:
7、 新建一个名为Task02的类,用作充当生产者,代码如下;
注:RabbitMqUtils工具类的实现在我的另一篇文章里,有需要的同学可以查看参考
https://blog.csdn.net/m0_64284147/article/details/129465871
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 package com.ken.ack;import com.ken.utils.RabbitMqUtils;import com.rabbitmq.client.Channel;import java.util.Scanner;public class Task02 { public static final String QUEUE_NAME = "ack_queue" ; public static void main (String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(QUEUE_NAME,false ,false ,false ,null ); Scanner scanner = new Scanner (System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("" ,QUEUE_NAME,null ,message.getBytes()); System.out.println("消息发送成功:" + message); } } }
效果图:
8、 新建一个名为Worker03的类,用作充当消费者一号,代码如下;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 package com.ken.ack;import com.ken.utils.RabbitMqUtils;import com.ken.utils.SleepUtils;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;public class Worker03 { public static final String QUEUE_NAME = "ack_queue" ; public static void main (String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> { SleepUtils.sleep(1 ); System.out.println("接收的消息:" + new String (message.getBody())); channel.basicAck(message.getEnvelope().getDeliveryTag(),false ); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("取消消费消息:" + consumerTag); }; System.out.println("Work03等待接收消息..." ); channel.basicConsume(QUEUE_NAME,false ,deliverCallback,cancelCallback); } }
效果图:
9、 新建一个名为Worker04的类,用作充当消费者二号,代码如下;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 package com.ken.ack;import com.ken.utils.RabbitMqUtils;import com.ken.utils.SleepUtils;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;public class Worker04 { public static final String QUEUE_NAME = "ack_queue" ; public static void main (String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> { SleepUtils.sleep(30 ); System.out.println("接收的消息:" + new String (message.getBody())); channel.basicAck(message.getEnvelope().getDeliveryTag(),false ); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("取消消费消息:" + consumerTag); }; System.out.println("Work04等待接收消息..." ); channel.basicConsume(QUEUE_NAME,false ,deliverCallback,cancelCallback); } }
效果图:
10、 分别先后启动Task02、Worker03、Worker04;
例:
11、 正常的在Task02输入消息,观察消息的被消费情况;
(1)在Task02分别输入第一条和第二条消息
(2)等待1秒后第一条消息被Work03消费
(3)等待30秒后第二条消息被Work04消费
12、 再次在Task02输入消息,然后手动暂停Worker04用以模拟Worker04消费者宕机的情况,观察消息的被消费情况;
(1)在Task02分别输入第三条和第四条消息
(2)手动停掉Worker04,模拟Worker04宕机的情况
(3)Worker04宕机后没有成功消费掉第四条消息,然后没有对消息进行应答,导致第四条消息重新入队,然后被Worker03消费掉
持久化 概念:在上一章文章中我们演示了消费者宕机的情况下消息没有被消费成功后会重新入队,然后再被消费,但如何保障RabbitMQ服务停掉的情况下,生产者发过来的消息不会丢失,这时候我们为了消息不会丢失就需要将队列和消息都标记为持久化。
1、 实现RabbitMQ队列持久化;
只需要把queueDeclare方法的第二个参数改为true即可对Queue进行持久化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 package com.ken;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class Producer { public static final String QUEUE_NAME = "durable_queue" ; public static void main (String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory (); factory.setHost("192.168.194.150" ); factory.setUsername("admin" ); factory.setPassword("123456" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true ,false ,false ,null ); String message = "Hello World" ; channel.basicPublish("" ,QUEUE_NAME,null ,message.getBytes()); System.out.println("消息发送成功!" ); } }
效果图:
只要Features这个属性的值为D,则证明队列持久化成功
2、 实现RabbitMQ消息持久化;
只需要往basicPublish方法的第三个参数传MessageProperties.PERSISTENT_TEXT_PLAIN,即可对消息进行持久化这个参数能告诉RabbitMQ将消息保存到磁盘里进行持久化处理,但值得注意的是将消息标记为持久化不能完全保证消息不会丢失,因为存在消息刚准备存储到磁盘里,但未完全存储完的时间间隔,这时候如果宕机了就不能保证消息真正的写入磁盘重从而实现持久化,但对于简单任务队列而言,这种持久化策略已经够用了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 package com.ken;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.MessageProperties;public class Producer { public static final String QUEUE_NAME = "durable_queue" ; public static void main (String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory (); factory.setHost("192.168.194.150" ); factory.setUsername("admin" ); factory.setPassword("123456" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false ,false ,false ,null ); String message = "Hello World" ; channel.basicPublish("" ,QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes()); System.out.println("消息发送成功!" ); } }
其他:
1、 出现报错信息:;
1 Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method:method<channel.close>(reply-code=406 , reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'durable_queue' in vhost '/' : received 'true' but current is 'false' , class-id=50 , method-id=10 )
原因:
当前队列是未持久化的,需要删除队列然后改成持久化才能重新生效
删除队列:
(1)点击要删除的队列
(2)找到Delete Queue的按钮
(3)点击确认删除
发布确认模式 概念:虽然我们可以设置队列和队列中的消息持久化,但任然存在消息在持久化的过程中,即在写入磁盘的过程中,消息未完全写入,然后服务器宕机导致消息丢失的情况,发布确认就是为了解决这种情况的概念,在消息完全写入磁盘后才确认消息完全持久化了
1、 发布确认模式:;
(1)单个确认发布模式(简单,但吞吐量有限)
(2)批量确认发布模式(简单,吞吐量合理,但出现问题很难找出是那条消息出现的问题)
(3)异步确认发布模式(最佳性能和资源使用,在出现错误的情况下能很好的控制,推荐使用)
2、 实现开启发布确认;
在生产者的代码中在channel调用confirmSelect方法,即channel.confirmSelect()
注:RabbitMqUtils工具类的实现在我的另一篇文章里,有需要的同学可以查看参考
https://blog.csdn.net/m0_64284147/article/details/129465871
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 package com.ken.ack;import com.ken.utils.RabbitMqUtils;import com.rabbitmq.client.Channel;import java.util.Scanner;public class Task02 { public static final String QUEUE_NAME = "my_queue" ; public static void main (String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); channel.confirmSelect(); channel.queueDeclare(QUEUE_NAME,false ,false ,false ,null ); Scanner scanner = new Scanner (System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("" ,QUEUE_NAME,null ,message.getBytes()); System.out.println("消息发送成功:" + message); } } }
3、 新建包,用于装实现确认发布模式的代码;
(1)新建一个名为confirm的包,用于装发布确认的代码
效果图:
(2)新建一个名为ConfirmMessage的类
4、 单个确认发布模式;
单个确认发布是一种同步确认发布的方式,在发布一个消息后并且该条消息被确认发布了,后续的消息才能继续发布,不过这种确认方式的最大缺点就是发布速度特别慢
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 package com.ken.confirm;import com.ken.utils.RabbitMqUtils;import com.rabbitmq.client.Channel;public class ConfirmMessage { public static final String QUEUE_NAME = "my_queue" ; public static final int MESSAGE_COUNT = 1000 ; public static void main (String[] args) throws Exception{ ConfirmMessage.publishMessageIndividually(); } public static void publishMessageIndividually () throws Exception{ Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(QUEUE_NAME,false ,false ,false ,null ); channel.confirmSelect(); long begin = System.currentTimeMillis(); for (int i = 0 ; i < MESSAGE_COUNT; i++) { String message = i + "" ; channel.basicPublish("" ,QUEUE_NAME,null ,message.getBytes()); channel.waitForConfirms(); } long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "个单个确认消息,耗时" + (end - begin) + "ms" ); } }
运行代码,查看单个确认发布模式消耗的时间
5、 批量确认发布模式;
批量确认发布是一种能极大的提高吞吐量的发布模式,在发布一批消息后一起确认,不过这种确认方式的缺点是当发送故障导致发布出现问题时,不知道是哪个消息出现的问题
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 package com.ken.confirm;import com.ken.utils.RabbitMqUtils;import com.rabbitmq.client.Channel;public class ConfirmMessage { public static final String QUEUE_NAME = "my_queue" ; public static final int MESSAGE_COUNT = 1000 ; public static void main (String[] args) throws Exception{ ConfirmMessage.publishMessageBatch(); } public static void publishMessageBatch () throws Exception{ Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(QUEUE_NAME,false ,false ,false ,null ); channel.confirmSelect(); long begin = System.currentTimeMillis(); int batchSize = 100 ; for (int i = 0 ; i < MESSAGE_COUNT; i++) { String message = i + "" ; channel.basicPublish("" ,QUEUE_NAME,null ,message.getBytes()); if (i % batchSize == 0 ) { channel.waitForConfirms(); } } long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms" ); } }
运行代码,查看批量确认发布模式消耗的时间
6、 异步确认发布模式;
(1)代码实现
异步确认发布实现逻辑比上面两种要复杂,但性价比高,无论是可靠性还是效率都非常突出,异步确认发布通过回调函数来达到消息可靠性传递,消息的结构类似于map,都是key-value的结构,当相应的消息被消费了或消费失败了,都可以通过对应的key值来确认消费或消费失败的是哪一条消息,所以可靠性很高
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 package com.ken.confirm;import com.ken.utils.RabbitMqUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.ConfirmCallback;public class ConfirmMessage { public static final String QUEUE_NAME = "my_queue" ; public static final int MESSAGE_COUNT = 1000 ; public static void main (String[] args) throws Exception{ ConfirmMessage.publishMessageAsync(); } public static void publishMessageAsync () throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(QUEUE_NAME,false ,false ,false ,null ); channel.confirmSelect(); long begin = System.currentTimeMillis(); ConfirmCallback ackCallback = (deliveryTag, multiple) -> { System.out.println("确认的消息:" + deliveryTag); }; ConfirmCallback nackCallback = (deliveryTag, multiple) -> { System.out.println("未确认的消息:" + deliveryTag); }; channel.addConfirmListener(ackCallback,nackCallback); for (int i = 0 ; i < MESSAGE_COUNT; i++) { String message = "消息" + i; channel.basicPublish("" ,QUEUE_NAME,null ,message.getBytes()); } long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "个异步发布确认消息,耗时" + (end - begin) + "ms" ); } }
(2)运行代码,查看异步确认发布模式消耗的时间
这里因为是异步的,即代码执行完并输出耗时时间了,但消息监听器还在运行,所以还在时间输出后还在输出确认的消息
(3)处理异步未确认消息
已确认的消息没必要处理,而未确认的消息需要进行重新入队的处理,但由上述步骤(2)的效果图可看出程序在执行完后监听器还在监听消息是否确认成功,而要怎么做才能在程序执行完后再处理监听器监听出来未确认的消息呢?最好的解决方案便是把未确认的消息放在一个基于内存的能被发布线程访问到的队列里,例如就用ConcurrentSkipListMap这个集合在confirm(发布确认)、 callbacks(回调)与发布线程之间进行消息的传递;实现的思路是先用ConcurrentSkipListMap记录发送的所有消息,然后监听器监听消息,确认消息成功后会执行消息确认成功的回调函数,而回调函数执行删除ConcurrentSkipListMap集合里当前被确认的消息的操作,最后ConcurrentSkipListMap里剩下的就是未确认成功的消息
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 package com.ken.confirm;import com.ken.utils.RabbitMqUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.ConfirmCallback;import java.util.concurrent.ConcurrentNavigableMap;import java.util.concurrent.ConcurrentSkipListMap;public class ConfirmMessage { public static final String QUEUE_NAME = "my_queue" ; public static final int MESSAGE_COUNT = 1000 ; public static void main (String[] args) throws Exception{ ConfirmMessage.publishMessageAsync(); } public static void publishMessageAsync () throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(QUEUE_NAME,false ,false ,false ,null ); channel.confirmSelect(); ConcurrentSkipListMap<Long,String> outStandingConfirms = new ConcurrentSkipListMap <>(); long begin = System.currentTimeMillis(); ConfirmCallback ackCallback = (deliveryTag, multiple) -> { if (multiple) { ConcurrentNavigableMap<Long, String> confirmed = outStandingConfirms.headMap(deliveryTag); System.out.println("确认的消息:" + deliveryTag); confirmed.clear(); }else { outStandingConfirms.remove(deliveryTag); } }; ConfirmCallback nackCallback = (deliveryTag, multiple) -> { String message = outStandingConfirms.get(deliveryTag); System.out.println("未确认的消息:" + message + "未确认消息的tag:" + deliveryTag); }; channel.addConfirmListener(ackCallback,nackCallback); for (int i = 0 ; i < MESSAGE_COUNT; i++) { String message = "消息" + i; channel.basicPublish("" ,QUEUE_NAME,null ,message.getBytes()); outStandingConfirms.put(channel.getNextPublishSeqNo(),message); } long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "个异步发布确认消息,耗时" + (end - begin) + "ms" ); } }
效果图:
交换机(exchange) 1、 交换机概念;
生产者生产的消息从不会直接发送到队列,生产者只能把消息发送到交换机(Exchange),交换机接收来着生产者的消息,另一方面把消息推入队列,交换机必须知道如何处理收到的消息,是应该把这些消息放进特定的队列里还是放到多个队列里还是把消息丢弃,这些操作都由交换机的类型来决定
例:
交换机能把一个消息分别发送到多个个队列里,从而实现多个消费者对一个消息进行多次消费
2、 交换机的类型;
直接(direct)、主题(topic)、标题(headers)、扇出(fanout)、无名
注意:channel.basicPublish方法的第一个参数就是交换机的名称,如果交换机的名称是空字符串,例如channel.basicPublish(“”,队列名称,null,message.getBytes());则表示默认交换机是无名交换机,消息能路由发送到队列是由routingkey(bindingkey)绑定key实现的
3、 临时队列;
临时队列是指创建队列后队列的名称是随机的,一旦断开消费者的连接,队列将被自动删除,这种便是临时队列
创建临时队列的方式:
1 String queue = channel.queueDeclare().getQueue();
4、 绑定(bindings);
下面我们来使用RabbitMQ的页面来进行绑定
(1)新建一个队列
效果图:
(2)新建一个交换机
效果图:
(3)绑定routingkey
效果图:
以上步骤就是用temp_exchange交换机绑定了temp_queue队列,temp_exchange交换机通过a这个routingkey来绑定了temp_queue队列,当我们发消息到temp_exchange交换机,然后temp_exchange交换机会通过路由规则来把消息发送到指定的队列里,往后我们只需要用不同的routingkey绑定不同的队列,然后我们就能根据指定的routingkey来往指定的队列发消息了
Fanout交换机 1、 Fanout交换机的介绍;
接收所有的消息广播到它知道的队列中,类似于发布订阅模式,只要Fanout禁用RoutingKey,绑定同一交换机的队列都可同时收到消息;若Fanout启动了routingkey,则绑定同一交换机且routingkeyKey相同的队列才能收到同一消息
2、 Fanout交换机的实现;
(1)新建一个名为fanout的包,用于装发布确认的代码
效果图:
(2)新建一个名为Receive01的类用于编写消费者的代码
代码如下:
注:RabbitMqUtils工具类的实现在我的另一篇文章里,有需要的同学可以查看参考
https://blog.csdn.net/m0_64284147/article/details/129465871
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 package com.ken.fanout;import com.ken.utils.RabbitMqUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;public class Receive01 { public static final String EXCHANGE_NAME = "fanout_exchange" ; public static void main (String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"fanout" ); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName,EXCHANGE_NAME,"" ); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("Receive01接收到的消息:" + new String (message.getBody(),"UTF-8" )); }; channel.basicConsume(queueName,true ,deliverCallback,consumerTag -> {}); } }
(3)复制Receive01类并粘贴重命名为Receive02
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 package com.ken.fanout;import com.ken.utils.RabbitMqUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;public class Receive02 { public static final String EXCHANGE_NAME = "fanout_exchange" ; public static void main (String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"fanout" ); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName,EXCHANGE_NAME,"" ); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("Receive02接收到的消息:" + new String (message.getBody(),"UTF-8" )); }; channel.basicConsume(queueName,true ,deliverCallback,consumerTag -> {}); } }
(4)新建一个名为Emit的类用于编写生产者的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package com.ken.fanout;import com.ken.utils.RabbitMqUtils;import com.rabbitmq.client.Channel;import java.util.Scanner;public class Emit { public static final String EXCHANGE_NAME = "fanout_exchange" ; public static void main (String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); Scanner scanner = new Scanner (System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish(EXCHANGE_NAME,"" ,null ,message.getBytes("UTF-8" )); System.out.println("生产者发送的消息:" + message); } } }
(5)分别先运行Receive01、Receive02和Emit
(6)在Emit里输入消息,然后查看Receive01和Receive02接收消息的情况,若两个消费者都分别消费了一样的消息,证明我们成功实现了Fanout交换机
例:
Emit
Receive01
Receive02
Direct 交换机 1、 Direct交换机的介绍;
Direct交换机能让消息只发送往绑定了指定routingkey的队列中去,值得注意的是当绑定多个队列的routingkey都相同,则这种情况下的表现与Fanout交换机的类似
2、 Direct交换机的实现;
(1)新建一个名为fanout的包,用于装发布确认的代码
效果图:
(2)新建一个名为Receive01的类用于编写消费者的代码
代码如下:
注:RabbitMqUtils工具类的实现在我的另一篇文章里,有需要的同学可以查看参考
RabbitMQ系列(6)–RabbitMQ模式之工作队列(Work queues)的简介及实现_Ken_1115的博客-CSDN博客
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 package com.ken.direct;import com.ken.utils.RabbitMqUtils;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;public class Receive01 { public static final String EXCHANGE_NAME = "direct_exchange" ; public static void main (String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare("console" ,false ,false ,false ,null ); channel.queueBind("console" ,EXCHANGE_NAME,"info" ); channel.queueBind("console" ,EXCHANGE_NAME,"warning" ); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("Receive01接收到的消息:" + new String (message.getBody(),"UTF-8" )); }; channel.basicConsume("console" ,true ,deliverCallback,consumerTag -> {}); } }
(3)复制Receive01类并粘贴重命名为Receive02
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 package com.ken.direct;import com.ken.utils.RabbitMqUtils;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;public class Receive02 { public static final String EXCHANGE_NAME = "direct_exchange" ; public static void main (String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare("disk" ,false ,false ,false ,null ); channel.queueBind("disk" ,EXCHANGE_NAME,"error" ); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("Receive01接收到的消息:" + new String (message.getBody(),"UTF-8" )); }; channel.basicConsume("disk" ,true ,deliverCallback,consumerTag -> {}); } }
(4)新建一个名为Direct的类用于编写生产者的代码
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package com.ken.direct;import com.ken.utils.RabbitMqUtils;import com.rabbitmq.client.Channel;import java.util.Scanner;public class Direct { public static final String EXCHANGE_NAME = "direct_exchange" ; public static void main (String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); Scanner scanner = new Scanner (System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish(EXCHANGE_NAME,"info" ,null ,message.getBytes("UTF-8" )); System.out.println("生产者发送的消息:" + message); } } }
(5)分别先运行Receive01、Receive02
(6)先把Direct类里的routingkey设置为info,然后启动Direct类
例:
(7)在Direct里输入消息,然后查看Receive01和Receive02接收消息的情况,能看出Receive01接收到消息,而Receive02没有接收到消息
(8)把Direct类里的routingkey设置为warning,然后重新启动Direct类
(9)在Direct里输入消息,然后查看Receive01和Receive02接收消息的情况,能看出Receive01接收到消息,而Receive02没有接收到消息
(10)把Direct类里的routingkey设置为error,然后重新启动Direct类
(11)在Direct里输入消息,然后查看Receive01和Receive02接收消息的情况,能看出Receive01没有接收到消息,而Receive02接收到了消息
从上述众多结果可看出direct交换机实现成功
Topics 交换机 1、 Topics交换机的介绍;
Topics交换机能让消息只发送往绑定了指定routingkey的队列中去,不同于Direct交换机的是,Topics能把一个消息往多个不同的队列发送;Topics交换机的routingkey不能随意写,必须是一个单词列表,并以点号分隔开,例如“one.two.three”,除此外还有两个替换符,(星号)能代替一个单词,#(井号)可以代替零个或多个单词,例如“ .one.”是中间是one的3个单词,“ ..one”是最后一个是one的3个单词,“one.#”是第一个单词是one的多个单词,若队列绑定键是#,这个队列将接收所有数据,这时候类似fanout交换机,若队列绑定键中没有#和 出现,这时候就类似direct交换机
2、 Topics交换机的实现;
(1)新建一个名为topics的包,用于装发布确认的代码
效果图:
(2)新建一个名为Receive01的类用于编写消费者的代码
代码如下:
注:RabbitMqUtils工具类的实现在我的另一篇文章里,有需要的同学可以查看参考
RabbitMQ系列(6)–RabbitMQ模式之工作队列(Work queues)的简介及实现_Ken_1115的博客-CSDN博客
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 package com.ken.topics;import com.ken.utils.RabbitMqUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rabbitmq.client.Delivery;public class Receive01 { public static final String EXCHANGE_NAME = "topic_exchange" ; public static void main (String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"topic" ); String queueName = "Q1" ; channel.queueDeclare(queueName,false ,false ,false ,null ); channel.queueBind(queueName,EXCHANGE_NAME,"*.one.*" ); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(new String (message.getBody(),"UTF-8" )); System.out.println("接收队列:" + queueName + " 绑定键:" + message.getEnvelope().getRoutingKey()); }; channel.basicConsume(queueName,true ,deliverCallback,consumerTag ->{}); } }
(3)复制Receive01类并粘贴重命名为Receive02
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 package com.ken.topics;import com.ken.utils.RabbitMqUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;public class Receive02 { public static final String EXCHANGE_NAME = "topic_exchange" ; public static void main (String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"topic" ); String queueName = "Q1" ; channel.queueDeclare(queueName,false ,false ,false ,null ); channel.queueBind(queueName,EXCHANGE_NAME,"*.*.two" ); channel.queueBind(queueName,EXCHANGE_NAME,"three.#" ); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(new String (message.getBody(),"UTF-8" )); System.out.println("接收队列:" + queueName + " 绑定键:" + message.getEnvelope().getRoutingKey()); }; channel.basicConsume(queueName,true ,deliverCallback,consumerTag ->{}); } }
(4)新建一个名为Emit的类用于编写生产者的代码
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package com.ken.topics;import com.ken.utils.RabbitMqUtils;import com.rabbitmq.client.Channel;import java.util.HashMap;import java.util.Map;public class Emit { public static final String EXCHANGE_NAME = "topic_exchange" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); Map<String,String> bindingKeyMap = new HashMap <>(); bindingKeyMap.put("four.one.two" ,"被队列Q1Q2接收" ); bindingKeyMap.put("three.one.five" ,"被队列Q1Q2接收" ); bindingKeyMap.put("four.one.six" ,"被队列Q1接收" ); bindingKeyMap.put("three.seven.six" ,"被队列Q2接收" ); bindingKeyMap.put("three.eight.two" ,"虽然满足两个绑定,但只被队列Q2接收一次" ); bindingKeyMap.put("three.seven.six" ,"不匹配任何绑定,不会被任何队列接收到,会被丢弃" ); bindingKeyMap.put("four.one.nine.two" ,"四个单词,不匹配任何绑定,会被丢弃" ); bindingKeyMap.put("three.one.nine.two" ,"四个单词,但匹配Q2" ); for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) { String routingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME,routingKey,null ,message.getBytes("UTF-8" )); System.out.println("生产者发出消息:" + message); } } }
(5)分别先运行Receive01、Receive02、Emit
(6)查看Receive01和Receive02接收消息的情况
从上述结果可看出topic交换机实现成功
死信机制 1、 死信的概念;
死信,顾名思义就是无法被消费的消息,一般来说producer(生产者)将消息投递到broker或直接放到queue(队列)中,consumer(消费者)从queue(队列)取出消息进行消费,但某些时候由于特定的原因导致queue(队列)中的消息无法被消费,若这些消息没有后续的处理,则这些消息就变成了死信,有死信自然就有了死信队列
2、 死信的应用场景;
为保证订单业务的消息数据不丢失,需要使用RabbitMQ的死信队列机制,当消息发生异常时,将消息投入死信队列中
3、 死信的来源;
(1)消息TTL(存活时间)过期
(2)队列达到最大长度(队列满了,无法再添加数据到mq中)
(3)消息被拒绝(basic.reject或basic.nack)并且requeue=false
4、 死信队列的实现;
(1)我们将根据这张死信队列的代码架构图来实现死信队列
(2)新建一个名为dead的包,用于装实现死信队列的代码
效果图:
(3)新建一个名为Consumer01的类用于编写消费者的代码
代码如下:
注:RabbitMqUtils工具类的实现在我的另一篇文章里,有需要的同学可以查看参考
RabbitMQ系列(6)–RabbitMQ模式之工作队列(Work queues)的简介及实现_Ken_1115的博客-CSDN博客
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 package com.ken.dead;import com.ken.utils.RabbitMqUtils;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rabbitmq.client.Delivery;import java.util.HashMap;import java.util.Map;public class Consumer01 { public static final String NORMAL_EXCHANGE = "normal_exchange" ; public static final String DEAD_EXCHANGE = "dead_exchange" ; public static final String NORMAL_QUEUE = "normal_queue" ; public static final String DEAD_QUEUE = "dead_queue" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT); Map<String, Object> arguments = new HashMap <>(); arguments.put("x-dead-letter-exchange" ,DEAD_EXCHANGE); arguments.put("x-dead-letter-routing-key" ,"dead" ); channel.queueDeclare(NORMAL_QUEUE,false ,false ,false ,arguments); channel.queueDeclare(DEAD_QUEUE,false ,false ,false ,null ); channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal" ); channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead" ); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("Consumer01接收的消息是:" + new String (message.getBody(),"UTF-8" )); }; channel.basicConsume(NORMAL_QUEUE,true ,deliverCallback,consumerTag -> {}); } }
(4)复制Consumer01类并粘贴重命名为Consumer02
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package com.ken.dead;import com.ken.utils.RabbitMqUtils;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;import java.util.Map;public class Consumer02 { public static final String NORMAL_QUEUE = "normal_queue" ; public static final String DEAD_QUEUE = "dead_queue" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("Consumer02接收的消息是:" + new String (message.getBody(),"UTF-8" )); }; channel.basicConsume(DEAD_QUEUE,true ,deliverCallback,consumerTag -> {}); } }
(5)新建一个名为Producer的类用于编写生产者的代码
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package com.ken.dead;import com.ken.utils.RabbitMqUtils;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;public class Producer { public static final String NORMAL_EXCHANGE = "normal_exchange" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); AMQP.BasicProperties properties= new AMQP .BasicProperties().builder().expiration("10000" ).build(); for (int i = 1 ; i < 11 ; i++) { String message = "info" + i; channel.basicPublish(NORMAL_EXCHANGE,"normal" ,properties,message.getBytes()); } } }
(6)先运行Consumer01,生成队列和交换机
(7)然后停止Consumer01,模拟消费者宕机
(8)运行Producer
(9)观察normal_queue队列和dead_queue队列消息数量的变化,一开始normal_queue队列里有10条消息,过了10s后消息都到了dead_queue队列里,证明消费者消费消息失败,消息从normal_queue队列移到了dead_queue队列里,由此可见当消息TTL过期后,死信队列成功运行
(10)启动Consumer02,可以看到Consumer02消费了dead_queue队列里的消息
(10)删除normal_queue队列
(11) 修改Produces的代码,把设置TTL时间的代码注释掉,basicPublish方法的第3个参数设置为null
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package com.ken.dead;import com.ken.utils.RabbitMqUtils;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;public class Producer { public static final String NORMAL_EXCHANGE = "normal_exchange" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); for (int i = 1 ; i < 11 ; i++) { String message = "info" + i; channel.basicPublish(NORMAL_EXCHANGE,"normal" ,null ,message.getBytes()); } } }
(12)修改Consumer01的代码,把之前注释的用于限制队列长度的代码放出来
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 package com.ken.dead;import com.ken.utils.RabbitMqUtils;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rabbitmq.client.Delivery;import java.util.HashMap;import java.util.Map;public class Consumer01 { public static final String NORMAL_EXCHANGE = "normal_exchange" ; public static final String DEAD_EXCHANGE = "dead_exchange" ; public static final String NORMAL_QUEUE = "normal_queue" ; public static final String DEAD_QUEUE = "dead_queue" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT); Map<String, Object> arguments = new HashMap <>(); arguments.put("x-dead-letter-exchange" ,DEAD_EXCHANGE); arguments.put("x-dead-letter-routing-key" ,"dead" ); arguments.put("x-max-length" ,6 ); channel.queueDeclare(NORMAL_QUEUE,false ,false ,false ,arguments); channel.queueDeclare(DEAD_QUEUE,false ,false ,false ,null ); channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal" ); channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead" ); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("Consumer01接收的消息是:" + new String (message.getBody(),"UTF-8" )); }; channel.basicConsume(NORMAL_QUEUE,true ,deliverCallback,consumerTag -> {}); } }
(13)停掉所有程序,然后重新运行Consumer01,生成队列和交换机
(14)然后停止Consumer01,模拟消费者宕机
(15)重新运行Producer
(16)观察normal_queue队列和dead_queue队列消息数量的变化,可以看到normal_queue队列只能堆积最多6条消息,而剩余的4条消息都移到了dead_queue队列里,由此可见当队列达到最大长度6条后,死信队列成功运行
(17)删除normal_queue队列和dead_queue队列
(18)修改Consumer01的代码,注释掉限制队列长度的代码,修改消息应答的部分代码
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 package com.ken.dead;import com.ken.utils.RabbitMqUtils;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rabbitmq.client.Delivery;import java.util.HashMap;import java.util.Map;public class Consumer01 { public static final String NORMAL_EXCHANGE = "normal_exchange" ; public static final String DEAD_EXCHANGE = "dead_exchange" ; public static final String NORMAL_QUEUE = "normal_queue" ; public static final String DEAD_QUEUE = "dead_queue" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT); Map<String, Object> arguments = new HashMap <>(); arguments.put("x-dead-letter-exchange" ,DEAD_EXCHANGE); arguments.put("x-dead-letter-routing-key" ,"dead" ); channel.queueDeclare(NORMAL_QUEUE,false ,false ,false ,arguments); channel.queueDeclare(DEAD_QUEUE,false ,false ,false ,null ); channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal" ); channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead" ); DeliverCallback deliverCallback = (consumerTag, message) -> { String mes = new String (message.getBody(),"UTF-8" ); if ("info5" .equals(mes)) { System.out.println("Consumer01接收的消息是:" + mes + ",此消息被拒绝" ); channel.basicReject(message.getEnvelope().getDeliveryTag(),false ); }else { System.out.println("Consumer01接收的消息是:" + mes); channel.basicAck(message.getEnvelope().getDeliveryTag(),false ); } }; channel.basicConsume(NORMAL_QUEUE,false ,deliverCallback,consumerTag -> {}); } }
(19)停掉所有程序,然后重新运行Consumer01,生成队列和交换机
(20)重新运行Producer
(21)查看Consumer01控制台的输出,观察dead_queue队列消息数量的变化,因为没有开启Consumer02消费dead_queue队列,可以看到dead_queue队列堆积了1条消息,查看这条消息,可以看出就是我们拒绝掉的info5,这证明消费者拒绝消费消息info5后,消息info5从normal_queue队列移到了dead_queue队列里,由此可见当消息被拒绝消费后,死信队列成功运行
(22)启动Consumer02,可以看到Consumer02消费了dead_queue队列里的消息
高级特性 延迟队列1 1、 延迟队列的概念;
延迟队列内部是有序的,重要的特性体现在它的延迟属性上,延迟队列中的元素希望在指定时间到了之后或之前取出处理,简单的说延迟队列就是用来存放需要在指定时间被处理的元素的队列。
2、 延迟队列的应用场景;
(1)订单指定时间内未支付则自动取消
(2)用户发起退款,指定时间内未处理则通知相关运营人员
3、 定时任务和延迟队列的取舍;
以上场景都有一个特点,那就是都需要在某个事件发生前或发生后执行一项任务,如生成订单后,在十分钟后检查订单状态,未支付的订单将关闭,这种场景也可以用定时任务来处理,但数据量比价少的话确实可以用定时任务来处理,但在活动期间,订单的数据量可能会变得很庞大,对于庞大的数据,定时任务很难在1秒内检查完订单,从而不能及时的关闭未支付的订单,而且用定时任务来检查订单会给数据库带来很大的压力,所以在数据量大的情况下,定时任务无法满足业务需求且性能低下
4、 延迟队列架构图(后面我们就根据这个架构图进行代码的设计与实现);
5、 延迟队列的实现;
(1)新建一个名为config的包,用于装实现特定配置的代码
效果图:
(2)在config包里新建一个名为TtlQueueConfig的类用于编写配置队列延迟的代码
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 package com.ken.springbootrqbbitmq.config;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configuration public class TtlQueueConfig { public static final String NORMAL_EXCHANGE = "normal_exchange" ; public static final String DEAD_EXCHANGE = "dead_exchange" ; public static final String NORMAL_QUEUE01 = "normal_queue01" ; public static final String NORMAL_QUEUE02 = "normal_queue02" ; public static final String DEAD_QUEUE = "dead_queue" ; @Bean("normalExchange") public DirectExchange normalExchange () { return new DirectExchange (NORMAL_EXCHANGE); } @Bean("deadExchange") public DirectExchange deadExchange () { return new DirectExchange (DEAD_EXCHANGE); } @Bean("normalQueue01") public Queue normalQueue01 () { Map<String, Object> arguments = new HashMap <>(); arguments.put("x-dead-letter-exchange" ,DEAD_EXCHANGE); arguments.put("x-dead-letter-routing-key" ,"dead" ); arguments.put("x-message-ttl" ,10000 ); return QueueBuilder.durable(NORMAL_QUEUE01).withArguments(arguments).build(); } @Bean("normalQueue02") public Queue normalQueue02 () { Map<String, Object> arguments = new HashMap <>(); arguments.put("x-dead-letter-exchange" ,DEAD_EXCHANGE); arguments.put("x-dead-letter-routing-key" ,"dead" ); arguments.put("x-message-ttl" ,40000 ); return QueueBuilder.durable(NORMAL_QUEUE02).withArguments(arguments).build(); } @Bean("deadQueue") public Queue deadQueue () { return QueueBuilder.durable(DEAD_QUEUE).build(); } @Bean public Binding queue01BindNormalExchange (@Qualifier("normalQueue01") Queue normalQueue01, @Qualifier("normalExchange") DirectExchange normalExchange) { return BindingBuilder.bind(normalQueue01).to(normalExchange).with("normal01" ); } @Bean public Binding queue02BindNormalExchange (@Qualifier("normalQueue02") Queue normalQueue02, @Qualifier("normalExchange") DirectExchange normalExchange) { return BindingBuilder.bind(normalQueue02).to(normalExchange).with("normal02" ); } @Bean public Binding deadQueueBindDeadExchange (@Qualifier("deadQueue") Queue deadQueue, @Qualifier("deadExchange") DirectExchange deadExchange) { return BindingBuilder.bind(deadQueue).to(deadExchange).with("dead" ); } }
(3)新建一个名为controller的包,用于装控制层的代码
效果图:
(4)新建一个名为SendMsgController的类用于充当生产者用于发送消息
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package com.ken.springbootrqbbitmq.controller;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.util.Date;@Slf4j @RequestMapping("ttl") @RestController public class SendMsgController { @Autowired(required = false) private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg/{message}") public void sendMsg (@PathVariable String message) { log.info("当前时间:{},发送一条消息给两个TTL队列:{}" ,new Date ().toString(),message); rabbitTemplate.convertAndSend("normal_exchange" ,"normal01" ,"消息来着ttl为10s的队列:" + message); rabbitTemplate.convertAndSend("normal_exchange" ,"normal02" ,"消息来着ttl为40s的队列:" + message); } }
(5)新建一个名为consumer的包,用于装消费者的代码
效果图:
(6)新建一个名为DeadQueueConsumer的类用于消费死信队列里的消息
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 package com.ken.springbootrqbbitmq.consumer;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.util.Date;@Slf4j @Component public class DeadQueueConsumer { @RabbitListener(queues = "dead_queue") public void receiveMsg (Message message, Channel channel) throws Exception { String msg = new String (message.getBody()); log.info("当前时间:{},收到死信队列的消息:{}" ,new Date ().toString(),msg); } }
(7)进入项目的启动类启动项目
(8)启动完毕后在浏览器地址栏输入http://localhost:8080/ttl/sendMsg/参数 往队列发送消息
(9)查看控制台的输出,发现分别在10s和40s后进行输出,这证明我们的延迟队列成功运行
延迟队列2-使用个插件实现 1、 前往RabbitMQ官网下载往RabbitMQ添加延迟消息的插件;
RabbitMQ官网下载插件的网址:https://www.rabbitmq.com/community-plugins.html
2、 下载rabbitmq_delayer_message_exchange插件(注:RabbitMQ是什么版本的,下载的插件就得是什么版本的,得对应上,以下截图为官方文档的对插件版本的要求说明);
3、 把这个插件传输到服务器上;
4、 根据官网的指示把插件放到RabbitMQ指定的文件夹下;
RabbitMQ官网指示安装插件步骤的网址:https://www.rabbitmq.com/installing-plugins.html
我这里安装RabbitMQ的系统是CentOS,所以放在
5、 拷贝插件到指定的目录下;
例:
1 cp rabbitmq_delayed_message_exchange-3.10 .0 .ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.10 .0 /plugins/
效果图:
6、 安装延迟队列插件;
输入以下命令安装延迟队列插件
1 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
效果图:
7、 重启RabbitMQ;
输入以下命令重启RabbitMQ
1 systemctl restart rabbitmq-server.service
效果图:
8、 查看插件是否安装成功;
进入RabbitMQ的管理页面,进入Exchange的管理页面,新增Exchange,在Type里面可以看到x-delayed-message的选项,证明延迟队列插件安装成功
9、 基于插件实现延迟队列的原理示意图;
原先我们没下插件之前实现延迟队列是基于图下这种方式实现的
但我们下载插件后就能通过交换机延迟消息的方式来实现消息的延迟了(由步骤8可见,我们验证插件是否安装成功是从Exchange进去的,而不是从Queues进去的)
10、 基于插件延迟队列的代码实现;
(1)在config包里新建一个名为DelayedQueueConfig的类用于编写配置队列延迟的代码
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 package com.ken.springbootrqbbitmq.config;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configuration public class DelayedQueueConfig { public static final String DELAYED_QUEUE_NAME = "delayed_queue" ; public static final String DELAYED_EXCHANGE_NAME = "DELAYED_EXCHANGE" ; public static final String DELAYED_ROUTING_KEY = "delayed" ; @Bean public Queue delayedQueue () { return new Queue (DELAYED_QUEUE_NAME); } @Bean public CustomExchange delayedExchange () { Map<String, Object> arguments = new HashMap <>(3 ); arguments.put("x-delayed-type" ,"direct" ); return new CustomExchange (DELAYED_QUEUE_NAME,"x-delayed-message" ,true ,false ,arguments); } @Bean public Binding delayedQueueBindingDelayedExchange (@Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") Exchange delayedExchange) { return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); } }
(2)在SendMsgController类里写一个接口,让其能往延迟队列里发送消息
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 package com.ken.springbootrqbbitmq.controller;import com.ken.springbootrqbbitmq.config.DelayedQueueConfig;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.util.Date;@Slf4j @RequestMapping("ttl") @RestController public class SendMsgController { @Autowired(required = false) private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg/{message}") public void sendMsg (@PathVariable String message) { log.info("当前时间:{},发送一条消息给两个TTL队列:{}" ,new Date (),message); rabbitTemplate.convertAndSend("normal_exchange" ,"normal01" ,"消息来着ttl为10s的队列:" + message); rabbitTemplate.convertAndSend("normal_exchange" ,"normal02" ,"消息来着ttl为40s的队列:" + message); } @GetMapping("/sendExpirationMsg/{message}/{ttlTime}") public void sendMsg (@PathVariable String message,@PathVariable String ttlTime) { log.info("当前时间:{},发送一条时长{}毫秒的TTL消息给normal03队列:{}" , new Date (),ttlTime,message); rabbitTemplate.convertAndSend("normal_exchange" ,"normal03" ,message,msg -> { msg.getMessageProperties().setExpiration(ttlTime); return msg; }); } @GetMapping("/sendDelayMsg/{message}/{delayTime}") public void sendMsg (@PathVariable String message,@PathVariable Integer delayTime) { log.info("当前时间:{},发送一条时长{}毫秒的消息给延迟队列:{}" , new Date (),delayTime,message); rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_QUEUE_NAME,DelayedQueueConfig.DELAYED_ROUTING_KEY,message, msg -> { msg.getMessageProperties().setDelay(delayTime); return msg; }); } }
(3)在consumer包里新建一个名为DelayQueueConsumer的类用于编写消费延迟队列的消费者代码
效果图:
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package com.ken.springbootrqbbitmq.consumer;import com.ken.springbootrqbbitmq.config.DelayedQueueConfig;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.util.Date;@Slf4j @Component public class DelayQueueConsumer { @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME) private void receiveDelayQueue (Message message) { String msg = new String (message.getBody()); log.info("当前时间{},收到延迟队列的消息" ,new Date (),msg); } }
(4)启动项目,往浏览器输入接口地址和参数,从而调用接口
[1]第一条消息
[2]第二条消息
效果图:
结论:基于测试发现在使用延迟插件的情况下,延迟时间短的消息会被先消费,这证明基于插件的延迟消息达到预期效果
rabbitmq宕机后生产者如何保证消息 前言:在生产环境中由于一些不明原因,导致RabbitMQ重启的情况下,在RabbitMQ重启期间生产者投递消息失败,生产者发送的消息会丢失,那这时候就需要去想在极端的情况下,RabbitMQ集群不可用的时候,如果去处理投递失败的消息。
1、 在config包里新建一个名为ConfirmConfig的类用于编写配置交换机、队列、routingkey的代码;
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 package com.ken.springbootrqbbitmq.config;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration public class ConfirmConfig { public static final String EXCHANGE_NAME = "confirm_exchange" ; public static final String QUEUE_NAME = "confirm_queue" ; public static final String ROUTING_KEY = "confirm" ; @Bean("confirmExchange") public DirectExchange confirmExchange () { return new DirectExchange (EXCHANGE_NAME); } @Bean("confirmQueue") public Queue confirmQueue () { return QueueBuilder.durable(QUEUE_NAME).build(); } @Bean public Binding queueBindingExchange (@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange) { return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY); } }
2、 在controller包里新建一个名为ProducerController的类用于编写充当生产者发送消息的代码;
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package com.ken.springbootrqbbitmq.controller;import com.ken.springbootrqbbitmq.config.ConfirmConfig;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@Slf4j @RestController @RequestMapping("/confirm") public class ProducerController { @Autowired(required = false) private RabbitTemplate rabbitTemplate; @GetMapping("/sendMessage/{message}") public void sendMessage (@PathVariable String message) { rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY,message); log.info("发送消息内容:{}" ,message); } }
3、 在consumer包里新建一个名为Consumer的类用于编写充当消费者消费消息的代码;
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package com.ken.springbootrqbbitmq.consumer;import com.ken.springbootrqbbitmq.config.ConfirmConfig;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Slf4j @Component public class Consumer { @RabbitListener(queues = ConfirmConfig.QUEUE_NAME) public void receiveConfirmMessage (Message message) { String msg = new String (message.getBody()); log.info("接收到队列的消息为:{}" ,msg); } }
4、 启动项目,在浏览器地址栏调用发送消息的接口,查看生产者是否运行成功并能发送消息http://localhost:8080/confirm/sendMessage/我是消息;
例:
效果图:
5、 前言里我们说过,怎么在RabbitMQ宕机的情况下,保证生产者发送的消息不丢失呢,这时候就需要用到回调函数了,交换机本身收到消息后会确认消息,如果交换机没有确认或者确认消息失败,都视为发送消息失败,然后触发回调接口,告诉生产者消息发送失败,这样,消息接收成功与否我们都能通过回调方法返回的消息知道了;
(1)在config包里新建一个名为MyCallBack的类用于编写交换机的确认回调方法
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 package com.ken.springbootrqbbitmq.config;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Slf4j @Component public class MyCallBack implements RabbitTemplate .ConfirmCallback { @Autowired(required = false) private RabbitTemplate rabbitTemplate; @PostConstruct public void init () { rabbitTemplate.setConfirmCallback(this ); } @Override public void confirm (CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : "" ; if (ack) { log.info("交换机已经收到id为{}的消息" ,id); }else { log.info("交换机还未收到id为{}的消息,原因为{}" ,id,cause); } } }
6、 在上述步骤可得知confirm方法有一个类型为CorrelationData的参数correlationData,这个参数实际上是空的,并没有值,需要生产者发送,correlationData参数才会有值(connfirm方法的其余两个参数ack和cause默认有值)所以我们需要修改生产者的代码;
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package com.ken.springbootrqbbitmq.controller;import com.ken.springbootrqbbitmq.config.ConfirmConfig;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@Slf4j @RestController @RequestMapping("/confirm") public class ProducerController { @Autowired(required = false) private RabbitTemplate rabbitTemplate; @GetMapping("/sendMessage/{message}") public void sendMessage (@PathVariable String message) { CorrelationData correlationData = new CorrelationData ("1" ); rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY,message,correlationData); log.info("发送消息内容:{}" ,message); } }
7、 在配置文件加上以下配置开启交换机确认发布模式;
1 spring.rabbitmq.publisher-confirm-type=correlated
配置文件完整内容如下:
1 2 3 4 5 6 7 8 spring.rabbitmq.host=192.168 .194 .150 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123456 #none(禁用发布确认模式,默认值) #correlated(发布消息成功到交换机后会触发回调方法) #simple(和correlated一样会触发回调方法,消息发布成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法,等待broker节点返回发送结果) spring.rabbitmq.publisher-confirm-type=correlated
效果图:
8、 启动项目,在浏览器地址栏调用发送消息的接口,可以看到生产者发送消息成功,交换机调用了回调接口,消费者成功消费消息;
http://localhost:8080/confirm/sendMessage/我是消息
例:
效果图:
9、 把生产者要发送到的交换机改成不存在的,用以模拟交换机出问题的情景;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package com.ken.springbootrqbbitmq.controller;import com.ken.springbootrqbbitmq.config.ConfirmConfig;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@Slf4j @RestController @RequestMapping("/confirm") public class ProducerController { @Autowired(required = false) private RabbitTemplate rabbitTemplate; @GetMapping("/sendMessage/{message}") public void sendMessage (@PathVariable String message) { CorrelationData correlationData = new CorrelationData ("1" ); rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME + "1" , ConfirmConfig.ROUTING_KEY,message,correlationData); log.info("发送消息内容:{}" ,message); } }
效果图:
10、 重新启动项目,在浏览器地址栏调用发送消息的接口,可以看到生产者发送消息成功,交换机调用了回调接口并打印出了交换机接收消息失败的原因;
http://localhost:8080/confirm/sendMessage/我是消息
例:
效果图:
11、 把RoutingKey改成不存在的,用以模拟队列出问题的情景;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package com.ken.springbootrqbbitmq.controller;import com.ken.springbootrqbbitmq.config.ConfirmConfig;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@Slf4j @RestController @RequestMapping("/confirm") public class ProducerController { @Autowired(required = false) private RabbitTemplate rabbitTemplate; @GetMapping("/sendMessage/{message}") public void sendMessage (@PathVariable String message) { CorrelationData correlationData = new CorrelationData ("2" ); rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY + "2" ,message,correlationData); log.info("发送消息内容:{}" ,message); } }
效果图:
12、 重新启动项目,在浏览器地址栏调用发送消息的接口,可以看到生产者发送消息成功,交换机调用了回调接口并打印出交换机接收消息成功,但消费者没有消费成功的日志输出,因为RoutingKey错了,交换机没有把消息发送到队列里,队列里没消息,自然消费者也就没有消费到消息了,但这个结果不符合我们的预期,因为这次丢失了消息,丢失消息却没有回馈消息丢失,实际上应该调用回调接口反馈消息丢失,所以我们需要继续往下改进代码;
http://localhost:8080/confirm/sendMessage/我是消息
例:
效果图:
13、 给配置文件加上以下配置,用以回退消息;
1 spring.rabbitmq.publisher-returns=true
配置文件完整内容如下:
1 2 3 4 5 6 7 8 9 10 spring.rabbitmq.host=192.168 .194 .150 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123456 #none(禁用发布确认模式,默认值) #correlated(发布消息成功到交换机后会触发回调方法) #simple(和correlated一样会触发回调方法,消息发布成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法,等待broker节点返回发送结果) spring.rabbitmq.publisher-confirm-type=correlated #一旦投递消息失败或者路由失败,是否回退消息给生产者 spring.rabbitmq.publisher-returns=true
14、 使用RabbitTemplate的内置接口回退消息;
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 package com.ken.springbootrqbbitmq.config;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Slf4j @Component public class MyCallBack implements RabbitTemplate .ConfirmCallback,RabbitTemplate.ReturnCallback { @Autowired(required = false) private RabbitTemplate rabbitTemplate; @PostConstruct public void init () { rabbitTemplate.setConfirmCallback(this ); rabbitTemplate.setReturnCallback(this ); } @Override public void confirm (CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : "" ; if (ack) { log.info("交换机已经收到id为{}的消息" ,id); }else { log.info("交换机还未收到id为{}的消息,原因为{}" ,id,cause); } } @Override public void returnedMessage (Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error("消息{},被交换机{}退回,退回原因:{},路由routingkey:{}" , new String (message.getBody()),exchange,replyText,routingKey); } }
15、 重新启动项目,在浏览器地址栏调用发送消息的接口,可以看到生产者发送消息成功,交换机收到消息发不过去队列后把消息回退了,保证了消息不丢失;
http://localhost:8080/confirm/sendMessage/我是消息
例:
效果图:
备份交换机 前言:上一篇文章我们提到当交换机确认消息失败或者交换机发送消息到队列失败,都可以通过回调方法让生产者重新发送消息,除此之外另一种方法就是通过备份交换机的方式保证消息的不丢失,当生产者无法把消息投递给交换机,就通过交换机把消息发送到备份交换机,再让备份交换机通过自己的路由以及自己的队列发送消息给消费者,从而把发送失败的消息保存下来。
1、 架构图如下;
2、 修改ConfirmConfig类的代码;
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 package com.ken.springbootrqbbitmq.config;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration public class ConfirmConfig { public static final String EXCHANGE_NAME = "confirm_exchange" ; public static final String QUEUE_NAME = "confirm_queue" ; public static final String ROUTING_KEY = "confirm" ; public static final String BACKUP_EXCHANGE_NAME = "backup_exchange" ; public static final String BACKUP_QUEUE_NAME = "backup_queue" ; public static final String WARNING_QUEUE_NAME = "warning_queue" ; @Bean("confirmExchange") public DirectExchange confirmExchange () { return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true ) .withArgument("alternate-exchange" ,BACKUP_EXCHANGE_NAME).build(); } @Bean("confirmQueue") public Queue confirmQueue () { return QueueBuilder.durable(QUEUE_NAME).build(); } @Bean public Binding queueBindingExchange (@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange) { return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY); } @Bean("backupExchange") public FanoutExchange backupExchange () { return new FanoutExchange (BACKUP_EXCHANGE_NAME); } @Bean("backupQueue") public Queue backupQueue () { return QueueBuilder.durable(BACKUP_QUEUE_NAME).build(); } @Bean("warningQueue") public Queue warningQueue () { return QueueBuilder.durable(WARNING_QUEUE_NAME).build(); } @Bean public Binding backupQueueBindingBackupExchange (@Qualifier("backupQueue") Queue backupQueue, @Qualifier("backupExchange") FanoutExchange backupExchange) { return BindingBuilder.bind(backupQueue).to(backupExchange); } @Bean public Binding warningQueueBindingBackupExchange (@Qualifier("warningQueue") Queue warningQueue, @Qualifier("backupExchange") FanoutExchange backupExchange) { return BindingBuilder.bind(warningQueue).to(backupExchange); } }
3、 在consumer包里新建一个名为WarningConsumer的类用于编写充当消费者消费消息的代码;
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package com.ken.springbootrqbbitmq.consumer;import com.ken.springbootrqbbitmq.config.ConfirmConfig;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component @Slf4j public class WarningConsumer { @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME) public void receiveWarningMsg (Message message) { String msg = new String (message.getBody()); log.error("警告!发现不可路由的消息{}" ,msg); } }
4、 因为我们修改了交换机的代码,所以我们需要删除旧的confirm_exchange交换机,然后再启动项目重新生成confirm_exchange交换机;
5、 重新启动项目,在浏览器地址栏调用发送消息的接口,可以看到生产者发送消息成功,交换机路由消息失败后把消息发送给了备份交换机,使得warning_queue队列收到消息,从而让warning_consumer成功消费到不可路由的消息;
http://localhost:8080/confirm/sendMessage/我是消息
例:
效果图:
6、 结果分析;
我们在配置文件开启了消息回退
也编写了和注入了消息回退的代码
同时我们也编写了备份交换机的代码
但由上述步骤5的结果可以看出,在同时使用mandatory参数和备份交换机的时候,消息优先走了备份交换机,打印了”警告!发现不可路由的消息“的日志,而不是走交换机的退回重发打印”被交换机退回“的日志,这证明备份交换机的优先级高于mandatory参数
关于消息幂等性的告知 1、 概念;
幂等性是指用户对同一操作发起的一次或多次请求的结果都是一致的,不会因为多次点击而产生副作用。举个例子,例如用户购买商品后会进行支付,支付时扣费成功了,在返回支付成功的结果的时候网络异常了,本来应该显示已付款的,但现在显示了未付款,用户再次支付后第二次扣款成功了,返回支付成功,但这时查看流水记录会发现有两条,用户买同一件商品花费了两份的钱,这是不允许的,无论是交易系统自身的bug还是交易系统的网络问题导致重复发送,必须只能扣用户一次钱,多次发送付款请求,扣费还只是扣一次,这就是幂等性。
2、 RabbitMQ幂等性问题;
RabbitMQ把消息发送给消费者消费,消费者消费成功并返回ack消息,但这时候网络中断了,RabbitMQ没有收到ack消息,这就让RabbitMQ误以为消息消费失败了,然后RabbitMQ会重新把该条消息发送给其他的消费者,或者等网络重连后再次发送给该候消费者,这时候就会造成重复消费的问题。
3、 解决思路;
MQ消费者的幂等性的解决一般使用全局ID或者写个唯一标识比如时间戳或者UUID或者订单消费者消费MQ中的消息也可利用MQ的该id来判断,或者可按自己的规则生成一个全局唯一id,每次消费消息时用该id先判断该消息是否已消费过。
4、 消费端幂等性保障思路;
在海量订单生成的业务高峰期,生产端有可能就会重复发送了多条消息,这时候消费端就要实现幂等性,这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。业界主流的幂等性有两种操作:
(1)唯一ID+指纹码机制,利用数据库主键去重
指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个id是否存在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就但是在高并发时,如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但分库分表也不是我们最推荐的方式,最佳方式是利用redis的原子性去实现
(2)Redis的原子性
利用Redis执行setnx命令,天然具有幂等性,从而实现不重复消费。
优先队列 前言:在购物系统中有一个订单催付的场景,如果客户在购物系统下单后在设定的时间内未付款那么就会给客户推送一条短信提醒,这是一个比较简单的功能,但是,商家对我们来说,肯定是要区分大客户和小客户的,比如像苹果、华为、小米这样的大商家一年能给我们创造很大的利润,在业务高峰时期,订单堆积,来不及处理,而为了创造最大的利润,他们的订单必须得到优先处理,而曾经的后端系统是使用redis来存放短信提醒的,并通过定时轮询实现短信发送,但大家都知道redis只能用List做一个简简单单的消息队列,并不能实现一个优先级的场景,所以后来需要采用RabbitMQ对系统进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级,否则就是默认优先级,而实现优先级就用到了RabbitMQ的优先级队列。
1、 在config包里新建一个名为PriorityQueueConfig的类用于编写配置交换机、队列、routingkey的代码;
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 package com.ken.springbootrqbbitmq.config;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration public class PriorityQueueConfig { public static final String EXCHANGE_NAME = "priority_exchange" ; public static final String QUEUE_NAME = "priority_queue" ; public static final String ROUTING_KEY = "priority" ; @Bean("directExchange") public DirectExchange priorityExchange () { return new DirectExchange (EXCHANGE_NAME); } @Bean("priorityQueue") public Queue priorityQueue () { return QueueBuilder.durable().withArgument("x-max-priority" ,10 ).build(); } @Bean public Binding warningQueueBindingBackupExchange (@Qualifier("priorityQueue") Queue priorityQueue, @Qualifier("directExchange") DirectExchange directExchange) { return BindingBuilder.bind(priorityQueue).to(directExchange).with(ROUTING_KEY); } }
2、 在controller包里新建一个名为SendPriorityMsgController的类用于编写充当生产者发送消息的代码;
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 package com.ken.springbootrqbbitmq.controller;import com.ken.springbootrqbbitmq.config.PriorityQueueConfig;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@Slf4j @RestController @RequestMapping("/priority") public class SendPriorityMsgController { @Autowired(required = false) private RabbitTemplate rabbitTemplate; @GetMapping("/sendPriorityMessage/{message}") public void sendMessage (@PathVariable String message) { for (int i = 1 ; i <= 10 ; i++) { String msg = message + i; if (i == 5 ) { rabbitTemplate.convertAndSend(PriorityQueueConfig.EXCHANGE_NAME, PriorityQueueConfig.ROUTING_KEY,msg,correlationData -> { correlationData.getMessageProperties().setPriority(5 ); return correlationData; }); }else { rabbitTemplate.convertAndSend(PriorityQueueConfig.EXCHANGE_NAME, PriorityQueueConfig.ROUTING_KEY,msg); } log.info("发送消息内容:{}" ,msg); } } }
3、 在consumer包里新建一个名为PriorityQueueConsumer的类用于编写充当消费者消费消息的代码;
代码如下:``
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package com.ken.springbootrqbbitmq.consumer;import com.ken.springbootrqbbitmq.config.PriorityQueueConfig;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Slf4j @Component public class PriorityQueueConsumer { @RabbitListener(queues = PriorityQueueConfig.QUEUE_NAME) public void receivePriorityMsg (Message message) { String msg = new String (message.getBody()); log.info("接收到的消息为:{}" ,msg); } }
4、 先注释消费者的代码,然后启动项目,在浏览器地址栏调用发送消息的接口;
http://localhost:8080/priority/sendPriorityMessage/我是消息
生产者发送消息后,没有消费者消费消息,消息就会堆积在队列中,可以用于模拟在业务高峰时期,订单堆积,来不及处理的场景。
效果图:
5、 去掉消费者代码里的注释,然后重新启动项目,可以得见消息被消费者消费了,且第5条消息由于优先级是5,在所有的消息里优先级最高,被优先消费了,这证明我们的优先队列成功实现了;
效果图:
惰性队列 1、 概念:RabbitMQ从3.6.0版本开始引入了惰性队列的概念惰性队列会尽可能的将消息存入磁盘中(持久化队列若想持久化消息还需要看消息设置了持久化没),而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储;
2、 使用场景:当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了;
3、 惰性队列的两种模式;
队列具备两种模式: default和lazy。默认的为default模式,在3.6.0之前的版本无需做任何变更,lazy模式即为惰性队列的模式,可以通过调用channel.queueDeclare方法的时候在参数中设置,也可以通过Policy的方式设置,如果一个队列同时使用这两种方式设置的话,那么Policy的方式具备更高的优先级。如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。在队列声明的时候可以通过”x-queue-mode”参数来设置队列的模式,取值为”default”和”lazy”。
下面示例中演示了一个惰性队列的声明细节:
1 2 3 Map<String,Object> args = new HashMap <String, Object>(); args.put("x-queue-mode" , "lazy" ); channel.queueDeclare("myqueue" , false , false , false , args);
4、 内存开销对比; 在发送1百万条消息,每条消息大概占1KB的情况下,普通队列占用内存是1.2GB,而惰性队列仅仅占用1.5MB,但惰性队列消费消息的速度比较慢,因为惰性队列需要先从磁盘读取消息到内存,然后才被消费,所以速度会慢点。
高可用 集群搭建 前言:当RabbitMQ服务器遇到内存崩溃、机器掉电或者主板故障等情况,该怎么办?单台RabbitMQ服务器可以满足每秒1000条消息的吞吐量,那如果应用需要RabbitMQ服务满足每秒10万条消息的吞吐量呢?购买昂贵的服务器来增强单机RabbitMQ服务的性能不太现实,再强的服务器也会有宕机的风险,所以搭建一个RabbitMQ集群才是解决实际问题的关键。
集群示意图
前期准备:
(1)准备3台服务器或者虚拟机
(2)3台服务器或者虚拟机都安装了RabbitMQ
安装步骤可以看我这篇文章:RabbitMQ系列(2)–Linux安装RabbitMQ_Ken_1115的博客-CSDN博客
我这里准备了3台虚拟机,ip分别为192.168.194.128、192.168.194.129、192.168.194.130
1、 分别在3台服务器输入以下命令,然后分别更改3台服务器的主机名称;
2、 更改主机名称后输入以下命令重启,使变更生效;
3、 配置每个节点的hosts文件,让各个节点间能互相访问对方;
使用以下命令编辑每个节点的hosts文件,并给各个节点的ip映射成对应的节点名称,然后保存并退出hosts文件
例:
node1
node2
node3
4、 因为搭建RabbitMQ集群要求erlang的cookie必须一模一样,所以得确保各个节点的cookie文件使用同一个值;
在node1节点执行以下2条命令,用以把node1节点的cookie复制到node2、node3节点上去,保证3个节点的cookie都一致
1 2 scp /var /lib/rabbitmq/.erlang.cookie root@node2 :/var /lib/rabbitmq/.erlang.cookie scp /var /lib/rabbitmq/.erlang.cookie root@node3 :/var /lib/rabbitmq/.erlang.cookie
效果图:
5、 分别重启3个节点上的RabbitMQ服务和Erlang虚拟机;
在3台节点上分别执行以下命令重启
1 rabbitmq-server -detached
效果图:
6、 以node1节点为集群,分别将node2和node3节点加入集群;
(1)将node2节点加入集群
[1]把RabbitMQ服务关闭
1 2 #注:rabbitmqctl stop命令会将Erlang虚拟机关闭,而rabbitmqctl stop_app命令只会关闭RabbitMQ服务 rabbitmqctl stop_app
效果图:
[2]重置RabbitMQ服务
效果图:
[3]将当前节点加入到node1节点当中
1 rabbitmqctl join_cluster rabbit@node1
效果图:
[4]启动RabbitMQ服务
1 2 #注:这个命令只启动应用服务 rabbitmqctl start_app
效果图:
(2)将node3节点加入集群
[1]把RabbitMQ服务关闭
1 2 #注:rabbitmqctl stop命令会将Erlang虚拟机关闭,而rabbitmqctl stop_app命令只会关闭RabbitMQ服务 rabbitmqctl stop_app
效果图:
[2]重置RabbitMQ服务
效果图:
[3]将当前节点加入到node2节点当中(因为node2已经加入到了node1,node2处在集群中,所以把node3加入node2也能把node3加入到集群中)
1 rabbitmqctl join_cluster rabbit@node2
效果图:
[4]启动RabbitMQ服务
1 2 #注:这个命令只启动应用服务 rabbitmqctl start_app
效果图:
7、 查看集群的状态;
1 rabbitmqctl cluster_status
8、 为集群创建一个账号;
1 rabbitmqctl add_user admin 123456
效果图:
9、 为账号设置角色;
1 rabbitmqctl set_user_tags admin administrator
效果图:
10、 设置用户权限;
1 rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
效果图:
11、 登录其中一个节点;
从图中我们可以看到有3个节点,这证明node1、node2、node3是一个集群,即搭建RabbitMQ集群成功
12、 查看节点信息(由于我们在第6步里重置了RabbitMQ,所以所有的交换机、队列除了默认的之外,其他的都被清掉了);
其他:
1、 解除集群节点;
[1]把RabbitMQ服务关闭
1 2 #注:rabbitmqctl stop命令会将Erlang虚拟机关闭,而rabbitmqctl stop_app命令只会关闭RabbitMQ服务 rabbitmqctl stop_app
[2]重置RabbitMQ服务
[3]将当前节点加入到node1节点当中
1 rabbitmqctl join_cluster rabbit@node1
[4]启动RabbitMQ服务
1 2 #注:这个命令只启动应用服务 rabbitmqctl start_app
[5]脱离集群(在node1节点上执行)
1 2 #node2为需要脱离集群的节点 rabbitmqctl forget_cluster_node rabbit@node2
使用镜像队列 前言:如果RabbitMQ集群中只有一个Broker节点,那么该节点的失效将导致整体服务的临时性不可用,并且也可能会导致消息的丢失,虽然可以将所有消息都设置为持久化,并且对应队列的durable属性也设置为true,这样可以保证消息不丢失,但是这样仍然无法避免由于缓存导致的问题:因为消息在发送之后和被写入磁盘井执行刷盘动作之间存在一个短暂却会产生问题的时间窗。通过 publisherconfirm机制能够确保客户端知道哪些消息己经存入磁盘,尽管如此,一般不希望遇到因单点故障导致的服务不可用,而通过引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他Broker节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。
验证过程如下(没开启消息持久化,有兴趣的同学可以看看):
我这里准备了3台虚拟机来跑RabbitMQ服务分别为
node1:192.168.194.128
node2:192.168.194.129
node3:192.168.194.130
(1)执行以下代码,在node1节点里生成队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 package com.ken;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.MessageProperties;public class Producer { public static final String QUEUE_NAME = "my_queue" ; public static void main (String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory (); factory.setHost("192.168.194.128" ); factory.setUsername("admin" ); factory.setPassword("123456" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false ,false ,false ,null ); String message = "Hello World" ; channel.basicPublish("" ,QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes()); System.out.println("消息发送成功!" ); } }
效果图:
(2)进页面查看效果,可以得知my_queue这个队列只在node1节点创建了,没在其他节点同步创建
(3)在node1节点执行关闭RabbitMQ服务的命令来模拟节点宕机
效果图:
(4)用其他节点的可视化页面来查看集群信息,由图可知node1节点没在运行
(5)查看队列,可以看到my_queue队列的状态为停止
(6)执行以下代码,在node2节点里生成消费者,尝试消费消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 package com.ken;import com.rabbitmq.client.*;public class Consumer { public static final String QUEUE_NAME = "my_queue" ; public static void main (String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory (); factory.setHost("192.168.194.129" ); factory.setUsername("admin" ); factory.setPassword("123456" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); DeliverCallback deliverCallback = (consumerTag,message) -> { System.out.println(new String (message.getBody())); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("取消消费消息" ); }; channel.basicConsume(QUEUE_NAME,true ,deliverCallback,cancelCallback); } }
效果图:(报错信息提示node1节点上的my_queue队列已经关闭了)
(7)把node1节点上的RabbitMQ服务重新启动起来
效果图:
(8)查看消费者的日志,可以发现消费者并没有消费消息
(9)查看队列里消息的情况,可以看到消息丢失了(在未设置消息持久化的情况下)
搭建镜像队列 1、 启动node1、node2、node3三台集群节点;
2、 随便找一个节点添加policy(策略);
(1)进入node1节点的可视化界面
(2)进入添加策略的界面
(3)给策略取一个名字
(4)给策略加上匹配规则,通过正则表达式匹配队列,若交换机或者队列的名字满足以mirror开头这个条件,则那条队列使用该策略
例:
(5)为策略选择模式为ha-mode(ha-mode表示是备机模式)点击HA mode即可
(6)为ha-mode指定获取参数方式为exactly(exactly表示指定参数)
(7)点击HA params,就会往自定义参数里填入ha-params,这里用于指定策略作用的节点的数量
(8)为ha-params指定策略作用的节点的数量为2(包含被镜像的队列,镜像和被镜像的队列数总共为2)
(9)点击HA sync mode,就会往自定义参数里填入ha-sync-mode,这里用于指定同步的模式
(10)为ha-sync-mode指定同步模式为自动同步模式
(11)最后点击Add/update policy添加策略即可
(12)往上滑动查看策略添加情况
(13)执行代码在node1节点上创建名字以mirror开头的队列
代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 package com.ken;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.MessageProperties;public class Producer { public static final String QUEUE_NAME = "mirror_queue" ; public static void main (String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory (); factory.setHost("192.168.194.128" ); factory.setUsername("admin" ); factory.setPassword("123456" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false ,false ,false ,null ); String message = "Hello World" ; channel.basicPublish("" ,QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes()); System.out.println("消息发送成功!" ); } }
效果图:
(14)进入Queues查看队列的情况,可以发现刚刚创建的mirror_queue队列上有+1,这证明镜像队列创建成功
(15)进入mirror_queue队列查看详情,可以发现镜像队列在node2节点上
3、 测试镜像队列是否正常运行;
(1)关闭node1节点,模拟node1节点宕机
效果图:
从node2的可视化页面可以看到node1节点停机了
注意:
从Queues进入mirror_queue队列查看详情,可以发现当node1节点停掉后node2自动替代了node1节点的位置,node3作为镜像队列的节点,由此可见我们策略里写的ha-params:2这一参数是生效的,使得节点的个数总是保持2个,这样就算我们整个集群只剩下一台机器,在节点不断替代的情况下,消费者始终能消费队列里面的消息
(2)执行以下代码,在node2节点里生成消费者,尝试消费mirror_queue队列的消息,发现node2可以消费mirror_queue队列的消息并且消费成功,这证明mirror_queue队列成功镜像到node2节点上
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 package com.ken;import com.rabbitmq.client.*;public class Consumer { public static final String QUEUE_NAME = "mirror_queue" ; public static void main (String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory (); factory.setHost("192.168.194.129" ); factory.setUsername("admin" ); factory.setPassword("123456" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); DeliverCallback deliverCallback = (consumerTag,message) -> { System.out.println(new String (message.getBody())); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("取消消费消息" ); }; channel.basicConsume(QUEUE_NAME,true ,deliverCallback,cancelCallback); } }
效果图:
haproxy + keepalive 实现高可用 前言:我们以往只能连接一个指定的队列,不能自由地连接其他的队列,当我们连接的那个指定队列宕机了,生产者和消费者都没办法往队列发送消息和消费消息,而且生产者和消费者也不能自动的连接到其他正常运行的队列,因为生产者和消费者要连接的队列都写死在了代码里(如图下所示)所以我们需要借助其他工具来解决不能变更所连接队列的问题。
1、 HAProxy的概念;
HAProxy提供高可用性、负载均衡及基于TCP/HTTP应用的代理,支持虚拟主机,它是免费、快速并且可靠的一种解决方案,包括Twitter、Reddit、StackOverflow、GitHub在内的多家知名互联网公司在使用。HAProxy实现了一种事件驱动、单一进程模型,此模型支持非常大的井发连接数。
2、 使用HAProxy+keepalive实现负载均衡及高可用架构图;
平时客户端的请求走主机的Haproxy,然后主机的Haproxy转发请求到多台部署了RabbitMQ的服务器上,如果主机的Haproxy宕机了,keepalive会识别到并且把客户端的请求转移到备机上,然后备机再转发请求到多台部署了RabbitMQ的服务器上,这种方案可以通过Haproxy转发多台RabbitMQ来解决负载均衡和高并发问题,并且因为有备机,所以也具备高可用
异地跨机房的broker保障 前言:
(broker北京)、(broker深圳)彼此之间相距甚远,网络延迟是一个不得不面对的问题。有一个在北京的业务(Client北京)需要连接(broker北京),向其中的交换器exchangeA发送消息,此时的网络延迟很小,(Client北京)可以迅速将消息发送至exchangeA 中,就算在开启了publisherconfirm机制或者事务机制的情况下,也可以迅速收到确认信息。此时又有个在深圳的业务(Client深圳)需要向exchangeA发送消息,那么(Client 深圳)(broker北京)之间有很大的网络延迟,(Client 深圳)发送消息至exchangeA会经历一定的延迟,尤其是在开启了publisherconfirm机制或者事务机制的情况下,(Client深圳)会等待很长的延迟时间来接收(broker北京)的确认信息,进而必然造成这条发送线程的性能降低,甚至造成一定程度上的阻塞。将业务(Client深圳)部署到北京的机房可以解决这个问题,但是如果(Client深圳)调用的另些服务都部署在深圳,那么又会引发新的时延问题,总不见得将所有业务全部部署在一个机房,那么容灾又何以实现?这里使用Federation 插件就可以很好地解决这个问题.
1、 FederationExchange工作原理图;
2、 在每台机器上开启federation相关插件;
(1)安装rabbitmq_federation插件
1 rabbitmq-plugins enable rabbitmq_federation
node1效果图 :
node2效果图:
node3效果图:
(2)安装rabbitmq_federation_management插件
1 rabbitmq-plugins enable rabbitmq_federation_management
node1效果图:
node2效果图:
node3效果图:
(3)进入安装了federation插件的节点的可视化页面,进入Admin可以看到多出了两个与federation相关的管理菜单
3、 在node2节点上创建fed_exchange交换机和node2_queue队列;
(1)编写以下代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package com.ken;import com.rabbitmq.client.*;public class Producer { public static final String FED_EXCHANGE = "fed_exchange" ; public static void main (String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory (); factory.setHost("192.168.194.128" ); factory.setUsername("admin" ); factory.setPassword("123456" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(FED_EXCHANGE, BuiltinExchangeType.DIRECT); channel.queueDeclare("node2_queue" ,true ,false ,false ,null ); channel.queueBind("node2_queue" ,FED_EXCHANGE,"routeKey" ); } }
(2)启动程序:
效果图:
交换机:
交换机和队列的绑定情况:
队列:
4、 在downstream(node2)上配置upstream(node1);
(1)进入配置页面
(2)给上游起名称
(3)设置上游链接
(4)点击添加上游即可
效果图:
5、 添加策略;
(1)进入添加策略的页面
(2)给策略取一个名字,我这里取exchange-policy
(3)给策略加上匹配规则,通过正则表达式匹配队列,若交换机或者队列的名字满足以fed开头后面任意这个条件,则那条队列使用该策略
例:
(4)选择交换机为策略的应用
(5)选择上游策略federation-upstream,点击Federation upstream即可
(6)填写上游的名称为node1-as-upstream(在步骤4(2)里我们填写的上游名称就是node1-as-upstream)
(7)添加策略即可
效果图:
(8)进入Admin里的Federation Status查看联邦状态,如果是running证明运行成功
Shovel 前言:
Federation具备的数据转发功能类似,Shovel能够可靠、持续地从一个Broker中的队列(作为源端,即source)拉取数据并转发至另一个Broker中的交换器(作为目的端,即destination)。作为源端的队列和作为目的端的交换器可以同时位于同一个Broker,也可以位于不同的Broker 上。Shovel可以翻译为”铲子”,是一种比较形象的比喻,这个”铲子”可以将消息从一方”铲子”另一方。Shovel行为就像优秀的客户端应用程序能够负责连接源和目的地、负责消息的读写及负责连接失败问题的处理。
1、 Shovel工作原理图;
分别往Q1和Q2各自发送一条消息,Q1的消息也会通过Shovel发送给Q2,所以Q1收到1条消息,Q2收到2条消息
2、 搭建shovel;
(1)分别在3个RabbitMQ节点上安装shovel插件
1 rabbitmq-plugins enable rabbitmq_shovel
效果图:
node1:
node2:
node3:
(2)分别在3个RabbitMQ节点上安装shovel管理插件
1 rabbitmq-plugins enable rabbitmq_shovel_managemen
效果图:
node1:
node2:
node3:
(3)进入Admin页面,可以发现多出了2个与shovel有关的菜单
(4)新增shovel
(5)填写shovel的相关信息
(6)新增shovel
效果图:
(7)点击Shovel Status菜单查看Shovel的运行状态,若状态显示running,则证明Shovel正在正常运行,若这时候往Q1队列发送消息,消息会自动同步到Q2