行业资讯 2025年08月6日
0 收藏 0 点赞 492 浏览 4569 个字
摘要 :

文章目录 一、Flume安装 二、Flume典型应用 1、本地数据读取 2、收集至HDFS 3、基于日期分区的数据收集 一、Flume安装 1)下载 官网下载地址:http://flume.apache.or……




  • 一、Flume安装
  • 二、Flume典型应用
    • 1、本地数据读取
    • 2、收集至HDFS
    • 3、基于日期分区的数据收集

    一、Flume安装

    1)下载
    官网下载地址:http://flume.apache.org/download.html
    这里我们下载1.9版本的:
    Flume安装与配置及Flume应用典型实例
    2)上传至/usr/flume目录,然后执行如下指令解压:

    tar -zxvf apache-flume-1.9.0-bin.tar.gz
    

    3)配置Flume的环境变量,在/etc/profile中新增如下:

    #配置新增
    export FLUME_HOME=/usr/flume/apache-flume-1.9.0-bin
    export PATH=$PATH:$FLUME_HOME/bin
    #执行生效指令
    source /etc/profile
    

    到这里,Flume就安装完成了。

    二、Flume典型应用

    1、本地数据读取

    首先我们来看一个从本地目录下搜集数据的实例。具体操作步骤如下:
    1)在flume目录下新建名为flume1.conf的配置文件,内容如下:

    #指定sources、sinks、channels
    a1.sources=r1
    a1.sinks=k1
    a1.channels=c1
    #配置Source
    a1.sources.r1.type=spooldir
    a1.sources.r1.spoolDir=/usr/flume/source
    #配置Sink
    a1.sinks.k1.type=file_roll
    a1.sinks.k1.sink.directory=/usr/flume/sink
    #将Channel类型设置为memory
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
    #把Source和Sink绑到Channel上
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1
    

    此配置定义了一个名为a1的代理。a1具有侦听端口/usr/flume/source目录中的数据的源,在内存中缓冲事件数据的通道以及将事件数据记录到/usr/flume/sink目录。配置文件为各个组件命名,然后描述它们的类型和配置参数。给定的配置文件可能会定义几个命名的代理。当启动给定的Flume进程时,会传递一个标志,告诉它要显示哪个命名的代理。
    2)创建相应的文件夹
    /usr/flume/下分别创建source(作为数据源)和sink(作为输出目录)文件夹,

    mkdir source
    mkdir sink
    

    3)启动Flume代理
    使用flume-ng命令启动Flume代理,同时在console控制台打印日志,命令如下:

    flume-ng agent --conf conf --conf-file /usr/flume/flume1.conf --name a1 -Dflume.root.logger=INFO,console
    

    4)测试数据
    /usr/flume/source文件夹下导入一个test.txt文本文件Flume客户端会有如下显示:
    Flume安装与配置及Flume应用典型实例
    一旦Flume成功启动并完成日志收集,再查看source文件夹的test.txt文件时,则变成了test.txt.COMPLETED,sink文件夹中会收集到文件。
    Flume安装与配置及Flume应用典型实例

    2、收集至HDFS

    1)在前一个案例基础上,我们参考flume1.conf配置,新建flume2.conf配置文件如下:

    #指定sources、sinks、channels
    a1.sources=r1
    a1.sinks=k1
    a1.channels=c1
    #配置Source
    a1.sources.r1.type=spooldir
    a1.sources.r1.spoolDir=/usr/flume/source
    #配置Sink
    a1.sinks.k1.type=hdfs
    #配置收集后的文件,放在HDFS的哪个位置
    a1.sinks.k1.hdfs.path=hdfs://master:9820/flume/data
    a1.sinks.k1.hdfs.rollInterval=0
    a1.sinks.k1.hdfs.rollSize=10240000
    a1.sinks.k1.hdfs.rollCount=0
    a1.sinks.k1.hdfs.idleTimeout=3
    a1.sinks.k1.hdfs.fileType=DataStream
    a1.sinks.k1.hdfs.round=true
    a1.sinks.k1.hdfs.roundValue=10
    a1.sinks.k1.hdfs.roundUnit=minute
    a1.sinks.k1.hdfs.useLocalTimeStamp=true
    #将Channel类型设置为memory
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
    #把Source和Sink绑到Channel上
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1
    

    2)启动hadoop

    start-all.sh
    

    3)启动Flume代理

    flume-ng agent --conf conf --conf-file /usr/flume/flume2.conf --name a1 -Dflume.root.logger=INFO,console
    

    4)在source目录下新建test2.txt文件,这时我们可能或发现如下报错:

    2021-05-06 17:57:19,198 ERROR hdfs.HDFSEventSink: process failed
    java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V
        at org.apache.hadoop.conf.Configuration.set(Configuration.java:1357)
        at org.apache.hadoop.conf.Configuration.set(Configuration.java:1338)
        at org.apache.hadoop.conf.Configuration.setBoolean(Configuration.java:1679)
        at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:221)
        at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:572)
        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:412)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
        at java.lang.Thread.run(Thread.java:745)
    

    这是由于flume内依赖的guava.jar和hadoop内的版本不一致会造成,我们需要查看hadoop安装目录下share/hadoop/common/libguava.jar版本,再查看flume安装目录下libguava.jar的版本,如果两者不一致,删除版本低的,并拷贝高版本过去。此处我们发现flume中版本为guava-11.0.2.jar,hadoop中版本为guava-27.0-jre.jar,所以,此处我把flume中的删除,把hadoop中的复制过去

    #删除
    rm /usr/flume/apache-flume-1.9.0-bin/lib/guava-11.0.2.jar
    #复制
    cp /usr/hadoop/hadoop-3.2.1/share/hadoop/common/lib/guava-27.0-jre.jar /usr/flume/apache-flume-1.9.0-bin/lib/
    

    5)重启flume,发现正常采集到hdfs
    Flume安装与配置及Flume应用典型实例

    3、基于日期分区的数据收集

    有些时候,由于收集的文件很大,或者因业务需求,需要将收集到的数据按照日期分开存储。本案例基于上面的HDFS收集,再根据年、月、日、时分分别生成文件,具体操作步骤如下:
    1)在前一个案例基础上,我们参考flume2.conf配置,新建flume3.conf配置文件如下:

    #指定sources、sinks、channels
    a1.sources=r1
    a1.sinks=k1
    a1.channels=c1
    #配置Source
    a1.sources.r1.type=spooldir
    a1.sources.r1.spoolDir=/usr/flume/source
    #配置Sink
    a1.sinks.k1.type=hdfs
    #配置收集后的文件,放在HDFS的哪个位置,并按日期分别存储
    a1.sinks.k1.hdfs.path=hdfs://master:9820/flume/data/%Y-%m-%d/%H%M
    a1.sinks.k1.hdfs.rollInterval=0
    a1.sinks.k1.hdfs.rollSize=10240000
    a1.sinks.k1.hdfs.rollCount=0
    a1.sinks.k1.hdfs.idleTimeout=3
    a1.sinks.k1.hdfs.fileType=DataStream
    a1.sinks.k1.hdfs.round=true
    a1.sinks.k1.hdfs.roundValue=10
    a1.sinks.k1.hdfs.roundUnit=minute
    a1.sinks.k1.hdfs.useLocalTimeStamp=true
    #将Channel类型设置为memory
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
    #把Source和Sink绑到Channel上
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1
    

    2)启动hadoop和flume代理:

    flume-ng agent --conf conf --conf-file /usr/flume/flume3.conf --name a1 -Dflume.root.logger=INFO,console
    

    3)在source添加test3.txt文件,发现正常采集到hdfs中并按日期分别存储
    Flume安装与配置及Flume应用典型实例

微信扫一扫

支付宝扫一扫

版权: 转载请注明出处:https://www.zuozi.net/7971.html

管理员

相关推荐
2025-08-06

文章目录 一、Reader 接口概述 1.1 什么是 Reader 接口? 1.2 Reader 与 InputStream 的区别 1.3 …

988
2025-08-06

文章目录 一、事件溯源 (一)核心概念 (二)Kafka与Golang的优势 (三)完整代码实现 二、命令…

465
2025-08-06

文章目录 一、证明GC期间执行native函数的线程仍在运行 二、native线程操作Java对象的影响及处理方…

348
2025-08-06

文章目录 一、事务基础概念 二、MyBatis事务管理机制 (一)JDBC原生事务管理(JdbcTransaction)…

456
2025-08-06

文章目录 一、SnowFlake算法核心原理 二、SnowFlake算法工作流程详解 三、SnowFlake算法的Java代码…

517
2025-08-06

文章目录 一、本地Jar包的加载操作 二、本地Class的加载方法 三、远程Jar包的加载方式 你知道Groo…

832
发表评论
暂无评论

还没有评论呢,快来抢沙发~

助力内容变现

将您的收入提升到一个新的水平

点击联系客服

在线时间:08:00-23:00

客服QQ

122325244

客服电话

400-888-8888

客服邮箱

122325244@qq.com

扫描二维码

关注微信客服号