关于rabbitmq,你想知道的都在这里(下)

上一篇我们讲解了队列的三种形式以及各自的应用。这节我们讲消息持久化和消息回调。

持久化

我们将消息发送到mq消息队列了,如果mq服务器挂掉了,在mq中未消费的消息是否就丢失了,重启mq服务器未消费的是否还在?我们可以将comsumer关掉,然后只保留producer发送消息,然后看看mq的管理端,发现消息并没人消费。此时,将mq服务重启,重启后看消息是否还在?

关于rabbitmq,你想知道的都在这里(下)

奇迹般的消息都还在。上节课我们也没配置什么东西,怎么消息默认就持久化了吗?

@Bean
public Queue DirectQueue() {
    return new Queue(DirectRabbitConfig.directQueue);
}

那是因为我们在定义队列的时候使用的构造方法底层实现的时候durable参数设置为true(持久化),因此消息当mq挂掉时,未消费的消息重启后还会在。

大家可以设置为false试试,但是需要注意我们未设置virtual-host,默认的使用的是/,而这个host不允许设置为false。我们可以在rabbitmq管理控制台增加一个host如下操作:

关于rabbitmq,你想知道的都在这里(下)

点击增加一个host名称为myVirtualHost,并在yml文件中增加一个配置:

关于rabbitmq,你想知道的都在这里(下)

这样就可以设置queue的durable为false,将消费者关闭(模拟消费者异常),然后生产消息,在mq的控制台看到有未消费的消息,重启mq,消息丢失~~~

消息确认

发送端确认

增加配置
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
    RabbitTemplate rabbitTemplate = new RabbitTemplate();
    rabbitTemplate.setConnectionFactory(connectionFactory);
    //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
    rabbitTemplate.setMandatory(true);

    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println("confirm");
            System.out.println("correlationData:"+correlationData);
            System.out.println("ack:"+ack);
            System.out.println("cause:"+cause);
            System.out.println();
        }
    });

    rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
        @Override
        public void returnedMessage(ReturnedMessage returnedMessage) {
            System.out.println("returnedMessage");
            System.out.println("returnedMessage.getMessage()-->"+returnedMessage.getMessage());
            System.out.println("returnedMessage.getReplyCode()-->"+returnedMessage.getReplyCode());
            System.out.println("returnedMessage.getReplyText()-->"+returnedMessage.getReplyText());
            System.out.println("returnedMessage.getExchange()-->"+returnedMessage.getExchange());
            System.out.println("returnedMessage.getRoutingKey()-->"+returnedMessage.getRoutingKey());
            System.out.println();
        }
    });

    return rabbitTemplate;
}
yml开始配置:

关于rabbitmq,你想知道的都在这里(下)

exchange routing
触发confirm,ack为true
× 触发returnsMessage,replyCode为312
× 触发confirm,ack为false
× × 触发confirm,ack为false

注:√代表存在,×代表不存在

消费端确认

basicAck,表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。

  • deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。
  • multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
void basicAck(long deliveryTag, boolean multiple) 

basicNack,表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。

  • deliveryTag:表示消息投递序号。
  • multiple:是否批量确认。
  • requeue:值为 true 消息将重新入队列。
    void basicNack(long deliveryTag, boolean multiple, boolean requeue)

消息重新投放,将投放到队列的头部,也就是马上会再被消费,如果业务不能处理成功,将陷入死循环。能瞬间将cpu撑满。良好实践是,先将这条消息设置为basicAck,然后再投递另外一条同样的消息,这样消息就进入了消息队列尾部,但是也需要处理消息一直消费不成功,当消费多少次都是失败后进入特有的失败队列。

basicReject,拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。

  • deliveryTag:表示消息投递序号。
  • requeue:值为 true 消息将重新入队列。
    void basicReject(long deliveryTag, boolean requeue)
@Slf4j
@Component
@RabbitListener(queues = "directQueue",group = "DirectReceiver")//监听的队列名称 directQueue,不需要管路由和交换机,因为这些是生产者管理的事情。消费者只需要关心队列即可
public class DirectReceiver {
//    @RabbitHandler
//    public void handler(Map testMessage) {
//        System.out.println("directReceiver消费者收到消息  : " + testMessage.toString());
//        for (Object item : testMessage.keySet()) {
//            log.info("item:{}-->value:{}",item,testMessage.get(item));
//        }
//        log.info("");
//    }

    @RabbitHandler
    public void processHandler(Map testMessage,Message message,Channel channel) throws IOException {
        try{
            System.out.println("directReceiver消费者收到消息  : " + testMessage.toString());
            log.info("channel:{}",channel);
            log.info("message:{}",message);
            int i=1/0;
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){
            //是否是重新投递的消息
            if(message.getMessageProperties().getRedelivered()){
                log.error("消息已重复处理,不再消费");
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            }else{
                log.error("消息即将再次返回队列处理");
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
            }
        }
    }
}

一个消息两个订阅

如果一个消息队列被两个消息消费者订阅,那两个订阅者都可以收到消息吗?还是抢夺式的?我们copy一份directReceiver如下:

@Component
@RabbitListener(queues = "directQueue")//监听的队列名称 directQueue,不需要管路由和交换机,因为这些是生产者管理的事情。消费者只需要关心队列即可
public class DirectAnotherReceiver {
    @RabbitHandler
    public void handler(Map testMessage) {
        System.out.println("directAnotherReceiver消费者收到消息  : " + testMessage.toString());
    }
}

然后用postman发消息,发现每次消息只有一个listener消费。且是轮流消费的。即使你启动多个服务也是只有一个消费者消费。这样可以在部署多个实例的时候保证消息不会被重复消费。

directAnotherReceiver消费者收到消息  : {datetime=1624238518016, data=hello,i am direct msg!, id=eb99a77f-0657-4f98-bee8-4815f88436d9}
directReceiver消费者收到消息  : {datetime=1624238539087, data=hello,i am direct msg!, id=2da0d4d3-1707-43f3-96f9-1790aa411be9}
directAnotherReceiver消费者收到消息  : {datetime=1624238540193, data=hello,i am direct msg!, id=5cfbfba5-d577-4992-82ec-29e3504f40b9}
directReceiver消费者收到消息  : {datetime=1624238541016, data=hello,i am direct msg!, id=9d687827-29f9-459d-9da8-f79f775becf9}
directAnotherReceiver消费者收到消息  : {datetime=1624238541879, data=hello,i am direct msg!, id=cddee0cc-0a4e-4448-ae1a-be02fa8530a3}
directReceiver消费者收到消息  : {datetime=1624238542683, data=hello,i am direct msg!, id=f1bb5c39-ffc4-4bdd-a34a-c0c394bebfe0}
directAnotherReceiver消费者收到消息  : {datetime=1624238543150, data=hello,i am direct msg!, id=48b6449f-0720-4321-92b0-e954728d258d}
directReceiver消费者收到消息  : {datetime=1624238545120, data=hello,i am direct msg!, id=120ef701-b762-46a1-a36b-39537b801a4a}
directAnotherReceiver消费者收到消息  : {datetime=1624238545586, data=hello,i am direct msg!, id=6a3df339-53d8-440d-ae76-8e3c8b131897}
directReceiver消费者收到消息  : {datetime=1624238545911, data=hello,i am direct msg!, id=7894f42e-357e-4996-84ed-88a51d283c66}
directAnotherReceiver消费者收到消息  : {datetime=1624238546260, data=hello,i am direct msg!, id=b6bb6399-7c59-42eb-a3da-aea72f75541c}
directReceiver消费者收到消息  : {datetime=1624238546605, data=hello,i am direct msg!, id=7c93063b-c195-467c-9ad9-d244c68d422d}
directAnotherReceiver消费者收到消息  : {datetime=1624238546930, data=hello,i am direct msg!, id=90405b7c-ae15-4456-8b07-5ec4ed78a84c}
directReceiver消费者收到消息  : {datetime=1624238548702, data=hello,i am direct msg!, id=63583d09-ed93-474c-9a69-264a152fca03}
directAnotherReceiver消费者收到消息  : {datetime=1624238549108, data=hello,i am direct msg!, id=55328371-eb0f-4341-9659-fd5256a3adc7}
directReceiver消费者收到消息  : {datetime=1624238549423, data=hello,i am direct msg!, id=c50faa16-885d-4855-9e92-9463cd21da2f}
directAnotherReceiver消费者收到消息  : {datetime=1624238570665, data=hello,i am direct msg!, id=1dad00f5-6bb8-4add-be9e-340e2f77a008}
directReceiver消费者收到消息  : {datetime=1624238571162, data=hello,i am direct msg!, id=5d604580-5f2d-430f-a11d-540aa3a9f263}
directAnotherReceiver消费者收到消息  : {datetime=1624238571909, data=hello,i am direct msg!, id=b0ae512e-153a-4e3a-a3c4-0581ac4d9c08}
directReceiver消费者收到消息  : {datetime=1624238572399, data=hello,i am direct msg!, id=cee8755d-80ad-4185-87b6-c015cb70126c}
directAnotherReceiver消费者收到消息  : {datetime=1624238573032, data=hello,i am direct msg!, id=3d0153c5-a7ac-4820-8884-6e53ad3a80e4}
directReceiver消费者收到消息  : {datetime=1624238573615, data=hello,i am direct msg!, id=fc6f4f8e-522c-4d47-9557-5ca7622e2567}
directAnotherReceiver消费者收到消息  : {datetime=1624238574171, data=hello,i am direct msg!, id=7dc9355f-e6fa-4a72-8f11-0b7ed5f3820f}

延时消费

延时队列需要在rabbitmq安装一个插件

https://www.rabbitmq.com/community-plugins.html 

否则会报错误:没有找到对应x-delayed-message的exchange type

关于rabbitmq,你想知道的都在这里(下)
关于rabbitmq,你想知道的都在这里(下)

下载后放到rabbitmq安装目录的plugins目录下,并执行命令如下:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

安装成功后如下图,然后需要重启rabbitmq

关于rabbitmq,你想知道的都在这里(下)

声明延时队列

@Configuration
public class DelayRabbitConfig {
    /**
     * 延时队列交换机
     * 注意这里的交换机类型:CustomExchange
     * @return
     */
    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");//直连型
        //属性参数 交换机名称 交换机类型 是否持久化 是否自动删除 配置参数
        return new CustomExchange("delay_exchange11", "x-delayed-message", true, false, args);
    }

    /**
     * 延时队列
     *
     * @return
     */
    @Bean
    public Queue delayQueue() {
        //属性参数 队列名称 是否持久化
        return new Queue("delay_queue11", true);
    }

    /**
     * 绑定交换机
     *
     * @return
     */
    @Bean
    public Binding bindDelay() {
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("abc").noargs();
    }
}
发送消息
//延迟队列
@GetMapping("/sendDelayMsg")
public String sendDelayMsg() {
    Map<String,Object> map=new HashMap<String,Object>();
    map.put("id",UUID.randomUUID().toString());
    map.put("data","hello,i am sendDelayMsg msg!");
    map.put("datetime",System.currentTimeMillis());
    //交换机 路由 消息(发送消息的时候不需要管队列,因为队列已经在DirectRabbitConfig中配置了,队列应该是消费者关心的事情)
    rabbitTemplate.convertAndSend("delay_exchange11", "abc", map,message -> {
        //配置消息的过期时间
        message.getMessageProperties().setDelay(5000);//延迟5秒
        return message;
    });

    return "ok";
}
接收消息
@Slf4j
@Component
@RabbitListener(queues = "delay_queue11")//监听的队列名称 delay_queue,不需要管路由和交换机,因为这些是生产者管理的事情。消费者只需要关心队列即可
public class DelayReceiver {
    @RabbitHandler
    public void handler(Map testMessage) {
        System.out.println("DelayReceiver消费者收到消息  : " + testMessage.toString());
        for (Object item : testMessage.keySet()) {
            log.info("item:{}-->value:{}",item,testMessage.get(item));
        }
        log.info("");
    }
}

注意:需要将Mandatory=false,否则将报错路由找不到

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注

关注我们