diff --git a/yudao-module-llm/yudao-module-llm-biz/src/main/java/cn/iocoder/yudao/module/llm/service/async/AsyncKnowledgeBase.java b/yudao-module-llm/yudao-module-llm-biz/src/main/java/cn/iocoder/yudao/module/llm/service/async/AsyncKnowledgeBase.java index 10b1e1e48..137f0103e 100644 --- a/yudao-module-llm/yudao-module-llm-biz/src/main/java/cn/iocoder/yudao/module/llm/service/async/AsyncKnowledgeBase.java +++ b/yudao-module-llm/yudao-module-llm-biz/src/main/java/cn/iocoder/yudao/module/llm/service/async/AsyncKnowledgeBase.java @@ -39,47 +39,68 @@ public class AsyncKnowledgeBase { // 向向量知识库创建文件 // @Async - public void createKnowledgeBase (List knowledgeList, List ids) { + public void createKnowledgeBase(List knowledgeList, List ids) { + log.info("开始执行 createKnowledgeBase 方法。knowledgeList 大小: {}, ids 大小: {}", knowledgeList.size(), ids.size()); + + // 如果提供了 ids,则删除现有的知识库文档 if (!CollectionUtils.isAnyEmpty(ids)) { + log.info("正在删除现有的知识库文档,ids: {}", ids); String mes = ragHttpService.ragDocumentsDel(llmBackendProperties.getRagDocumentsDel(), ids); - log.info("delete knowledge base info {}", mes); + log.info("删除知识库信息: {}", mes); + } else { + log.info("未提供 ids,跳过删除操作。"); } - // 注释调试 + // 处理 knowledgeList 中的每个知识文档 if (!CollectionUtils.isAnyEmpty(knowledgeList)) { + log.info("开始处理知识文档列表,列表大小: {}", knowledgeList.size()); knowledgeList.forEach(knowledge -> { try { - log.info("knowledge base begin create {}", knowledge); + log.info("开始为文档创建知识库,文档 ID: {}", knowledge.getId()); - // 修改状态为 未上传 + // 将文件状态更新为 NOT_UPLOADED(未上传) + log.info("将文档状态更新为 NOT_UPLOADED,文档 ID: {}", knowledge.getId()); updateFileState(knowledge, KnowledgeStatusEnum.NOT_UPLOADED); + // 准备 RegUploadReqVO 用于文档上传 + log.info("为文档准备 RegUploadReqVO,文档 ID: {}", knowledge.getId()); RegUploadReqVO regUploadReqVO = new RegUploadReqVO() .setUrl(llmBackendProperties.getRagEmbed()) .setFileId(String.valueOf(knowledge.getId())) .setFileName(knowledge.getDocumentName()) .setFileUrl(knowledge.getFileUrl()); + // 检查文件扩展名并根据扩展名处理 int lastIndex = knowledge.getDocumentName().lastIndexOf("."); if (lastIndex != -1) { - String extension = knowledge.getDocumentName().substring(lastIndex + 1).toLowerCase(); + log.info("文档扩展名: {}", extension); + if ("txt".equals(extension)) { + log.info("文档为 txt 文件,直接上传嵌入,文档 ID: {}", knowledge.getId()); ragHttpService.embedUploadFile(regUploadReqVO); } else { + log.info("文档为非 txt 文件,调用知识嵌入方法,文档 ID: {}", knowledge.getId()); knowledgeEmbed(knowledge, knowledge.getKnowledgeBaseId()); } + } else { + log.warn("文档无扩展名,跳过处理,文档 ID: {}", knowledge.getId()); } + + log.info("文档处理完成,文档 ID: {}", knowledge.getId()); } catch (Exception e) { - log.error("the creation of the knowledge base error {}", e.getMessage()); - // 修改状态为 上传失败 + log.error("创建知识库时发生错误,文档 ID: {},错误信息: {}", knowledge.getId(), e.getMessage()); + // 如果发生异常,将文件状态更新为 UPLOAD_FAILED(上传失败) + log.info("将文档状态更新为 UPLOAD_FAILED,文档 ID: {}", knowledge.getId()); updateFileState(knowledge, KnowledgeStatusEnum.UPLOAD_FAILED); throw exception(new ErrorCode(10047, "文件上传到知识库失败!")); } - }); + } else { + log.info("知识文档列表为空,跳过处理。"); } + log.info("createKnowledgeBase 方法执行完成。"); } /** @@ -102,19 +123,11 @@ public class AsyncKnowledgeBase { */ public void knowledgeEmbed (KnowledgeDocumentsDO knowledge, Long id) { - // TODO:本地调试时打开 - // String tmpUrl = "http://xhllm.xinnuojinzhi.com/admin-api/infra/file/29/get/ca3d06d24f80c127ec0300408a035176f5e0bf90ce319fda17018303226e2298.doc"; - // log.info("knowledge url {}", tmpUrl); - // knowledge.setFileUrl(tmpUrl); - // 创建知识向量 KnowledgeRagEmbedReqVO ragEmbedReqVo = new KnowledgeRagEmbedReqVO() .setFileId(String.valueOf(knowledge.getId())) .setFileName(knowledge.getDocumentName()) .setFileUrl(knowledge.getFileUrl()); -// .setFileInputStream(new ByteArrayInputStream(Objects.requireNonNull(getFileByte(knowledge.getFileUrl())))) -// .setFileBytes(getFileByte(knowledge.getFileUrl() - try { ragHttpService.knowledgeEmbed(ragEmbedReqVo, id); @@ -123,34 +136,4 @@ public class AsyncKnowledgeBase { } } - - /** - * 获取文件字节数组 - * - * @param fileUrl 文件地址 - * @return 文件字节数组 - */ - public static byte[] getFileByte (String fileUrl) { - try (InputStream inputStream = new URL(fileUrl).openStream(); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { - - // 缓冲区大小 - byte[] buffer = new byte[1024]; - int bytesRead; - - // 读取文件内容并写入 ByteArrayOutputStream - while ((bytesRead = inputStream.read(buffer)) != -1) { - outputStream.write(buffer, 0, bytesRead); - } - - // 返回字节数组 - return outputStream.toByteArray(); - - } catch (IOException e) { - log.error("Failed to read remote file: {}", e.getMessage()); - - throw exception(new ErrorCode(10001_001, "文件读取错误")); - } - } - } diff --git a/yudao-module-llm/yudao-module-llm-biz/src/main/java/cn/iocoder/yudao/module/llm/service/http/RagHttpService.java b/yudao-module-llm/yudao-module-llm-biz/src/main/java/cn/iocoder/yudao/module/llm/service/http/RagHttpService.java index 58a970d8f..9eeaa76a0 100644 --- a/yudao-module-llm/yudao-module-llm-biz/src/main/java/cn/iocoder/yudao/module/llm/service/http/RagHttpService.java +++ b/yudao-module-llm/yudao-module-llm-biz/src/main/java/cn/iocoder/yudao/module/llm/service/http/RagHttpService.java @@ -95,67 +95,91 @@ public class RagHttpService { /** * 向量知识库文档上传 * - * @param ragUploadReqVO - * @throws UnirestException - * @throws IOException + * @param ragUploadReqVO 上传请求参数 + * @throws UnirestException 如果 Unirest 请求失败 + * @throws IOException 如果发生 I/O 错误 */ - public void embedUploadFile (RegUploadReqVO ragUploadReqVO) throws UnirestException, IOException { + public void embedUploadFile(RegUploadReqVO ragUploadReqVO) throws UnirestException, IOException { + log.info("开始向量知识库文档上传流程"); + // 根据 fileId 查询知识库文档 + log.info("根据 fileId 查询知识库文档,fileId: {}", ragUploadReqVO.getFileId()); KnowledgeDocumentsDO documents = getKnowledgeDocuments(ragUploadReqVO.getFileId()); if (documents == null) { + log.error("知识库文档不存在,fileId: {}", ragUploadReqVO.getFileId()); throw exception(new ErrorCode(10047, "知识库文档不存在")); } + log.info("成功获取知识库文档: {}", documents); - // 修改状态为 上传中 + // 修改状态为上传中 + log.info("更新文件状态为上传中,fileId: {}", ragUploadReqVO.getFileId()); updateFileState(documents, KnowledgeStatusEnum.UPLOADING); + // 创建 HTTP 客户端 + log.info("创建 HTTP 客户端"); CloseableHttpClient httpClient = HttpClients.createDefault(); RagEmbedRespVO ragEmbedRespVO = new RagEmbedRespVO(); + + // 创建 HTTP GET 请求 + log.info("创建 HTTP GET 请求,文件URL: {}", ragUploadReqVO.getFileUrl()); HttpGet request = new HttpGet(ragUploadReqVO.getFileUrl()); + try (CloseableHttpResponse response = httpClient.execute(request)) { + log.info("HTTP GET 请求执行成功,响应状态: {}", response.getStatusLine()); + HttpEntity entity = response.getEntity(); if (entity != null) { + log.info("响应实体不为空,开始处理文件内容"); + try (InputStream inputStream = entity.getContent(); BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream)) { + log.info("创建 BufferedInputStream,准备检测文件编码"); + bufferedInputStream.mark(Integer.MAX_VALUE); String encoding = detectCharset(bufferedInputStream); + log.info("检测到文件编码: {}", encoding); + bufferedInputStream.reset(); // 重置流以便重新读取 + log.info("重置 BufferedInputStream,准备重新读取文件内容"); + // 使用检测到的编码读取文件内容 try (InputStreamReader reader = new InputStreamReader(bufferedInputStream, encoding); BufferedReader bufferedReader = new BufferedReader(reader)) { + log.info("创建 BufferedReader,开始读取文件内容"); + StringBuilder fileContentBuilder = new StringBuilder(); String line; while ((line = bufferedReader.readLine()) != null) { fileContentBuilder.append(line).append(System.lineSeparator()); } String fileContent = fileContentBuilder.toString(); - byte[] utf8Bytes = fileContent.getBytes(StandardCharsets.UTF_8); + log.info("成功读取文件内容,文件大小: {} 字符", fileContent.length()); + byte[] utf8Bytes = fileContent.getBytes(StandardCharsets.UTF_8); + log.info("将文件内容转换为 UTF-8 字节数组,字节大小: {} 字节", utf8Bytes.length); try { - // 配置 Unirest - /* Unirest.config().reset(); - Unirest.config().socketTimeout(86400000);*/ - // 发送上传请求 + log.info("准备上传文件内容"); ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(utf8Bytes); + int bufferSize = 1024; + byte[] byteArray = new byte[bufferSize]; + int bytesRead; + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - int bufferSize = 1024; // 定义缓冲区大小为1024字节 - byte[] byteArray = new byte[bufferSize]; // 创建字节数组 - - int bytesRead; // 记录实际读取的字节数 - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); // 输出流用于存储字节 - + log.info("开始读取字节数组并写入输出流"); while ((bytesRead = byteArrayInputStream.read(byteArray)) != -1) { - // 如果没有读取到数据,bytesRead会返回-1 - outputStream.write(byteArray, 0, bytesRead); // 将读取的字节写入输出流 + outputStream.write(byteArray, 0, bytesRead); } - // 将ByteArrayOutputStream转换为byte数组 byte[] result = outputStream.toByteArray(); + log.info("成功将文件内容转换为字节数组,字节大小: {} 字节", result.length); - inputStream.close(); // 关闭InputStream - outputStream.close(); // 关闭ByteArrayOutputStream + // 关闭流 + byteArrayInputStream.close(); + outputStream.close(); + log.info("关闭 ByteArrayInputStream 和 ByteArrayOutputStream"); + // 获取向量嵌入接口 URL String ragEmbed = ragUploadReqVO.getUrl(); log.info("知识库向量嵌入接口URL: {}", ragEmbed); @@ -167,43 +191,55 @@ public class RagHttpService { String curlCommand = String.format("curl -X POST -F \"file_id=%s\" -F \"file=@%s\" \"%s\"", ragUploadReqVO.getFileId(), ragUploadReqVO.getFileName(), ragEmbed); log.info("生成的 curl 命令: {}", curlCommand); - long startTime = System.currentTimeMillis(); // 记录开始时间 + // 记录开始时间 + long startTime = System.currentTimeMillis(); + log.info("开始发送上传请求,时间戳: {}", startTime); + + // 发送上传请求 String body = HttpRequest.post(ragUploadReqVO.getUrl()) .form("file", result, ragUploadReqVO.getFileName()) .form("file_id", ragUploadReqVO.getFileId()) .timeout(60000) .executeAsync().body(); - // 打印响应内容 + // 记录结束时间并计算耗时 long endTime = System.currentTimeMillis(); long duration = endTime - startTime; + log.info("上传请求完成,耗时: {}", formatDuration(duration)); + // 打印响应内容 printLogs(); - log.info("请求耗时: {}", formatDuration(duration)); log.info("响应原始内容: {}", body); printLogs(); + // 解析响应 ragEmbedRespVO = JSON.parseObject(body, RagEmbedRespVO.class); - log.info("ragEmbedRespVO:{}", ragEmbedRespVO); + log.info("解析响应结果: {}", ragEmbedRespVO); if (ragEmbedRespVO.isStatus()) { - // 修改状态为 上传成功 + log.info("文件上传成功,更新文件状态为上传成功"); updateFileState(documents, KnowledgeStatusEnum.UPLOAD_SUCCESS); } else { - // 修改状态为 上传失败 + log.error("文件上传失败,错误信息: {}", ragEmbedRespVO.getMessage()); updateFileState(documents, KnowledgeStatusEnum.UPLOAD_FAILED); throw new RuntimeException("文件上传失败:" + ragEmbedRespVO.getMessage()); } - } catch (UnirestException e) { - + log.error("文件上传失败,UnirestException: {}", e.getMessage(), e); throw new RuntimeException("文件上传失败: " + e.getMessage()); } - } } + } else { + log.error("响应实体为空,无法获取文件内容"); + throw new IOException("响应实体为空"); } + } catch (IOException e) { + log.error("HTTP GET 请求执行失败: {}", e.getMessage(), e); + throw e; } + + log.info("向量知识库文档上传流程结束"); } public void printLogs(){ @@ -317,6 +353,7 @@ public class RagHttpService { return res; } + /** * 知识库向量嵌入 * @@ -324,9 +361,12 @@ public class RagHttpService { * @param id 知识库ID * @throws IOException 如果发生I/O错误 */ - public void knowledgeEmbed (KnowledgeRagEmbedReqVO reqVO, Long id) throws IOException { + public void knowledgeEmbed(KnowledgeRagEmbedReqVO reqVO, Long id) throws IOException { + log.info("开始知识库向量嵌入流程,知识库ID: {}", id); + // 获取向量嵌入接口的URL String ragEmbed = llmBackendProperties.getEmbed(); + log.info("向量嵌入接口URL: {}", ragEmbed); // 从请求参数中获取文件ID和文件名 String fileId = reqVO.getFileId(); @@ -334,31 +374,36 @@ public class RagHttpService { String fileUrl = reqVO.getFileUrl(); String mediaType = getMediaType(fileName); - log.info("URL: {}, fileId: {} ,fileName: {}, fileUrl: {}, mediaType: {} ", ragEmbed, fileId, fileName, fileUrl, mediaType); + log.info("文件ID: {}, 文件名: {}, 文件URL: {}, 文件类型: {}", fileId, fileName, fileUrl, mediaType); // 获取知识库文档 + log.info("开始获取知识库文档,知识库ID: {}, 文件ID: {}", id, fileId); KnowledgeDocumentsDO documents = getKnowledgeDocuments(id, fileId); if (documents == null) { + log.error("知识库文档不存在,知识库ID: {}, 文件ID: {}", id, fileId); throw exception(new ErrorCode(10047, "知识库文档不存在")); } + log.info("成功获取知识库文档: {}", documents); // 更新文件状态为上传中 + log.info("更新文件状态为上传中,文件ID: {}", fileId); updateFileState(documents, KnowledgeStatusEnum.UPLOADING); - + // 获取文件字节数组 + log.info("开始获取文件字节数组,文件URL: {}", fileUrl); byte[] fileBytes = Objects.requireNonNull(getFileByte(fileUrl)); + log.info("成功获取文件字节数组,文件大小: {} 字节", fileBytes.length); // 创建 OkHttpClient 实例 + log.info("创建 OkHttpClient 实例,设置超时时间为 30 分钟"); OkHttpClient client = new OkHttpClient.Builder() - // 连接超时时间 .connectTimeout(30, TimeUnit.MINUTES) - // 读取超时时间 .readTimeout(30, TimeUnit.MINUTES) - // 写入超时时间 .writeTimeout(30, TimeUnit.MINUTES) .build(); // 创建 MultipartBody + log.info("创建 MultipartBody,文件ID: {}, 文件名: {}", fileId, fileName); RequestBody requestBody = new MultipartBody.Builder() .setType(MultipartBody.FORM) .addFormDataPart("file_id", fileId) @@ -369,8 +414,10 @@ public class RagHttpService { // 记录开始时间 long startTime = System.currentTimeMillis(); + log.info("开始发送请求,时间戳: {}", startTime); // 创建请求 + log.info("创建请求,URL: {}", ragEmbed); Request sendRequest = new Request.Builder() .url(ragEmbed) .post(requestBody) @@ -378,40 +425,46 @@ public class RagHttpService { .build(); // 发送请求 + log.info("发送请求..."); try (Response sendResponse = client.newCall(sendRequest).execute()) { if (sendResponse.body() != null) { String body = sendResponse.body().string(); - // 打印响应内容 long endTime = System.currentTimeMillis(); long duration = endTime - startTime; printLogs(); - log.info("请求耗时: {}", formatDuration(duration)); - log.info("!!!!!!!!!! 响应原始内容 Response: {}", body); + log.info("请求成功,耗时: {}", formatDuration(duration)); + log.info("响应原始内容: {}", body); printLogs(); JSONObject resJson = JSONObject.parseObject(body); - // 1: 先判断是否存在 detail, + // 判断是否存在 detail String detail = resJson.getString("detail"); if (StringUtils.isNotEmpty(detail)) { + log.error("请求失败,错误详情: {}", detail); handleFailure(documents, detail); } else { Boolean status = resJson.getBoolean("status"); if (!status) { - handleFailure(documents, resJson.getString("message")); + String message = resJson.getString("message"); + log.error("请求失败,错误信息: {}", message); + handleFailure(documents, message); } else { + log.info("请求成功,开始处理响应"); processResponse(body, documents); } } } else { + log.error("请求失败,响应体为空"); handleFailure(documents, FILE_UPLOAD_FAILED_MSG); } } catch (IOException e) { + log.error("请求发生IO异常: {}", e.getMessage(), e); handleFailure(documents, FILE_UPLOAD_FAILED_MSG, e); } - + log.info("知识库向量嵌入流程结束,知识库ID: {}", id); } /** @@ -420,25 +473,25 @@ public class RagHttpService { * @param fileUrl 文件地址 * @return 文件字节数组 */ - public static byte[] getFileByte (String fileUrl) { + public static byte[] getFileByte(String fileUrl) { + log.info("开始读取远程文件,文件URL: {}", fileUrl); try (InputStream inputStream = new URL(fileUrl).openStream(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { - // 缓冲区大小 byte[] buffer = new byte[1024]; int bytesRead; + int totalBytesRead = 0; - // 读取文件内容并写入 ByteArrayOutputStream while ((bytesRead = inputStream.read(buffer)) != -1) { outputStream.write(buffer, 0, bytesRead); + totalBytesRead += bytesRead; } - // 返回字节数组 + log.info("成功读取远程文件,文件大小: {} 字节", totalBytesRead); return outputStream.toByteArray(); } catch (IOException e) { - log.error("Failed to read remote file: {}", e.getMessage()); - + log.error("读取远程文件失败: {}", e.getMessage(), e); throw exception(new ErrorCode(10001_001, "文件读取错误")); } } @@ -449,22 +502,30 @@ public class RagHttpService { * @param fileName 文件名 * @return 文件类型 */ - private static String getMediaType (String fileName) { + private static String getMediaType(String fileName) { + log.info("获取文件类型,文件名: {}", fileName); String fileSuffix = fileName.substring(fileName.lastIndexOf(".") + 1); + String mediaType; switch (fileSuffix) { case "pdf": - return "application/pdf"; + mediaType = "application/pdf"; + break; case "md": - return "text/x-markdown"; + mediaType = "text/x-markdown"; + break; case "docx": - return "application/vnd.openxmlformats-officedocument.wordprocessingml.document"; + mediaType = "application/vnd.openxmlformats-officedocument.wordprocessingml.document"; + break; case "txt": - return "text/plain"; + mediaType = "text/plain"; + break; + default: + mediaType = "application/octet-stream"; + break; } - return "application/octet-stream"; + log.info("文件类型: {}", mediaType); + return mediaType; } - - /** * 处理响应结果 */ @@ -506,17 +567,7 @@ public class RagHttpService { throw new RuntimeException(errorMsg); } - /** - * 重置 Unirest 连接 - */ - private void resetUnirestConnection () { - try { - Unirest.shutDown(); - Unirest.config().socketTimeout(86400000); - } catch (Exception e) { - log.warn("重置Unirest连接失败: {}", e.getMessage()); - } - } + /** * 修改知识库文档状态 @@ -552,16 +603,6 @@ public class RagHttpService { return knowledgeDocumentsMapper.selectById(fileId); } - /** - * 判断异常是否由 Socket 关闭引起 - * - * @param e 异常 - * @return 是否为 Socket 关闭异常 - */ - private boolean isSocketClosedException (UnirestException e) { - Throwable cause = e.getCause(); - return cause instanceof IOException; - } public static void main (String[] args) { // 创建 OkHttpClient 实例