2. Apache Flume

2.1. 概述

官网:http://flume.apache.org/

Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的工具。基于流式架构,容错性强,也很灵活简单。 Flume可以采集文件,socket数据包、文件、文件夹、kafka等各种形式源数据,又可以将采集到的数据(下沉sink)输出到HDFS、hbase、hive、kafka等众多外部存储系统中,一般的采集需求,通过对flume的简单配置即可实现 Flume针对特殊场景也具备良好的自定义扩展能力,因此,flume可以适用于大部分的日常数据采集场景。

  • Flume是一个海量数据采集的软件。

  • Flume是一款来自于apache java语言软件。

  • Flume身世

    • 0.9- 属于Cloudera 叫做flume-og

    • 1.0+ 属于apache 叫做flume-ng

2.2. 组件架构

1、Flume分布式系统中最核心的角色是agent,agent 本身是一个 Java 进程,一般运行在日志收集节点。flume采集系统就是由一个个agent所连接起来形成。 2、每一个agent相当于一个数据传递员,内部有三个组件: a)Source:采集组件,用于跟数据源对接,以获取数据 b)Sink:下沉组件,用于往下一级agent传递数据或者往最终存储系统传递数据 c)Channel:传输通道组件,用于从source将数据传递到sink

简单结构

image-20201024152821463

Source 到 Channel 到 Sink之间传递数据的形式是Event事件;Event事件是一个数据流单元

复杂结构

多级agent之间串联

image-20201024192847583

  • 部署架构

    • 单agent架构

      适合简单业务场景  只需要部署一个agent进程即可
      
    • 多agent级联(串联)

      适合复杂业务场景  在此场景下 各个agent之间没有地位区别 大家都一样。没有主从之分。
      因为在多台机器部署的 也称之为分布式架构。
      
  • 数据流程

Flume的一般流程是这样的:

source监控某个文件或数据流,数据源产生新的数据,拿到该数据后,将数据封装在一个Event中,并发送到channel后提交,channel队列先进先出,sink去channel队列中拉取数据,然后写入到HDFS/Kafka/或者其他的数据源,甚至是下一个Agent的Source。

  • 组件

    • source

      对接各个不同的数据源。 采集数据。
      

    ​ 常用的Source有:

    ​ 1) exec:可通过tail -f命令去tail一个文件,然后实时同步日志到sink

    ​ 2) spooldir:可监听一个目录,同步目录中的新文件到sink,被同步完的文件可被立即删除或被打上标记。 适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步。

    ​ 3) taildir:可实时监控一批文件,并记录每个文件最新消费位置,agent进程重启后不会有重复消费的问题

    ​ 4) 支持自定义

    • channel

      就是source和sink之间缓存数据的通道。
      

    ​ Channel是Agent 内部的数据传输通道,用于从 source 将数据传递到 sink;用于桥接Sources和Sinks, 类似于一个队列/缓存。Channel分为:

    ​ 1) Memory Channel是基于内存的,速度快

    ​ 2) File Channel是基于文件的,速度慢,因为会将所有事件写到磁盘,但数据更安全

    • sink

      对接各个不同下沉地(目的地)
      

    ​ Sink是下沉地/目的地,采集数据的传送目的地,用于从Channel收集数据,将数据写到目标源,Sink分为:

    ​ 1) 可以是HDFS、HBase、Kafka等

    ​ 2) 也可以是下一个 FlumeAgent的Source

    ​ 3) 支持自定义

    • 结论:上述3个组件就组成了flume的一个java进程。该进程的名字叫做agent。

  • Event

    • event是flume内部最小的数据单元。

    • flume采集的数据都是以event形式存在的。类似于数据包。

    • event分为event head /event body 数据都是存储在body中。

       Event: { headers:{} body: 6E 69 68 61 6F 0D                 }
      

2.2.1. Flume事务

FlumeTransaction

2.2.1.1. 数据传输的三大步骤

  • Source从数据源读取数据

  • Source将数据推送进入Channel

  • Sink从Channel中拉取数据

2.2.1.2. Flume如何实现传输数据的完整性、可靠性?

  • 数据为什么会丢失(没有事务机制的情况下)?

    • Channel是被动的,Source将数据主动推送给Channel,而Sink主动从Channel拉取数据(Take)

    • 一般Channel使用MemoryChannel,这样速度更快(FileChannel更安全但是速度慢),但是由于是基于内存,agent宕机的情况下可能导致内存中的数据丢失

    • Source端。

2.2.1.3. Flume的事务机制

2.2.1.3.1. Put事务

顺利的情况下:

Source采集数据调用duPut方法将一批数据(Event)封装在putList中,这批数据成功放入putList中之后,就会调用doCommit方法,将所有的Event放入到Channel中,成功放入就会清空putList。

问题:

第一种:

  • 如果Sink取出数据的速度过慢,而source放入数据过快,就会造成Channel中数据积压,这个时候putList中的数据就会放不进去,可是已经doCommit了,putList数据丢失了

  • 解决:调用doRollback方法:

    • 将putList清空,抛出ChannelException

    • 这个时候Source就会catch到doRollback的异常,Source就会将之前的一批数据重新采集,采集完成之后重新进行事务流程

第二种:

  • 如果Source采集数据使用的是tailDir source,由于某种情况下,监听分源目录被删除了,也会出现问题

  • 解决:调用doRollback来进行事务回滚

2.2.1.3.2. Take事务

顺利的情况下:

doTake方法会将Channel中的数据剪切到takeList中,然后等到takeList满后会调用doCommit方法将数据写入目的地,调用doCommit时会进行将数据写入目的地之后再清空takeList

问题:

  • 如果出现网络原因导致数据写入到目的地时传输失败了,这个时候如果不进行回滚而置之不理就会导致数据丢失

  • 解决:调用doRollback方法来进行回滚,takeList中存有备份数据,takeList中的数据就会原封不动地返还给Channel

  • 新的问题:如果在往目的地Sink数据的时候,刚好Sink“一半”的时候目的机宕机了,在回滚的时候takeList还是将全部数据原封不动返还给Channel,当目的机重新启动上线的时候,再进行Sink操作,这个时候数据就会重复了。所以从某种程度上来说,使用Flume采集数据不会丢失数据反而会使得数据重复。

2.3. Flume 安装

  • 提取将JDK配置好

  • 上传安装包进行解压

    tar -zxvf flume-ng-1.6.0-cdh5.14.0.tar.gz -C /export/servers/
    
  • 配置文件

    cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf

    cp  flume-env.sh.template flume-env.sh
    

    vim flume-env.sh

    export JAVA_HOME=/export/servers/jdk1.8.0_141
    

​ 配置成自己得JDK安装版本和安装目录

2.4. 案例

2.4.1. 入门案例

需求:采集日志文件 如果有数据发送 采集数据打印到控制台。

2.4.1.1. 配置文件编写

  • 所谓的采集方案指的就是根据业务需求 确定3个组件的类型和参数。 flume只有根据采集方案文件才指的如何工作。

  • 确定3个组件

    source: TAILDIR  读取文件
    
    channel:memory channel 缓存在内存中
    
    sink: logger 把数据打印到控制台
    
    基于上述3个类型组件 编写采集方案。
    

cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf/

vim console-logger.conf

# Name the components on this agent
# 先定义这个agent中各组件的名字 a1
# 再给3个组件 分别起名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# 描述和配置source组件:r1  
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /export/servers/tmp/flume/orderinfo.log

# Describe the sink
# 描述和配置source组件:r1
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# 描述和配置source  channel   sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

各组件得名称可以自定义

2.4.1.2. 数据源模拟

模拟日志发送数据

echo "订单金额:100" >> /export/servers/tmp/flume/orderinfo.log

2.4.1.3. Flume启动命令

#完整版命令
bin/flume-ng agent --conf conf --conf-file conf/console-logger.conf --name a1 -Dflume.root.logger=INFO,console

#精简版命令
bin/flume-ng agent -c ./conf -f ./conf/console-logger.conf  -n a1 -Dflume.root.logger=INFO,console


#bin/flume-ng agent  固定搭配
#--conf(-c)
	指定默认的配置文件路径  要求改路径下 必须有两个文件:flume-env.sh log4j.properties
#--conf-file  (-f)
	指定采集方案文件路径
#--name	 指定agent的名字 进程名称 (-n)
	该名称可以随便起 但是要保证和采集方案中一致
#-Dflume.root.logger=INFO,console
	开启日志 打印更详细的信息 在开发中建议打开

–name 指定agent的名字表示进程名称,一定

2.4.2. 采集数据到HDFS

  • 案例1

    • 需求:某目录下,会不断产生新的文件,每当有新文件出现,就需要把文件采集到 HDFS中去

    • 确定agent组件

      source: TAILDIR
      channel: memory channel
      sink:  HDFS sink
      

2.4.2.1. 配置文件编写

cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf

vim fileToHdfs.conf

#为我们的source channel  sink起名
a1.sources = r1
a1.channels = c1
a1.sinks = s1
#指定我们的source收集到的数据发送到哪个管道
a1.sources.r1.channels = c1
#指定我们的source数据收集策略
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /export/servers/tmp/flume/orderinfo.log

#指定我们的channel为memory,即表示所有的数据都装进memory当中
a1.channels.c1.type = memory
#指定我们的sink为kafka  sink,并指定我们的sink从哪个channel当中读取数据
a1.sinks.s1.channel = c1
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://node1:8020/tmp/flume
a1.sinks.s1.hdfs.fileType=DataStream

2.4.2.2. 数据源模拟

模拟日志发送数据

echo "订单金额:100" >> /export/servers/tmp/flume/orderinfo.log

2.4.2.3. Flume启动命令

#精简版命令
bin/flume-ng agent -c ./conf -f ./conf/fileToHdfs.conf  -n a1 -Dflume.root.logger=INFO,console

2.4.2.4. 优化配置

  • 配置参数介绍

    #roll 控制文件以何种形式触发滚动 写入新文件
    a1.sinks.k1.hdfs.rollInterval = 3  #以时间间隔控制滚动  默认30 s
    a1.sinks.k1.hdfs.rollSize = 20    #以文件大小控制滚动  默认1024 bytes 
    a1.sinks.k1.hdfs.rollCount = 5    #以event数量控制滚动  默认 10 个
    
    #上述三个都配置的情况下 如果滚动?  谁先满足 谁触发滚动。
    #如果不想以某个属性滚动。 就把该属性设置为0  禁用该属性滚动。
    
    #注意事项  如果滚动条件设置不合理 会频繁触发文件切换 会造成大量小文件产生
    #在实际开发中 避免小文件产生  最喜欢根据128M大小进行滚动  134217728
    
    
    #round 是否开启时间上的舍弃  通俗解释:多少时间切换新的文件夹
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 10
    a1.sinks.k1.hdfs.roundUnit = minute
    
    • 注意事项 因为在flume中涉及到了动态提取时间**的功能 需要做下面两件事中任一即可:

      /flume/events/%y-%m-%d/%H%M/
      
      • 开启使用本地时间戳

        a1.sinks.k1.hdfs.useLocalTimeStamp = true
        
  • 完整配置文件

cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf

vim fileToHdfs2.conf

a1.sources = r1
a1.channels = c1
a1.sinks = s1
#指定channel
a1.sources.r1.channels = c1
#指定我们的source数据收集策略
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f  /export/servers/tmp/flume/test.log

#指定我们的channel为memory,即表示所有的数据都装进memory当中
a1.channels.c1.type = memory
#指定我们的sink到s1,并指定我们的sink从c1当中读取数据
a1.sinks.s1.channel = c1

a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://node1:8020/tmp/flume/%y-%m-%d/%H-%M/
a1.sinks.s1.hdfs.fileType=DataStream

a1.sinks.s1.hdfs.rollInterval = 3
a1.sinks.s1.hdfs.rollSize = 20
a1.sinks.s1.hdfs.rollCount = 5
a1.sinks.s1.hdfs.round = true
a1.sinks.s1.hdfs.roundValue = 10
a1.sinks.s1.hdfs.roundUnit = minute
a1.sinks.s1.hdfs.useLocalTimeStamp = true
a1.sinks.s1.hdfs.filePrefix = test
a1.sinks.s1.hdfs.fileSuffix = log                    
  • 模拟日志增量数据

    • 日志内容不断增加,需要把采集数据追到日志文件中,并实时写入到 hdfs

    • 确定3个组件

      source:  exec   #支持在flume中运行shell命令 把命令执行的结果作为数据源进行采集。
      		 shell:tail -f 文件路径
      channel: memory channel  基于内存缓存数据
      sink: hdfs sink
      
    • 编写增量日志脚本

      cd /export/servers/tmp/flume

    ​ vim shell_log.sh

    while true;do date >> /export/servers/tmp/flume/test.log;sleep 0.5;done
    
    
    
    #!/bin/bash
    while true
    do
     date >>  /export/servers/tmp/flume/test.log
     sleep 0.5
    done
    

sh脚本赋权限:chmod 755 shell_log.sh

执行脚本:

sh  shell_log.sh

查看日志:

tail -f /export/servers/tmp/flume/test.log 

启动flume命令:

bin/flume-ng agent -c ./conf -f ./conf/fileToHdfs2.conf  -n a1 -Dflume.root.logger=INFO,console

2.4.3. 采集数据到Kafka

2.4.3.1. 配置文件编写

cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf

vim fileToKafka.conf

#为我们的source channel  sink起名
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#指定我们的source收集到的数据发送到哪个管道
a1.sources.r1.channels = c1
#指定我们的source数据收集策略
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /export/servers/tmp/flume/orderinfo.log

#指定我们的channel为memory,即表示所有的数据都装进memory当中
a1.channels.c1.type = memory
#指定我们的sink为kafka  sink,并指定我们的sink从哪个channel当中读取数据
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = test2
a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1

2.4.3.2. 数据源模拟

模拟日志发送数据

echo "订单金额:100" >> /export/servers/tmp/flume/orderinfo.log

2.4.3.3. Flume启动命令

#精简版命令
bin/flume-ng agent -c ./conf -f ./conf/fileToKafka.conf  -n a1 -Dflume.root.logger=INFO,console

2.4.3.4. kafka消费命令

bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test2