Ingestion Pipeline 文档入库流水线
如果说 Retriever / Reranker / RagService 解决的是“查的时候怎么查”,那么 IngestionPipeline 解决的是“资料一开始怎么进库”。
它把原来分散的几步串成了一条统一主线:
source
-> DocumentLoader
-> LoadedDocumentProcessor
-> Chunker
-> MetadataEnricher
-> embedding
-> VectorStore.upsert
1. 适合解决什么问题
在很多项目里,RAG 第一版通常是手写这些步骤:
- 用
TikaUtil把文件转文本 - 用
RecursiveCharacterTextSplitter切块 - 手动给每个 chunk 绑定 metadata
- 调
IEmbeddingService - 拼
VectorUpsertRequest - 写入
VectorStore
这条链路并不复杂,但如果每个项目都重复拼一次,很快就会出现:
- 元数据字段不一致
- chunk id 规则不一致
- 批量 embedding 写法不一致
- 文件和纯文本两套入口各写一遍
IngestionPipeline 就是把这条最小工程主线收拢成一套轻量编排。
2. 当前内置的组成
当前内置的最小实现包括:
IngestionPipelineIngestionRequestIngestionResultIngestionSourceDocumentLoaderTextDocumentLoaderTikaDocumentLoaderChunkerRecursiveTextChunkerLoadedDocumentProcessorWhitespaceNormalizingDocumentProcessorOcrNoiseCleaningDocumentProcessorOcrTextExtractorOcrTextExtractingDocumentProcessorMetadataEnricherDefaultMetadataEnricher
设计原则很明确:
- 不引入新的大框架
- 不替代你现有的向量库选型
- 只把“文档入库”这条通用链路标准化
3. 最小示例
IEmbeddingService embeddingService = aiService.getEmbeddingService(PlatformType.OPENAI);
VectorStore vectorStore = new QdrantVectorStore(qdrantConfig);
IngestionPipeline ingestionPipeline = new IngestionPipeline(embeddingService, vectorStore);
IngestionResult ingestResult = ingestionPipeline.ingest(IngestionRequest.builder()
.dataset("kb_docs")
.embeddingModel("text-embedding-3-small")
.document(RagDocument.builder()
.sourceName("员工手册")
.sourcePath("/docs/handbook.md")
.tenant("acme")
.biz("hr")
.version("2026.03")
.build())
.source(IngestionSource.text("第一章 假期政策。第二章 报销政策。"))
.build());
执行后会完成:
- 文本装载
- 默认分块
- 默认 metadata 绑定
- 批量 embedding
VectorStore.upsert(...)
4. 文件入库示例
如果你的输入是本地文件,不需要自己先调 TikaUtil:
IngestionResult ingestResult = ingestionPipeline.ingest(IngestionRequest.builder()
.dataset("kb_files")
.embeddingModel("text-embedding-3-small")
.source(IngestionSource.file(new File("docs/employee-handbook.pdf")))
.build());
默认会走:
TikaDocumentLoaderWhitespaceNormalizingDocumentProcessorRecursiveTextChunkerDefaultMetadataEnricher
其中 TikaDocumentLoader 会补齐:
sourceNamesourcePathsourceUrimimeType
5. 默认写入哪些 metadata
默认会统一写入这些关键字段:
contentdocumentIdchunkIdsourceNamesourcePathsourceUripageNumbersectionTitlechunkIndextenantbizversion
这意味着后面的:
DenseRetriever- 引用展示
- trace 调试
- 租户过滤
- 版本隔离
都可以直接复用这套元数据约定。
6. 如何自定义分块
如果你不想继续用默认字符分块,可以直接实现 Chunker:
public class MarkdownHeadingChunker implements Chunker {
@Override
public List<RagChunk> chunk(RagDocument document, String content) {
// 先按标题切,再构造 RagChunk
return ...;
}
}
然后在请求里替换:
IngestionRequest request = IngestionRequest.builder()
.dataset("kb_docs")
.embeddingModel("text-embedding-3-small")
.source(IngestionSource.file(file))
.chunker(new MarkdownHeadingChunker())
.build();
适合场景:
- Markdown / Wiki
- FAQ
- API 文档
- 表格型知识库
7. 如何自定义元数据
如果你有自己的业务字段,比如:
docTypedepartmentupdatedAtlanguage
可以追加自己的 MetadataEnricher:
MetadataEnricher customEnricher = new MetadataEnricher() {
@Override
public void enrich(RagDocument document, RagChunk chunk, Map<String, Object> metadata) {
metadata.put("docType", "policy");
metadata.put("department", "hr");
}
};
IngestionRequest request = IngestionRequest.builder()
.dataset("kb_docs")
.embeddingModel("text-embedding-3-small")
.source(IngestionSource.text(content))
.metadataEnrichers(Collections.singletonList(customEnricher))
.build();
默认 DefaultMetadataEnricher 仍然会先执行,你追加的是业务增强,而不是把基础字段打散。
8. 如何扩展新的文档来源
如果你的资料不是本地文件,也不是已经得到的纯文本,而是:
- OSS / S3
- 数据库 blob
- CMS 页面
- 内部知识平台 API
可以实现自己的 DocumentLoader:
public class CmsDocumentLoader implements DocumentLoader {
@Override
public boolean supports(IngestionSource source) {
return source != null && source.getUri() != null && source.getUri().startsWith("cms://");
}
@Override
public LoadedDocument load(IngestionSource source) {
return LoadedDocument.builder()
.content(fetchCmsText(source.getUri()))
.sourceName(source.getName())
.sourceUri(source.getUri())
.build();
}
}
然后把它注册到自定义 pipeline:
IngestionPipeline ingestionPipeline = new IngestionPipeline(
embeddingService,
vectorStore,
Arrays.asList(new CmsDocumentLoader(), new TextDocumentLoader(), new TikaDocumentLoader()),
new RecursiveTextChunker(1000, 200),
Collections.<MetadataEnricher>singletonList(new DefaultMetadataEnricher())
);
9. OCR 与复杂文档清洗扩展点
现在 IngestionPipeline 在 DocumentLoader 和 Chunker 之间增加了一层:
LoadedDocumentProcessor
这层适合做:
- OCR 回填
- PDF / 扫描件文本修复
- 多余空白清理
- 连字符断行修复
- 页眉页脚、噪声文本清洗
9.1 内置清洗器
当前内置了:
WhitespaceNormalizingDocumentProcessorOcrNoiseCleaningDocumentProcessor
例如:
IngestionRequest request = IngestionRequest.builder()
.dataset("kb_docs")
.embeddingModel("text-embedding-3-small")
.source(IngestionSource.text("docu-\nment\n\n\nH e l l o"))
.documentProcessors(Collections.singletonList(
new OcrNoiseCleaningDocumentProcessor()
))
.build();
9.2 接入你自己的 OCR 引擎
如果你要接 PaddleOCR、Tesseract、云 OCR 或内部文档解析服务,可以实现 OcrTextExtractor:
OcrTextExtractor extractor = new OcrTextExtractor() {
@Override
public boolean supports(IngestionSource source, LoadedDocument document) {
return source != null && source.getFile() != null;
}
@Override
public String extractText(IngestionSource source, LoadedDocument document) {
return callYourOcrEngine(source.getFile());
}
};
IngestionPipeline ingestionPipeline = new IngestionPipeline(
embeddingService,
vectorStore,
Arrays.asList(new TextDocumentLoader(), new TikaDocumentLoader()),
new RecursiveTextChunker(1000, 200),
Collections.singletonList(new OcrTextExtractingDocumentProcessor(extractor)),
Collections.<MetadataEnricher>singletonList(new DefaultMetadataEnricher())
);
这个设计的重点是:
- Pipeline 不强绑具体 OCR 依赖
- 你可以按项目接任意 OCR 后端
- 清洗链和分块链天然解耦
9.3 Qdrant / Milvus / pgvector 接线方式
IngestionPipeline 自身不绑具体向量库,接线点始终是 VectorStore。
如果你直接用 AiService,可以这样接:
AiService aiService = new AiService(configuration);
IngestionPipeline qdrantPipeline = aiService.getIngestionPipeline(
PlatformType.OPENAI,
aiService.getQdrantVectorStore()
);
IngestionPipeline milvusPipeline = aiService.getIngestionPipeline(
PlatformType.OPENAI,
aiService.getMilvusVectorStore()
);
IngestionPipeline pgvectorPipeline = aiService.getIngestionPipeline(
PlatformType.OPENAI,
aiService.getPgVectorStore()
);
如果你是 Spring Boot,通常就是直接注入具体 VectorStore Bean,或者自己显式构造:
IEmbeddingService embeddingService = aiService.getEmbeddingService(PlatformType.OPENAI);
VectorStore qdrantStore = aiService.getQdrantVectorStore();
VectorStore milvusStore = aiService.getMilvusVectorStore();
VectorStore pgvectorStore = aiService.getPgVectorStore();
对应配置前缀分别是:
ai.vector.qdrant.*ai.vector.milvus.*ai.vector.pgvector.*
示例:
ai:
vector:
qdrant:
enabled: true
host: http://localhost:6333
api-key: ""
milvus:
enabled: false
host: http://localhost:19530
token: ""
db-name: default
pgvector:
enabled: false
jdbc-url: jdbc:postgresql://localhost:5432/postgres
username: postgres
password: postgres
table-name: ai4j_vectors
建议:
- 业务代码尽量面向
VectorStore - 只在配置层决定底层是
Qdrant / Milvus / pgvector - Flowgram 知识检索节点也复用这层统一抽象
10. IngestionResult 可以拿到什么
当前结果里可以直接拿到:
- 标准化后的
RagDocument - 最终
RagChunk列表 - 即将入库的
VectorRecord列表 - 实际 upsert 数量
这对两类场景很有用:
- 入库后做审计和调试
- 做“预览模式”,先看 chunk 和 metadata 再决定是否真正写库
如果你只是想预览,不真正写入,可以:
IngestionRequest request = IngestionRequest.builder()
.dataset("kb_docs")
.embeddingModel("text-embedding-3-small")
.source(IngestionSource.text(content))
.upsert(Boolean.FALSE)
.build();
11. 当前边界
这套内置实现故意保持轻量,所以当前边界也很明确:
- 默认还是字符分块,不是语义分块
- 不直接内置父子块、层级块、表格专用块
- 不直接承担重排序,它只负责“写库前”的链路
也就是说:
- OCR / 文档清洗现在已经有扩展点
- 但具体 OCR 引擎、复杂版面解析、表格结构恢复仍然由你按项目接入
也就是说,它是一个稳定起点,不是最终形态的知识库工程平台。
12. 和后续检索链路怎么接
典型接法就是:
IngestionPipeline
-> VectorStore
-> DenseRetriever / Bm25Retriever / HybridRetriever
-> 可选 ModelReranker
-> RagService
所以它和检索层是上下游关系:
IngestionPipeline负责把资料变成可检索索引Retriever / Reranker / RagService负责把索引真正用起来