如何利用CompletableFuture保证批量接口请求结果的顺序一致性?

如何利用CompletableFuture保证批量接口请求结果的顺序一致性?

高效并发处理批量接口请求:确保结果顺序一致

高效率地处理大量数据时,并发调用多个第三方接口能显著提升效率。然而,简单的线程并发可能导致返回结果顺序错乱,与原始数据列表对应不上。本文将介绍如何利用Java的CompletableFuture解决这个问题,确保接口调用结果与原始数据顺序完全一致。

问题:

假设需要并发调用1000多个第三方接口并处理返回结果。如果使用简单的for循环启动多个线程,接口调用的顺序无法保证,最终结果的顺序与原始数据列表不符。某些示例代码使用CompletableFuture.runAsync执行异步任务,但忽略了结果的收集和顺序的维护。

解决方案:

为了保证结果顺序与原始数据列表一致,关键在于使用CompletableFuture.supplyAsync代替CompletableFuture.runAsync。supplyAsync方法可以返回一个结果,而runAsync没有返回值。通过supplyAsync返回每个接口调用的结果,再利用流式处理将结果收集到列表中,即可确保结果顺序与原始数据列表一致。

改进后的代码:

public static void main(String[] args) {     List<String> dataList = new ArrayList<>(); // 原始数据列表     // ... 初始化 dataList ...      ExecutorService executorService = new ThreadPoolExecutor(             //核心线程数             Runtime.getRuntime().availableProcessors(),             //最大线程数             Runtime.getRuntime().availableProcessors() * 2,             //线程存活时间             60L, TimeUnit.SECONDS,             new LinkedBlockingQueue<>(),             new ThreadPoolExecutor.CallerRunsPolicy());      List<CompletableFuture<String>> futures = new ArrayList<>();     for (String data : dataList) {         futures.add(CompletableFuture.supplyAsync(() -> {             logger.info("开始执行异步线程->>" + data);             // 调用接口,传入 data             // 根据接口返回值判断 data 是否匹配             // 返回处理后的结果             return processData(data); // 处理数据并返回结果         }, executorService)); // 使用自定义线程池     }      // 所有请求完成后处理逻辑     CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).thenRun(() -> {         List<String> results = futures.stream()                 .map(CompletableFuture::join)                 .collect(Collectors.toList());         logger.info("线程执行完毕:{}", JSON.toJSONString(results));         // 调用发送短信     }).thenRun(() -> executorService.shutdown()); }  // 处理数据的方法,根据实际情况修改 private static String processData(String data) {     // ...  接口调用和数据处理逻辑 ...     return data + "处理后的结果"; }

通过将每个CompletableFuture的结果存储在futures列表中,并在最后使用futures.stream().map(CompletableFuture::join).collect(Collectors.toList())收集结果,就保证了结果顺序与原始数据列表一致。CompletableFuture::join方法会阻塞直到获取到CompletableFuture的结果。 这样就有效解决了原代码中结果顺序错乱的问题。

© 版权声明
THE END
喜欢就支持一下吧
点赞8 分享