当前位置: 首页 > news >正文

【RabbitMQ】高级篇,学习纪录+笔记

目录

一.高级特性

1.1消息的可靠投递

2.1Consumer Ack

3.1消费端限流

4.1TTL

5.1死信队列

6.1延迟队列

7.1日志与监控

7.1.1日志

7.1.2监控

8.1消息追踪

8.1.1Firehose

8.1.2rabbitmq_tracing

9.1消息可靠性保障(思路)

9.2消息幂等性保障(思路)


一.高级特性

1.1消息的可靠投递

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提 供了两种方式用来控制消息的投递可靠性模式。

①confirm确认模式

②return退回模式

rabbitmq 整个消息投递的路径为: producer--->rabbitmqbroker--->exchange--->queue--->consumer 

消息从 producer 到 exchange 则会返回一个 confirmCallback 。 

消息从 exchange-->queue 投递失败则会返回一个 returnCallback 。

我们将利用这两个 callback 控制消息的可靠性投递

接基础篇,在生产者中RabttitMQConfig中配置

    public static final String CONFIRM_EXCHANGE_NAME = "test_exchange_confirm";
    public static final String CONFIRM_QUEUE_NAME = "test_queue_confirm";
    @Bean("confirmExchange")
    public Exchange confirmExchange(){
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).build();
    }

    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    @Bean
    public Binding bindConfirmExchangeQueue(@Qualifier("confirmExchange") Exchange exchange,
                                            @Qualifier("confirmQueue") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("confirm").noargs();
    }

在yml中开启

#配置RabbitMQ的基本信息
spring:
  rabbitmq:
    host: ip
    username: admin
    password: admin
    port: 5672
    virtual-host: /
    publisher-confirms: true //开启确认模式,这个是老版本的写法,已经废弃了,新版本在后面加type
    publisher-returns: true //开启退回模式

测试类

/*
    *
    * 确认模式步骤
    * 1.确认模式开启,在yml中配置
    * 2.在rabbitTemplate定义ConfirmCallback回调函数
    *
    * */
@Test
    public void testConfirm(){
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             *
             * @param correlationData 相关的配置信息
             * @param ack exchange交换机 是否成功收到了消息,true成功,false失败
             * @param cause 如果ack为false,这是失败原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if(ack){
                    //接收成功
                    System.out.println("接收成功消息:"+cause);
                }else {
                    //接收失败
                    System.out.println("接收失败消息:"+cause);
                    //做一些处理,再次发送

                }
                System.out.println("Confirm方法被执行……");
            }
        });
        rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE_NAME,"confirm","message..confirm...");
    }

以上配置方法也可以写在配置类中

回退方式测试类

/**
     * 回退模式:当消息发送给Exchange后,Exchange路由到Queue失败时才会执行ReturnCallback
     * 1.开启回退模式
     * 2.设置ReturnCallback
     * 3.设置Exchange处理消息的模式
     *   消息没有路由到Queue,丢弃消息
     *   如果消息没有路由到Queue,返回给消息发送方
     */
    @Test
    public void testCallback() throws InterruptedException {
        //设置交换机处理失败消息的模式
        rabbitTemplate.setMandatory(true);

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replycode, String replyText, String exchange, String routingKey) {
                System.out.println("return执行了……");
                System.out.println(message);
                System.out.println(replycode);
                System.out.println(replyText);
                System.out.println(exchange);
                System.out.println(routingKey);
            }
        });
        rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE_NAME,"confirm","message..confirm...");
        Thread.sleep(3000);
    }

消费者确认再继续看下面

2.1Consumer Ack

ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。

有三种确认方式: 自动确认:acknowledge="none" •

手动确认:acknowledge="manual" •

根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的 消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。

如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则 调用channel.basicNack()方法,让其自动重新发送消息

在消费者工程中,新建监听者

/**
 * Counsumer ACK
 * 1.设置手动签收 在yml
 * 2.让监听器类实现ChannelAwareMessageListener
 * 3.如果消息成功处理,调用channel的basicAck进行签收
 * 4.basicNack拒收 broker重新发送给counsumer
 */
@Component
public class ACKListener  {

    @RabbitListener(queues = "test_queue_confirm")
    public void ListenerQueue2(Message message,Channel channel) throws IOException {
        System.out.println("test_queue_confirm----->"+message);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("处理业务逻辑");
            Thread.sleep(3000);
            int i = 10/0;
            /**
             * long deliveryTag,收到消息的标识
             * boolean multiple,是否签收多条消息
             */
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            //拒绝签收 第三个参数设置为true,消息重新回到queue,broker重新发送该消息给消费端
            // channel.basicNack(deliveryTag,true,true);

            // 如果真得出现了异常,我们采用消息重投,获取redelivered,判断是否为重投: false没有重投,true重投
            Boolean redelivered = message.getMessageProperties().getRedelivered();
            System.out.println("redelivered = " + redelivered);
            try {
                // (已重投)拒绝确认
                if (redelivered) {
                    /**
                     * 拒绝确认,从队列中删除该消息,防止队列阻塞(消息堆积)
                     * boolean requeue: false不重新入队列(丢弃消息)
                     */
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
                    System.out.println();
                } else { // (没有重投) 消息重投
                    /**
                     * 消息重投,重新把消息放回队列中
                     * boolean multiple: 单条或批量
                     * boolean requeue: true重回队列
                     */
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                    System.out.println("=========消息重投了=======");
                }
            } catch (Exception ee) {
                ee.printStackTrace();
            }

        }
    }


}

配置yml

#配置RabbitMQ的基本信息
spring:
  rabbitmq:
    host: ip
    username: admin
    password: admin
    port: 5672
    virtual-host: /
    listener:
      simple:
        # 并发消费:每个侦听器线程的最小数量,具体数值根据系统性能配置(一般为系统cpu核数)
        concurrency: 2
        # 并发消费:每个侦听器线程的最大数量,具体数值根据系统性能配置(一般为系统cpu核数*2)
        max-concurrency: 4
        # 每次只能获取一条消息,处理完成才能获取下一个消息,避免照成消息堆积在一个消费线程上
        prefetch: 1
        #acknowledge-mode: manual         # 消费者开启手动ack消息确认,需要测试请看示例请AckConsumer,所有队列都会生效
        #default-requeue-rejected: false  # 设置为false,会重发消息到死信队列(防止手动ack确认失败的消息堆积),需要测试请示例AckConsumer,所有队列都会生效
        retry:
          enabled: true                   # 解决消息死循环问题-启用重试
          max-attempts: 3                 # 最大重试3次(默认),超过就丢失(或放到死信队列中,防止消息堆积)
          multiplier: 2                   # 乘子
          initial-interval: 3000          # 第一次和第二次之间的重试间隔,后面的用乘子计算 3s 6s 12s
          max-interval: 16000             # 最大重试时间间隔16s
      direct:
        acknowledge-mode: manual
        default-requeue-rejected: false

小结

在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认 

如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息 

如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。

持久化 exchange要持久化

queue要持久化

message要持久化

生产方确认Confirm

消费方确认Ack

Broker高可用

3.1消费端限流

在yml中配置 prefetch属性设置消费端一次拉取多少消息 

prefetch: 1

消费端的确认模式一定为手动确认。

acknowledge-mode: manual

详细配置见上面消费者的yml

4.1TTL

TTL 全称 Time To Live(存活时间/过期时间)。 

当消息到达存活时间后,还没有被消费,会被自动清除。 

RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

可以直接在RabbitMQ的web端直接对队列进行设置

也可以通过java代码对其进行设置,具体代码见后面死信队列与延迟队列

小结

设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。 

设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断 这一消息是否过期。 

如果两者都进行了设置,以时间短的为准。

5.1死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以 被重新发送到另一个交换机,这个交换机就是DLX

 

消息成为死信的三种情况:

1. 队列消息长度到达限制;

2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标列,requeue=false;

3. 原队列存在消息过期设置,消息到达超时时间未被消费 

 

正常队列绑定死信交换机: 给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key

 

生产者

RabbitMQConfig中配置一对正常交换机队列、一堆死信交换机队列、正常队列与死信交换机绑定

 //死信
    //先声明正常的交换机(test_exchange_dlx)和队列(test_queue_dlx)
    //声明死信交换机(exchange_dlx)和队列(queue_dlx)
    //正常队列绑定死信交换机
    //设置两个参数
    //x-dead-letter-exchange 死信交换机名称
    //x-dead-letter-routing-key 发送给死信交换机的routing key
    @Bean("test_exchange_dlx")
    public Exchange test_exchange_dlx(){
        return ExchangeBuilder.topicExchange("test_exchange_dlx").durable(true).build();
    }

    @Bean("test_queue_dlx")
    public Queue test_queue_dlx(){
        Map<String, Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange","exchange_dlx");
        map.put("x-dead-letter-routing-key","dlx.hehe");
        map.put("x-message-ttl",10000);
        map.put("x-max-length",10);
        return QueueBuilder.durable("test_queue_dlx").withArguments(map).build();
    }

    @Bean
    public Binding dlxCommonBinding(@Qualifier("test_exchange_dlx") Exchange exchange,
                              @Qualifier("test_queue_dlx") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("test.dlx.#").noargs();
    }

    @Bean("exchange_dlx")
    public Exchange exchange_dlx(){
        return ExchangeBuilder.topicExchange("exchange_dlx").durable(true).build();
    }

    @Bean("queue_dlx")
    public Queue queue_dlx(){
        return QueueBuilder.durable("queue_dlx").build();
    }

    @Bean
    public Binding dlxBinding(@Qualifier("exchange_dlx") Exchange exchange,
                              @Qualifier("queue_dlx") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("dlx.#").noargs();
    }

测试类

    /**
     * 发送测试死信消息
     * 1.过期时间
     * 2.长度限制
     * 3.消息拒收
     */
    @Test
    public void testDlx(){
        //1.过期时间
//        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死亡");

        //2.长度限制
//        for (int i = 1; i <= 20 ; i++) {
//            rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死亡");
//        }

        //3.消息拒收
        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死亡");

    }

消费者监听,监听正常队列

@Component
public class DLXListener {

    @RabbitListener(queues = "test_queue_dlx")
    public void ListenerQueue3(Message message,Channel channel) throws IOException {
        System.out.println("test_queue_dlx----->"+message);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("处理业务逻辑");
            Thread.sleep(3000);
            int i = 10/0;
            /**
             * long deliveryTag,收到消息的标识
             * boolean multiple,是否签收多条消息
             */
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            //拒绝签收 第三个参数设置为true,消息重新回到queue,broker重新发送该消息给消费端
            // channel.basicNack(deliveryTag,true,true);
            // 如果真得出现了异常,我们采用消息重投,获取redelivered,判断是否为重投: false没有重投,true重投
            Boolean redelivered = message.getMessageProperties().getRedelivered();
            System.out.println("redelivered = " + redelivered);
            try {
                // (已重投)拒绝确认
                if (redelivered) {
                    System.out.println("拒绝了,不重投");
                    /**
                     * 拒绝确认,从队列中删除该消息,防止队列阻塞(消息堆积)
                     * boolean requeue: false不重新入队列(丢弃消息)
                     */
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
                    System.out.println();
                } else { // (没有重投) 消息重投
                    /**
                     * 消息重投,重新把消息放回队列中
                     * boolean multiple: 单条或批量
                     * boolean requeue: true重回队列
                     */
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                    System.out.println("=========消息拒绝接收,重投了=======");
                }
            } catch (Exception ee) {
                ee.printStackTrace();
            }

        }
    }


}

观察死信队列中消息数量变化

6.1延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

需求:

1. 下单后,30分钟未支付,取消订单,回滚库存。

2. 新用户注册成功7天后,发送短信问候。

实现方式: 1. 定时器 2. 延迟队列

定时器对于性能消耗很大,所以采取延迟队列

由于RabbitMQ没有直接的延迟队列,故而采取ttl+死信队列来实现

生产者

RabbitMQConfig中配置

//延迟
    //先声明正常的交换机(order_exchange)和队列(order_queue)
    //声明死信交换机(order_exchange_dlx)和队列(order_queue_dlx)
    //正常队列绑定死信交换机
    //设置两个参数
    //x-dead-letter-exchange 死信交换机名称
    //x-dead-letter-routing-key 发送给死信交换机的routing key
    @Bean("order_exchange")
    public Exchange order_exchange(){
        return ExchangeBuilder.topicExchange("order_exchange").durable(true).build();
    }

    @Bean("order_queue")
    public Queue order_queue(){
        Map<String, Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange","order_exchange_dlx");
        map.put("x-dead-letter-routing-key","dlx.order.cancel");
        map.put("x-message-ttl",10000);
        map.put("x-max-length",10);
        return QueueBuilder.durable("order_queue").withArguments(map).build();
    }

    @Bean
    public Binding yanchiCommonBinding(@Qualifier("order_exchange") Exchange exchange,
                                    @Qualifier("order_queue") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
    }

    @Bean("order_exchange_dlx")
    public Exchange order_exchange_dlx(){
        return ExchangeBuilder.topicExchange("order_exchange_dlx").durable(true).build();
    }

    @Bean("order_queue_dlx")
    public Queue order_queue_dlx(){
        return QueueBuilder.durable("order_queue_dlx").build();
    }

    @Bean
    public Binding yanchiBinding(@Qualifier("order_exchange_dlx") Exchange exchange,
                              @Qualifier("order_queue_dlx") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("dlx.order.#").noargs();
    }

测试类

@Test
    public void testDelay() throws InterruptedException {
        //发送订单消息
        rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息 id:1");
        //打印倒计时10s
        for (int i = 0; i < 10; i++) {
            System.out.println(i+"...");
            Thread.sleep(1000);
        }

    }

消费者监听,监听的是死信队列

@Component
public class OrderListener {

    @RabbitListener(queues = "order_queue_dlx")
    public void ListenerQueue4(Message message,Channel channel) throws IOException {
        System.out.println("order_queue_dlx----->"+message);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("处理业务逻辑");
            Thread.sleep(3000);
            System.out.println("根据订单id查询状态");
            System.out.println("判断状态是否为支付成功");
            System.out.println("取消订单,回滚……");
            /**
             * long deliveryTag,收到消息的标识
             * boolean multiple,是否签收多条消息
             */
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            //拒绝签收 第三个参数设置为true,消息重新回到queue,broker重新发送该消息给消费端
            // channel.basicNack(deliveryTag,true,true);
            // 如果真得出现了异常,我们采用消息重投,获取redelivered,判断是否为重投: false没有重投,true重投
            Boolean redelivered = message.getMessageProperties().getRedelivered();
            System.out.println("redelivered = " + redelivered);
            try {
                // (已重投)拒绝确认
                if (redelivered) {
                    System.out.println("拒绝了,不重投");
                    /**
                     * 拒绝确认,从队列中删除该消息,防止队列阻塞(消息堆积)
                     * boolean requeue: false不重新入队列(丢弃消息)
                     */
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
                    System.out.println();
                } else { // (没有重投) 消息重投
                    /**
                     * 消息重投,重新把消息放回队列中
                     * boolean multiple: 单条或批量
                     * boolean requeue: true重回队列
                     */
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                    System.out.println("=========消息拒绝接收,重投了=======");
                }
            } catch (Exception ee) {
                ee.printStackTrace();
            }

        }
    }


}

小结

1.延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。

2. RabbitMQ没有提供延迟队列功能,但是可以使用 : TTL + DLX 来实现延迟队列效果。

7.1日志与监控

7.1.1日志

RabbitMQ默认日志存放路径: /var/log/rabbitmq/rabbit@xxx.log

日志包含了RabbitMQ的版本号、Erlang的版本号、RabbitMQ服务节点名称、cookie的hash值、 RabbitMQ配置文件地址、内存限制、磁盘限制、默认账户guest的创建以及权限配置等等

7.1.2监控

web管控台监控

rabbitmqctl管理和监控

8.1消息追踪

在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于RabbitMQ而言,可能 是因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也 有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位。

在RabbitMQ中可以使用Firehoserabbitmq_tracing插件功能来实现消息追踪

8.1.1Firehose

firehose的机制是将生产者投递给rabbitmq的消息,rabbitmq投递给消费者的消息按照指定的格式 发送到默认的exchange上。这个默认的exchange的名称为amq.rabbitmq.trace,它是一个topic类 型的exchange。发送到这个exchange上的消息的routing key为 publish.exchangename 和 deliver.queuename。其中exchangename和queuename为实际exchange和queue的名称,分别对应生产者投递到exchange的消息,和消费者从queue上获取的消息。

注意:打开 trace 会影响消息写入功能,适当打开后请关闭。

rabbitmqctl trace_on:开启Firehose命令

rabbitmqctl trace_off:关闭Firehose命令

步骤:

1.创建一个新的队列,然后将这个队列绑定到默认交换机上,Routing Key写#,表示所有消息都监听

2.然后在这个新的队列发一条消息,然后再查,发现只有一条

3.然后开启rabbitmqctl trace_on

4.再发一条消息,再查,发现有额外的消息

8.1.2rabbitmq_tracing

rabbitmq_tracing和Firehose在实现上如出一辙,只不过rabbitmq_tracing的方式比Firehose多了一 层GUI的包装,更容易使用和管理。

启用插件:rabbitmq-plugins enable rabbitmq_tracing

然后即可在web端右侧看到Tracing,点进去即可查看与操作,可以创建一个新的,Pattern写#

9.1消息可靠性保障(思路)

由于比较复杂,只提供一个思路,思路多种多样,这个不一定好,看看就好

 

9.2消息幂等性保障(思路)

幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。

也就是说,其任 意多次执行对资源本身所产生的影响均与一次执行的影响相同。

在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。

比如发送两条相同的扣款信息,不能扣两次款。

采取乐观锁机制。

 

本文只是本人在学习复习过程中的笔记,有什么不足之处请指教。

相关文章:

  • 自己免费做网站(四)/市场推广怎么做
  • 厦门网站制作公司推荐/网站推广怎么弄
  • 中小学网站建设有什么好处/seo案例
  • 北流科技网站建设/天津网站优化
  • 织梦cms做网站/企业网站cms
  • 如何判断网站好坏/百度竞价平台官网
  • IB学生必看的时间表(二)
  • python中的设计模式:单例模式、工厂模式
  • 程序员必知必会 QPS TPS、URI URL、PV UV GMV
  • CDH6.3生产环境中禁用Kerberos
  • EMQX 在 Kubernetes 中如何进行优雅升级
  • Java---中间件---Redis的常见命令和客户端使用
  • C/C++数据结构(十)—— 二叉查找树
  • 装修--避坑--马桶
  • Android 音视频——直播推流技术指南
  • 基于2D Object Detection的目标几何中心三维位置估计(C++)
  • 质心标准差和分散程度
  • JavaScript中 join()、split()、slice()函数的用法及区别