refactor(module-llm):重构知识库向量嵌入功能

- 优化了知识库向量嵌入的请求和响应处理逻辑
- 引入了重试机制和更详细的错误处理- 提高了代码的可读性和可维护性
This commit is contained in:
Liuyang 2025-02-12 17:27:04 +08:00
parent 764c9455f2
commit 2f2e1257cc

View File

@ -12,6 +12,7 @@ import cn.iocoder.yudao.module.llm.framework.backend.config.LLMBackendProperties
import cn.iocoder.yudao.module.llm.service.http.vo.*;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.BeanUtils;
import com.google.gson.JsonArray;
@ -20,6 +21,7 @@ import kong.unirest.Unirest;
import kong.unirest.UnirestException;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
@ -55,6 +57,10 @@ public class RagHttpService {
* 最大重试次数
*/
private static final int MAX_RETRIES = 3;
private static final String JSON_PARSE_ERROR_MSG = "返回结果解析为JSON格式错误";
private static final String FILE_UPLOAD_FAILED_MSG = "文件上传失败";
private static final String CONTENT_TYPE_HEADER = "Content-Type";
private static final String WORD_DOCUMENT_CONTENT_TYPE = "application/vnd.openxmlformats-officedocument.wordprocessingml.document";
/**
* RAG健康检查API
@ -270,78 +276,104 @@ public class RagHttpService {
* 知识库向量嵌入
*
* @param reqVO 请求参数
* @param id
* @param id 知识库ID
*/
public void knowledgeEmbed (KnowledgeRagEmbedReqVO reqVO, Long id) {
// 获取知识库向量嵌入的url
String ragEmbed = llmBackendProperties.getEmbed();
log.info("知识库向量嵌入接口URL: {}", ragEmbed);
log.info("url : {}", ragEmbed);
// fileId llm_knowledge_documents ID
String fileId = reqVO.getFileId();
String fileName = reqVO.getFileName();
// 根据 fileId 查询知识库文档
// 获取知识库文档
KnowledgeDocumentsDO documents = getKnowledgeDocuments(id, fileId);
if (documents == null) {
throw exception(new ErrorCode(10047, "知识库文档不存在"));
}
// 修改状态为 上传中
// 更新文件状态为上传中
updateFileState(documents, KnowledgeStatusEnum.UPLOADING);
int retryCount = 0;
boolean uploadSuccess = false;
// 初始化 Unirest 配置只需一次
Unirest.config().socketTimeout(86400000);
while (retryCount < MAX_RETRIES && !uploadSuccess) {
// 发送 POST 请求
for (int retryCount = 0; retryCount < MAX_RETRIES; retryCount++) {
try {
// 配置 Unirest
Unirest.config().reset();
Unirest.config().socketTimeout(86400000);
// 构建请求参数
HttpResponse<String> response = Unirest.post(ragEmbed)
.header("Content-Type", "application/vnd.openxmlformats-officedocument.wordprocessingml.document")
.header(CONTENT_TYPE_HEADER, WORD_DOCUMENT_CONTENT_TYPE)
.field("file_id", fileId)
.field("file", reqVO.getFileInputStream(), fileName)
.asString();
// 检查响应状态
String responseBody = response.getBody();
log.info(" ========= String Response Body Result: {}", responseBody);
try {
JSONObject parseObject = JSON.parseObject(responseBody);
log.info(" ========= JSON Response Body Result: {}", responseBody);
if (parseObject.containsKey("status") && parseObject.getBoolean("status")) {
// 修改状态为 上传成功
updateFileState(documents, KnowledgeStatusEnum.UPLOAD_SUCCESS);
} else {
// 修改状态为 上传失败
updateFileState(documents, KnowledgeStatusEnum.UPLOAD_FAILED);
throw exception(new ErrorCode(10047, responseBody));
}
} catch (Exception e) {
log.error("返回结果 解析为 JSON格式错误: {}", e.getMessage());
throw new RuntimeException("返回结果 解析为 JSON格式错误: " + e.getMessage(), e);
// 检查 HTTP 状态码
if (response.getStatus() != HttpStatus.SC_OK) {
log.error("HTTP请求失败状态码: {}", response.getStatus());
throw new RuntimeException("HTTP请求失败状态码: " + response.getStatus());
}
String responseBody = response.getBody();
log.info("响应原始内容: {}", responseBody);
processResponse(responseBody, documents);
return;
} catch (UnirestException e) {
if (isSocketClosedException(e)) {
log.warn("knowledgeEmbed Socket 连接已关闭,尝试重新上传...");
retryCount++;
if (isSocketClosedException(e) && retryCount < MAX_RETRIES - 1) {
log.warn("Socket连接异常尝试重试 ({}/{})", retryCount + 1, MAX_RETRIES);
resetUnirestConnection();
} else {
throw new RuntimeException("文件上传失败: " + e.getMessage(), e);
handleFailure(documents, FILE_UPLOAD_FAILED_MSG, e);
}
} catch (Exception e) {
handleFailure(documents, FILE_UPLOAD_FAILED_MSG, e);
}
}
if (!uploadSuccess) {
throw new RuntimeException("文件上传失败,已达到最大重试次数: " + MAX_RETRIES);
}
throw new RuntimeException("文件上传失败,已达到最大重试次数: " + MAX_RETRIES);
}
/**
* 处理响应结果
*/
private void processResponse (String responseBody, KnowledgeDocumentsDO documents) {
try {
JSONObject parseObject = JSON.parseObject(responseBody);
log.info("解析后的JSON响应: {}", parseObject);
// 如果状态为true则更新文件状态为上传成功否则更新为上传失败
if (parseObject.getBooleanValue("status")) {
updateFileState(documents, KnowledgeStatusEnum.UPLOAD_SUCCESS);
} else {
String errorMsg = parseObject.getString("error");
updateFileState(documents, KnowledgeStatusEnum.UPLOAD_FAILED);
throw exception(new ErrorCode(10047, errorMsg));
}
} catch (JSONException e) {
log.error("{},原始响应内容: {}", JSON_PARSE_ERROR_MSG, responseBody);
throw new RuntimeException(JSON_PARSE_ERROR_MSG + ": " + e.getMessage(), e);
}
}
/**
* 处理失败逻辑
*/
private void handleFailure (KnowledgeDocumentsDO documents, String errorMsg, Exception e) {
updateFileState(documents, KnowledgeStatusEnum.UPLOAD_FAILED);
log.error("{}: {}", errorMsg, e.getMessage(), e);
throw new RuntimeException(errorMsg, e);
}
/**
* 重置 Unirest 连接
*/
private void resetUnirestConnection () {
try {
Unirest.shutDown();
Unirest.config().socketTimeout(86400000);
} catch (Exception e) {
log.warn("重置Unirest连接失败: {}", e.getMessage());
}
}
/**