首頁 > 軟體

Reactor 多工並行執行且結果按順序返回第一個

2022-09-24 14:00:39

1 場景

呼叫多個平級服務,按照服務優先順序返回第一個有效資料。

具體case:一個頁面可能有很多的彈窗,彈窗之間又有優先順序。每次只需要返回第一個有資料的彈窗。但是又希望所有彈窗之間的資料獲取是非同步的。這種場景使用 Reactor 怎麼實現呢?

2 建立 service

2.1 建立基本介面和實體類

public interface TestServiceI {
    Mono request();
}

提供一個 request 方法,返回一個 Mono 物件。

@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class TestUser {
    private String name;
}

2.2 建立 service 實現

@Slf4j
public class TestServiceImpl1 implements TestServiceI {
    @Override
    public Mono request() {
        log.info("execute.test.service1");
        return Mono.fromSupplier(() -> {
                    try {
                        System.out.println("service1.threadName=" + Thread.currentThread().getName());
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    return "";
                })
                .map(name -> {
                    return new TestUser(name);
                });
    }
}

第一個 service 執行耗時 500ms。返回空物件;

建立第二個 service 執行耗時 1000ms。返回空物件;程式碼如上,改一下sleep時間即可。

繼續建立第三個 service 執行耗時 1000ms。返回 name3。程式碼如上,改一下 sleep 時間,以及返回為 name3。

3 主體方法

public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        TestServiceI testServiceImpl4 = new TestServiceImpl4();
        TestServiceI testServiceImpl5 = new TestServiceImpl5();
        TestServiceI testServiceImpl6 = new TestServiceImpl6();
        List<TestServiceI> serviceIList = new ArrayList<>();
        serviceIList.add(testServiceImpl4);
        serviceIList.add(testServiceImpl5);
        serviceIList.add(testServiceImpl6);

    // 執行 service 列表,這樣有多少個 service 都可以
        Flux<Mono<TestUser>> monoFlux = Flux.fromIterable(serviceIList)
                .map(service -> {
                    return service.request();
                });
    // flatMap(或者flatMapSequential) + map 實現異常繼續下一個執行
        Flux flux = monoFlux.flatMapSequential(mono -> {
            return mono.map(user -> {
                        TestUser testUser = JsonUtil.parseJson(JsonUtil.toJson(user), TestUser.class);
                        if (Objects.nonNull(testUser) && StringUtils.isNotBlank(testUser.getName())) {
                            return testUser;
                        }
            // null 在 reactor 中是異常資料。
                        return null;
                    })
                    .onErrorContinue((err, i) -> {
                        log.info("onErrorContinue={}", i);
                    });
        });
        Mono mono = flux.elementAt(0, Mono.just(""));
        Object block = mono.block();
        System.out.println(block + "blockFirst 執行耗時ms:" + (System.currentTimeMillis() - startTime));
    }
  • 1、Flux.fromIterable 執行 service 列表,可以隨意增刪 service 服務。
  • 2、flatMap(或者flatMapSequential) + map + onErrorContinue 實現異常繼續下一個執行。具體參考:Reactor中的onErrorContinue 和 onErrorResume
  • 3、Mono mono = flux.elementAt(0, Mono.just("")); 返回第一個正常資料。

執行輸出:

20:54:26.512 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
20:54:26.553 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1
service1.threadName=main
20:54:27.237 [main] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
20:54:27.237 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
service5.threadName=main
20:54:28.246 [main] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
20:54:28.246 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
service6.threadName=main
TestUser(name=name3)blockFirst 執行耗時ms:2895

  • 1、service1 和 service2 因為返回空,所以繼續下一個,最終返回 name3。
  • 2、檢視總耗時:2895ms。service1 耗時 500,service2 耗時1000,service3 耗時 1000。發現耗時基本上等於 service1 + service2 + service3 。這是怎麼回事呢?檢視返回執行的執行緒,都是 main。

總結:這樣實現按照順序返回第一個正常資料。但是執行並沒有非同步。下一步:如何實現非同步呢?

4 實現非同步

4.1 subcribeOn 實現非同步

修改 service 實現。增加 .subscribeOn(Schedulers.boundedElastic())

如下:

@Slf4j
public class TestServiceImpl1 implements TestServiceI {
    @Override
    public Mono request() {
        log.info("execute.test.service1");
        return Mono.fromSupplier(() -> {
                    try {
                        System.out.println("service1.threadName=" + Thread.currentThread().getName());
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    return "";
                })
                //增加subscribeOn
                .subscribeOn(Schedulers.boundedElastic())
                .map(name -> {
                    return new TestUser(name);
                });
    }
}

再次執行輸出如下:

21:02:04.213 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
21:02:04.265 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1
service4.threadName=boundedElastic-1
21:02:04.300 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
21:02:04.302 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
service2.threadName=boundedElastic-2
service3.threadName=boundedElastic-3
21:02:04.987 [boundedElastic-1] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
21:02:05.307 [boundedElastic-2] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
TestUser(name=name6)blockFirst 執行耗時ms:1242

  • 1、發現具體實現 sleep 的執行緒都不是 main 執行緒,而是 boundedElastic
  • 2、最終執行耗時 1242ms,只比執行時間最長的 service2 和 service3 耗時 1000ms,多一些。證明是非同步了。

4.2 CompletableFuture 實現非同步

修改 service 實現,使用 CompletableFuture 執行耗時操作(這裡是sleep,具體到專案中可能是外部介面呼叫,DB 操作等);然後使用 Mono.fromFuture 返回 Mono 物件。

@Slf4j
public class TestServiceImpl1 implements TestServiceI{
    @Override
    public Mono request() {
        log.info("execute.test.service1");
        CompletableFuture<String> uCompletableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("service1.threadName=" + Thread.currentThread().getName());
                Thread.sleep(500);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "testname1";
        });

        return Mono.fromFuture(uCompletableFuture).map(name -> {
            return new TestUser(name);
        });
    }
}

執行返回如下:

21:09:59.465 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
21:09:59.510 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
service2.threadName=ForkJoinPool.commonPool-worker-1
21:09:59.526 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
service3.threadName=ForkJoinPool.commonPool-worker-2
21:09:59.526 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1 
service1.threadName=ForkJoinPool.commonPool-worker-3
21:10:00.526 [ForkJoinPool.commonPool-worker-1] INFO com.geniu.reactor.TestReactorOrder - onErrorContinue=TestUser(name=)
21:10:00.538 [ForkJoinPool.commonPool-worker-2] INFO com.geniu.reactor.TestReactorOrder - onErrorContinue=TestUser(name=)
TestUser(name=testname1)blockFirst 執行耗時ms:1238

  • 1、耗時操作都是使用 ForkJoinPool 執行緒池中的執行緒執行。
  • 2、最終耗時和方法1基本差不多。

到此這篇關於Reactor 多工並行執行且結果按順序返回第一個的文章就介紹到這了,更多相關Reactor 多工執行內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com