行业资讯 2025年08月6日
0 收藏 0 点赞 586 浏览 7266 个字
摘要 :

文章目录 一、ETL管道的主要接口 二、构建ETL管道的详细步骤 (一)Maven依赖配置 (二)文档读取器的实现 (三)文档转换器的设置 (四)文档写入器的配置 (五)整合……




  • 一、ETL管道的主要接口
  • 二、构建ETL管道的详细步骤
    • (一)Maven依赖配置
    • (二)文档读取器的实现
    • (三)文档转换器的设置
    • (四)文档写入器的配置
    • (五)整合构建ETL管道
    • (六)执行ETL管道
  • 三、演示过程
  • 四、总结

本文将详细介绍如何利用Spring AI搭建ETL(提取、转换、加载)管道,将各种格式的原始数据(如文本、JSON/XML、音频、视频等)处理后存储到结构化向量存储中,以便后续进行相似性搜索,为基于检索增强生成(RAG)的应用提供数据支持。

一、ETL管道的主要接口

ETL管道主要由三个部分组成,在Spring AI中通过相应的接口来实现:

  • 文档读取器(DocumentReader):负责从指定的数据源读取文档。比如从文件系统中读取文本文件,或者从网络接口获取数据等。
  • 文档转换器(DocumentTransformer):对读取到的文档进行转换操作。例如,把长文本分割成合适的页面或段落,方便后续处理。
  • 文档写入器(DocumentWriter):将处理后的文档写入存储介质,比如文件系统或者向量数据库。

Spring AI为这些接口提供了许多内置的实现类,像TikaDocumentReader就实现了DocumentReader接口。Spring AI 如何实现ETL数据处理管道:从数据提取到存储实战当然,开发者也可以根据实际需求创建自定义的实现类。在创建好所需的bean之后,就可以按照下面的方式构建ETL管道:

// 创建TikaDocumentReader实例,用于读取文档
TikaDocumentReader documentReader = ...; 
// 创建TokenTextSplitter实例,用于转换文档
TokenTextSplitter documentTransformer = ...; 
// 创建VectorStore实例,用于写入文档
VectorStore documentWriter = ...; 

// 执行ETL操作,读取文档、转换后写入存储
documentWriter.write(documentTransformer.split(documentReader.read())); 

二、构建ETL管道的详细步骤

下面从头开始构建一个ETL管道,以便更深入地理解其工作原理。本次构建的需求如下:

  1. 所有的内容文件(如文本文件、CSV文件等)都存储在文件系统的特定目录中。
  2. 程序需要读取该目录下指定扩展名的文件,忽略其他文件。
  3. 转换过程会将文件内容分割成特定大小的块。
  4. 分割后的内容由文档写入器存储到向量数据库中。

(一)Maven依赖配置

在本次演示中,我们使用以下依赖:

  • spring-ai-tika-document-reader:用于读取各种格式的输入文档,支持多种常见文件类型。
  • spring-ai-openai-spring-boot-starter:借助OpenAiEmbeddingModel将文本块转换为向量。
  • spring-ai-chroma-store-spring-boot-starter:把Chroma数据库作为向量存储使用。
  • spring-boot-docker-compose:通过容器启动Chroma数据库。
<dependency>
    <groupId>org.springframework.ai</groupId>
    <artifactId>spring-ai-tika-document-reader</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.ai</groupId>
    <artifactId>spring-ai-openai-spring-boot-starter</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.ai</groupId>
    <artifactId>spring-ai-chroma-store-spring-boot-starter</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-docker-compose</artifactId>
    <scope>runtime</scope>
</dependency>

(二)文档读取器的实现

这里使用TikaDocumentReader,它基于Apache Tika,能够从PDF、DOC/DOCX、PPT/PPTX、HTML等多种格式的文件中提取文本,并将提取的文本封装在Document实例中。由于要读取项目目录外的多个文件,我们利用JDK的Files API列出并遍历所有符合条件的文件。在遍历过程中,使用Files.readAllBytes()读取文件内容,再通过TikaDocumentReader将字节数据转换为org.springframework.ai.document.Document对象。

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.ai.document.Document;
import org.springframework.ai.document.DocumentReader;
import org.springframework.ai.reader.tika.TikaDocumentReader;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.stereotype.Component;

@Component
public class CustomDocumentReader implements DocumentReader {
    // 定义日志记录器,用于记录类相关的日志信息
    private static final Logger LOGGER = LoggerFactory.getLogger(CustomDocumentReader.class); 

    // 从配置文件中读取输入目录路径
    @Value(\"${input.directory}\") 
    private String inputDir;

    // 从配置文件中读取文件名匹配正则表达式,用于筛选文件
    @Value(\"${input.filename.regex}\") 
    private String pattern;

    // 重写get方法,用于获取文档列表
    @SneakyThrows
    @Override
    public List<Document> get() {
        List<Document> documentList = new ArrayList<>();
        TikaDocumentReader tikaDocumentReader;

        // 遍历指定目录下符合正则表达式的文件
        Files.newDirectoryStream(Path.of(inputDir), pattern).forEach(path -> { 
            List<Document> documents = null;
            try {
                // 读取文件字节数据并转换为Document对象,同时添加文件来源元数据
                documents = new TikaDocumentReader(new ByteArrayResource(Files.readAllBytes(path))).get() 
                       .stream().peek(document -> {
                            document.getMetadata().put(\"source\", path.getFileName());
                            // 记录读取新文档的日志信息
                            LOGGER.info(\"Reading new document :: {}\", path.getFileName()); 
                        }).toList();
            } catch (IOException e) {
                // 读取文件出错时抛出运行时异常
                throw new RuntimeException(\"Error while reading the file : \" + path.toUri() + \"::\" + e); 
            }
            // 将读取到的文档添加到文档列表中
            documentList.addAll(documents); 
        });
        return documentList;
    }
}

输入目录和支持的文件扩展名可以在属性文件中进行配置:

# 输入文件目录路径
input.directory=c:/temp/ingestion-files/ 
# 文件名匹配正则表达式,只读取指定扩展名的文件
input.filename.regex=*.{pdf,docx,txt,pages,csv} 

(三)文档转换器的设置

当处理一些大文本文件时,将文本分割成小块是很有必要的,这样可以使文本更好地适配大语言模型(LLM)的上下文窗口。如果直接发送大文本,转换后的向量在进行相似性搜索时可能无法得到预期结果。这里我们使用TokenTextSplitter,并采用默认配置。如果有需要,也可以通过构造函数传入参数来配置不同的分块大小。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.ai.transformer.splitter.TextSplitter;
import org.springframework.ai.transformer.splitter.TokenTextSplitter;

@Configuration
public class AppConfiguration {
    // 定义一个bean,创建TokenTextSplitter实例
    @Bean 
    TextSplitter textSplitter() {
        return new TokenTextSplitter();
    }
}

(四)文档写入器的配置

在项目中添加Chroma DB依赖后,Spring AI会自动创建ChromaVectorStore类型的bean,其连接属性会从docker-compose文件中自动获取。docker-compose文件配置如下:

version: \'3.9\'

networks:
  net:
    driver: bridge
services:
  server:
    image: ghcr.io/chroma-core/chroma:latest
    environment:
      - IS_PERSISTENT=TRUE
    volumes:
      - chroma-data:/chroma/chroma/
    ports:
      - 8000:8000
    networks:
      - net
volumes:
  chroma-data:
    driver: local

如果是在本地安装的ChromaDB,则需要在属性文件中指定连接详细信息。配置完成后,就可以在Spring管理的bean中访问VectorStore bean了:

import org.springframework.stereotype.Component;
import org.springframework.ai.vectorstore.VectorStore;
import org.springframework.beans.factory.annotation.Autowired;

@Component
public class EtlPipeline {
    // 注入VectorStore实例,用于将处理后的文档写入向量数据库
    @Autowired 
    VectorStore vectorStore;
    // 其他可能的属性和方法
    //... 
}

(五)整合构建ETL管道

在创建好文档读取器、转换器和写入器后,就可以将它们整合起来构建ETL管道了。

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.ai.transformer.splitter.TextSplitter;
import org.springframework.ai.vectorstore.VectorStore;
import org.springframework.stereotype.Component;

@Component
public class EtlPipeline {
    // 定义日志记录器,用于记录类相关的日志信息
    private static final Logger LOGGER = LoggerFactory.getLogger(EtlPipeline.class); 

    // 注入自定义文档读取器实例
    private final CustomDocumentReader documentReader; 
    // 注入向量存储实例
    private final VectorStore vectorStore; 
    // 注入文本分割器实例
    private final TextSplitter textSplitter; 

    // 构造函数,用于初始化ETL管道所需的组件
    public EtlPipeline(VectorStore vectorStore,
                       TextSplitter textSplitter,
                       CustomDocumentReader documentReader) {
        this.vectorStore = vectorStore;
        this.textSplitter = textSplitter;
        this.documentReader = documentReader;
    }

    // 定义执行数据摄取的方法
    public void runIngestion() {
        // 记录数据摄取开始的日志信息
        LOGGER.info(\"RunIngestion() started\"); 
        // 执行ETL操作,读取文档、分割后写入向量存储
        vectorStore.write(textSplitter.apply(documentReader.get())); 
        // 记录数据摄取结束的日志信息
        LOGGER.info(\"RunIngestion() finished\"); 
    }
}

(六)执行ETL管道

根据项目需求不同,执行文档摄取过程的方式也多种多样。例如,可以通过定时任务调度、批处理程序,或者按需通过REST API端点来触发。在本次演示中,我们通过REST API端点来调用。

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class IngestionController {
    // 注入ETL管道实例
    EtlPipeline etlPipeline; 

    // 构造函数,用于初始化ETL管道实例
    public IngestionController(EtlPipeline etlPipeline){ 
        this.etlPipeline = etlPipeline;
    }

    // 处理POST请求,执行ETL管道的数据摄取操作
    @PostMapping(\"run-ingestion\") 
    public ResponseEntity<?> run(){
        etlPipeline.runIngestion();
        // 返回已接受请求的响应
        return ResponseEntity.accepted().build(); 
    }
}

三、演示过程

当所有组件都配置完成后,在输入目录中放入一些符合条件的文件,启动服务器,然后通过API客户端发送/run-ingestion请求。观察控制台中的服务器日志,可以看到ETL管道处理的所有文件记录。

...c.h.ai.demo.EtlPipeline : RunIngestion() started
...c.h.ai.demo.EtlPipeline : Reading new document :: current-affairs.pdf 
...c.h.ai.demo.EtlPipeline : Reading new document :: data.txt 
...c.h.ai.demo.EtlPipeline : Reading new document :: pan-format-file.csv 
...c.h.ai.demo.EtlPipeline : RunIngestion() finished

四、总结

通过这个Spring AI ETL管道的示例,我们成功创建了一个数据摄取服务。该服务能够从指定的文件系统目录读取多种格式的文档,将文档内容处理成小块,并把这些小块的嵌入向量存储到Chroma向量数据库中。希望通过本文的学习,大家能够在实际项目中灵活运用Spring AI的ETL技术,实现高效的数据处理和存储。

归属教程 Spring AI 快速入门教程汇总

文章目录 Spring AI是什么?有啥优势? 如何在项目中使用Spring AI? Spring AI详细功 […]

微信扫一扫

支付宝扫一扫

版权: 转载请注明出处:https://www.zuozi.net/10302.html

管理员

相关推荐
2025-08-06

文章目录 一、Reader 接口概述 1.1 什么是 Reader 接口? 1.2 Reader 与 InputStream 的区别 1.3 …

988
2025-08-06

文章目录 一、事件溯源 (一)核心概念 (二)Kafka与Golang的优势 (三)完整代码实现 二、命令…

465
2025-08-06

文章目录 一、证明GC期间执行native函数的线程仍在运行 二、native线程操作Java对象的影响及处理方…

348
2025-08-06

文章目录 一、事务基础概念 二、MyBatis事务管理机制 (一)JDBC原生事务管理(JdbcTransaction)…

456
2025-08-06

文章目录 一、SnowFlake算法核心原理 二、SnowFlake算法工作流程详解 三、SnowFlake算法的Java代码…

517
2025-08-06

文章目录 一、本地Jar包的加载操作 二、本地Class的加载方法 三、远程Jar包的加载方式 你知道Groo…

832
发表评论
暂无评论

还没有评论呢,快来抢沙发~

助力内容变现

将您的收入提升到一个新的水平

点击联系客服

在线时间:08:00-23:00

客服QQ

122325244

客服电话

400-888-8888

客服邮箱

122325244@qq.com

扫描二维码

关注微信客服号