异步
使用场景
大文件上传及上传后进行语音识别的应用场景
- 大文件上传: 企业员工上传完整的会议录音或录像文件
- 上传后进行语音识别: 对会议内容进行自动转写,生成会议纪要,便于后续查阅和分析讨论点
如果上传完成后不采用异步任务对文件进行语音识别,可能会出现以下问题:
- 响应延迟: 语音识别是一个计算密集型的操作,特别是当处理大文件时。如果在同一请求中直接进行语音识别,服务器响应时间将大幅增加,导致用户体验变差。
- 超时错误: 许多 Web 服务器和 HTTP 客户端都有请求超时的限制。长时间的同步处理可能导致请求超时,从而使得语音识别任务失败。
- 资源占用: 大量的同步语音识别任务可能会占用大量服务器资源,包括 CPU 和内存,影响服务器处理其他请求的能力,降低整体服务的可用性和稳定性。
- 用户界面冻结: 对于基于 Web 的应用,如果在前端直接等待语音识别的结果,可能导致用户界面冻结,无法进行其他操作,影响用户体验。
目前比较通用的做法是采用异步任务
- 文件上传
- 用户通过客户端(如 Web 页面、移动应用)上传文件。
- 服务器接收到文件后,存储在服务器上或云存储服务中,并立即返回一个响应告诉用户文件已成功上传。
- 异步任务创建
- 在文件上传成功后,服务器不直接进行语音识别处理,而是创建一个异步任务。
- 这个任务被发送到消息队列中,这样可以在系统资源允许的情况下按顺序或并行处理。
- 任务处理
- 后台工作进程监听消息队列。一旦发现新任务,就开始处理该任务。
- 在处理过程中,工作进程执行语音识别操作,将音频文件转写为文本。
- 更新状态和通知
- 一旦异步任务完成,系统会更新任务的状态(例如,从“处理中”变为“已完成”)。
- 系统可以通过不同的方式通知用户任务已完成,比如电子邮件通知、短信、应用内通知等。
- 用户可以通过客户端查询任务状态,或者直接获取任务结果(如转写好的文本)。
异步线程池
TioThreadUtils
- tio-boot 内置了线程池工具类 com.litongjava.tio.utils.thread.TioThreadUtils,并且 tio-boot 在关闭是 会自动释放线程资源 示例
TioThreadUtils.submit(() -> {
try {
RespBodyVo result = Aop.get(AsrSubmitService.class).submit(uploadFile, email);
log.info("异步任务执行完成, 结果: {}", result);
} catch (Exception e) {
log.error("异步任务执行异常", e);
}
});
CompletableFuture.runAsync
CompletableFuture.runAsync
是 Java CompletableFuture
类的一部分,用于在异步执行一个无返回值的任务。它通常用于需要在后台线程中执行一些操作,而不阻塞主线程的场景。runAsync
方法有两个主要重载形式:
runAsync(Runnable runnable)
:使用默认的ForkJoinPool
作为线程池执行任务。runAsync(Runnable runnable, Executor executor)
:使用指定的Executor
执行任务。
用法
CompletableFuture.runAsync(() -> {
// 在异步线程中执行的代码
});
底层运行机制
CompletableFuture.runAsync
的底层运行机制主要涉及以下几个方面:
线程池管理:
- 当使用
runAsync(Runnable runnable)
时,CompletableFuture
默认使用全局的ForkJoinPool.commonPool()
线程池来执行异步任务。 ForkJoinPool
是一个适合并行任务处理的线程池,支持任务分割和工作窃取机制,以提高线程利用率和执行效率。- 如果使用
runAsync(Runnable runnable, Executor executor)
,任务会由你提供的自定义Executor
执行,这给了你更大的灵活性来控制线程的使用。
- 当使用
任务提交和执行:
- 当
runAsync
被调用时,传入的Runnable
任务会被提交到线程池中,线程池会在适当的时候执行这个任务。 - 任务被执行时,
runAsync
方法立即返回一个CompletableFuture
对象,而不等待任务执行完毕。
- 当
非阻塞操作:
CompletableFuture
的异步方法(如runAsync
)不会阻塞调用线程,这意味着主线程可以继续执行后续操作。- 异步操作完成后,可以通过
thenRun
,thenAccept
,thenApply
等方法对结果进行处理。
任务的生命周期:
CompletableFuture
的状态在任务的执行过程中会发生变化,从未完成状态转变为完成状态。- 你可以在
CompletableFuture
完成后设置回调函数,如.thenRun(...)
,这使得异步任务的结果可以在完成后被处理。
ForkJoinPool
的运行机制
默认情况下,CompletableFuture
使用的 ForkJoinPool.commonPool()
是一个并行的工作窃取线程池,适合处理大量的小任务。其运行机制包括:
- 任务分割:
ForkJoinPool
可以将任务分解成更小的子任务并行处理。 - 工作窃取:线程池中的每个线程都有自己的双端队列,线程首先从自己的队列中获取任务执行。如果自己的队列为空,它会尝试从其他线程的队列末尾"窃取"任务以保持高效利用。
- 线程调度:
ForkJoinPool
动态管理线程数,以平衡任务执行和资源消耗。
注意事项
- 线程池的选择:默认的
ForkJoinPool.commonPool()
适用于 CPU 密集型任务。如果任务是 IO 密集型或需要更严格的线程管理,最好提供自定义的Executor
。 - 任务的阻塞:在异步任务中避免长时间的阻塞操作(如等待网络或文件 IO),以免耗尽线程池中的线程。
示例
以下是一个使用 CompletableFuture.runAsync
执行异步任务的示例:
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("Running in a separate thread: " + Thread.currentThread().getName());
// 模拟长时间运行的任务
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
这个代码片段会在 ForkJoinPool
中的一个线程上运行,而不会阻塞主线程。
tio-boot TioThreadUtils 实现异步示例
在 Controller 中使用线程池,开启异步任务
import com.litongjava.apps.asrgpt.services.AsrSubmitService;
import com.litongjava.apps.asrgpt.validator.AsrSubmitValidator;
import com.litongjava.jfinal.aop.Aop;
import com.litongjava.tio.http.common.HttpRequest;
import com.litongjava.tio.http.common.HttpResponse;
import com.litongjava.tio.http.common.UploadFile;
import com.litongjava.tio.http.server.util.Resps;
import com.litongjava.tio.utils.resp.RespBodyVo;
import com.litongjava.tio.utils.thread.TioThreadUtils;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class AsrSubmitController {
public HttpResponse submit(HttpRequest request) {
UploadFile uploadFile = request.getUploadFile("file");
String email = request.getParam("email");
log.info("upload file size:{}", uploadFile.getData().length);
log.info("email:{}", email);
//validator
RespBodyVo respVo = Aop.get(AsrSubmitValidator.class).submit(email);
if (respVo != null) {
return Resps.json(request, respVo);
}
TioThreadUtils.submit(() -> {
try {
RespBodyVo result = Aop.get(AsrSubmitService.class).submit(uploadFile, email);
log.info("异步任务执行完成, 结果: {}", result);
} catch (Exception e) {
log.error("异步任务执行异常", e);
}
});
// 立即响应客户端,表示文件上传请求已接收,正在处理中
RespBodyVo ok = RespBodyVo.ok();
ok.setMsg("文件上传成功,正在处理中...");
return Resps.json(request, ok);
}
}