refactor(yudao-module-llm): 重构流式聊天接口的异步处理逻辑

-移除了 ExecutorService 的创建和手动管理
- 使用 CompletableFuture.runAsync() 替代手动线程管理,简化异步处理逻辑
-优化了异常处理和 SseEmitter 的完成处理
- 注释掉了
This commit is contained in:
Liuyang 2025-03-03 17:38:05 +08:00
parent 0bd503f11d
commit 9e593db115

View File

@ -11,6 +11,7 @@ import cn.iocoder.yudao.module.llm.controller.admin.conversation.vo.*;
import cn.iocoder.yudao.module.llm.dal.dataobject.conversation.ConversationDO;
import cn.iocoder.yudao.module.llm.service.conversation.ConversationService;
import cn.iocoder.yudao.module.llm.service.http.vo.TextToImageReqVo;
import com.alibaba.fastjson.JSON;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
@ -25,6 +26,7 @@ import javax.servlet.http.HttpServletResponse;
import javax.validation.Valid;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -108,32 +110,43 @@ public class ConversationController {
* @return SseEmitter 对象用于流式发送响应
*/
@PostMapping("/stream-chat")
public SseEmitter streamChat (@Valid @RequestBody ChatReqVO chatReqVO, HttpServletResponse response) {
public void streamChat (@Valid @RequestBody ChatReqVO chatReqVO, HttpServletResponse response) {
log.info("收到对话推理请求,请求参数: {}", chatReqVO);
SseEmitter emitter = new SseEmitter(120_000L);
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
executor.execute(() -> {
try {
conversationService.chatStream(chatReqVO, emitter, response);
} catch (Exception e) {
emitter.completeWithError(e);
} finally {
executor.shutdown();
}
});
} catch (Exception e) {
log.error("处理对话推理请求时发生异常", e);
// ExecutorService executor = Executors.newSingleThreadExecutor();
// try {
// executor.execute(() -> {
// try {
// conversationService.chatStream(chatReqVO, emitter, response);
// } catch (Exception e) {
// emitter.completeWithError(e);
// } finally {
// executor.shutdown();
// }
// });
// } catch (Exception e) {
// log.error("处理对话推理请求时发生异常", e);
// try {
// emitter.completeWithError(e);
// } catch (Exception ex) {
// log.error("无法完成 SseEmitter 错误处理", ex);
// }
// }
// log.info("返回 SseEmitter 对象,准备进行流式响应");
// 异步处理避免阻塞主线程
CompletableFuture.runAsync(() -> {
try {
emitter.completeWithError(e);
} catch (Exception ex) {
log.error("无法完成 SseEmitter 错误处理", ex);
conversationService.chatStream(chatReqVO, emitter, response);
emitter.complete();
} catch (Exception e) {
log.error("处理对话推理请求时发生异常", e);
try {
emitter.completeWithError(e);
} catch (Exception ex) {
log.error("无法完成 SseEmitter 错误处理", ex);
}
}
}
log.info("返回 SseEmitter 对象,准备进行流式响应");
emitter.onCompletion(executor::shutdown);
emitter.onTimeout(executor::shutdown);
return emitter;
});
}
@PostMapping("/text-to-image")