首頁 > 軟體

java非同步程式設計的7種實現方式小結

2023-03-23 22:01:26

最近有很多小夥伴給我留言,能不能總結下非同步程式設計,今天就和大家簡單聊聊這個話題。

早期的系統是同步的,容易理解,我們來看個例子

同步程式設計

當用戶建立一筆電商交易訂單時,要經歷的業務邏輯流程還是很長的,每一步都要耗費一定的時間,那麼整體的RT就會比較長。

於是,聰明的人們開始思考能不能將一些非核心業務從主流程中剝離出來,於是有了非同步程式設計雛形。

非同步程式設計是讓程式並行執行的一種手段。它允許多個事件同時發生,當程式呼叫需要長時間執行的方法時,它不會阻塞當前的執行流程,程式可以繼續執行。

核心思路:採用多執行緒優化效能,將序列操作變成並行操作。非同步模式設計的程式可以顯著減少執行緒等待,從而在高吞吐量場景中,極大提升系統的整體效能,顯著降低時延。

接下來,我們來講下非同步有哪些程式設計實現方式

一、執行緒 Thread

直接繼承 Thread類 是建立非同步執行緒最簡單的方式。

首先,建立Thread子類,普通類或匿名內部類方式;然後建立子類範例;最後通過start()方法啟動執行緒。

public class AsyncThread extends Thread{
    @Override
    public void run() {
        System.out.println("當前執行緒名稱:" + this.getName() + ", 執行執行緒名稱:" + Thread.currentThread().getName() + "-hello");
    }
}
public static void main(String[] args) {
 
  // 模擬業務流程
  // .......
  
    // 建立非同步執行緒 
    AsyncThread asyncThread = new AsyncThread();
 
    // 啟動非同步執行緒
    asyncThread.start();
}
 

當然如果每次都建立一個 Thread執行緒,頻繁的建立、銷燬,浪費系統資源。我們可以採用執行緒池

@Bean(name = "executorService")
public ExecutorService downloadExecutorService() {
    return new ThreadPoolExecutor(20, 40, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),
            new ThreadFactoryBuilder().setNameFormat("defaultExecutorService-%d").build(),
            (r, executor) -> log.error("defaultExecutor pool is full! "));
}

將業務邏輯封裝到 Runnable 或 Callable 中,交由 執行緒池 來執行

二、Future

上述方式雖然達到了多執行緒並行處理,但有些業務不僅僅要執行過程,還要獲取執行結果。

Java 從1.5版本開始,提供了 Callable 和 Future,可以在任務執行完畢之後得到任務執行結果。

當然也提供了其他功能,如:取消任務、查詢任務是否完成等

Future類位於java.util.concurrent包下,介面定義:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

方法描述:

  • cancel():取消任務,如果取消任務成功返回true,如果取消任務失敗則返回false
  • isCancelled():表示任務是否被取消成功,如果在任務正常完成前被取消成功,則返回 true
  • isDone():表示任務是否已經完成,如果完成,返回true
  • get():獲取執行結果,這個方法會產生阻塞,會一直等到任務執行完畢才返回
  • get(long timeout, TimeUnit unit):用來獲取執行結果,如果在指定時間內,還沒獲取到結果,就直接返回null

程式碼範例:

public class CallableAndFuture {
 
    public static ExecutorService executorService = new ThreadPoolExecutor(4, 40,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(1024), new ThreadFactoryBuilder()
            .setNameFormat("demo-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy());
 
 
    static class MyCallable implements Callable<String> {
        @Override
        public String call() throws Exception {
            return "非同步處理,Callable 返回結果";
        }
    }
 
    public static void main(String[] args) {
        Future<String> future = executorService.submit(new MyCallable());
        try {
            System.out.println(future.get());
        } catch (Exception e) {
            // nodo
        } finally {
            executorService.shutdown();
        }
    }
}

Future 表示一個可能還沒有完成的非同步任務的結果,通過 get 方法獲取執行結果,該方法會阻塞直到任務返回結果。

三、FutureTask

FutureTask 實現了 RunnableFuture 介面,則 RunnableFuture 介面繼承了 Runnable 介面和 Future 介面,所以可以將 FutureTask 物件作為任務提交給 ThreadPoolExecutor 去執行,也可以直接被 Thread 執行;又因為實現了 Future 介面,所以也能用來獲得任務的執行結果。

FutureTask 建構函式:

public FutureTask(Callable<V> callable)
public FutureTask(Runnable runnable, V result)

FutureTask 常用來封裝 Callable 和 Runnable,可以作為一個任務提交到執行緒池中執行。除了作為一個獨立的類之外,也提供了一些功能性函數供我們建立自定義 task 類使用。

FutureTask 執行緒安全由CAS來保證。

ExecutorService executor = Executors.newCachedThreadPool();
// FutureTask包裝callbale任務,再交給執行緒池執行
FutureTask<Integer> futureTask = new FutureTask<>(() -> {
    System.out.println("子執行緒開始計算:");
    Integer sum = 0;
    for (int i = 1; i <= 100; i++)
        sum += i;
    return sum;
});
 
// 執行緒池執行任務, 執行結果在 futureTask 物件裡面
executor.submit(futureTask);
 
try {
    System.out.println("task執行結果計算的總和為:" + futureTask.get());
} catch (Exception e) {
    e.printStackTrace();
}
executor.shutdown();

Callable 和 Future 的區別:Callable 用於產生結果,Future 用於獲取結果

如果是對多個任務多次自由序列、或並行組合,涉及多個執行緒之間同步阻塞獲取結果,Future 程式碼實現會比較繁瑣,需要我們手動處理各個交叉點,很容易出錯。

四、非同步框架 CompletableFuture

Future 類通過 get() 方法阻塞等待獲取非同步執行的執行結果,效能比較差。

JDK1.8 中,Java 提供了 CompletableFuture 類,它是基於非同步函數語言程式設計。相對阻塞式等待返回結果,CompletableFuture 可以通過回撥的方式來處理計算結果,實現了非同步非阻塞,效能更優。

優點

  • 非同步任務結束時,會自動回撥某個物件的方法
  • 非同步任務出錯時,會自動回撥某個物件的方法
  • 主執行緒設定好回撥後,不再關心非同步任務的執行

泡茶範例:

(內容摘自:極客時間的《Java 並行程式設計實戰》)

//任務1:洗水壺->燒開水
CompletableFuture<Void> f1 =
        CompletableFuture.runAsync(() -> {
            System.out.println("T1:洗水壺...");
            sleep(1, TimeUnit.SECONDS);
 
            System.out.println("T1:燒開水...");
            sleep(15, TimeUnit.SECONDS);
        });
 
//任務2:洗茶壺->洗茶杯->拿茶葉
CompletableFuture<String> f2 =
        CompletableFuture.supplyAsync(() -> {
            System.out.println("T2:洗茶壺...");
            sleep(1, TimeUnit.SECONDS);
 
            System.out.println("T2:洗茶杯...");
            sleep(2, TimeUnit.SECONDS);
 
            System.out.println("T2:拿茶葉...");
            sleep(1, TimeUnit.SECONDS);
            return "龍井";
        });
 
//任務3:任務1和任務2完成後執行:泡茶
CompletableFuture<String> f3 =
        f1.thenCombine(f2, (__, tf) -> {
            System.out.println("T1:拿到茶葉:" + tf);
            System.out.println("T1:泡茶...");
            return "上茶:" + tf;
        });
 
//等待任務3執行結果
System.out.println(f3.join());
 
}

CompletableFuture 提供了非常豐富的API,大約有50種處理序列,並行,組合以及處理錯誤的方法。

五、 SpringBoot 註解 @Async

除了寫死的非同步程式設計處理方式,SpringBoot 框架還提供了 註解式 解決方案,以 方法體 為邊界,方法體內部的程式碼邏輯全部按非同步方式執行。

首先,使用 @EnableAsync 啟用非同步註解

@SpringBootApplication
@EnableAsync
public class StartApplication {
 
    public static void main(String[] args) {
        SpringApplication.run(StartApplication.class, args);
    }
}

自定義執行緒池:

@Configuration
@Slf4j
public class ThreadPoolConfiguration {
 
    @Bean(name = "defaultThreadPoolExecutor", destroyMethod = "shutdown")
    public ThreadPoolExecutor systemCheckPoolExecutorService() {
 
        return new ThreadPoolExecutor(3, 10, 60, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(10000),
                new ThreadFactoryBuilder().setNameFormat("default-executor-%d").build(),
                (r, executor) -> log.error("system pool is full! "));
    }
}

在非同步處理的方法上新增註解 @Async ,當對 execute 方法 呼叫時,通過自定義的執行緒池 defaultThreadPoolExecutor 非同步化執行  execute 方法

@Service
public class AsyncServiceImpl implements AsyncService {
 
    @Async("defaultThreadPoolExecutor")
    public Boolean execute(Integer num) {
        System.out.println("執行緒:" + Thread.currentThread().getName() + " , 任務:" + num);
        return true;
    }
 
}

用 @Async 註解標記的方法,稱為非同步方法。在spring boot應用中使用 @Async 很簡單:

  • 呼叫非同步方法類上或者啟動類加上註解 @EnableAsync
  • 在需要被非同步呼叫的方法外加上 @Async
  • 所使用的 @Async 註解方法的類物件應該是Spring容器管理的bean物件;

六、Spring ApplicationEvent 事件

事件機制在一些大型專案中被經常使用,Spring 專門提供了一套事件機制的介面,滿足了架構原則上的解耦。

ApplicationContext 通過 ApplicationEvent 類和 ApplicationListener 介面進行事件處理。如果將實現 ApplicationListener 介面的 bean 注入到上下文中,則每次使用 ApplicationContext 釋出 ApplicationEvent 時,都會通知該 bean。本質上,這是標準的觀察者設計模式

ApplicationEvent 是由 Spring 提供的所有 Event 類的基礎類別

首先,自定義業務事件子類,繼承自 ApplicationEvent,通過泛型注入業務模型引數類。相當於 MQ 的訊息體。

public class OrderEvent extends AbstractGenericEvent<OrderModel> {
    public OrderEvent(OrderModel source) {
        super(source);
    }
}

然後,編寫事件監聽器。ApplicationListener 介面是由 Spring 提供的事件訂閱者必須實現的介面,我們需要定義一個子類,繼承 ApplicationListener。相當於 MQ 的消費端

@Component
public class OrderEventListener implements ApplicationListener<OrderEvent> {
    @Override
    public void onApplicationEvent(OrderEvent event) {
 
        System.out.println("【OrderEventListener】監聽器處理!" + JSON.toJSONString(event.getSource()));
 
    }
}

最後,釋出事件,把某個事件告訴所有與這個事件相關的監聽器。相當於 MQ 的生產端。

OrderModel orderModel = new OrderModel();
orderModel.setOrderId((long) i);
orderModel.setBuyerName("Tom-" + i);
orderModel.setSellerName("judy-" + i);
orderModel.setAmount(100L);
// 釋出Spring事件通知
SpringUtils.getApplicationContext().publishEvent(new OrderEvent(orderModel));

加個餐:

[消費端]執行緒:http-nio-8090-exec-1,消費事件 {"amount":100.0,"buyerName":"Tom-1","orderId":1,"sellerName":"judy-1"}
[生產端]執行緒:http-nio-8090-exec-1,釋出事件 1
[消費端]執行緒:http-nio-8090-exec-1,消費事件 {"amount":100.0,"buyerName":"Tom-2","orderId":2,"sellerName":"judy-2"}
[生產端]執行緒:http-nio-8090-exec-1,釋出事件 2
[消費端]執行緒:http-nio-8090-exec-1,消費事件 {"amount":100.0,"buyerName":"Tom-3","orderId":3,"sellerName":"judy-3"}
[生產端]執行緒:http-nio-8090-exec-1,釋出事件 3

上面是跑了個demo的執行結果,我們發現無論生產端還是消費端,使用了同一個執行緒 http-nio-8090-exec-1,Spring 框架的事件機制預設是同步阻塞的。只是在程式碼規範方面做了解耦,有較好的擴充套件性,但底層還是採用同步呼叫方式。

那麼問題來了,如果想實現非同步呼叫,如何處理?

我們需要手動建立一個 SimpleApplicationEventMulticaster,並設定 TaskExecutor,此時所有的消費事件採用非同步執行緒執行。

@Component
public class SpringConfiguration {
 
    @Bean
    public SimpleApplicationEventMulticaster applicationEventMulticaster(@Qualifier("defaultThreadPoolExecutor") ThreadPoolExecutor defaultThreadPoolExecutor) {
        SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
        simpleApplicationEventMulticaster.setTaskExecutor(defaultThreadPoolExecutor);
        return simpleApplicationEventMulticaster;
    }
 
}
 

我們看下改造後的執行結果:

[生產端]執行緒:http-nio-8090-exec-1,釋出事件 1
[生產端]執行緒:http-nio-8090-exec-1,釋出事件 2
[生產端]執行緒:http-nio-8090-exec-1,釋出事件 3
[消費端]執行緒:default-executor-1,消費事件 {"amount":100.0,"buyerName":"Tom-2","orderId":2,"sellerName":"judy-2"}
[消費端]執行緒:default-executor-2,消費事件 {"amount":100.0,"buyerName":"Tom-1","orderId":1,"sellerName":"judy-1"}
[消費端]執行緒:default-executor-0,消費事件 {"amount":100.0,"buyerName":"Tom-3","orderId":3,"sellerName":"judy-3"}

SimpleApplicationEventMulticaster 這個我們自己範例化的 Bean 與系統預設的載入順序如何?會不會有衝突?

查了下 Spring 原始碼,處理邏輯在 AbstractApplicationContext#initApplicationEventMulticaster 方法中,通過 beanFactory 查詢是否有自定義的 Bean,如果沒有,容器會自己 new 一個 SimpleApplicationEventMulticaster 物件注入到容器中。

七、訊息佇列

非同步架構是網際網路系統中一種典型架構模式,與同步架構相對應。而訊息佇列天生就是這種非同步架構,具有超高吞吐量和超低時延。

訊息佇列非同步架構的主要角色包括訊息生產者、訊息佇列和訊息消費者。

訊息生產者就是主應用程式,生產者將呼叫請求封裝成訊息傳送給訊息佇列。

訊息佇列的職責就是緩衝訊息,等待消費者消費。根據消費方式又分為對等模式釋出訂閱模式兩種。

訊息消費者,用來從訊息佇列中拉取、消費訊息,完成業務邏輯處理。

當然市面上訊息佇列框架非常多,常見的有RabbitMQ、Kafka、RocketMQ、ActiveMQ 和 Pulsar 等

不同的訊息佇列的功能特性會略有不同,但整體架構類似,這裡就不展開了。

我們只需要記住一個關鍵點,藉助訊息佇列這個中介軟體可以高效的實現非同步程式設計。

到此這篇關於java非同步程式設計的7種實現方式小結的文章就介紹到這了,更多相關java非同步程式設計內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com