关于rabbitmq,你想知道的都在这里(下)
上一篇我们讲解了队列的三种形式以及各自的应用。这节我们讲消息持久化和消息回调。
持久化
我们将消息发送到mq消息队列了,如果mq服务器挂掉了,在mq中未消费的消息是否就丢失了,重启mq服务器未消费的是否还在?我们可以将comsumer关掉,然后只保留producer发送消息,然后看看mq的管理端,发现消息并没人消费。此时,将mq服务重启,重启后看消息是否还在?
奇迹般的消息都还在。上节课我们也没配置什么东西,怎么消息默认就持久化了吗?
@Bean
public Queue DirectQueue() {
return new Queue(DirectRabbitConfig.directQueue);
}
那是因为我们在定义队列的时候使用的构造方法底层实现的时候durable
参数设置为true(持久化),因此消息当mq挂掉时,未消费的消息重启后还会在。
大家可以设置为false试试,但是需要注意我们未设置virtual-host
,默认的使用的是/
,而这个host
不允许设置为false
。我们可以在rabbitmq
管理控制台增加一个host
如下操作:
点击增加一个host名称为myVirtualHost,并在yml文件中增加一个配置:
这样就可以设置queue的durable为false,将消费者关闭(模拟消费者异常),然后生产消息,在mq的控制台看到有未消费的消息,重启mq,消息丢失~~~
消息确认
发送端确认
增加配置
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm");
System.out.println("correlationData:"+correlationData);
System.out.println("ack:"+ack);
System.out.println("cause:"+cause);
System.out.println();
}
});
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("returnedMessage");
System.out.println("returnedMessage.getMessage()-->"+returnedMessage.getMessage());
System.out.println("returnedMessage.getReplyCode()-->"+returnedMessage.getReplyCode());
System.out.println("returnedMessage.getReplyText()-->"+returnedMessage.getReplyText());
System.out.println("returnedMessage.getExchange()-->"+returnedMessage.getExchange());
System.out.println("returnedMessage.getRoutingKey()-->"+returnedMessage.getRoutingKey());
System.out.println();
}
});
return rabbitTemplate;
}
yml开始配置:
exchange | routing | |
---|---|---|
√ | √ | 触发confirm,ack为true |
√ | × | 触发returnsMessage,replyCode为312 |
× | √ | 触发confirm,ack为false |
× | × | 触发confirm,ack为false |
注:√代表存在,×代表不存在
消费端确认
basicAck
,表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。
- deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。
- multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
void basicAck(long deliveryTag, boolean multiple)
basicNack
,表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。
- deliveryTag:表示消息投递序号。
- multiple:是否批量确认。
- requeue:值为 true 消息将重新入队列。
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
注:消息重新投放,将投放到队列的头部,也就是马上会再被消费,如果业务不能处理成功,将陷入死循环。能瞬间将cpu撑满。良好实践是,先将这条消息设置为basicAck,然后再投递另外一条同样的消息,这样消息就进入了消息队列尾部,但是也需要处理消息一直消费不成功,当消费多少次都是失败后进入特有的失败队列。
basicReject
,拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。
- deliveryTag:表示消息投递序号。
- requeue:值为 true 消息将重新入队列。
void basicReject(long deliveryTag, boolean requeue)
@Slf4j
@Component
@RabbitListener(queues = "directQueue",group = "DirectReceiver")//监听的队列名称 directQueue,不需要管路由和交换机,因为这些是生产者管理的事情。消费者只需要关心队列即可
public class DirectReceiver {
// @RabbitHandler
// public void handler(Map testMessage) {
// System.out.println("directReceiver消费者收到消息 : " + testMessage.toString());
// for (Object item : testMessage.keySet()) {
// log.info("item:{}-->value:{}",item,testMessage.get(item));
// }
// log.info("");
// }
@RabbitHandler
public void processHandler(Map testMessage,Message message,Channel channel) throws IOException {
try{
System.out.println("directReceiver消费者收到消息 : " + testMessage.toString());
log.info("channel:{}",channel);
log.info("message:{}",message);
int i=1/0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
//是否是重新投递的消息
if(message.getMessageProperties().getRedelivered()){
log.error("消息已重复处理,不再消费");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}else{
log.error("消息即将再次返回队列处理");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
}
}
}
}
一个消息两个订阅
如果一个消息队列被两个消息消费者订阅,那两个订阅者都可以收到消息吗?还是抢夺式的?我们copy一份directReceiver如下:
@Component
@RabbitListener(queues = "directQueue")//监听的队列名称 directQueue,不需要管路由和交换机,因为这些是生产者管理的事情。消费者只需要关心队列即可
public class DirectAnotherReceiver {
@RabbitHandler
public void handler(Map testMessage) {
System.out.println("directAnotherReceiver消费者收到消息 : " + testMessage.toString());
}
}
然后用postman发消息,发现每次消息只有一个listener消费。且是轮流消费的。即使你启动多个服务也是只有一个消费者消费。这样可以在部署多个实例的时候保证消息不会被重复消费。
directAnotherReceiver消费者收到消息 : {datetime=1624238518016, data=hello,i am direct msg!, id=eb99a77f-0657-4f98-bee8-4815f88436d9}
directReceiver消费者收到消息 : {datetime=1624238539087, data=hello,i am direct msg!, id=2da0d4d3-1707-43f3-96f9-1790aa411be9}
directAnotherReceiver消费者收到消息 : {datetime=1624238540193, data=hello,i am direct msg!, id=5cfbfba5-d577-4992-82ec-29e3504f40b9}
directReceiver消费者收到消息 : {datetime=1624238541016, data=hello,i am direct msg!, id=9d687827-29f9-459d-9da8-f79f775becf9}
directAnotherReceiver消费者收到消息 : {datetime=1624238541879, data=hello,i am direct msg!, id=cddee0cc-0a4e-4448-ae1a-be02fa8530a3}
directReceiver消费者收到消息 : {datetime=1624238542683, data=hello,i am direct msg!, id=f1bb5c39-ffc4-4bdd-a34a-c0c394bebfe0}
directAnotherReceiver消费者收到消息 : {datetime=1624238543150, data=hello,i am direct msg!, id=48b6449f-0720-4321-92b0-e954728d258d}
directReceiver消费者收到消息 : {datetime=1624238545120, data=hello,i am direct msg!, id=120ef701-b762-46a1-a36b-39537b801a4a}
directAnotherReceiver消费者收到消息 : {datetime=1624238545586, data=hello,i am direct msg!, id=6a3df339-53d8-440d-ae76-8e3c8b131897}
directReceiver消费者收到消息 : {datetime=1624238545911, data=hello,i am direct msg!, id=7894f42e-357e-4996-84ed-88a51d283c66}
directAnotherReceiver消费者收到消息 : {datetime=1624238546260, data=hello,i am direct msg!, id=b6bb6399-7c59-42eb-a3da-aea72f75541c}
directReceiver消费者收到消息 : {datetime=1624238546605, data=hello,i am direct msg!, id=7c93063b-c195-467c-9ad9-d244c68d422d}
directAnotherReceiver消费者收到消息 : {datetime=1624238546930, data=hello,i am direct msg!, id=90405b7c-ae15-4456-8b07-5ec4ed78a84c}
directReceiver消费者收到消息 : {datetime=1624238548702, data=hello,i am direct msg!, id=63583d09-ed93-474c-9a69-264a152fca03}
directAnotherReceiver消费者收到消息 : {datetime=1624238549108, data=hello,i am direct msg!, id=55328371-eb0f-4341-9659-fd5256a3adc7}
directReceiver消费者收到消息 : {datetime=1624238549423, data=hello,i am direct msg!, id=c50faa16-885d-4855-9e92-9463cd21da2f}
directAnotherReceiver消费者收到消息 : {datetime=1624238570665, data=hello,i am direct msg!, id=1dad00f5-6bb8-4add-be9e-340e2f77a008}
directReceiver消费者收到消息 : {datetime=1624238571162, data=hello,i am direct msg!, id=5d604580-5f2d-430f-a11d-540aa3a9f263}
directAnotherReceiver消费者收到消息 : {datetime=1624238571909, data=hello,i am direct msg!, id=b0ae512e-153a-4e3a-a3c4-0581ac4d9c08}
directReceiver消费者收到消息 : {datetime=1624238572399, data=hello,i am direct msg!, id=cee8755d-80ad-4185-87b6-c015cb70126c}
directAnotherReceiver消费者收到消息 : {datetime=1624238573032, data=hello,i am direct msg!, id=3d0153c5-a7ac-4820-8884-6e53ad3a80e4}
directReceiver消费者收到消息 : {datetime=1624238573615, data=hello,i am direct msg!, id=fc6f4f8e-522c-4d47-9557-5ca7622e2567}
directAnotherReceiver消费者收到消息 : {datetime=1624238574171, data=hello,i am direct msg!, id=7dc9355f-e6fa-4a72-8f11-0b7ed5f3820f}
延时消费
延时队列需要在rabbitmq安装一个插件
https://www.rabbitmq.com/community-plugins.html
否则会报错误:没有找到对应x-delayed-message的exchange type
下载后放到rabbitmq安装目录的plugins目录下,并执行命令如下:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
安装成功后如下图,然后需要重启rabbitmq
声明延时队列
@Configuration
public class DelayRabbitConfig {
/**
* 延时队列交换机
* 注意这里的交换机类型:CustomExchange
* @return
*/
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");//直连型
//属性参数 交换机名称 交换机类型 是否持久化 是否自动删除 配置参数
return new CustomExchange("delay_exchange11", "x-delayed-message", true, false, args);
}
/**
* 延时队列
*
* @return
*/
@Bean
public Queue delayQueue() {
//属性参数 队列名称 是否持久化
return new Queue("delay_queue11", true);
}
/**
* 绑定交换机
*
* @return
*/
@Bean
public Binding bindDelay() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("abc").noargs();
}
}
发送消息
//延迟队列
@GetMapping("/sendDelayMsg")
public String sendDelayMsg() {
Map<String,Object> map=new HashMap<String,Object>();
map.put("id",UUID.randomUUID().toString());
map.put("data","hello,i am sendDelayMsg msg!");
map.put("datetime",System.currentTimeMillis());
//交换机 路由 消息(发送消息的时候不需要管队列,因为队列已经在DirectRabbitConfig中配置了,队列应该是消费者关心的事情)
rabbitTemplate.convertAndSend("delay_exchange11", "abc", map,message -> {
//配置消息的过期时间
message.getMessageProperties().setDelay(5000);//延迟5秒
return message;
});
return "ok";
}
接收消息
@Slf4j
@Component
@RabbitListener(queues = "delay_queue11")//监听的队列名称 delay_queue,不需要管路由和交换机,因为这些是生产者管理的事情。消费者只需要关心队列即可
public class DelayReceiver {
@RabbitHandler
public void handler(Map testMessage) {
System.out.println("DelayReceiver消费者收到消息 : " + testMessage.toString());
for (Object item : testMessage.keySet()) {
log.info("item:{}-->value:{}",item,testMessage.get(item));
}
log.info("");
}
}
注意:需要将Mandatory=false,否则将报错路由找不到