refactor(module-llm):重构知识库文档上传流程

- 优化了 createKnowledgeBase 方法的逻辑,增加了日志记录和错误处理
- 重构了 embedUploadFile 方法,使用 HttpClient 替代 Unirest
- 改进了 knowledgeEmbed 方法,使用 OkHttpClient 替代 Unirest
- 优化了 getFileByte 方法,增加了日志记录和错误处理
- 移除了未使用的代码和冗余的注释
This commit is contained in:
Liuyang 2025-02-21 17:28:41 +08:00
parent 758b00fefb
commit b046efa65b
2 changed files with 148 additions and 124 deletions

View File

@ -39,47 +39,68 @@ public class AsyncKnowledgeBase {
// 向向量知识库创建文件
// @Async
public void createKnowledgeBase (List<KnowledgeDocumentsDO> knowledgeList, List<Long> ids) {
public void createKnowledgeBase(List<KnowledgeDocumentsDO> knowledgeList, List<Long> ids) {
log.info("开始执行 createKnowledgeBase 方法。knowledgeList 大小: {}, ids 大小: {}", knowledgeList.size(), ids.size());
// 如果提供了 ids则删除现有的知识库文档
if (!CollectionUtils.isAnyEmpty(ids)) {
log.info("正在删除现有的知识库文档ids: {}", ids);
String mes = ragHttpService.ragDocumentsDel(llmBackendProperties.getRagDocumentsDel(), ids);
log.info("delete knowledge base info {}", mes);
log.info("删除知识库信息: {}", mes);
} else {
log.info("未提供 ids跳过删除操作。");
}
// 注释调试
// 处理 knowledgeList 中的每个知识文档
if (!CollectionUtils.isAnyEmpty(knowledgeList)) {
log.info("开始处理知识文档列表,列表大小: {}", knowledgeList.size());
knowledgeList.forEach(knowledge -> {
try {
log.info("knowledge base begin create {}", knowledge);
log.info("开始为文档创建知识库,文档 ID: {}", knowledge.getId());
// 修改状态为 未上传
// 将文件状态更新为 NOT_UPLOADED未上传
log.info("将文档状态更新为 NOT_UPLOADED文档 ID: {}", knowledge.getId());
updateFileState(knowledge, KnowledgeStatusEnum.NOT_UPLOADED);
// 准备 RegUploadReqVO 用于文档上传
log.info("为文档准备 RegUploadReqVO文档 ID: {}", knowledge.getId());
RegUploadReqVO regUploadReqVO = new RegUploadReqVO()
.setUrl(llmBackendProperties.getRagEmbed())
.setFileId(String.valueOf(knowledge.getId()))
.setFileName(knowledge.getDocumentName())
.setFileUrl(knowledge.getFileUrl());
// 检查文件扩展名并根据扩展名处理
int lastIndex = knowledge.getDocumentName().lastIndexOf(".");
if (lastIndex != -1) {
String extension = knowledge.getDocumentName().substring(lastIndex + 1).toLowerCase();
log.info("文档扩展名: {}", extension);
if ("txt".equals(extension)) {
log.info("文档为 txt 文件,直接上传嵌入,文档 ID: {}", knowledge.getId());
ragHttpService.embedUploadFile(regUploadReqVO);
} else {
log.info("文档为非 txt 文件,调用知识嵌入方法,文档 ID: {}", knowledge.getId());
knowledgeEmbed(knowledge, knowledge.getKnowledgeBaseId());
}
} else {
log.warn("文档无扩展名,跳过处理,文档 ID: {}", knowledge.getId());
}
log.info("文档处理完成,文档 ID: {}", knowledge.getId());
} catch (Exception e) {
log.error("the creation of the knowledge base error {}", e.getMessage());
// 修改状态为 上传失败
log.error("创建知识库时发生错误,文档 ID: {},错误信息: {}", knowledge.getId(), e.getMessage());
// 如果发生异常将文件状态更新为 UPLOAD_FAILED上传失败
log.info("将文档状态更新为 UPLOAD_FAILED文档 ID: {}", knowledge.getId());
updateFileState(knowledge, KnowledgeStatusEnum.UPLOAD_FAILED);
throw exception(new ErrorCode(10047, "文件上传到知识库失败!"));
}
});
} else {
log.info("知识文档列表为空,跳过处理。");
}
log.info("createKnowledgeBase 方法执行完成。");
}
/**
@ -102,19 +123,11 @@ public class AsyncKnowledgeBase {
*/
public void knowledgeEmbed (KnowledgeDocumentsDO knowledge, Long id) {
// TODO:本地调试时打开
// String tmpUrl = "http://xhllm.xinnuojinzhi.com/admin-api/infra/file/29/get/ca3d06d24f80c127ec0300408a035176f5e0bf90ce319fda17018303226e2298.doc";
// log.info("knowledge url {}", tmpUrl);
// knowledge.setFileUrl(tmpUrl);
// 创建知识向量
KnowledgeRagEmbedReqVO ragEmbedReqVo = new KnowledgeRagEmbedReqVO()
.setFileId(String.valueOf(knowledge.getId()))
.setFileName(knowledge.getDocumentName())
.setFileUrl(knowledge.getFileUrl());
// .setFileInputStream(new ByteArrayInputStream(Objects.requireNonNull(getFileByte(knowledge.getFileUrl()))))
// .setFileBytes(getFileByte(knowledge.getFileUrl()
try {
ragHttpService.knowledgeEmbed(ragEmbedReqVo, id);
@ -123,34 +136,4 @@ public class AsyncKnowledgeBase {
}
}
/**
* 获取文件字节数组
*
* @param fileUrl 文件地址
* @return 文件字节数组
*/
public static byte[] getFileByte (String fileUrl) {
try (InputStream inputStream = new URL(fileUrl).openStream();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
// 缓冲区大小
byte[] buffer = new byte[1024];
int bytesRead;
// 读取文件内容并写入 ByteArrayOutputStream
while ((bytesRead = inputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead);
}
// 返回字节数组
return outputStream.toByteArray();
} catch (IOException e) {
log.error("Failed to read remote file: {}", e.getMessage());
throw exception(new ErrorCode(10001_001, "文件读取错误"));
}
}
}

View File

@ -95,67 +95,91 @@ public class RagHttpService {
/**
* 向量知识库文档上传
*
* @param ragUploadReqVO
* @throws UnirestException
* @throws IOException
* @param ragUploadReqVO 上传请求参数
* @throws UnirestException 如果 Unirest 请求失败
* @throws IOException 如果发生 I/O 错误
*/
public void embedUploadFile (RegUploadReqVO ragUploadReqVO) throws UnirestException, IOException {
public void embedUploadFile(RegUploadReqVO ragUploadReqVO) throws UnirestException, IOException {
log.info("开始向量知识库文档上传流程");
// 根据 fileId 查询知识库文档
log.info("根据 fileId 查询知识库文档fileId: {}", ragUploadReqVO.getFileId());
KnowledgeDocumentsDO documents = getKnowledgeDocuments(ragUploadReqVO.getFileId());
if (documents == null) {
log.error("知识库文档不存在fileId: {}", ragUploadReqVO.getFileId());
throw exception(new ErrorCode(10047, "知识库文档不存在"));
}
log.info("成功获取知识库文档: {}", documents);
// 修改状态为 上传中
// 修改状态为上传中
log.info("更新文件状态为上传中fileId: {}", ragUploadReqVO.getFileId());
updateFileState(documents, KnowledgeStatusEnum.UPLOADING);
// 创建 HTTP 客户端
log.info("创建 HTTP 客户端");
CloseableHttpClient httpClient = HttpClients.createDefault();
RagEmbedRespVO ragEmbedRespVO = new RagEmbedRespVO();
// 创建 HTTP GET 请求
log.info("创建 HTTP GET 请求文件URL: {}", ragUploadReqVO.getFileUrl());
HttpGet request = new HttpGet(ragUploadReqVO.getFileUrl());
try (CloseableHttpResponse response = httpClient.execute(request)) {
log.info("HTTP GET 请求执行成功,响应状态: {}", response.getStatusLine());
HttpEntity entity = response.getEntity();
if (entity != null) {
log.info("响应实体不为空,开始处理文件内容");
try (InputStream inputStream = entity.getContent();
BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream)) {
log.info("创建 BufferedInputStream准备检测文件编码");
bufferedInputStream.mark(Integer.MAX_VALUE);
String encoding = detectCharset(bufferedInputStream);
log.info("检测到文件编码: {}", encoding);
bufferedInputStream.reset(); // 重置流以便重新读取
log.info("重置 BufferedInputStream准备重新读取文件内容");
// 使用检测到的编码读取文件内容
try (InputStreamReader reader = new InputStreamReader(bufferedInputStream, encoding);
BufferedReader bufferedReader = new BufferedReader(reader)) {
log.info("创建 BufferedReader开始读取文件内容");
StringBuilder fileContentBuilder = new StringBuilder();
String line;
while ((line = bufferedReader.readLine()) != null) {
fileContentBuilder.append(line).append(System.lineSeparator());
}
String fileContent = fileContentBuilder.toString();
byte[] utf8Bytes = fileContent.getBytes(StandardCharsets.UTF_8);
log.info("成功读取文件内容,文件大小: {} 字符", fileContent.length());
byte[] utf8Bytes = fileContent.getBytes(StandardCharsets.UTF_8);
log.info("将文件内容转换为 UTF-8 字节数组,字节大小: {} 字节", utf8Bytes.length);
try {
// 配置 Unirest
/* Unirest.config().reset();
Unirest.config().socketTimeout(86400000);*/
// 发送上传请求
log.info("准备上传文件内容");
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(utf8Bytes);
int bufferSize = 1024;
byte[] byteArray = new byte[bufferSize];
int bytesRead;
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
int bufferSize = 1024; // 定义缓冲区大小为1024字节
byte[] byteArray = new byte[bufferSize]; // 创建字节数组
int bytesRead; // 记录实际读取的字节数
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); // 输出流用于存储字节
log.info("开始读取字节数组并写入输出流");
while ((bytesRead = byteArrayInputStream.read(byteArray)) != -1) {
// 如果没有读取到数据bytesRead会返回-1
outputStream.write(byteArray, 0, bytesRead); // 将读取的字节写入输出流
outputStream.write(byteArray, 0, bytesRead);
}
// 将ByteArrayOutputStream转换为byte数组
byte[] result = outputStream.toByteArray();
log.info("成功将文件内容转换为字节数组,字节大小: {} 字节", result.length);
inputStream.close(); // 关闭InputStream
outputStream.close(); // 关闭ByteArrayOutputStream
// 关闭流
byteArrayInputStream.close();
outputStream.close();
log.info("关闭 ByteArrayInputStream 和 ByteArrayOutputStream");
// 获取向量嵌入接口 URL
String ragEmbed = ragUploadReqVO.getUrl();
log.info("知识库向量嵌入接口URL: {}", ragEmbed);
@ -167,43 +191,55 @@ public class RagHttpService {
String curlCommand = String.format("curl -X POST -F \"file_id=%s\" -F \"file=@%s\" \"%s\"", ragUploadReqVO.getFileId(), ragUploadReqVO.getFileName(), ragEmbed);
log.info("生成的 curl 命令: {}", curlCommand);
long startTime = System.currentTimeMillis(); // 记录开始时间
// 记录开始时间
long startTime = System.currentTimeMillis();
log.info("开始发送上传请求,时间戳: {}", startTime);
// 发送上传请求
String body = HttpRequest.post(ragUploadReqVO.getUrl())
.form("file", result, ragUploadReqVO.getFileName())
.form("file_id", ragUploadReqVO.getFileId())
.timeout(60000)
.executeAsync().body();
// 打印响应内容
// 记录结束时间并计算耗时
long endTime = System.currentTimeMillis();
long duration = endTime - startTime;
log.info("上传请求完成,耗时: {}", formatDuration(duration));
// 打印响应内容
printLogs();
log.info("请求耗时: {}", formatDuration(duration));
log.info("响应原始内容: {}", body);
printLogs();
// 解析响应
ragEmbedRespVO = JSON.parseObject(body, RagEmbedRespVO.class);
log.info("ragEmbedRespVO:{}", ragEmbedRespVO);
log.info("解析响应结果: {}", ragEmbedRespVO);
if (ragEmbedRespVO.isStatus()) {
// 修改状态为 上传成功
log.info("文件上传成功,更新文件状态为上传成功");
updateFileState(documents, KnowledgeStatusEnum.UPLOAD_SUCCESS);
} else {
// 修改状态为 上传失败
log.error("文件上传失败,错误信息: {}", ragEmbedRespVO.getMessage());
updateFileState(documents, KnowledgeStatusEnum.UPLOAD_FAILED);
throw new RuntimeException("文件上传失败:" + ragEmbedRespVO.getMessage());
}
} catch (UnirestException e) {
log.error("文件上传失败UnirestException: {}", e.getMessage(), e);
throw new RuntimeException("文件上传失败: " + e.getMessage());
}
}
}
} else {
log.error("响应实体为空,无法获取文件内容");
throw new IOException("响应实体为空");
}
} catch (IOException e) {
log.error("HTTP GET 请求执行失败: {}", e.getMessage(), e);
throw e;
}
log.info("向量知识库文档上传流程结束");
}
public void printLogs(){
@ -317,6 +353,7 @@ public class RagHttpService {
return res;
}
/**
* 知识库向量嵌入
*
@ -324,9 +361,12 @@ public class RagHttpService {
* @param id 知识库ID
* @throws IOException 如果发生I/O错误
*/
public void knowledgeEmbed (KnowledgeRagEmbedReqVO reqVO, Long id) throws IOException {
public void knowledgeEmbed(KnowledgeRagEmbedReqVO reqVO, Long id) throws IOException {
log.info("开始知识库向量嵌入流程知识库ID: {}", id);
// 获取向量嵌入接口的URL
String ragEmbed = llmBackendProperties.getEmbed();
log.info("向量嵌入接口URL: {}", ragEmbed);
// 从请求参数中获取文件ID和文件名
String fileId = reqVO.getFileId();
@ -334,31 +374,36 @@ public class RagHttpService {
String fileUrl = reqVO.getFileUrl();
String mediaType = getMediaType(fileName);
log.info("URL: {}, fileId: {} ,fileName: {}, fileUrl: {}, mediaType: {} ", ragEmbed, fileId, fileName, fileUrl, mediaType);
log.info("文件ID: {}, 文件名: {}, 文件URL: {}, 文件类型: {}", fileId, fileName, fileUrl, mediaType);
// 获取知识库文档
log.info("开始获取知识库文档知识库ID: {}, 文件ID: {}", id, fileId);
KnowledgeDocumentsDO documents = getKnowledgeDocuments(id, fileId);
if (documents == null) {
log.error("知识库文档不存在知识库ID: {}, 文件ID: {}", id, fileId);
throw exception(new ErrorCode(10047, "知识库文档不存在"));
}
log.info("成功获取知识库文档: {}", documents);
// 更新文件状态为上传中
log.info("更新文件状态为上传中文件ID: {}", fileId);
updateFileState(documents, KnowledgeStatusEnum.UPLOADING);
// 获取文件字节数组
log.info("开始获取文件字节数组文件URL: {}", fileUrl);
byte[] fileBytes = Objects.requireNonNull(getFileByte(fileUrl));
log.info("成功获取文件字节数组,文件大小: {} 字节", fileBytes.length);
// 创建 OkHttpClient 实例
log.info("创建 OkHttpClient 实例,设置超时时间为 30 分钟");
OkHttpClient client = new OkHttpClient.Builder()
// 连接超时时间
.connectTimeout(30, TimeUnit.MINUTES)
// 读取超时时间
.readTimeout(30, TimeUnit.MINUTES)
// 写入超时时间
.writeTimeout(30, TimeUnit.MINUTES)
.build();
// 创建 MultipartBody
log.info("创建 MultipartBody文件ID: {}, 文件名: {}", fileId, fileName);
RequestBody requestBody = new MultipartBody.Builder()
.setType(MultipartBody.FORM)
.addFormDataPart("file_id", fileId)
@ -369,8 +414,10 @@ public class RagHttpService {
// 记录开始时间
long startTime = System.currentTimeMillis();
log.info("开始发送请求,时间戳: {}", startTime);
// 创建请求
log.info("创建请求URL: {}", ragEmbed);
Request sendRequest = new Request.Builder()
.url(ragEmbed)
.post(requestBody)
@ -378,40 +425,46 @@ public class RagHttpService {
.build();
// 发送请求
log.info("发送请求...");
try (Response sendResponse = client.newCall(sendRequest).execute()) {
if (sendResponse.body() != null) {
String body = sendResponse.body().string();
// 打印响应内容
long endTime = System.currentTimeMillis();
long duration = endTime - startTime;
printLogs();
log.info("请求耗时: {}", formatDuration(duration));
log.info("!!!!!!!!!! 响应原始内容 Response: {}", body);
log.info("请求成功,耗时: {}", formatDuration(duration));
log.info("响应原始内容: {}", body);
printLogs();
JSONObject resJson = JSONObject.parseObject(body);
// 1: 判断是否存在 detail
// 判断是否存在 detail
String detail = resJson.getString("detail");
if (StringUtils.isNotEmpty(detail)) {
log.error("请求失败,错误详情: {}", detail);
handleFailure(documents, detail);
} else {
Boolean status = resJson.getBoolean("status");
if (!status) {
handleFailure(documents, resJson.getString("message"));
String message = resJson.getString("message");
log.error("请求失败,错误信息: {}", message);
handleFailure(documents, message);
} else {
log.info("请求成功,开始处理响应");
processResponse(body, documents);
}
}
} else {
log.error("请求失败,响应体为空");
handleFailure(documents, FILE_UPLOAD_FAILED_MSG);
}
} catch (IOException e) {
log.error("请求发生IO异常: {}", e.getMessage(), e);
handleFailure(documents, FILE_UPLOAD_FAILED_MSG, e);
}
log.info("知识库向量嵌入流程结束知识库ID: {}", id);
}
/**
@ -420,25 +473,25 @@ public class RagHttpService {
* @param fileUrl 文件地址
* @return 文件字节数组
*/
public static byte[] getFileByte (String fileUrl) {
public static byte[] getFileByte(String fileUrl) {
log.info("开始读取远程文件文件URL: {}", fileUrl);
try (InputStream inputStream = new URL(fileUrl).openStream();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
// 缓冲区大小
byte[] buffer = new byte[1024];
int bytesRead;
int totalBytesRead = 0;
// 读取文件内容并写入 ByteArrayOutputStream
while ((bytesRead = inputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead);
totalBytesRead += bytesRead;
}
// 返回字节数组
log.info("成功读取远程文件,文件大小: {} 字节", totalBytesRead);
return outputStream.toByteArray();
} catch (IOException e) {
log.error("Failed to read remote file: {}", e.getMessage());
log.error("读取远程文件失败: {}", e.getMessage(), e);
throw exception(new ErrorCode(10001_001, "文件读取错误"));
}
}
@ -449,22 +502,30 @@ public class RagHttpService {
* @param fileName 文件名
* @return 文件类型
*/
private static String getMediaType (String fileName) {
private static String getMediaType(String fileName) {
log.info("获取文件类型,文件名: {}", fileName);
String fileSuffix = fileName.substring(fileName.lastIndexOf(".") + 1);
String mediaType;
switch (fileSuffix) {
case "pdf":
return "application/pdf";
mediaType = "application/pdf";
break;
case "md":
return "text/x-markdown";
mediaType = "text/x-markdown";
break;
case "docx":
return "application/vnd.openxmlformats-officedocument.wordprocessingml.document";
mediaType = "application/vnd.openxmlformats-officedocument.wordprocessingml.document";
break;
case "txt":
return "text/plain";
mediaType = "text/plain";
break;
default:
mediaType = "application/octet-stream";
break;
}
return "application/octet-stream";
log.info("文件类型: {}", mediaType);
return mediaType;
}
/**
* 处理响应结果
*/
@ -506,17 +567,7 @@ public class RagHttpService {
throw new RuntimeException(errorMsg);
}
/**
* 重置 Unirest 连接
*/
private void resetUnirestConnection () {
try {
Unirest.shutDown();
Unirest.config().socketTimeout(86400000);
} catch (Exception e) {
log.warn("重置Unirest连接失败: {}", e.getMessage());
}
}
/**
* 修改知识库文档状态
@ -552,16 +603,6 @@ public class RagHttpService {
return knowledgeDocumentsMapper.selectById(fileId);
}
/**
* 判断异常是否由 Socket 关闭引起
*
* @param e 异常
* @return 是否为 Socket 关闭异常
*/
private boolean isSocketClosedException (UnirestException e) {
Throwable cause = e.getCause();
return cause instanceof IOException;
}
public static void main (String[] args) {
// 创建 OkHttpClient 实例