首頁 > 軟體

盤點Java中延時任務的多種實現方式

2022-12-03 14:01:32

場景描述

①需要實現一個定時釋出系統通告的功能,如何實現? ②支付超時,訂單自動取消,如何實現?

實現方式

一、掛起執行緒

推薦指數:★★☆ 優點: JDK原生(JUC包下)支援,無需引入新的依賴; 缺點: (1)基於記憶體,應用重啟(或宕機)會導致任務丟失 (2)基於記憶體掛起執行緒實現延時,不支援叢集 (3)程式碼耦合性大,不易維護 (4)一個任務就要新建一個執行緒繫結任務的執行,容易造成資源浪費

①設定延遲任務專用執行緒池

/**
 * 執行緒池設定
 */
@Configuration
@EnableAsync
@EnableConfigurationProperties(ThreadPoolProperties.class)
public class ThreadPoolConfig {

	//ThreadPoolProperties的設定依據需求和伺服器設定自行設定
    @Resource
    private ThreadPoolProperties threadPoolProperties;
    //延遲任務佇列容量
    private final static int DELAY_TASK_QUEUE_CAPACITY = 100;

    @Bean
    public ThreadPoolTaskExecutor delayTaskExecutor() {
        log.info("start delayTaskExecutor");
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
        //設定核心執行緒數
        threadPool.setCorePoolSize(threadPoolProperties.getCorePoolSize());
        //設定最大執行緒數
        threadPool.setMaxPoolSize(threadPoolProperties.getMaxPoolSize());
        //設定佇列大小
        threadPool.setQueueCapacity(DELAY_TASK_QUEUE_CAPACITY);
        //執行緒最大存活時間
        threadPool.setKeepAliveSeconds (threadPoolProperties.getKeepAliveSeconds());
        //設定執行緒池中的執行緒的名稱字首
        threadPool.setThreadNamePrefix(threadPoolProperties.getThreadNamePrefix());

        // rejection-policy:當pool已經達到max size的時候執行的策略
        threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        //執行初始化
        threadPool.initialize();
        return threadPool;
    }
}

②建立延時任務

在需要執行的程式碼塊建立延時任務

delayTaskExecutor.execute(() -> {
    try {
        //執行緒掛起指定時間
        TimeUnit.MINUTES.sleep(time);
        //執行業務邏輯
        doSomething();
    } catch (InterruptedException e) {
        log.error("執行緒被打斷,執行業務邏輯失敗");
    }
});

二、ScheduledExecutorService 延遲任務執行緒池

推薦指數:★★★ 優點: 程式碼簡潔,JDK原生支援 缺點: (1)基於記憶體,應用重啟(或宕機)會導致任務丟失 (2)基於記憶體存放任務,不支援叢集 (3)一個任務就要新建一個執行緒繫結任務的執行,容易造成資源浪費

class Task implements Runnable{

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getId()+":"+Thread.currentThread().getName());
        System.out.println("scheduledExecutorService====>>>延時器");
    }
}
public class ScheduleServiceTest {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService=new ScheduledThreadPoolExecutor(10);
        scheduledExecutorService.schedule(new Task(),1, TimeUnit.SECONDS);
        scheduledExecutorService.schedule(new Task(),2, TimeUnit.SECONDS);
        scheduledExecutorService.schedule(new Task(),1, TimeUnit.SECONDS);
    }
}

三、DelayQueue(延時佇列)

推薦指數:★★★☆ 優點: (1)JDK原生(JUC包下)支援,無需引入新的依賴; (2)可以用一個執行緒對整個延時佇列按序執行; 缺點: (1)基於記憶體,應用重啟(或宕機)會導致任務丟失 (2)基於記憶體存放佇列,不支援叢集 (3)依據compareTo方法排列佇列,呼叫take阻塞式的取出第一個任務(不呼叫則不取出),比較不靈活,會影響時間的準確性

①新建一個延時任務

public class DelayTask implements Delayed {

    private Integer taskId;

    private long executeTime;

    DelayTask(Integer taskId, long executeTime) {
        this.taskId = taskId;
        this.executeTime = executeTime;
    }

    /**
     * 該任務的延時時長
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return executeTime - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
        DelayTask t = (DelayTask) o;
        if (this.executeTime - t.executeTime <= 0) {
            return -1;
        } else {
            return 1;
        }
    }

    @Override
    public String toString() {
        return "延時任務{" +
                "任務編號=" + taskId +
                ", 執行時間=" + new Date(executeTime) +
                '}';
    }

    /**
     * 執行具體業務程式碼
     */
    public void doTask(){
        System.out.println(this+":");
        System.out.println("執行緒ID-"+Thread.currentThread().getId()+":執行緒名稱-"+Thread.currentThread().getName()+":do something!");
    }
}

②執行延時任務

public class TestDelay {
    public static void main(String[] args) throws InterruptedException {
        // 新建3個任務,並依次設定超時時間為 30s 10s 60s
        DelayTask d1 = new DelayTask(1, System.currentTimeMillis() + 3000L);
        DelayTask d2 = new DelayTask(2, System.currentTimeMillis() + 1000L);
        DelayTask d3 = new DelayTask(3, System.currentTimeMillis() + 6000L);

        DelayQueue<DelayTask> queue = new DelayQueue<>();
        queue.add(d1);
        queue.add(d2);
        queue.add(d3);

        System.out.println("開啟延時佇列時間:" + new Date()+"n");

        // 從延時佇列中獲取元素
        while (!queue.isEmpty()) {
            queue.take().doTask();
        }
        System.out.println("n任務結束");
    }
}

執行結果:

四、Redis-為key指定超時時長,並監聽失效key

推薦指數:★★★☆ 優點: 對於有依賴redis的業務且有延時任務的需求,能夠快速對接 缺點: (1)使用者端斷開後重連會導致所有事件丟失 (2)高並行場景下,存在大量的失效key場景會匯出失效時間存在延遲 (3)若有多個監聽器監聽該key,是會重複消費這個過期事件的,需要特定邏輯判斷

① 修改Redis組態檔並重啟Redis

notify-keyspace-events Ex

注意: redis組態檔不能有空格,否則會啟動報錯

②Java中關於Redis的設定類

redisTemplate範例bean需要自定義生成; RedisMessageListenerContainer 是redis-key過期監聽需要的監聽器容器;

@Configuration
@Slf4j
public class RedisConfiguration {
    /**
     * Redis設定
     * @param factory
     * @return
     */
    @Bean(name = "redisTemplate")
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<Object, Object> template = new RedisTemplate<>();
        RedisSerializer<String> redisSerializer = new StringRedisSerializer();

        template.setConnectionFactory(factory);
        //key序列化方式
        template.setKeySerializer(redisSerializer);
        //value序列化
        template.setValueSerializer(redisSerializer);
        //value hashmap序列化
        template.setHashValueSerializer(redisSerializer);
        //key hashmap序列化
        template.setHashKeySerializer(redisSerializer);

        return template;
    }

    /**
     * 訊息監聽器容器bean
     * @param connectionFactory
     * @return
     */
    @Bean
    public RedisMessageListenerContainer container(LettuceConnectionFactory connectionFactory) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        return container;
    }
}

③監聽器程式碼

@Slf4j
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
    private static final String TEST_REDIS_KEY = "testExpired";
    public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer,
                                      RedisTemplate redisTemplate) {
        super(listenerContainer);
        /**
         * 設定一個Redis延遲過期key(key名:testExpired,過期時間:30秒)
         */
        redisTemplate.opsForValue().set(TEST_REDIS_KEY, "1", 20, TimeUnit.SECONDS);
        log.info("設定redis-key");
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        try {
            String expiredKey = message.toString();
            if (TEST_REDIS_KEY.equals(expiredKey)) {
                //業務處理
                log.info(expiredKey + "過期,觸發回撥");
            }
        } catch (Exception e) {
            log.error("key 過期通知處理異常,{}", e);
        }

    }
}

測試結果:

五、時間輪

推薦指數:★★★★ 優點: (1)對於大量定時任務,時間輪可以僅用一個工作執行緒對編排的任務進行順序執行; (2)自動執行,可以自定義時間輪每輪的tick數,tick間隔,靈活且時間精度可控 缺點: (1)基於記憶體,應用重啟(或宕機)會導致任務丟失 (2)基於記憶體存放任務,不支援叢集

public class WheelTimerTest {

    public static void main(String[] args) {

        //設定每個格子是 100ms, 總共 256 個格子
        HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 256);

        //加入三個任務,依次設定超時時間是 10s 5s 20s

        System.out.println("加入一個任務,ID = 1, time= " + LocalDateTime.now());
        hashedWheelTimer.newTimeout(timeout -> {
            System.out.println(Thread.currentThread().getName());
            System.out.println("執行一個任務,ID = 1, time= " + LocalDateTime.now());
        }, 10, TimeUnit.SECONDS);

        System.out.println("加入一個任務,ID = 2, time= " + LocalDateTime.now());
        hashedWheelTimer.newTimeout(timeout -> {
            System.out.println(Thread.currentThread().getName());
            System.out.println("執行一個任務,ID = 2, time= " + LocalDateTime.now());
        }, 5, TimeUnit.SECONDS);

        System.out.println("加入一個任務,ID = 3, time= " + LocalDateTime.now());
        hashedWheelTimer.newTimeout(timeout -> {
            System.out.println(Thread.currentThread().getName());
            System.out.println("執行一個任務,ID = 3, time= " + LocalDateTime.now());
        }, 20, TimeUnit.SECONDS);
        System.out.println("加入一個任務,ID = 4, time= " + LocalDateTime.now());
        hashedWheelTimer.newTimeout(timeout -> {
            System.out.println(Thread.currentThread().getName());
            System.out.println("執行一個任務,ID = 4, time= " + LocalDateTime.now());
        }, 20, TimeUnit.SECONDS);

        System.out.println("等待任務執行===========");
    }
}

六、訊息佇列-延遲佇列

針對任務丟失的代價過大,高並行的場景 推薦指數:★★★★ 優點: 支援叢集,分散式,高並行場景; 缺點: 引入額外的訊息佇列,增加專案的部署和維護的複雜度。

場景:為一個委託指定期限,委託到期後,委託關係終止,相關業務許可權移交回原擁有者 這裡採用的是RabbitMq的死信佇列加TTL訊息轉化為延遲佇列的方式(RabbitMq沒有延時佇列)

①宣告一個佇列設定其的死信佇列

@Configuration
public class MqConfig {
    public static final String GLOBAL_RABBIT_TEMPLATE = "rabbitTemplateGlobal";

    public static final String DLX_EXCHANGE_NAME = "dlxExchange";
    public static final String AUTH_EXCHANGE_NAME = "authExchange";

    public static final String DLX_QUEUE_NAME = "dlxQueue";
    public static final String AUTH_QUEUE_NAME = "authQueue";
    public static final String DLX_AUTH_QUEUE_NAME = "dlxAuthQueue";

    @Bean
    @Qualifier(GLOBAL_RABBIT_TEMPLATE)
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

    @Bean
    @Qualifier(AUTH_EXCHANGE_NAME)
    public Exchange authExchange() {
        return ExchangeBuilder.directExchange (AUTH_EXCHANGE_NAME).durable (true).build ();
    }

    /**
     * 死信交換機
     * @return
     */
    @Bean
    @Qualifier(DLX_EXCHANGE_NAME)
    public Exchange dlxExchange() {
        return ExchangeBuilder.directExchange (DLX_EXCHANGE_NAME).durable (true).build ();
    }

    /**
     * 記錄紀錄檔的死信佇列
     * @return
     */
    @Bean
    @Qualifier(DLX_QUEUE_NAME)
    public Queue dlxQueue() {
        // Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        return QueueBuilder.durable (DLX_QUEUE_NAME).build ();
    }

    /**
     * 委託授權專用佇列
     * @return
     */
    @Bean
    @Qualifier(AUTH_QUEUE_NAME)
    public Queue authQueue() {
        return QueueBuilder
                .durable (AUTH_QUEUE_NAME)
                .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_NAME)
                .withArgument("x-dead-letter-routing-key", "dlx_auth")
                .build ();
    }

    /**
     * 委託授權專用死信佇列
     * @return
     */
    @Bean
    @Qualifier(DLX_AUTH_QUEUE_NAME)
    public Queue dlxAuthQueue() {
        // Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        return QueueBuilder
                .durable (DLX_AUTH_QUEUE_NAME)
                .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_NAME)
                .withArgument("x-dead-letter-routing-key", "dlx_key")
                .build ();
    }

    @Bean
    public Binding bindDlxQueueExchange(@Qualifier(DLX_QUEUE_NAME) Queue dlxQueue, @Qualifier(DLX_EXCHANGE_NAME) Exchange dlxExchange){
        return BindingBuilder.bind (dlxQueue).to (dlxExchange).with ("dlx_key").noargs ();
    }

    /**
     * 委託授權專用死信佇列繫結關係
     * @param dlxAuthQueue
     * @param dlxExchange
     * @return
     */
    @Bean
    public Binding bindDlxAuthQueueExchange(@Qualifier(DLX_AUTH_QUEUE_NAME) Queue dlxAuthQueue, @Qualifier(DLX_EXCHANGE_NAME) Exchange dlxExchange){
        return BindingBuilder.bind (dlxAuthQueue).to (dlxExchange).with ("dlx_auth").noargs ();
    }

    /**
     * 委託授權專用佇列繫結關係
     * @param authQueue
     * @param authExchange
     * @return
     */
    @Bean
    public Binding bindAuthQueueExchange(@Qualifier(AUTH_QUEUE_NAME) Queue authQueue, @Qualifier(AUTH_EXCHANGE_NAME) Exchange authExchange){
        return BindingBuilder.bind (authQueue).to (authExchange).with ("auth").noargs ();
    }

}

②傳送含過期時間的訊息

向授權交換機,傳送路由為"auth"的訊息(指定了業務所需的超時時間) =》發向MqConfig.AUTH_QUEUE_NAME 佇列

rabbitTemplate.convertAndSend(MqConfig.AUTH_EXCHANGE_NAME, "auth", "型別:END,資訊:{id:1,fromUserId:111,toUserId:222,beginData:20201204,endData:20211104}", message -> {
            /**
             * MessagePostProcessor:訊息後置處理
             * 為訊息設定屬性,然後返回訊息,相當於包裝訊息的類
             */

            //業務邏輯:過期時間=xxxx
            String ttl = "5000";
            //設定訊息的過期時間
            message.getMessageProperties ().setExpiration (ttl);
            return message;
        });

③超時後佇列MqConfig.AUTH_QUEUE_NAME會將訊息轉發至其設定的死信路由"dlx_auth",監聽該死信佇列即可消費定時的訊息

 	/**
     * 授權定時處理
     * @param channel
     * @param message
     */
    @RabbitListener(queues = MqConfig.DLX_AUTH_QUEUE_NAME)
    public void dlxAuthQ(Channel channel, Message message) throws IOException {
        System.out.println ("n死信原因:" + message.getMessageProperties ().getHeaders ().get ("x-first-death-reason"));
        //1.判斷訊息型別:1.BEGIN 2.END
        try {
            //2.1 型別為授權到期(END)
            //2.1.1 修改報件辦理人
            //2.1.2 修改授權狀態為0(失效)

            //2.2 型別為授權開啟(BEGIN)
            //2.2.1 修改授權狀態為1(開啟)
            System.out.println (new String(message.getBody (), Charset.forName ("utf8")));
            channel.basicAck (message.getMessageProperties ().getDeliveryTag (),  false);
            System.out.println ("已處理,授權相關資訊修改成功");
        } catch (Exception e) {
            //拒籤訊息
            channel.basicNack (message.getMessageProperties ().getDeliveryTag (), false, false);
            System.out.println ("授權相關資訊處理失敗, 進入死信佇列記錄紀錄檔");
        }
    }

以上就是盤點Java中延時任務的多種實現方式的詳細內容,更多關於Java延時任務的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com