Hello! 欢迎来到小浪资源网!

通过异步和非阻塞架构实现 Java 整体现代化以获得更好的性能

通过异步和非阻塞架构实现 Java 整体现代化以获得更好的性能

在最近的一个项目中,我对用 dropwizard 编写的老化的整体 Java web 服务进行了现代化改造。该服务通过 aws Lambda 函数处理许多第三方 (3p) 依赖项,但由于架构的同步、阻塞性质,性能滞后。该设置的 p99 延迟为 20 秒,在等待无服务器功能完成时阻塞请求线程。这种阻塞导致线程池饱和,导致流量高峰时请求频繁失败。


问题的症结是每个对 lambda 函数的请求都会占用 java 服务中的一个请求线程。由于这些 3p 函数通常需要相当长的时间才能完成,因此处理它们的线程将保持阻塞状态,从而消耗资源并限制可扩展性。以下是此阻塞行为在代码中的示例:

// blocking code example public string calllambdaservice(string payload) {     string response = externallambdaservice.invoke(payload);     return response; } 

在此示例中,calllambdaservice 方法会等待,直到 externallambdaservice.invoke() 返回响应。同时,没有其他任务可以使用该线程。


为了解决这些瓶颈,我使用异步和非阻塞方法重新构建了服务。此更改涉及使用调用 lambda 函数的 http 客户端来使用 org.asynchttpclient 库中的 asynchttpclient,该库在内部使用 eventloopgroup 异步处理请求。

使用 asynchttpclient 有助于卸载阻塞操作,而无需消耗池中的线程。以下是更新后的非阻塞调用的示例:

// non-blocking code example public completablefuture<string> calllambdaserviceasync(string payload) {     return completablefuture.supplyasync(() -> {         return asynchttpclient.invoke(payload);     }); } 

利用 java 的 completablefuture 来链接异步调用

除了使单个调用成为非阻塞之外,我还使用 completablefuture 链接了多个依赖项调用。使用 thencombine 和 thenapply 等方法,我可以异步获取并组合来自多个源的数据,从而显着提高吞吐量。

completablefuture<string> future1 = calllambdaserviceasync(payload1); completablefuture<string> future2 = calllambdaserviceasync(payload2);  completablefuture<string> combinedresult = future1.thencombine(future2, (result1, result2) -> {     return processresults(result1, result2); }); 

使用自定义 safeasyncresponse 类引入类型安全

在实现过程中,我观察到 java 的默认 asyncresponse 对象缺乏类型安全性,允许传递任意 java 对象。为了解决这个问题,我创建了一个带有泛型的 safeasyncresponse 类,它确保只能返回指定的响应类型,从而提高可维护性并降低运行时错误的风险。如果多次写入响应,此类还会记录错误。

public class safeasyncresponse<t> {     private static final logger logger = logger.getlogger(safeasyncresponse.class.getname());     private final asyncresponse asyncresponse;     private final atomicInteger invocationcount = new atomicinteger(0);      private safeasyncresponse(asyncresponse asyncresponse) {         this.asyncresponse = asyncresponse;     }      /**      * factory method to create a safeasyncresponse from an asyncresponse.      *      * @param asyncresponse the asyncresponse to wrap      * @param <t>           the type of the response      * @return a new instance of safeasyncresponse      */     public static <t> safeasyncresponse<t> from(asyncresponse asyncresponse) {         return new safeasyncresponse<>(asyncresponse);     }      /**      * resume the async response with a successful result.      *      * @param response the successful response of type t      */     public void withsuccess(t response) {         if (invocationcount.incrementandget() > 1) {             logerror("withsuccess");             return;         }         asyncresponse.resume(response);     }      /**      * resume the async response with an error.      *      * @param error the throwable representing the error      */     public void witherror(throwable error) {         if (invocationcount.incrementandget() > 1) {             logerror("witherror");             return;         }         asyncresponse.resume(error);     }      /**      * logs an error message indicating multiple invocations.      *      * @param methodname the name of the method that was invoked multiple times      */     private void logerror(string methodname) {         logger.severe(() -> string.format(             "safeasyncresponse method '%s' invoked more than once. ignoring subsequent invocations.", methodname         ));     } }  

safeasyncresponse 的示例用法

@get @path("/example") public void exampleendpoint(@suspended asyncresponse asyncresponse) {     safeasyncresponse<string> saferesponse = safeasyncresponse.from(asyncresponse);      // simulate success     saferesponse.withsuccess("operation successful!");      // simulate multiple invocations (only the first one will be processed)     saferesponse.witherror(new runtimeexception("this should not be processed"));     saferesponse.withsuccess("this will be ignored"); } 


为了验证这些更改的有效性,我使用虚拟线程编写了负载测试来模拟单台计算机上的最大吞吐量。我生成了不同级别的无服务器函数执行时间(范围从 1 到 20 秒),发现新的异步非阻塞实现在执行时间较短时将吞吐量提高了 8 倍,在执行时间较长时吞吐量提高了约 4 倍。



发现 http 客户端中的隐藏错误

在运行这些压力测试时,我在我们的自定义 http 客户端中发现了一个隐藏的错误。客户端使用连接超时设置为 integer.max_value 的信号量,这意味着如果客户端用完可用连接,它将无限期地阻塞线程。解决此错误对于防止高负载场景中潜在的死锁至关重要。




synchronized byte[] getData() {     byte[] buf = ...;     int nread = socket.getInputStream().read(buf);  // Can block here     ... } 


幸运的是,随着 jep 491 的出现,java 开发人员可以期待虚拟线程行为的改进,其中可以更有效地处理同步代码中的阻塞操作,而不会耗尽平台线程。


通过将我们的服务重构为异步非阻塞架构,我们实现了显着的性能改进。通过实现 asynchttpclient、引入 safeasyncresponse 来实现类型安全以及进行负载测试,我们能够优化 java 服务并极大提高吞吐量。该项目是单体应用程序现代化方面的一次有价值的实践,并揭示了适当的异步实践对可扩展性的重要性。

随着 java 的发展,我们将来也许能够更有效地利用虚拟线程,但就目前而言,异步和非阻塞架构仍然是高延迟、依赖第三方的服务中性能优化的重要方法。
