feat(llm): 优化知识库文件上传稳定性

- 在 KnowledgeBaseServiceImpl 中注释掉 Unirest 配置代码
- 在 RagHttpService 中增加文件上传重试机制
- 添加最大重试次数常量 MAX_RETRIES
- 在文件上传方法中加入异常捕获和重试逻辑
-增加日志记录和异常处理
This commit is contained in:
Liuyang 2025-02-12 16:19:19 +08:00
parent c5f65c9787
commit 1a0e6ebe20
2 changed files with 105 additions and 38 deletions

View File

@ -51,6 +51,11 @@ public class RagHttpService {
@Resource
private KnowledgeDocumentsMapper knowledgeDocumentsMapper;
/**
* 最大重试次数
*/
private static final int MAX_RETRIES = 3;
/**
* RAG健康检查API
*/
@ -115,25 +120,50 @@ public class RagHttpService {
String fileContent = fileContentBuilder.toString();
byte[] utf8Bytes = fileContent.getBytes(StandardCharsets.UTF_8);
// 上传文件
Unirest.config().reset();
Unirest.config().socketTimeout(86400000);
HttpResponse<String> uploadResponse = Unirest.post(ragUploadReqVO.getUrl())
.field("file_id", ragUploadReqVO.getFileId())
.field("file", new ByteArrayInputStream(utf8Bytes), ragUploadReqVO.getFileName()) // 使用文件名 "file.txt" 作为示例
.asString();
int retryCount = 0;
boolean uploadSuccess = false;
log.info("Response Body: {}", uploadResponse.getBody());
ragEmbedRespVO = JSON.parseObject(uploadResponse.getBody(), RagEmbedRespVO.class);
log.info("ragEmbedRespVO:{}", ragEmbedRespVO);
while (retryCount < MAX_RETRIES && !uploadSuccess) {
try {
// 配置 Unirest
Unirest.config().reset();
Unirest.config().socketTimeout(86400000);
if (ragEmbedRespVO.isStatus()) {
// 修改状态为 上传成功
updateFileState(documents, KnowledgeStatusEnum.UPLOAD_SUCCESS);
} else {
// 修改状态为 上传失败
updateFileState(documents, KnowledgeStatusEnum.UPLOAD_FAILED);
throw new RuntimeException("文件上传失败:" + ragEmbedRespVO.getMessage());
// 发送上传请求
HttpResponse<String> uploadResponse = Unirest.post(ragUploadReqVO.getUrl())
.field("file_id", ragUploadReqVO.getFileId())
.field("file", new ByteArrayInputStream(utf8Bytes), ragUploadReqVO.getFileName())
.asString();
// 检查响应状态
log.info("Response Body: {}", uploadResponse.getBody());
ragEmbedRespVO = JSON.parseObject(uploadResponse.getBody(), RagEmbedRespVO.class);
log.info("ragEmbedRespVO:{}", ragEmbedRespVO);
if (ragEmbedRespVO.isStatus()) {
// 修改状态为 上传成功
updateFileState(documents, KnowledgeStatusEnum.UPLOAD_SUCCESS);
} else {
// 修改状态为 上传失败
updateFileState(documents, KnowledgeStatusEnum.UPLOAD_FAILED);
throw new RuntimeException("文件上传失败:" + ragEmbedRespVO.getMessage());
}
} catch (UnirestException e) {
if (isSocketClosedException(e)) {
log.warn("Socket 连接已关闭,尝试重新上传...");
retryCount++;
} else {
throw new RuntimeException("文件上传失败: " + e.getMessage(), e);
}
} finally {
Unirest.shutDown();
}
}
if (!uploadSuccess) {
throw new RuntimeException("文件上传失败,已达到最大重试次数: " + MAX_RETRIES);
}
}
}
@ -263,23 +293,49 @@ public class RagHttpService {
// 修改状态为 上传中
updateFileState(documents, KnowledgeStatusEnum.UPLOADING);
// 构建请求参数
HttpResponse<String> response = Unirest.post(ragEmbed)
.field("file_id", fileId)
.field("file", reqVO.getFileInputStream(), fileName)
.asString();
int retryCount = 0;
boolean uploadSuccess = false;
String responseBody = response.getBody();
JSONObject parseObject = JSON.parseObject(responseBody);
log.info(" ========= Response Body Result: {}", responseBody);
while (retryCount < MAX_RETRIES && !uploadSuccess) {
try {
// 配置 Unirest
Unirest.config().reset();
Unirest.config().socketTimeout(86400000);
if (parseObject.containsKey("status") && parseObject.getBoolean("status")) {
// 修改状态为 上传成功
updateFileState(documents, KnowledgeStatusEnum.UPLOAD_SUCCESS);
} else {
// 修改状态为 上传失败
updateFileState(documents, KnowledgeStatusEnum.UPLOAD_FAILED);
throw exception(new ErrorCode(10047, responseBody));
// 构建请求参数
HttpResponse<String> response = Unirest.post(ragEmbed)
.field("file_id", fileId)
.field("file", reqVO.getFileInputStream(), fileName)
.asString();
// 检查响应状态
String responseBody = response.getBody();
JSONObject parseObject = JSON.parseObject(responseBody);
log.info(" ========= Response Body Result: {}", responseBody);
if (parseObject.containsKey("status") && parseObject.getBoolean("status")) {
// 修改状态为 上传成功
updateFileState(documents, KnowledgeStatusEnum.UPLOAD_SUCCESS);
} else {
// 修改状态为 上传失败
updateFileState(documents, KnowledgeStatusEnum.UPLOAD_FAILED);
throw exception(new ErrorCode(10047, responseBody));
}
} catch (UnirestException e) {
if (isSocketClosedException(e)) {
log.warn("knowledgeEmbed Socket 连接已关闭,尝试重新上传...");
retryCount++;
} else {
throw new RuntimeException("文件上传失败: " + e.getMessage(), e);
}
} finally {
Unirest.shutDown();
}
}
if (!uploadSuccess) {
throw new RuntimeException("文件上传失败,已达到最大重试次数: " + MAX_RETRIES);
}
}
@ -317,4 +373,15 @@ public class RagHttpService {
private KnowledgeDocumentsDO getKnowledgeDocuments (String fileId) {
return knowledgeDocumentsMapper.selectById(fileId);
}
/**
* 判断异常是否由 Socket 关闭引起
*
* @param e 异常
* @return 是否为 Socket 关闭异常
*/
private boolean isSocketClosedException (UnirestException e) {
Throwable cause = e.getCause();
return cause instanceof IOException;
}
}

View File

@ -78,12 +78,12 @@ public class KnowledgeBaseServiceImpl implements KnowledgeBaseService {
KnowledgeBaseDO updateObj = BeanUtils.toBean(updateReqVO, KnowledgeBaseDO.class);
knowledgeBaseMapper.updateById(updateObj);
Unirest.config().reset();
Unirest.config()
.socketTimeout(86400000)
.connectTimeout(100000)
.concurrency(10, 5)
.setDefaultHeader("Accept", "application/json");
// Unirest.config().reset();
// Unirest.config()
// .socketTimeout(86400000)
// .connectTimeout(100000)
// .concurrency(10, 5)
// .setDefaultHeader("Accept", "application/json");
// 4. 处理附表知识文档数据
if (!CollectionUtils.isAnyEmpty(updateReqVO.getKnowledgeDocuments())) {