首頁 > 軟體

springboot整合RabbitMQ 中的 TTL範例程式碼

2022-09-30 14:02:08

TTL簡介

TTL 是什麼呢?TTL 是 RabbitMQ 中一個訊息或者佇列的屬性,表明一條訊息或者該佇列中的所有訊息的最大存活時間,單位是毫秒。換句話說,如果一條訊息設定了 TTL 屬性或者進入了設定 TTL 屬性的佇列,那麼這條訊息如果在 TTL 設定的時間內沒有被消費,則會成為"死信"。
下面就根據這個圖片來驗證程式碼

設定類程式碼

這裡寫一些設定,比如建立佇列 交換機 和它們之間的繫結關係

  • @Qualifier 註解與我們想要使用的特定 Spring bean 的名稱一起進行裝配,Spring 框架就能從多個相同型別並滿足裝配要求的 bean 中找到我們想要的,避免讓Spring腦裂。我們需要做的是@Component或者@Bean註解中宣告的value屬性以確定名稱

注意 包別導錯了

package com.xbfinal.springbootrabbitmq.config;


import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

/**
 * ttl佇列 組態檔類
 *
 */
@Configuration
public class TtlQueueConfig {

    //普通交換機名稱
    public static final String X_EXCHANGE="X";
    //死信交換機名稱
    public static final String Y_DEAD_LETTER_EXCHANGE="Y";
    //普通佇列名稱
    public static final String QUEUE_A="QA";
    public static final String QUEUE_B="QB";
    //死信佇列名稱
    public static final String DEAD_LETTER_QUEUE_D="QD";

    /**
     * 宣告x交換機
     * @return
     */
    @Bean("xExchange")//別名和方法名取一樣
    public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
    }

    /**
     * 宣告y交換機
     * @return
     */
    @Bean("yExchange")//別名和方法名取一樣
    public DirectExchange yExchange(){
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    //宣告佇列A
    @Bean("queueA")
    public Queue queueA(){
        final HashMap<String, Object> arguments
                = new HashMap<>();
        //設定死信交換機
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //設定死信RoutingKey
        arguments.put("x-dead-letter-routing-key","YD");
        //設定TTL設定10秒過期
        arguments.put("x-message-ttl",10000);

        return QueueBuilder.durable(QUEUE_A)
                .withArguments(arguments)
                .build();
    }

    //宣告佇列B
    @Bean("queueB")
    public Queue queueB(){
         HashMap<String, Object> arguments
                = new HashMap<>();
        //設定死信交換機
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //設定死信RoutingKey
        arguments.put("x-dead-letter-routing-key","YD");
        //設定TTL設定40秒過期
        arguments.put("x-message-ttl",40000);

        return QueueBuilder.durable(QUEUE_B)
                .withArguments(arguments)
                .build();
    }

    @Bean("queueD")
    public Queue queueD(){
        return QueueBuilder.durable(DEAD_LETTER_QUEUE_D)
                .build();
    }

    /**
     * A佇列繫結X交換機
     * @param queueA
     * @return
     */
    @Bean
    public Binding queueABindingX(@Qualifier("queueA")Queue queueA,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    /**
     * B佇列繫結X交換機
     * @param queueB
     * @param xExchange
     * @return
     */
    @Bean
    public Binding queueBBindingX(@Qualifier("queueB")Queue queueB,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }

    /**
     * D佇列繫結死信y交換機
     * @param queueD
     * @param yExchange
     * @return
     */
    @Bean
    public Binding queueDBindingX(@Qualifier("queueD")Queue queueD,
                                  @Qualifier("yExchange") DirectExchange yExchange){
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }

}

生產者程式碼

我們用Controller寫,通過網頁提交的方式 生產訊息
url:http://localhost:8080/ttl/sendMsg/message

package com.xbfinal.springbootrabbitmq.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
 * 生產者
 * 傳送延遲訊息
 */
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message){
        log.info("當前時間:{},傳送了一條訊息({})給兩個佇列", new Date().toString(),message);
        //傳送訊息
        rabbitTemplate.convertAndSend("X","XA","10秒"+message);
        rabbitTemplate.convertAndSend("X","XB","40秒"+message);
    }
}


訊息消費者程式碼

注意@RabbitListener註解
@RabbitListener註解指定目標方法來作為消費訊息的方法,通過註解引數指定所監聽的佇列或者Binding。使用@RabbitListener可以設定一個自己明確預設值的RabbitListenerContainerFactory物件。

  • @RabbitListener標註在方法上,直接監聽指定的佇列,此時接收的引數需要與傳送市型別一致
  • 3.@RabbitListener 可以標註在類上面,需配合 @RabbitHandler 註解一起使用

@RabbitListener 標註在類上面表示當有收到訊息的時候,就交給 @RabbitHandler 的方法處理,根據接受的引數型別進入具體的方法中。

package com.xbfinal.springbootrabbitmq.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;

/**
 * 佇列TTL的消費者
 */
@Slf4j
@Component
public class DeadLetterQueueConsumer {

    //接收訊息
    @RabbitListener(queues = "QD")
    public void receivedD(Message message, Channel channel)throws Exception{
        String msg=new String(message.getBody());
        log.info("當前時間:{},收到訊息:{}",new Date().toString(),msg);
    }
}

驗證程式碼

先在歷覽器輸入http://localhost:8080/ttl/sendMsg/%E7%AC%91%E9%9C%B8fianl

檢視控制檯:

到此這篇關於springboot整合RabbitMQ 中的 TTL的文章就介紹到這了,更多相關springboot整合RabbitMQ內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com