内部资料,请扫码登录
pigcloud
知识库问答源码解析
# 后端入口
AiChatController.msg 端点获取到前端的消息会建立 SSE 双向请求链接
AiChatController.msg 是整个聊天流程的入口点,它接收前端传来的消息 key,并建立 Server-Sent Events (SSE) 连接,实现服务器向客户端的实时推送。
@Inner(value = false)
@GetMapping(value = "/msg/list", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<AiMessageResultDTO> msg(@RequestParam Long key) {
try {
return chatService.chatList(key).concatWithValues(new AiMessageResultDTO("[DONE]"));
}
catch (Exception e) {
log.error("chat error", e);
return Flux.just(new AiMessageResultDTO(e.getMessage())).concatWithValues(new AiMessageResultDTO("[DONE]"));
}
}
这个端点依赖于之前的 createConnection 调用,即使接口设置为公开,没有有效的 message key 也无法调用。
# 消息预处理
AiChatServiceImpl.chatList 进行消息的处理
当 msg 端点接收到请求后,会调用 AiChatServiceImpl.chatList 方法进行消息处理:
- 首先根据 key 查询聊天记录
- 如果记录不存在,返回"链接已失效"
- 构建 ChatMessageDTO 对象,填充必要信息
- 执行风控逻辑
# 风控逻辑:flowRisk
// 如果开启了规则引擎
if (flowExecutorOptional.isPresent()) {
Flux<AiMessageResultDTO> aiMessageResultDTO = flowRisk(chatMessageDTO);
if (aiMessageResultDTO != null)
return aiMessageResultDTO;
}
风控逻辑通过规则引擎实现,遵循以下流程:
<chain name="chat">
THEN(
// 敏感词判断,触发直接退出流程
sensitive,
// 判断是否执行风控规则,触发直接退出流程; 内部调用按用户名+总量控制,外部调用按IP+总量控制
IF(isNoLimit, noLimit, IF(isInner, WHEN(user,tokens), WHEN(ip,tokens)))
);
</chain>
风控逻辑主要包括:
- 敏感词检测:检测用户输入是否包含敏感词
- 限流控制:根据调用方式不同采取不同的限流策略
- 内部调用:按用户名+总量控制
- 外部调用:按IP+总量控制
# 根据请求类型匹配处理规则
ChatTypeEnums chatTypeEnums = ChatTypeEnums.fromCode(chatMessageDTO.getDatasetId());
ChatMessageContextHolder.set(chatMessageDTO);
return chatRuleMap.get(chatTypeEnums.getType()).process(chatMessageDTO);
系统根据 datasetId 确定请求消息类型,主要包括:
Chat Type | Code | Description |
---|---|---|
FUNCTION_CHAT | -1L | 功能聊天 |
SIMPLE_CHAT | 0L | 简单聊天 |
DATABASE_CHAT | -2L | 数据库聊天 |
IMAGE_CHAT | -3L | 生成图片 |
MARKMAP_CHAT | -4L | 生成脑图 |
FLOW_CHAT | -5L | 编排 |
JSON_CHAT | -6L | JSON 聊天 |
REASON_CHAT | -7L | 推理聊天 |
VECTOR_CHAT | 1L | 知识库聊天 |

# 知识库聊天处理流程 (VectorChatRule)
当请求类型为知识库聊天时,系统会调用 VectorChatRule 进行处理:
@Override
public Flux<AiMessageResultDTO> process(ChatMessageDTO chatMessageDTO) {
AiDatasetEntity dataset = aiDatasetService.getById(chatMessageDTO.getDatasetId());
DimensionAwareEmbeddingModel embeddingModel = modelProvider.getEmbeddingModel(dataset.getEmbeddingModel());
Embedding queryEmbedding = embeddingModel.embed(chatMessageDTO.getContent()).content();
// 使用标注数据处理结果
if (YesNoEnum.YES.getCode().equals(dataset.getStandardFlag())) {
Flux<AiMessageResultDTO> q2qFluxResult = q2QStandardRagChatHandler
.process(queryEmbedding, dataset, chatMessageDTO)
.cache();
// 如果 q2qFluxResult 不是 empty 则直接返回,如果是 empty 则继续执行 q2AVectorRagChatHandler
return q2qFluxResult
.switchIfEmpty(q2AVectorRagChatHandler.process(queryEmbedding, dataset, chatMessageDTO));
}
return q2AVectorRagChatHandler.process(queryEmbedding, dataset, chatMessageDTO);
}
处理流程包括:
- 获取知识库数据集信息
- 获取嵌入模型并将用户问题转换为向量
- 根据知识库配置决定处理方式:
- 如果启用了标准问答(standardFlag=YES),先尝试问题匹配(Q2QStandardRagChatHandler)
- 如果问题匹配无结果,或未启用标准问答,则使用答案匹配(Q2AVectorRagChatHandler)
# 向量搜索处理 (Q2AVectorRagChatHandler)
public Flux<AiMessageResultDTO> process(Embedding embeddedList, AiDatasetEntity dataset,
ChatMessageDTO chatMessageDTO) {
double minScore = NumberUtil.div(Double.parseDouble(dataset.getScore().toString()), Double.parseDouble("100"),
2);
EmbeddingSearchRequest embeddingSearchRequest = EmbeddingSearchRequest.builder()
.queryEmbedding(embeddedList)
.maxResults(dataset.getTopK())
.filter(metadataKey(AiDocumentEntity.Fields.datasetId).isEqualTo(dataset.getId().toString())
.and(metadataKey(DocumentTypeEnums.Fields.type).isEqualTo(DocumentTypeEnums.ANSWER.getType())))
.minScore(minScore)
.build();
EmbeddingSearchResult<TextSegment> searchResult = embeddingStoreService
.embeddingStore(dataset.getCollectionName())
.search(embeddingSearchRequest);
List<EmbeddingMatch<TextSegment>> embeddingMatchList = searchResult.matches();
// 未匹配
if (CollUtil.isEmpty(embeddingMatchList)) {
return Flux.just(new AiMessageResultDTO(dataset.getEmptyDesc()));
}
// 更新命中次数
List<String> embeddingIdList = embeddingMatchList.stream().map(EmbeddingMatch::embeddingId).toList();
aiSliceService.updateHitCount(embeddingIdList);
// 对向量结果进行总结
Flux<AiMessageResultDTO> aiMessageResultDTOFlux = summaryResult(dataset, chatMessageDTO,
embeddingMatchList.stream().map(EmbeddingMatch::embedded).map(TextSegment::text).toList())
.cache();
// 修改 map 逻辑在最后拼接一下参考资料
AiMessageResultDTO aiMessageResultDTO = new AiMessageResultDTO();
aiMessageResultDTO.setMessage(StrUtil.EMPTY);
List<AiMessageResultDTO.ExtLink> extLinks = buildExtMessage(embeddingMatchList);
aiMessageResultDTO.setExtLinks(extLinks);
return aiMessageResultDTOFlux.concatWithValues(aiMessageResultDTO);
}
向量搜索处理流程:
- 根据知识库配置设置最小相似度分数
- 构建向量搜索请求,包括:
- 查询向量
- 最大结果数
- 过滤条件(数据集ID和文档类型)
- 最小分数
- 执行向量搜索
- 处理搜索结果:
- 如果没有匹配结果,返回知识库配置的空结果描述
- 如果有匹配结果,更新命中次数
- 调用大模型对搜索结果进行总结(summaryResult)
- 构建参考资料链接
- 返回总结结果和参考资料
# 结果总结处理
结果总结处理流程:
- 创建提示模板,加载系统提示(knowledge-system.st)
- 向模板添加参数:
- 搜索结果内容
- 用户问题
- 空结果描述
- 获取大模型服务
- 调用大模型进行聊天,生成总结
- 将总结结果转换为 AiMessageResultDTO 对象并返回
# summaryResult 方法代码
public Flux<AiMessageResultDTO> summaryResult(AiDatasetEntity dataset, ChatMessageDTO chatMessageDTO,
List<String> resultList) {
// 对结果进行总结
PromptTemplate userTemplate = new PromptTemplate(systemResource);
userTemplate.add("contents", CollUtil.join(resultList, StrUtil.CRLF));
userTemplate.add("userMessage", chatMessageDTO.getContent());
userTemplate.add("emptyDesc", dataset.getEmptyDesc());
AiStreamAssistantService streamAssistantService = modelProvider
.getAiStreamAssistant(chatMessageDTO.getModelName())
.getValue();
Flux<String> summaryResult = streamAssistantService.chat(chatMessageDTO.getConversationId(),
userTemplate.render());
return summaryResult.map(AiMessageResultDTO::new);
}
