diff --git a/yudao-module-llm/yudao-module-llm-biz/src/main/java/cn/iocoder/yudao/module/llm/service/http/RagHttpService.java b/yudao-module-llm/yudao-module-llm-biz/src/main/java/cn/iocoder/yudao/module/llm/service/http/RagHttpService.java index 166533df7..e500727ad 100644 --- a/yudao-module-llm/yudao-module-llm-biz/src/main/java/cn/iocoder/yudao/module/llm/service/http/RagHttpService.java +++ b/yudao-module-llm/yudao-module-llm-biz/src/main/java/cn/iocoder/yudao/module/llm/service/http/RagHttpService.java @@ -51,6 +51,11 @@ public class RagHttpService { @Resource private KnowledgeDocumentsMapper knowledgeDocumentsMapper; + /** + * 最大重试次数 + */ + private static final int MAX_RETRIES = 3; + /** * RAG健康检查API */ @@ -115,25 +120,50 @@ public class RagHttpService { String fileContent = fileContentBuilder.toString(); byte[] utf8Bytes = fileContent.getBytes(StandardCharsets.UTF_8); - // 上传文件 - Unirest.config().reset(); - Unirest.config().socketTimeout(86400000); - HttpResponse uploadResponse = Unirest.post(ragUploadReqVO.getUrl()) - .field("file_id", ragUploadReqVO.getFileId()) - .field("file", new ByteArrayInputStream(utf8Bytes), ragUploadReqVO.getFileName()) // 使用文件名 "file.txt" 作为示例 - .asString(); + int retryCount = 0; + boolean uploadSuccess = false; - log.info("Response Body: {}", uploadResponse.getBody()); - ragEmbedRespVO = JSON.parseObject(uploadResponse.getBody(), RagEmbedRespVO.class); - log.info("ragEmbedRespVO:{}", ragEmbedRespVO); + while (retryCount < MAX_RETRIES && !uploadSuccess) { + try { + // 配置 Unirest + Unirest.config().reset(); + Unirest.config().socketTimeout(86400000); - if (ragEmbedRespVO.isStatus()) { - // 修改状态为 上传成功 - updateFileState(documents, KnowledgeStatusEnum.UPLOAD_SUCCESS); - } else { - // 修改状态为 上传失败 - updateFileState(documents, KnowledgeStatusEnum.UPLOAD_FAILED); - throw new RuntimeException("文件上传失败:" + ragEmbedRespVO.getMessage()); + // 发送上传请求 + HttpResponse uploadResponse = Unirest.post(ragUploadReqVO.getUrl()) + .field("file_id", ragUploadReqVO.getFileId()) + .field("file", new ByteArrayInputStream(utf8Bytes), ragUploadReqVO.getFileName()) + .asString(); + + // 检查响应状态 + + log.info("Response Body: {}", uploadResponse.getBody()); + ragEmbedRespVO = JSON.parseObject(uploadResponse.getBody(), RagEmbedRespVO.class); + log.info("ragEmbedRespVO:{}", ragEmbedRespVO); + + if (ragEmbedRespVO.isStatus()) { + // 修改状态为 上传成功 + updateFileState(documents, KnowledgeStatusEnum.UPLOAD_SUCCESS); + } else { + // 修改状态为 上传失败 + updateFileState(documents, KnowledgeStatusEnum.UPLOAD_FAILED); + throw new RuntimeException("文件上传失败:" + ragEmbedRespVO.getMessage()); + } + + } catch (UnirestException e) { + if (isSocketClosedException(e)) { + log.warn("Socket 连接已关闭,尝试重新上传..."); + retryCount++; + } else { + throw new RuntimeException("文件上传失败: " + e.getMessage(), e); + } + } finally { + Unirest.shutDown(); + } + } + + if (!uploadSuccess) { + throw new RuntimeException("文件上传失败,已达到最大重试次数: " + MAX_RETRIES); } } } @@ -263,23 +293,49 @@ public class RagHttpService { // 修改状态为 上传中 updateFileState(documents, KnowledgeStatusEnum.UPLOADING); - // 构建请求参数 - HttpResponse response = Unirest.post(ragEmbed) - .field("file_id", fileId) - .field("file", reqVO.getFileInputStream(), fileName) - .asString(); + int retryCount = 0; + boolean uploadSuccess = false; - String responseBody = response.getBody(); - JSONObject parseObject = JSON.parseObject(responseBody); - log.info(" ========= Response Body Result: {}", responseBody); + while (retryCount < MAX_RETRIES && !uploadSuccess) { + try { + // 配置 Unirest + Unirest.config().reset(); + Unirest.config().socketTimeout(86400000); - if (parseObject.containsKey("status") && parseObject.getBoolean("status")) { - // 修改状态为 上传成功 - updateFileState(documents, KnowledgeStatusEnum.UPLOAD_SUCCESS); - } else { - // 修改状态为 上传失败 - updateFileState(documents, KnowledgeStatusEnum.UPLOAD_FAILED); - throw exception(new ErrorCode(10047, responseBody)); + // 构建请求参数 + HttpResponse response = Unirest.post(ragEmbed) + .field("file_id", fileId) + .field("file", reqVO.getFileInputStream(), fileName) + .asString(); + + // 检查响应状态 + String responseBody = response.getBody(); + JSONObject parseObject = JSON.parseObject(responseBody); + log.info(" ========= Response Body Result: {}", responseBody); + + if (parseObject.containsKey("status") && parseObject.getBoolean("status")) { + // 修改状态为 上传成功 + updateFileState(documents, KnowledgeStatusEnum.UPLOAD_SUCCESS); + } else { + // 修改状态为 上传失败 + updateFileState(documents, KnowledgeStatusEnum.UPLOAD_FAILED); + throw exception(new ErrorCode(10047, responseBody)); + } + + } catch (UnirestException e) { + if (isSocketClosedException(e)) { + log.warn("knowledgeEmbed Socket 连接已关闭,尝试重新上传..."); + retryCount++; + } else { + throw new RuntimeException("文件上传失败: " + e.getMessage(), e); + } + } finally { + Unirest.shutDown(); + } + } + + if (!uploadSuccess) { + throw new RuntimeException("文件上传失败,已达到最大重试次数: " + MAX_RETRIES); } } @@ -317,4 +373,15 @@ public class RagHttpService { private KnowledgeDocumentsDO getKnowledgeDocuments (String fileId) { return knowledgeDocumentsMapper.selectById(fileId); } + + /** + * 判断异常是否由 Socket 关闭引起 + * + * @param e 异常 + * @return 是否为 Socket 关闭异常 + */ + private boolean isSocketClosedException (UnirestException e) { + Throwable cause = e.getCause(); + return cause instanceof IOException; + } } diff --git a/yudao-module-llm/yudao-module-llm-biz/src/main/java/cn/iocoder/yudao/module/llm/service/knowledgebase/KnowledgeBaseServiceImpl.java b/yudao-module-llm/yudao-module-llm-biz/src/main/java/cn/iocoder/yudao/module/llm/service/knowledgebase/KnowledgeBaseServiceImpl.java index 383887a90..67b682844 100644 --- a/yudao-module-llm/yudao-module-llm-biz/src/main/java/cn/iocoder/yudao/module/llm/service/knowledgebase/KnowledgeBaseServiceImpl.java +++ b/yudao-module-llm/yudao-module-llm-biz/src/main/java/cn/iocoder/yudao/module/llm/service/knowledgebase/KnowledgeBaseServiceImpl.java @@ -78,12 +78,12 @@ public class KnowledgeBaseServiceImpl implements KnowledgeBaseService { KnowledgeBaseDO updateObj = BeanUtils.toBean(updateReqVO, KnowledgeBaseDO.class); knowledgeBaseMapper.updateById(updateObj); - Unirest.config().reset(); - Unirest.config() - .socketTimeout(86400000) - .connectTimeout(100000) - .concurrency(10, 5) - .setDefaultHeader("Accept", "application/json"); +// Unirest.config().reset(); +// Unirest.config() +// .socketTimeout(86400000) +// .connectTimeout(100000) +// .concurrency(10, 5) +// .setDefaultHeader("Accept", "application/json"); // 4. 处理附表(知识文档)数据 if (!CollectionUtils.isAnyEmpty(updateReqVO.getKnowledgeDocuments())) {