首頁 > 軟體

springboot使用ThreadPoolTaskExecutor多執行緒批次插入百萬級資料的實現方法

2023-09-06 06:25:28

前言

開發目的:提高百萬級資料插入效率。
採取方案:利用ThreadPoolTaskExecutor多執行緒批次插入。
採用技術:springboot2.1.1+mybatisPlus3.0.6+swagger2.5.0+Lombok1.18.4+postgresql+ThreadPoolTaskExecutor等。

具體實現細節

application-dev.properties新增執行緒池設定資訊

# 非同步執行緒設定
# 設定核心執行緒數
async.executor.thread.core_pool_size = 30
# 設定最大執行緒數
async.executor.thread.max_pool_size = 30
# 設定佇列大小
async.executor.thread.queue_capacity = 99988
# 設定執行緒池中的執行緒的名稱字首
async.executor.thread.name.prefix = async-importDB-

spring容器注入執行緒池bean物件

@Configuration
 
@EnableAsync
 
@Slf4j
 
public class ExecutorConfig {
    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;
 
    @Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        log.warn("start asyncServiceExecutor");
        //在這裡修改
        ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
        //設定核心執行緒數
        executor.setCorePoolSize(corePoolSize);
        //設定最大執行緒數
        executor.setMaxPoolSize(maxPoolSize);
        //設定佇列大小
        executor.setQueueCapacity(queueCapacity);
        //設定執行緒池中的執行緒的名稱字首
        executor.setThreadNamePrefix(namePrefix);
        // rejection-policy:當pool已經達到max size的時候,如何處理新任務
        // CALLER_RUNS:不在新執行緒中執行任務,而是有呼叫者所在的執行緒來執行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //執行初始化
        executor.initialize();
        return executor;
    }
}

建立非同步執行緒 業務類

@Service
 
@Slf4j
 
public class AsyncServiceImpl implements AsyncService {
@Override
    @Async("asyncServiceExecutor")
    public void executeAsync(List<LogOutputResult> logOutputResults, LogOutputResultMapper logOutputResultMapper, CountDownLatch countDownLatch) {
        try{
            log.warn("start executeAsync");
            //非同步執行緒要做的事情
            logOutputResultMapper.addLogOutputResultBatch(logOutputResults);
            log.warn("end executeAsync");
        }finally {
            countDownLatch.countDown();// 很關鍵, 無論上面程式是否異常必須執行countDown,否則await無法釋放
        }
    }
}

建立多執行緒批次插入具體業務方法

@Override
    public int testMultiThread() {
        List<LogOutputResult> logOutputResults = getTestData();
        //測試每100條資料插入開一個執行緒
        List<List<LogOutputResult>> lists = ConvertHandler.splitList(logOutputResults, 100);
        CountDownLatch countDownLatch = new CountDownLatch(lists.size());
        for (List<LogOutputResult> listSub:lists) {
            asyncService.executeAsync(listSub, logOutputResultMapper,countDownLatch);
        }
        try {
            countDownLatch.await(); //保證之前的所有的執行緒都執行完成,才會走下面的;
            // 這樣就可以在下面拿到所有執行緒執行完的集合結果
        } catch (Exception e) {
            log.error("阻塞異常:"+e.getMessage());
        }
        return logOutputResults.size();
    }

模擬2000003 條資料進行測試

多執行緒 測試 2000003  耗時如下:耗時1.67分鐘

本次開啟30個執行緒,截圖如下:

單執行緒測試2000003  耗時如下:耗時5.75分鐘

檢查多執行緒入庫的資料,檢查是否存在重複入庫的問題:

根據id分組,檢視是否有id重複的資料,通過sql語句檢查,沒有發現重複入庫的問題

檢查資料完整性: 通過sql語句查詢,多執行緒錄入資料完整

測試結果

不同執行緒數測試:

總結

通過以上測試案列,同樣是匯入2000003  條資料,多執行緒耗時1.67分鐘,單執行緒耗時5.75分鐘。通過對不同執行緒數的測試,發現不是執行緒數越多越好,具體多少合適,網上有一個不成文的演演算法:
CPU核心數量*2 +2 個執行緒。
附:測試電腦設定

到此這篇關於springboot使用ThreadPoolTaskExecutor多執行緒批次插入百萬級資料的實現方法的文章就介紹到這了,更多相關springboot ThreadPoolTaskExecutor多執行緒內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com