行业资讯 2025年08月6日
0 收藏 0 点赞 263 浏览 7092 个字
摘要 :

文章目录 一、Maven依赖配置 二、文件供应商与文档读取器 三、文档转换器 四、文档写入器 五、配置ETL管道 六、执行ETL管道 七、演示 八、总结 在开发Spring AI数据摄……




  • 一、Maven依赖配置
  • 二、文件供应商与文档读取器
  • 三、文档转换器
  • 四、文档写入器
  • 五、配置ETL管道
  • 六、执行ETL管道
  • 七、演示
  • 八、总结

在开发Spring AI数据摄取微服务时,搭建高效的ETL(提取、转换、加载)管道是关键一环。本文将详细介绍如何利用Spring Cloud Function和Spring AI来构建这样的管道,通过一步步的操作指南,带你从环境搭建到功能实现,最后进行演示验证,帮助你轻松掌握这一技术,为实际项目开发提供有力支持。

一、Maven依赖配置

要基于Spring Cloud Function配置ETL管道,首先得添加Spring AI和Spring Cloud Function模块的相关依赖。在项目的pom.xml文件中进行如下配置:

<properties>
    <!-- 设置Spring Cloud版本 -->
    <spring.cloud.version>2023.0.1</spring.cloud.version> 
    <!-- 设置Spring Functions Catalog版本 -->
    <spring.functions.catalog.version>5.0.0-SNAPSHOT</spring.functions.catalog.version> 
</properties>
<dependencies>
    <!-- Spring Cloud Function上下文依赖 -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-function-context</artifactId>
    </dependency>
    <!-- 文件供应商依赖,用于提供文件流 -->
    <dependency>
        <groupId>org.springframework.cloud.fn</groupId>
        <artifactId>spring-file-supplier</artifactId>
    </dependency>
    <!-- Spring AI的Tika文档读取器依赖 -->
    <dependency>
        <groupId>org.springframework.ai</groupId>
        <artifactId>spring-ai-tika-document-reader</artifactId>
    </dependency>
    <!-- Spring AI与OpenAI集成的启动器依赖 -->
    <dependency>
        <groupId>org.springframework.ai</groupId>
        <artifactId>spring-ai-openai-spring-boot-starter</artifactId>
    </dependency>
    <!-- Spring AI与Chroma数据库集成的启动器依赖 -->
    <dependency>
        <groupId>org.springframework.ai</groupId>
        <artifactId>spring-ai-chroma-store-spring-boot-starter</artifactId>
    </dependency>
    <!-- Spring Boot与Docker Compose集成的依赖,用于运行时管理容器 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-docker-compose</artifactId>
        <scope>runtime</scope>
    </dependency>
</dependencies>
<dependencyManagement>
    <dependencies>
        <!-- Spring Cloud依赖管理 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring.cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <!-- Spring Functions Catalog的BOM依赖管理 -->
        <dependency>
            <groupId>org.springframework.cloud.fn</groupId>
            <artifactId>spring-functions-catalog-bom</artifactId>
            <version>${spring.functions.catalog.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

通过这些依赖,项目就能引入所需的功能模块,为后续构建ETL管道打下基础。Spring AI 如何集成Spring Cloud Function构建ETL数据处理管道

二、文件供应商与文档读取器

spring-file-supplier模块提供了一个fileSupplier,它可以在其他应用中复用和组合。这个供应商能从指定目录生成一个文件的响应式流,开发者需要订阅这个Flux来获取数据。可以通过配置file.supplier.*相关属性,指定输入目录的位置和支持的文件格式。

# 设置输入目录路径
file.supplier.directory=c:/temp/ingestion-files 
# 设置文件名匹配正则表达式,只处理符合格式的文件
file.supplier.filename-regex=.*\\.(pdf|docx|txt|pages|csv) 

当Spring检测到Maven坐标和配置属性后,会自动启用一个函数,该函数读取文件内容并返回Flux<Message<byte[]>>。但在Spring AI的ETL管道中,我们需要的是org.springframework.ai.document.Document对象的Flux。所以,要创建一个新函数documentReader来进行格式转换。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.ai.document.Document;
import org.springframework.ai.reader.tika.TikaDocumentReader;
import org.springframework.messaging.Message;
import reactor.core.publisher.Flux;
import java.util.List;
import java.util.stream.Collectors;

@Configuration
public class CloudFunctionConfig {
    // 定义一个名为documentReader的Bean,用于将文件流转换为Document对象列表的流
    @Bean
    public Function<Flux<Message<byte[]>>, Flux<List<Document>>> documentReader() {
        // 输入是包含文件字节数据的消息流,输出是Document对象列表的流
        return resourceFlux -> resourceFlux
               // 对每个消息进行转换操作
               .map(message -> {
                    // 使用TikaDocumentReader将字节数据转换为Document对象列表
                    List<Document> documents = new TikaDocumentReader(new ByteArrayResource(message.getPayload()))
                           .get()
                           .stream()
                           // 为每个Document对象添加文件来源元数据
                           .peek(document -> {
                                document.getMetadata().put(\"source\", message.getHeaders().get(\"file_name\"));
                            })
                           .collect(Collectors.toList());
                    return documents;
                });
    }
    // 其他可能的配置方法
    //... 
}

三、文档转换器

文档转换器的功能是使用TokenTextSplitter将读取到的Document对象分割成文本块。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.ai.document.Document;
import org.springframework.ai.transformer.splitter.TokenTextSplitter;
import reactor.core.publisher.Flux;
import java.util.List;

@Configuration
public class CloudFunctionConfig {
    // 定义一个名为documentTransformer的Bean,用于分割Document对象列表
    @Bean
    public Function<Flux<List<Document>>, Flux<List<Document>>> documentTransformer() {
        // 输入是Document对象列表的流,输出也是Document对象列表的流
        return documentListFlux -> documentListFlux
               // 对每个Document对象列表进行转换操作
               .map(unsplitList -> new TokenTextSplitter().apply(unsplitList));
    }
    // 其他可能的配置方法
    //... 
}

四、文档写入器

文档写入器负责接收分割后的Document列表,并将其文本和嵌入向量存储到向量数据库中。

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.ai.document.Document;
import org.springframework.ai.vectorstore.VectorStore;
import reactor.core.publisher.Flux;
import java.util.List;

@Configuration
public class CloudFunctionConfig {
    // 定义日志记录器
    private static final Logger LOGGER = LoggerFactory.getLogger(CloudFunctionConfig.class); 

    // 定义一个名为documentWriter的Bean,用于将Document对象列表写入向量数据库
    @Bean
    public Consumer<Flux<List<Document>>> documentWriter(VectorStore vectorStore) {
        // 输入是Document对象列表的流
        return documentFlux -> documentFlux
               // 对每个Document对象列表进行操作
               .doOnNext(documents -> {
                    // 记录写入向量数据库的文档数量
                    LOGGER.info(\"Writing {} documents to vector store.\", documents.size()); 
                    // 将Document对象列表写入向量数据库
                    vectorStore.accept(documents); 
                    // 记录已写入向量数据库的文档数量
                    LOGGER.info(\"{} documents have been written to vector store.\", documents.size()); 
                })
               // 订阅流,开始处理数据
               .subscribe(); 
    }
    // 其他可能的配置方法
    //... 
}

五、配置ETL管道

当所有的Bean都添加到@Configuration类中后,就可以在属性文件中组合函数,形成ETL管道。在属性文件中添加如下配置:

# 定义ETL管道中函数的执行顺序
spring.cloud.function.definition=fileSupplier|documentReader|documentTransformer|documentWriter 

这样就指定了文件供应商、文档读取器、文档转换器和文档写入器的执行顺序,构建起了完整的ETL管道。

六、执行ETL管道

要执行一个函数,可以通过FunctionCatalog.lookup()方法查找其实例,然后调用run()方法来执行。如果上下文中只有一个组合函数,catalog.lookup(null)就能返回这个组合函数;否则,需要在lookup()方法中指定函数名。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.function.context.catalog.FunctionCatalog;
import org.springframework.stereotype.Service;

@Service
public class IngestionService {
    // 注入FunctionCatalog实例
    private final FunctionCatalog catalog; 

    // 构造函数,初始化FunctionCatalog实例
    public IngestionService(FunctionCatalog catalog) {
        this.catalog = catalog;
    }

    // 定义数据摄取方法
    public void ingest() {
        // 获取组合函数实例
        Runnable composedFunction = catalog.lookup(null); 
        // 执行组合函数
        composedFunction.run(); 
    }
}

七、演示

完成上述配置后,可以从多种方式调用ingest()方法,比如通过REST端点、批处理定时任务,或者任何支持流处理的应用。在本文示例中,通过REST端点来调用。

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;

@RestController
public class IngestionController {
    // 注入IngestionService实例
    @Resource
    IngestionService ingestionService; 

    // 处理POST请求,执行数据摄取操作
    @PostMapping(\"run-ingestion\")
    public ResponseEntity<?> run() {
        // 调用IngestionService的ingest方法执行数据摄取
        ingestionService.ingest(); 
        // 返回表示请求已被接受的响应
        return ResponseEntity.accepted().build(); 
    }
}

在本地运行应用,调用/run-ingestion端点,它会按顺序执行所有函数,并将文件内容的嵌入向量存储到向量数据库中。通过查看控制台日志,可以验证操作是否成功。例如,日志中会显示类似如下信息:

Writing 7 documents to vector store.
7 documents have been written to vector store.

八、总结

通过这个Spring Cloud Function的教程,我们成功构建了一个适用于Spring AI应用的数据摄取ETL管道。该应用利用内置的供应商函数,从指定的文件系统目录读取多种格式的文档,然后通过自定义的函数将文档内容处理成文本块,并把嵌入向量存储到Chroma向量数据库中。

归属教程 Spring AI 快速入门教程汇总

文章目录 Spring AI是什么?有啥优势? 如何在项目中使用Spring AI? Spring AI详细功 […]

微信扫一扫

支付宝扫一扫

版权: 转载请注明出处:https://www.zuozi.net/10300.html

管理员

相关推荐
2025-08-06

文章目录 一、Reader 接口概述 1.1 什么是 Reader 接口? 1.2 Reader 与 InputStream 的区别 1.3 …

988
2025-08-06

文章目录 一、事件溯源 (一)核心概念 (二)Kafka与Golang的优势 (三)完整代码实现 二、命令…

465
2025-08-06

文章目录 一、证明GC期间执行native函数的线程仍在运行 二、native线程操作Java对象的影响及处理方…

348
2025-08-06

文章目录 一、事务基础概念 二、MyBatis事务管理机制 (一)JDBC原生事务管理(JdbcTransaction)…

456
2025-08-06

文章目录 一、SnowFlake算法核心原理 二、SnowFlake算法工作流程详解 三、SnowFlake算法的Java代码…

517
2025-08-06

文章目录 一、本地Jar包的加载操作 二、本地Class的加载方法 三、远程Jar包的加载方式 你知道Groo…

832
发表评论
暂无评论

还没有评论呢,快来抢沙发~

助力内容变现

将您的收入提升到一个新的水平

点击联系客服

在线时间:08:00-23:00

客服QQ

122325244

客服电话

400-888-8888

客服邮箱

122325244@qq.com

扫描二维码

关注微信客服号