flume实战案例

flume下载安装

Apache Flume is a distributed, reliable, and available system for efficiently collecting, aggregating and moving large amounts of log data from many different sources to a centralized data store.
http://www.apache.org/dyn/closer.lua/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
可用性【所有的Agent在supervise的方式下启动,如果进程死掉会被系统立即重启,以提供服务。其次,对所有的Agent进行存活监控,发现Agent死掉立即报警。最后,对于非常重要的日志,建议应用直接将日志写磁盘,Agent使用spooldir的方式获得最新的日志。–美团】
可靠性【首先,Agent间的事务交换。其次,数据流中Channel的持久性。(MemoryChannel是可能会丢失数据的-在Agent死掉的时候,FileChannel是持久性的,提供类似MySQL bin-log的日志机制,保证数据不丢失。)–美团】
可扩展性【集群线性扩展】

1
2
wget http://www.apache.org/dyn/closer.lua/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
tar zxvf apache-flume-1.8.0-bin.tar.gz
1
2
3
4
5
vi ~/.bash_profile
export FLUME_HOME=/usr/local/apache-flume-1.8.0-bin
export FLUME_CONF_DIR=$FLUME_HOME/conf
export PATH=$PATH:$FLUME_HOME/bin
source ~/.bash_profile

flume实例一 从指定网络端口输出数据到控制台

控制台显示最多16个字符,多余的字符会被略掉
命令行传递参数NC_PORT

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = ${NC_PORT}

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动agent

1
2
3
4
5
6
7
8
NC_PORT=44444 bin/flume-ng agent --conf conf --conf-file conf/example.conf --name a1 -Dflume.root.logger=INFO,console -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties
......
NC_PORT=44444 bin/flume-ng agent --conf conf --conf-file conf/example.conf --name a1 -Dflume.root.logger=INFO,console -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties
2018-04-23 10:46:34,531 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
2018-04-23 10:49:44,657 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 77 6F 72 6C 64 0D helloworld. }
2018-04-23 10:50:08,370 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 61 20 73 69 6E 67 6C 65 20 65 78 61 6D 70 6C 65 a single example }
2018-04-23 10:50:55,956 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 6B 6A 64 6E 62 6B 67 66 6E 6A 6B 6E 66 0D kjdnbkgfnjknf. }
2018-04-23 10:51:04,969 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 6B 6A 64 6E 6D 6B 66 76 6E 64 6B 66 6E 6B 62 6E kjdnmkfvndkfnkbn }

telnet模拟访问端口

1
2
3
4
5
6
7
8
9
10
11
12
13
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
helloworld
OK
a single example but not zh_cn
OK
kjdnbkgfnjknf
OK
kjdnmkfvndkfnkbnkfgjbnkfnbkfjb
OK

flume实例二 监控一个文件实时采集新增的数据到控制台

tail -F f大写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# example2.conf: A single-node Flume configuration

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /usr/local/apache-flume-1.8.0-bin/data/data.log
a2.sources.r2.shell = /bin/sh -c

# Describe the sink
a2.sinks.k2.type = logger

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

启动agent

1
2
3
4
5
6
7
8
9
10
11
12
bin/flume-ng agent --conf conf --conf-file conf/example2.conf --name a2 -Dflume.root.logger=INFO,console
......
2018-04-23 11:05:22,718 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 6C 73 6B 6C 64 6B 6C 66 6B 6C 73 lskldklfkls }
2018-04-23 11:05:22,718 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F hello }
2018-04-23 11:05:22,719 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 77 6F 72 6C 64 world }
2018-04-23 11:05:22,719 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 6B 6B 6B 6B 6B 6B 6B 6B kkkkkkkk }
2018-04-23 11:05:22,719 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 6C 6B 73 6C 64 6D 66 6C 73 64 6B lksldmflsdk }
2018-04-23 11:05:22,719 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 68 68 hhh }
2018-04-23 11:05:37,745 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 68 68 6B 6C 6B 6C hhhklkl }
2018-04-23 11:05:46,756 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 68 68 6B 6C 6B 6C 73 73 73 hhhklklsss }
2018-04-23 11:12:42,344 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: E6 AD A4 E5 A4 84 E8 83 BD E9 87 87 E9 9B 86 E4 ................ }
2018-04-23 11:13:12,801 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 66 61 69 6C 75 65 20 75 73 65 20 63 68 69 6E 65 failue use chine }

往文件中追加数据

1
2
3
4
5
$ echo hhh >> data.log
$ echo hhhklkl >> data.log
$ echo hhhklklsss >> data.log
$ echo 此处能采集中文吗?试试看 >> data.log
$ echo failue use chinese character >> data.log

flume实例三 服务器B上的日志实时采集到服务器A且与A中的日志进行合并

此处服务器A和服务器B为同一台机器,本机模拟,IP为127.0.0.1

服务器A的agent配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
agenta.sources = sa sb
agenta.channels = ca
agenta.sinks = ka

agenta.sources.sa.type = avro
agenta.sources.sa.bind = 127.0.0.1
agenta.sources.sa.port = 33333
agenta.sources.sa.channels = ca
agenta.sources.sa.batchSize = 1000
agenta.sources.sa.batchTimeout = 1000

agenta.sources.sb.type = exec
agenta.sources.sb.channels = ca
agenta.sources.sb.command = tail -F /usr/local/apache-flume-1.8.0-bin/data/data.log
agenta.sources.sb.shell = /bin/sh -c
agenta.sources.sb.batchSize = 1000
agenta.sources.sb.batchTimeout = 1000

agenta.channels.ca.type = memory
agenta.channels.ca.capacity = 50000
agenta.channels.ca.transactionCapacity = 10000

agenta.sinks.ka.channel = ca
agenta.sinks.ka.type = file_roll
agenta.sinks.ka.sink.directory = /Users/shaozhipeng/tmp/flume
agenta.sinks.ka.sink.rollInterval = 0
agenta.sinks.ka.sink.batchSize = 100

服务器A的数据

1
2
3
4
5
6
7
8
9
10
11
$ tail -F /usr/local/apache-flume-1.8.0-bin/data/data.log
hello
world
kkkkkkkk
lksldmflsdk
hhh
hhhklkl
hhhklklsss
此处能采集中文吗?试试看
failue use chinese character
hi

服务器B的agent配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
agentb.sources = src1
agentb.channels = ch1
agentb.sinks = sink1

agentb.sources.src1.type = exec
agentb.sources.src1.channels = ch1
agentb.sources.src1.command = tail -F /usr/local/apache-flume-1.8.0-bin/data/access.log
agentb.sources.src1.shell = /bin/sh -c

agentb.channels.ch1.type = memory
agentb.channels.ch1.capacity = 10000
agentb.channels.ch1.transactionCapacity = 10000

agentb.sinks.sink1.channel = ch1
agentb.sinks.sink1.type = avro
agentb.sinks.sink1.hostname = 127.0.0.1
agentb.sinks.sink1.port = 33333
agentb.sinks.sink1.batchSize = 50

服务器B的数据

1
2
3
4
5
$ tail -F /usr/local/apache-flume-1.8.0-bin/data/access.log
模拟B服务器日志
B Server nginx access log file
XXXX
what

分别启动两个agent【后台运行】

1
2
nohup bin/flume-ng agent --conf conf --conf-file conf/examplea.conf --name agenta > examplea.out 2>&1 &
nohup bin/flume-ng agent --conf conf --conf-file conf/exampleb.conf --name agentb > exampleb.out 2>&1 &

查看合并后文件输出的目录【必要时先手动创建目录】
生成文件1524467365741-1

1
2
3
4
5
6
7
8
9
10
11
12
cd /Users/shaozhipeng/tmp/flume
$ tail -f 1524467365741-1
hhh
hhhklkl
hhhklklsss
此处能采集中文吗?试试看
failue use chinese character
hi
模拟B服务器日志
B Server nginx access log file
XXXX
what

实时数据采集合并输出

1
2
$ echo test accessfile >> /usr/local/apache-flume-1.8.0-bin/data/access.log
$ echo test datafile >> /usr/local/apache-flume-1.8.0-bin/data/data.log
1
2
3
4
5
6
7
8
9
10
11
12
13
$ tail -f 1524467365741-1
hhh
hhhklkl
hhhklklsss
此处能采集中文吗?试试看
failue use chinese character
hi
模拟B服务器日志
B Server nginx access log file
XXXX
what
test accessfile
test datafile
邵志鹏 wechat
扫一扫上面的二维码关注我的公众号
0%