消息队列和其作用

消息队列是一种业务处理方案,目的是减轻同步业务的压力,缓解突发流量和提高系统可利用率的一种常见处理方式。消息队列中间件是该方案的具体实现和具体产品。

作用: (解耦、消峰、异步)

  • 抵御洪峰流量,(吞吐量)达到保护主业务的目的,消息都给中间件(消峰)
  • 消息给中间件后,等通知,不用一直等着(异步)
  • 消息给了中间件,与消息多少没关系,也就是新模块进来,代码改动最小(解耦

目前比较常见和可靠的MQ的产品具体有:kafka、RabbitMQ、RocketMQ、ActiveMQ

基于产品要求的特点,上面的产品必须要解决以下问题:

技术维度 专业名词
消息中间件 有api发送和接收
不能宕机 高可用性
需要多而不是单机版 集群和容错配置
不能断不能丢 持久化
取消撤回 延时发送/定时投递
有无收到 签收机制

如果没有引入MQ,生产者和消费者互相调用,在大型分布式应用中,系统间的RPC交互繁杂,即每增加一个消费者,生产者都要修改(系统之间的接口耦合比较严重);等待同步消息性存在问题(RPC接口基本上是同步调用,类似“木桶理论”);面对消息多容易冲垮

  • Kafka:最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
  • RabbitMQ:实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
  • RocketMQ:RocketMQ是一款分布式消息中间件,最初是由阿里巴巴消息中间件团队研发并大规模应用于生产系统,满足线上海量消息堆积的需求, 在2016年底捐赠给Apache开源基金会成为孵化项目,经过不到一年时间正式成为了Apache顶级项目;早期阿里曾经基于ActiveMQ研发消息系统, 随着业务消息的规模增大,瓶颈逐渐显现,后来也考虑过Kafka,但因为在低延迟和高可靠性方面没有选择,最后才自主研发了RocketMQ, 各方面的性能都比目前已有的消息队列要好,RocketMQ和Kafka在概念和原理上都非常相似,所以也经常被拿来对比;RocketMQ默认采用长轮询的拉模式, 单机支持千万级别的消息堆积,可以非常好的应用在海量消息系统中。
  • ActiveMQ:Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件;由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。

下面分别介绍几款mq产品,分别描述各自的优缺点和涉及简单原理、以及springboot的简单集成;

ActiveMQ

Apache ActiveMQ是Apache软件基金会的一个开源项目,是一个基于消息的通信中间件。ActiveMQ是JMS的一个具体实现,支持JMS的两种消息模型。ActiveMQ使用AMQP协议集成多平台应用,使用STOMP协议通过websockets在Web应用程序之间交换消息,使用MQTT协议管理物联网设备。(参考ActiveMQ官网)

概念理解

涉及的几个概念:

点到点、发布/订阅

点对点消息传递域的特点如下:

1、每个消息只能有一个消费者。不是每一个队列只能有一个消费者
2、消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,它都可以提取消息。类似于发短信。

发布/订阅消息传递域的特点如下:

1、每个消息可以有多个消费者,可以想象成关注微博中的一个话题。的确和微博比较像,哈哈
2、生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅之后发布的消息,订阅之前的消息是收不到的。JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。

Broker

一个java实现的mq实例应用; 队列和订阅都是发布到broker上的。 B和C也是与此通信实现mq的所有作用;

Quene

队列,表面消息要投递到的地方。 是一个虚拟的概念,用于消息的分裂投递;

ACK

应答,消息的可靠性保证,一般需要回复broker消息被消费的 确定性;

集成和代码实现

环境准备

  • 启动的ActiveMQ服务
  • JDK1.8+
  • IDEA或Eclipse
  • Maven环境
  • SpringBoot和ActiveMQ整合的依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<!--activemq启动器-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--boot启动器-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>2.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--测试启动器-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>

环境启动配置和代码

  • application.yml配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
server:
port: 7777
spring:
activemq:
你的activemq连接地址
broker-url: tcp://192.168.64.129:61616
账号
user: admin
密码
password: admin
jms:
指定连接的是队列(Queue)还是主题(Topic),false代表队列,true代表主题
pub-sub-domain: false

queue:
name: boot-queue-test

topic:
name: boot-topic-test

ActiveMQ配置类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.huazai.activemq.springboot.config;

import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.jms.Queue;

/**
* @author DDKK.COM 弟弟快看,程序员编程资料站
* @date 2022/1/16 19:13
*/
@Configuration
public class ActiveConfig {


@Value("${queue.name}")
private String queueName;

@Value("${topic.name}")
private String topicName;

@Bean
public Queue activeQueue() {


return new ActiveMQQueue(queueName);
}

@Bean
public Topic activeTopic() {


return new ActiveMQTopic(topicName);
}
}
  • 启动类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.huazai.activemq.springboot;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;

/**
* @author DDKK.COM 弟弟快看,程序员编程资料站
* @date 2022/1/16 18:51
*/
@SpringBootApplication
// 开启JMS服务
@EnableJms
public class BootQueueProviderMain {


public static void main(String[] args) {


SpringApplication.run(BootQueueProviderMain.class, args);
}
}

队列生产者代码

  • 消息生产者服务:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.huazai.activemq.springboot.queue;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

import javax.jms.JMSException;
import javax.jms.Queue;
import java.util.UUID;

@Service
public class QueueProviderService {


/**
* 相当于 {@link JmsTemplate}
*/
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;

@Autowired
private Queue queue;

/**
* 生产消息
*
* @throws JMSException
*/
@JmsListener(destination = "${queue.name}")
public void productMessage() throws JMSException {


// 生产者生产并发送消息,此方法是send方法的加强版
jmsMessagingTemplate.convertAndSend(queue, "消费者发送消息:" + UUID.randomUUID());
}
}
  • 队列消息发送测试类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import com.huazai.activemq.springboot.BootQueueProviderMain;
import com.huazai.activemq.springboot.queue.QueueProviderService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import javax.jms.JMSException;

/**
* @author DDKK.COM 弟弟快看,程序员编程资料站
* @date 2022/1/16 19:25
*/
@SpringBootTest(classes = BootQueueProviderMain.class)
@RunWith(SpringJUnit4ClassRunner.class)
public class QueueProviderTest {


@Autowired
private QueueProviderService queueProviderService;

@Test
public void testSend() throws JMSException {


queueProviderService.productMessage();
}
}

启动测试类,将消息发送到名称为boot-queue-test队列,结果如下:

队列消费者代码

  • 消息监听消费服务:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.huazai.activemq.springboot.queue;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;

import javax.jms.JMSException;
import javax.jms.TextMessage;

@Service
public class QueueConsumerService {


/**
* 监听接收的方法,监听的目的地名称为${queue.name}配置
*/
@JmsListener(destination = "${queue.name}")
public void receive(TextMessage textMessage) throws JMSException {


String text = textMessage.getText();
System.out.println("***消费者收到的消息: " + text);
}
}

启动消费者服务,接受到了之前生产者生产的消息,测试结果如下:

主题代码

  • 主题订阅者服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.huazai.activemq.springboot.topic;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import javax.jms.Topic;
import java.util.UUID;

/**
* @author DDKK.COM 弟弟快看,程序员编程资料站
* @date 2022/1/17 15:14
*/
@Service
public class TopicProviderService {


@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;

@Autowired
private Topic topic;

/**
* 每隔3秒定时发布主题消息
*/
@Scheduled(fixedDelay = 3000)
public void productTopic() {


jmsMessagingTemplate.convertAndSend(topic, "发布者发布主题消息:" + UUID.randomUUID());
}
}
  • 主题发布者服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.huazai.activemq.springboot.topic;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;

import javax.jms.JMSException;
import javax.jms.TextMessage;

/**
* @author DDKK.COM 弟弟快看,程序员编程资料站
* @date 2022/1/17 15:15
*/
@Service
public class TopicConsumerService {



/**
* 开启监听器监听主题消息
*
* @param textMessage
* @throws JMSException
*/
@JmsListener(destination = "${topic.name}")
public void receive(TextMessage textMessage) throws JMSException {


String text = textMessage.getText();
System.out.println("订阅者订阅到的消息:" + text);
}
}

先启动主题订阅者,再启动主题发布者,主题订阅者会间隔3秒接收到主题发布者的消息,结果如下:

高可用

持久化方案

为了避免服务器意外宕机后导致消息队列数据丢失,需要引入持久化机制保证服务器重启后恢复消息队列数据使ActiveMQ达到高可用性。

ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的。

就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等。再试图将消息发给接收者,成功则将消息从存储中删除,失败则继续尝试尝试发送。

消息中心启动以后,要先检查指定的存储位置是否有未成功发送的消息,如果有,则会先把存储位置中的消息发出去。

持久化方式

  • AMQ(了解)
    AMQ 消息存储是一种基于文件存储形式,它具有写入速度快和容易恢复的特点。消息存储在一个个文件中文件的默认大小为32M,当一个文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本
  • KahaDB(重点)
  • KahaDB消息存储是基于日志文件的存储方式,它是5.4版本之后默认存储方式。

在ActiveMQ安装目录的conf/activemq.xml文件配置了ActiveMQ的默认持久化方式。

其中,directory属性值配置了KahaDB持久化方式日志所在目录,即data/kahadb。官方参考地址:http://activemq.apache.org/kahadb

  • KahaDB可用于任何场景,提高了性能和恢复能力。消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址。
    事务日志用于保存持久化数据,相当于新华字典内容;索引文件作为索引指向事务日志,相当于新华字典目录。
  • KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模型进行了优化。

数据被追加到data logs中。当不再需要log文件中的数据的时候,log文件会被丢弃。

  • db-number.log(事务日志)
    KahaDB存储消息到预定大小的数据纪录文件中,文件名为db-number.log。当数据文件已满时,一个新的文件会随之创建,number数值也会随之递增,它随着消息数量的增多,如每32M一个文件,文件名按照数字进行编号,如db-1.log,db-2.log······。当不再有引用到数据文件中的任何消息时,文件会被删除或者归档。
  • db.data(索引文件)
    该文件包含了持久化的BTree索引,索引了消息数据记录中的消息,它是消息的索引文件,本质上是B-Tree(B树),使用B-Tree作为索引指向db-number.log里面存储消息。
  • db.free
    记录当前db.data文件里面哪些页面是空闲的,文件具体内容是所有空闲页的ID,方便下次记录数据时,从空闲的页面开始建立索引,保证索引的连续型且没有碎片。
  • db.redo
    用来进行消息恢复,如果KahaDB消息存储再强制退出后启动,用于恢复BTree索引。
  • lock
    文件锁,表示当前kahadb独写权限的broker。
  • LevelDB(了解)
    LevelDB文件系统是从ActiveMQ5.8之后引进的,它和KahaDB很相似,也是基于文件的本地数据库存储形式,但是它提供比KahaDB更快的持久性,但它不再使用自定义B-Tree实现来索引预写日志,而是使用基于LevelDB的索引。相对于KahaDB它具有如下更好的优点:
  • 快速更新(无需进行随机磁盘更新)
  • 并发读取
  • 使用硬链接快速索引快照

为什么LevelDB比KahaDB有着更好的性能,为什么现在使用KahaDB使用广泛呢?按照官网的说法是:LevelDB存储已被弃用,不再支持或推荐使用,Replicated LevelDB(可复制的LevelDB)有望取代LevelDB。推荐的存储方式是KahaDB。参考官网地址:http://activemq.apache.org/leveldb-store

如何配置LevelDB?在ActiveMQ安装目录的conf/activemq.xml找到persistenceAdapter地方将原来默认的kahaDB进行如下修改:

1
2
3
4
<persistenceAdapter>
<!--<kahaDB directory="${activemq.data}/kahadb"/> -->
<levelDB directory="${activemq.data}/leveldb"/>
</persistenceAdapter>
  • JDBC(重点)
    JDBC持久化方式顾名思义就是将数据持久化到数据库中(如mysql),实现步骤如下:

1、 添加mysql驱动到ActiveMQ安装目录的lib文件夹中;
2、 配置ActiveMQ.xml;
在ActiveMQ安装目录的conf/activemq.xml中找到persistenceAdapter标签替换有以下内容:

1
2
3
4
5
<persistenceAdapter>
<!--<kahaDB directory="${activemq.data}/kahadb"/> -->
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true" />
</persistenceAdapter>
dataSource属性值指定将要引用的持久化数据库的bean名称(后面会配置一个名为mysql-ds的bean);createTablesOnStartup属性值是否在启动的时候创建数据库表,默认是true,这样每次启动都会去创建表了,一般是第一次启动的时候设置为true,然后再去改成false

1、 数据库连接池配置;
配置连接池之前先在数据库中建立一个数据库,比如我建立数据库名称为:activemq_test

1
2
3
4
5
6
7
8
9
10
11
12
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<!-- 数据库驱动名称,此处是mysql驱动名称-->
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<!-- 连接数据库url,ip换成自己数据库所在的机器ip,数据库名为新建立数据库的名称-->
<property name="url" value="jdbc:mysql://your ip:3306/your db's name?relaxAutoCommit=true&serverTimezone=GMT"/>
<!-- 连接数据库用户名-->
<property name="username" value="your username"/>
<!-- 连接数据库密码-->
<property name="password" value="your password"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
ActiveMQ默认使用DBCP连接池,并且自带了DBCP连接池相关jar包,如果想要换成C3P0等连接池,需要自行引入相关jar包。其中bean的id属性值一定要和上面的dataSource属性值一样。

1、 启动ActiveMQ;
运行命令./activemq start启动服务,不出意外的话,启动成功后,将会在配置的数据库中生成相应的三张表。

表创建后记得修改activemq.xml的jdbcPersistenceAdapter标签的createTablesOnStartup属性值改为false。

ACTIVEMQ_MSGS:消息表,Queue和Topic都存在里面。

列名 类型 字段描述
ID bigint(20) 主键
CONTAINER varchar(250) 消息的Destination
MSGID_PROD varchar(250) 消息发送者的主键
MSGID_SEQ bigint(20) 发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID
EXPIRATION bigint(20) 消息的过期时间,存储的是从1970-01-01到现在的毫秒数,0表示用不过期
MSG blob(0) 消息本体的Java序列化对象的二进制数据
PRIORITY bigint(20) 优先级,从0-9,数值越大优先级越高,默认0
XID varchar(250) -

ACTIVEMQ_ACKS:订阅关系表,如果是持久化Topic,订阅者和服务器的订阅关系保存在这个表。

列名 类型 字段描述
CONTAINER varchar(250) 消息的Destination
SUB_DEST varchar(250) 如果使用的是Static集群(静态集群),将保存集群其他系统的信息
CLIENT_ID varchar(250) 每个订阅者都必须有一个唯一的客户端ID用以区分
SUB_NAME varchar(250) 订阅者名称
SELECTOR varchar(250) 选择器,可以选择只消费满足条件的消息,条件可以自定义属性实现,可支持多属性AND和OR操作
LAST_ACKED_ID bigint(20) 记录消费过消息的ID
PRIORITY bigint(20) 优先级,从0-9,数值越大优先级越高,默认5
XID varchar(250) -

ACTIVEMQ_LOCK:在集群环境下才有用,只有一个Broker可以获取消息,称为Master Broker,其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。这个表用于记录哪个Broker是当前的Master Broker。

列名 类型 字段描述
ID bigint(20) 主键
TIME bigint(20) timestamp
BROKER_NAME varchar(250) 当前的 master broker名称

(意外启动失败则看这里)ActiveMQ启动日志记录在安装目录的data/activemq.log日志文件中,启动失败的常见原因拒绝连接数据库,因为mysql默认没有开启连接远程连接的权限,比如我启动失败的错误信息如下(启动错误请根据自身情况处理):

解决方案:开启允许远程连接mysql服务,执行脚本如下:

1
2
3
4
5
use mysql;
select host, user, authentication_string, plugin from user;
update user set host='%' where user='root';
flush privileges;
此方式是开放所有IP的root访问权限,如果是正式环境还是建议用 开放指定IP的方式。

1、 java代码连接测试queue;
代码参考:4.Java编码实现ActiveMQ通讯(Queue)
先启动队列生产者,由于ActiveMQ默认是开启持久化方式,启动成功后,表ACTIVEMQ_MSGS表数据如下:

再启动队列消费者,消息消费成功后,数据表的数据被清空了,结论如下:

  • 当DeliveryMode设置为NON_PERSISTENCE时,消息被保存在内存中。
  • 当DeliveryMode设置为PERSISTENCE时,消息保存在broker的相应的文件或者数据库中。
  • 点对点类型中消息一旦被Consumer消费,就从数据中删除。
  • 消费前的队列消息,会被存放到数据库。
    如果开启非持久化方式,则消息不会持久化到数据表。 2、 java代码连接测试topic;
    代码参考:5.Java编码实现ActiveMQ通讯(Topic)
    先启动topic订阅者,再启动主题发布者,数据表数据如下:
    一定先启动订阅者,表示该topic存在订阅者,否则发布的topic是不会持久化到数据库中,换句话说不存在订阅者的topic就是废消息,没必要持久化到JDBC中。 结论如下:
    topic与queue不同,被消费的消息不会在数据表中删除。 3、 jdbc持久化方式+ActiveMQJournal(日志);
    jdbc持久化方式将消息持久化到数据库中虽好,但是JDBC每次消息过来,都需要去写库读库。引入
    ActiveMQ Journal,使每次消息过来之后在ActiveMQ和JDBC之间加一层高速缓存,使用高速缓存写入技术,大大提高了性能。

如:当有消息过来后,消息不会立马持久化到数据库中,而是先保存到缓存中,被消费的消息也是先从缓存中读取,经过了指定的时间之后,才把缓存中的数据持久化到数据库中。如果是queue,则只持久化未被消费的消息。

用法:在activemq.xml文件中,将persistenceFactory替换掉persistenceAdapter内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<!-- <persistenceAdapter> -->
<!--<kahaDB directory="${activemq.data}/kahadb"/> -->
<!--<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true" /> -->
<!-- </persistenceAdapter> -->

<persistenceFactory>
<journalPersistenceAdapterFactory
journalLogFiles="5"
journalLogFileSize="32768"
useJournal="true"
useQuickJournal="true"
dataSource="#mysql-ds"
dataDirectory="../activemq-data" />
</persistenceFactory>

dataSource属性值指定将要引用的持久化数据库的bean名称,dataDirectory属性指定了Journal相关的日志保存位置。成功启动后则会发现多出一个activemq-data文件夹。
配置成功后,重启ActiveMQ服务。启动队列生产生产消息,消息不会立即同步到数据库中,如果过了一段时间后队列的消息还没被消费,则会自动持久化到数据库中。

  • 从最初的AMQ Message Store方案到ActiveMQ V4版本退出的High Performance Journal(高性能事务支持)附件,并且同步推出了关于关系型数据库的存储方案。ActiveMQ5.3版本又推出了对KahaDB的支持(5.4版本后被作为默认的持久化方案),后来ActiveMQ 5.8版本开始支持LevelDB,到现在5.9提供了标准的Zookeeper+LevelDB集群化方案。
  • ActiveMQ消息持久化机制有:
  • AMQ 基于日志文件
  • KahaDB 基于日志文件,从ActiveMQ5.4开始默认使用
  • JDBC 基于第三方数据库
  • Replicated LevelDB Store 从5.9开始提供了LevelDB和Zookeeper的数据复制方法,用于Master-slave方式的首选数据复制方案。

集群部署

5.9前的版本,采用的是 主从的方式,基于静态地址做双向的数据备份。

后面的新版本加入zk实现了更强大的集群可靠性;

ActiveMQ集群有以下三种方式:

1、 基于shareFileSystem共享文件系统(KahaDB);
2、 基于JDBC;
3、 基于Zookeeper和LevelDB搭建的集群;

本章只重点讲解基于Zookeeper和LevelDB的集群方式

其他集群方式参考官网:http://activemq.apache.org/masterslave

基于zk的集群搭建

Zookeeper集群介绍

从ActiveMQ5.9开始,ActiveMQ的集群实现方式取消了传统的Masster-Slave方式,增加了基于Zookeeper+LevelDB的Master-Slave实现方式,从5.9版本后也是官网的推荐。

集群原理

使用Zookeeper集群注册所有的ActiveMQ Broker但只有其中一个Broker可以提供服务,它将被视为Master,其他的Broker处于待机状态被视为Slave。

如果Master因故障而不能提供服务,Zookeeper会从Slave中选举出一个Broker充当Master。Slave连接Master并同步他们的存储状态,Slave不接受client连接,所有的client都只会连接到Master。Master所有的存储操作都将被复制到连接至Maste的Slaves。

如果Master宕机了,则最新更新的Slave会变成Master。如果原本Master故障节点恢复,则会重新加入集群并连接新的Master进入Slave模式。

所有需要同步的消息操作都将等待存储状态被复制到其他法定节点的操作完成才能完成。 所以,如给你配置了replicas=3,name法定大小是(3/2)+1 = 2。Master将会存储更新然后等待(2-1)=1个Slave存储和更新完成,才汇报success。

在Zookeeper中,有一个node要作为观察者存在。当一个新的Master被选中,你需要至少保障一个法定node在线以能够找到拥有最新状态的node,这个node才可以成为新的Master。因此,推荐运行至少3个replica nodes以防止一个node失败后服务中断。

集群操作

集群环境准备:linux系统 + jdk + zookeeper + apache-activemq。
在linux操作系统上创建mq_cluster文件夹,以下所有下载安装的文件都放到这里。创建命令:mkdir /opt/mq_cluster

zookeeper安装和配置

详细配置启动看这篇文章,手把手教学
那么这里就默认zookeeper集群搭建完毕

ActiveMQ安装和配置

1、 ActiveMQ下载;
此处重点讲ActiveMQ集群配置,ActiveMQ下载和安装参考:ActiveMQ下载和安装(Linux版) 2、 将下载好的ActiveMQ解压3份并分别重命名为:mq_node01、mq_node02、mq_node03;
3、 修改访问控制台端口;
除了mq_node01使用默认的控制台访问端口(8161)不用修改外,mq_node02和mq_node03控制台访问端口分别改为8162和8163。
分别在mq_node02和mq_node03路径的conf/jetty.xml配置文件中找到bean id为jettyPort,并将port属性值分别改为8162和8163。将host改为0.0.0.0方便外部机器远程连接访问控制台,示例如下:
4、 修改brokerName;
mq_node01、mq_node02和mq_node03的activemq.xml的brokerName改成一样。示例如下:
5、 持久化配置;
在mq_node01、mq_node02、mq_node03目录下的conf/activemq.xm将默认的persistenceAdapter替换成以下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<!--
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
-->
<!-- mq_node01的bind属性值端口为63631,mq_node02的bind属性值端口为63632,mq_node03的bind属性值端口为63633 -->
<persistenceAdapter>
<replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:63631"
zkAddress="localhost:2181,localhost:2182,localhost:2183"
hostname="localhost"
zkPath="/activemq/leveldb-stores"
/>
</persistenceAdapter>

directory:持久化数据路径
replicas:当前主从模型中的节点数,根据实际配置
bind:主从实例间的通讯端口。
zkAddress:zookeeper应用的安装位置
hostname:ActiveMQ实例安装的实际linux主机名
zkPath:ActiveMQ的主从信息保存在zookeeper中的什么目录

1、 修改消息端口;
除了mq_node01使用默认端口(61616),同样修改activemq.xml,mq_node02和mq_node03端口分别改为61617和616168。示例如下:

测试集群

1、 编写mq_batch_start.sh批量开启activeMQ:;

1
2
3
4
5
6
7
8
9
10
#!/bin/sh

cd /opt/mq_cluster/mq_node01/bin
./activemq start

cd /opt/mq_cluster/mq_node02/bin
./activemq start

cd /opt/mq_cluster/mq_node03/bin
./activemq start

1、 编写mq_batch_stop.sh批量关闭activeMQ:;

1
2
3
4
5
6
7
8
9
10
#!/bin/sh

cd /opt/mq_cluster/mq_node01/bin
./activemq stop

cd /opt/mq_cluster/mq_node02/bin
./activemq stop

cd /opt/mq_cluster/mq_node03/bin
./activemq stop

分别给两个批处理文件赋予执行权限,命令:chomod 777 <文件名称>
3、 启动zk集群,若不知道zk集群,看上面推荐文章;
4、 执行./mq_batch_start.sh分别启动activeMQ服务,ps-ef|grepactivemq|grep-vgrep查看MQ启动情况;

5、 使用客户端连接其中一台zookeeper,./zkCli.sh-server127.0.0.1:2181;

6、 查看三台activeMQ是否成功注册到了zookeeper,ls/activemq/leveldb-stores;

7、 可以看到三台activeMQ节点分别为:00000000016,00000000017,00000000018;
分别查看这三个节点的主从状态,get /activemq/leveldb-stores/00000000016、get /activemq/leveldb-stores/00000000017、get /activemq/leveldb-stores/00000000018。
可以看到仅仅00000000018节点的elected属性值不为null,则表示00000000018为Master,其他两个节点为Slave。

集群可用性测试

Slave不接受client连接,client只与Master连接,所以客户端连接的Broker应该使用failover协议(失败转移)。

由以上连接测试可知,Master为00000000016节点,该节点访问消息端口为61616,访问控制台(client)端口为8161。所以通过浏览器打开控制台只能使用8161,同样的,使用lsof -i:<端口>命令查看ActiveMQ服务启动情况,只能查看到61616端口的服务在监听着。

java代码连通测试

集群测试消息生产者代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.huazai.activemq.cluster;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
* @author DDKK.COM 弟弟快看,程序员编程资料站
* @date 2020/12/23 23:21
*/
public class ClusterJMSProducer {


// 集群地址需要换成failover协议(失败转移)
public static final String ACTIVEMQ_URL = "failover:(tcp://192.168.64.129:61616,tcp://192.168.64.129:61617,tcp://192.168.64.129:61618)";
// 消息队列名称
public static final String QUEUE_NAME = "queue-cluster";

public static void main(String[] args) throws Exception {


// 1.创建给定ActiveMQ服务连接工厂,使用默认的用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2.通过连接工厂,创建连接对象并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 3.创建会话,第一个参数为是否开启事务,第二个参数为签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.创建目的地(队列或者主题)
Queue queue = session.createQueue(QUEUE_NAME);
// 可以用父接口Destination接受
// Destination queue = session.createQueue(QUEUE_NAME);
// 5.创建消息的生产者
MessageProducer producer = session.createProducer(queue);
// 6.通过消息生产者生产6条消息发送MQ队列
for (int i = 0; i < 3; i++) {


// 7.创建消息
TextMessage textMessage = session.createTextMessage("msg" + i + ":hello world");
// 8.将消息发送到MQ
producer.send(textMessage);
}
// 9.关闭资源
producer.close();
session.close();
connection.close();
System.out.println("finish");
}

}

集群测试消息消费者代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.huazai.activemq.cluster;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
* @author DDKK.COM 弟弟快看,程序员编程资料站
* @date 2020/12/24 22:09
*/
public class ClusterJMSConsumer {


// 集群地址需要换成failover协议(失败转移)
public static final String ACTIVEMQ_URL = "failover:(tcp://192.168.64.129:61616,tcp://192.168.64.129:61617,tcp://192.168.64.129:61618)";
// 消息队列名称,取消息必须和存消息的队列名称一致
public static final String QUEUE_NAME = "queue-cluster";

public static void main(String[] args) throws Exception {


// 1.创建给定ActiveMQ服务连接工厂,使用默认的用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2.通过连接工厂,创建连接对象并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 3.创建会话,第一个参数为是否开启事务,第二个参数为签收
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// 4.创建目的地(队列或者主题)
Queue queue = session.createQueue(QUEUE_NAME);
// 5.创建消费者
MessageConsumer consumer = session.createConsumer(queue);
while (true) {


// 接受消息根据生产者发送消息类型强类型转换
TextMessage message = (TextMessage) consumer.receive();
if (message != null) {


String text = message.getText();
System.out.println(text);
message.acknowledge();
// session.commit();
} else {


break;
}
}
consumer.close();
session.close();
connection.close();
}
}

先启动生产者,再启动消费者代码,控制台输出如下:

测试Master选举

手动将Master的broker服务关闭(演示宕机)后,是否会从其他两台Slave中选举出一个Master呢?

由此可知,当Master宕机后,查看00000000016节点已经不存在,通过选举00000000017为新的Master。

当旧的Master服务重新连接后,是继续为Master,还是为新的Master成为Slave?

上图可知,当旧的Master重新连接后,生成了一个新的节点,并为Slave,现Master仍然是通过选举的00000000017。

高级特性

ActiveMQ高级特性:

1、 异步投递;
2、 延迟投递和定时投递;
3、 分发策略;
4、 消息重试机制;
5、 死信队列;

异步投递

ActiveMQ支持同步,异步两种发送的模式将消息发送到broker,模式的选择对发送延时有巨大的影响。producer能达到怎么样的产出率(产出率=发送数据总量/时间)主要受发送延时的影响,使用异步发送可以显著提高发送的性能。

ActiveMQ默认使用异步发送的模式,除非明确指定使用同步发送的方式或者在未使用事务的前提下发送持久化的消息,这两种情况都是同步发送的。

如果你没有使用事务且发送的是持久化的消息,每一次发送都是默认同步发送的且会阻塞producer直到broker返回一个确认,表示消息已经被安全的持久化到磁盘。确认机制提供了消息安全的保障,但同时会阻塞客户端带来了很大的延时。

很多高性能的应用,允许在失败的情况下有少量的数据丢失。如果你的应用满足这个特点,你可以使用异步发送来提高生产率,即使发送的是持久化的消息。

异步发送它可以最大化producer端的发送效率。我们通常在发送消息量比较密集的情况下使用异步发送,它可以很大的提升producer性能。不过这也带来了额外的问题,就是需要消耗更多的Client端内存同时也会导致broker端性能消耗增加,此外它不能有效的确保消息的发送成功。在异步投递的情况下客户端需要容忍消息丢失的可能。

设置异步投递的三种方式:

1、 连接ActiveMQ的url添加jms.useAsyncSend=true参数;

1
cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");

1、 ActiveMQConnectionFactory方法设置;

1
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);

1、 ActiveMQConnection方法设置;

1
((ActiveMQConnection)connection).setUseAsyncSend(true);

参考官网:Async Sends

异步消息如何确定发送成功?
当设置异步投递后,使用producer.send(msg)持续发送消息,如果消息不阻塞,生产者会认为所有send的消息均被成功发送至MQ。如果MQ突然宕机,此时生产者端内存中尚未被发送至MQ的消息都会丢失。

答案是:正确的异步发送方法是需要接收回调的。同步发送等send方法不阻塞了就表示一定发送成功了;异步发送需要客户端回执并由客户端再判断一次是否发送成功。

案例演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package com.huazai.activemq.advanced;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.AsyncCallback;

import javax.jms.*;
import java.util.UUID;

/**
* ActiveMQ高级特性之异步投递
*
* @author DDKK.COM 弟弟快看,程序员编程资料站
*/
public class JmsProduceAsyncSend {


// ActiveMQ服务地址
public static final String ACTIVEMQ_URL = "tcp://192.168.64.129:61616";
// 消息队列名称
public static final String QUEUE_NAME = "queue-async";

public static void main(String[] args) throws Exception {


// 1.创建给定ActiveMQ服务连接工厂,使用默认的用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 开启异步投递
activeMQConnectionFactory.setUseAsyncSend(true);
// 2.通过连接工厂,创建连接对象并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 3.创建会话,第一个参数为是否开启事务,第二个参数为签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.创建目的地(队列或者主题)
Queue queue = session.createQueue(QUEUE_NAME);
// 可以用父接口Destination接受
// Destination queue = session.createQueue(QUEUE_NAME);
// 5.创建消息的生产者,一定要向上转型为ActiveMQMessageProducer类型才有异步投递功能
ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) session.createProducer(queue);
// 6.通过消息生产者生产6条消息发送MQ队列
for (int i = 0; i < 3; i++) {


// 7.创建消息
TextMessage textMessage = session.createTextMessage("异步投递消息" + i + ":hello world");
// 给消息设置一个唯一的id可以知道哪条消息发送成功,哪条消息发送失败
textMessage.setJMSMessageID(UUID.randomUUID().toString());
String msgId = textMessage.getJMSMessageID();
// 8.将消息发送到MQ,并回调判断消息是否发送成功
activeMQMessageProducer.send(textMessage, new AsyncCallback() {


/**
* 消息发送成功回调方法
*/
@Override
public void onSuccess() {


System.out.println("消息成功发送回调方法,成功消息标识:" + msgId);
}

/**
* 消息发送失败回调方法
*
* @param exception
*/
@Override
public void onException(JMSException exception) {


System.err.println("消息发送失败回调方法,失败消息标识:" + msgId);
}
});
}
// 9.关闭资源
activeMQMessageProducer.close();
session.close();
connection.close();
System.out.println("finish");
}
}

运行结果:

延迟投递和定时投递

延迟投递:延迟多长时间开始投递消息
定时投递:间隔多长时间开始投递消息

开启定时投递和间隔投递功能

activemq.xml文件中配置broker的schedulerSupport属性为true,并重启服务。

四大属性

Property Name Type Description
AMQ_SCHEDULED_DELAY long 延迟投递的时间
AMQ_SCHEDULED_PERIOD long 重复投递的时间间隔
AMQ_SCHEDULED_REPEAT int 重复投递次数
AMQ_SCHEDULED_CRON String Cron表达式,可以用这表达式配置以上三个属性值,类似Spring的Schedule的cron

案例演示

消费者代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.huazai.activemq.advanced;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.time.LocalDateTime;

/**
* @author DDKK.COM 弟弟快看,程序员编程资料站
* @date 2022/1/30 20:54
*/
public class JmsConsumerDelayAndScheduleRecieve {


// ActiveMQ服务地址
public static final String ACTIVEMQ_URL = "tcp://192.168.64.129:61616";
// 消息队列名称,取消息必须和存消息的队列名称一致
public static final String QUEUE_NAME = "queue-delay-schedule";

public static void main(String[] args) throws Exception {


// 1.创建给定ActiveMQ服务连接工厂,使用默认的用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2.通过连接工厂,创建连接对象并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 3.创建会话,第一个参数为是否开启事务,第二个参数为签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.创建目的地(队列或者主题)
Queue queue = session.createQueue(QUEUE_NAME);
// 5.创建消费者
MessageConsumer consumer = session.createConsumer(queue);
int i = 0;
while (true) {


// 接受消息根据生产者发送消息类型强类型转换
TextMessage message = (TextMessage) consumer.receive();
if (message != null) {


String text = message.getText();
System.out.print("第" + ++i + "次接受延迟消息:");
System.out.println(text);
int endSecond = LocalDateTime.now().getSecond();
} else {


break;
}
}
consumer.close();
session.close();
connection.close();
}
}

生产者代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package com.huazai.activemq.advanced;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.AsyncCallback;
import org.apache.activemq.ScheduledMessage;

import javax.jms.*;
import java.util.UUID;

/**
* @author DDKK.COM 弟弟快看,程序员编程资料站
* @date 2022/1/30 20:25
*/
public class JmsProduceDelayAndScheduleSend {


// ActiveMQ服务地址
public static final String ACTIVEMQ_URL = "tcp://192.168.64.129:61616";
// 消息队列名称
public static final String QUEUE_NAME = "queue-delay-schedule";
/**
* 延迟投递的时间
*/
private static final long DELAY = 3 * 1000;
/**
* 每次投递的时间间隔
*/
private static final long PERIOD = 4 * 1000;
/**
* 投递的次数
*/
private static final int REPEAT = 5;

public static void main(String[] args) throws Exception {


// 1.创建给定ActiveMQ服务连接工厂,使用默认的用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2.通过连接工厂,创建连接对象并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 3.创建会话,第一个参数为是否开启事务,第二个参数为签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.创建目的地(队列或者主题)
Queue queue = session.createQueue(QUEUE_NAME);
// 可以用父接口Destination接受
// Destination queue = session.createQueue(QUEUE_NAME);
// 5.创建消息的生产者,一向上转型为ActiveMQMessageProducer
ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) session.createProducer(queue);

// 6.通过消息生产者生产6条消息发送MQ队列
for (int i = 0; i < 3; i++) {


// 7.创建消息
TextMessage textMessage = session.createTextMessage("延迟投递和定时投递消息" + i + ":hello world\n");
// 给消息设置一个唯一的id可以知道哪条消息发送成功,哪条消息发送失败
textMessage.setJMSMessageID(UUID.randomUUID().toString());
String msgId = textMessage.getJMSMessageID();
// 延迟投递的时间
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, DELAY);
// 每次投递的时间间隔
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, PERIOD);
// 投递的次数
textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, REPEAT);
// 8.将消息发送到MQ,并回调判断消息是否发送成功
activeMQMessageProducer.send(textMessage);
}
// 9.关闭资源
activeMQMessageProducer.close();
session.close();
connection.close();
System.out.println("finish");
}
}

先启动消费者,再启动生产者,然后观察消费者控制台,结果如下:

消息重试机制

以下情况会引发消息重发:

activeMQ中的消息重发,指的是消息可以被broker重新分派给消费者,不一定的之前的消费者。重发消息之后,消费者可以重新消费。消息重发的情况有以下几种:

1、 Client用了transactions且在session中调用了rollback;
2、 Client用了transactions且在调用commit之前关闭或者没有commit;
3、 Client再CLIENT_ACKNOWLEDGE的传递模式下,session中调用了recover;
4、 Client连接超时(可能正在执行的代码花费的时间比配置的超时时间长);

1
消息默认重发时间间隔为1秒,默认重发次数为6。一个消息被redelivedred超过默认的最大重发次数(默认6次)时,消费端会给MQ发一个“poison ack”表示这个消息有毒,告诉broker不要再发了。这个时候broker会把这个消息放到DLQ(死信队列)。

案例演示(演示第2种没有提交事务引发的重试机制)

生产者代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.huazai.activemq.advanced;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;

import javax.jms.Connection;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

/**
* ActiveMQ高级特性之重试机制生产者
*
* @author DDKK.COM 弟弟快看,程序员编程资料站
* @date 2022/2/1 23:24
*/
public class JmsProduceRedeliveryPolicy {


// ActiveMQ服务地址
public static final String ACTIVEMQ_URL = "tcp://192.168.64.129:61616";
// 消息队列名称
public static final String QUEUE_NAME = "queue-redelivery-policy";

public static void main(String[] args) throws Exception {


// 1.创建给定ActiveMQ服务连接工厂,使用默认的用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2.通过连接工厂,创建连接对象并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 3.创建会话,第一个参数为是否开启事务,第二个参数为签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.创建目的地(队列或者主题)
Queue queue = session.createQueue(QUEUE_NAME);
// 可以用父接口Destination接受
// Destination queue = session.createQueue(QUEUE_NAME);
// 5.创建消息的生产者,一定要向上转型为ActiveMQMessageProducer类型才有异步投递功能
ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) session.createProducer(queue);
// 6.通过消息生产者生产消息发送MQ队列
// 7.创建消息
TextMessage textMessage = session.createTextMessage("投递消息:" + ":hello world");
// 8.将消息发送到MQ,并回调判断消息是否发送成功
activeMQMessageProducer.send(textMessage);
// 9.关闭资源
activeMQMessageProducer.close();
session.close();
connection.close();
System.out.println("finish");
}
}

消费者代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.huazai.activemq.advanced;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
* ActiveMQ高级特性之重试机制消费者
*
* @author DDKK.COM 弟弟快看,程序员编程资料站
* @date 2022/2/1 23:29
*/
public class JmsConsumerRedeliveryPolicy {


// ActiveMQ服务地址
public static final String ACTIVEMQ_URL = "tcp://192.168.64.129:61616";
// 消息队列名称,取消息必须和存消息的队列名称一致
public static final String QUEUE_NAME = "queue-redelivery-policy";

public static void main(String[] args) throws Exception {


// 1.创建给定ActiveMQ服务连接工厂,使用默认的用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2.通过连接工厂,创建连接对象并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 3.创建会话,true开启事务
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 4.创建目的地(队列或者主题)
Queue queue = session.createQueue(QUEUE_NAME);
// 5.创建消费者
MessageConsumer consumer = session.createConsumer(queue);
while (true) {


// 接受消息根据生产者发送消息类型强类型转换
TextMessage message = (TextMessage) consumer.receive();
if (message != null) {


String text = message.getText();
System.out.println(text);
// 注意此处故意不提交事务
// session.commit();
} else {


break;
}
}
consumer.close();
session.close();
connection.close();
}
}
注意消费者代码已开启事务,但故意不提交事务。

启动生产者服务,再启动消费者服务,消费者能成功接收到队列的消息,但是控制台的消息并未出队。

再重新启动消费者服务,发现之前消费的消息还能重复消费。ActiveMQ默认重发次数为6次,当消费者第七次启动消费消息时,发现消费不到消息,查看控制台发现,未消费的消息已出队列,并且多出了一个名为ActiveMQ.DLQ的队列。

ActiveMQ.DLQ即为死信队列,超过重复消费上限次数(6次)的消息被放到了死信队列里。

此时如果我们把代码里的队列名称改成ActiveMQ.DLQ,同样会拿到之前重复消费的消息。

1
public static final String QUEUE_NAME = "ActiveMQ.DLQ";
死信队列

ActiveMQ中引入了“死信队列”(Dead Letter Queue)的概念。即一条消息再被重发了多次后(默认为重发6次,即redeliveryCounter==6) 。将会被ActiveMQ移入“死信队列”。开发人员可以在这个Queue中查看处理出错的消息,进行人工干预。

死信队列应用案例

DLQ-死信队列(Dead Letter Queue)用来保存处理失败或者过期的消息。

一般生产环境中在使用MQ的时候设计两个队列:一个是核心业务队列,一个是死信队列。

核心业务队列,就是比如上图专门用来让订单系统发送订单消息的,然后另外一个死信队列就是用来处理异常情况的。

假如第三方物流系统故障了,此时无法请求,那么仓储系统每次消费到一条订单消息,尝试通知发货和配送都会遇到对方的接口报错。此时仓储系统就可以把这条消息拒绝访问或者标记为处理失败。一旦标记这条消息处理失败了之后,MQ就会把这条消息转入提前设置好的一个死信队列中。然后你会看到的就是,在第三方物流系统故障期间,所有的订单消息全部处理失败,全部都会转入到死信队列。然后你的仓储系统得专门找一个后台线程,监控第三方物流系统是否正常,是否能请求,不停地监视。一旦发现对方恢复正常,这个后台线程就从死信队列消费出来处理失败的订单,重新执行发货和配送通知。

1
2
缺省持久消息过期,会被送到死信队列,非持久消息不会送到DLQ。`
`缺省的死信队列是ActiveMQ.DLQ,如果没有特别指定,死信都会被发送到这个队列中。

死信队列策略

1、 共享队列策略(默认);

1
2
3
4
5
6
7
<!-- “>”表示对所有队列生效,如果需要设置指定队列,则直接写队 列名称-->
<policyEntry queue=">">
<deadLetterStrategy>
<sharedDeadLetterStrategy processNonPersistent="false" processExpired="true" deadLetterQueue="" />
</deadLetterStrategy>
</policyEntry>
processNonPersistent:是否将非持久化消息发送到死信队列,默认false;processExpired:是否将过期消息放入到死信队列中,默认为true

1、 独立死信队列策略;

1
2
3
4
5
6
7
8
9
10
11
<!-- “>”表示对所有队列生效,如果需要设置指定队列,则直接写队 列名称-->
<policyEntry queue=">">
<deadLetterStrategy>
<!--
Use the prefix 'DLQ.' for the destination name, and make
the DLQ a queue rather than a topic
-->
<individualDeadLetterStrategy queuePrefix="DLQ."/>
</deadLetterStrategy>
</policyEntry>
queuePrefix:设置死信队列前缀

更多死信队列参考官网:Message Redelivery and DLQ Handling