flume实时采集数据到kafka

启动zookeeper和kafka broker server

1
2
3
4
5
6
7
8
$ zkServer.sh start
JMX enabled by default
Using config: /usr/local/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

$ kafka-server-start.sh $KAFKA_HOME/config/server.properties
[2018-05-03 23:06:58,125] INFO KafkaConfig values:
......

分别配置exec-memory-avro.conf和avro-memory-kafka.conf

数据从控制台执行脚本将数据传输到TCP端口
TCP端口传过来的数据输出到kafka

exec-memory-avro.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# exec-memory-avro.conf
exec-memory-avro.sources = exec-source
exec-memory-avro.channels = memory-channel
exec-memory-avro.sinks = avro-sink

exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sources.exec-source.command = tail -F /usr/local/apache-flume-1.8.0-bin/data/access.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c

exec-memory-avro.channels.memory-channel.type = memory
exec-memory-avro.channels.memory-channel.capacity = 10000
exec-memory-avro.channels.memory-channel.transactionCapacity = 10000

exec-memory-avro.sinks.avro-sink.channel = memory-channel
exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = 127.0.0.1
exec-memory-avro.sinks.avro-sink.port = 44444
exec-memory-avro.sinks.avro-sink.batchSize = 50
avro-memory-kafka.conf

kafka sink 见 https://flume.apache.org/FlumeUserGuide.html

image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# avro-memory-kafka.conf

# Name the components on this agent
avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = momory-channel

# Describe/configure the source
avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = 127.0.0.1
avro-memory-kafka.sources.avro-source.port = 44444

# Describe the sink
# Must be set to org.apache.flume.sink.kafka.KafkaSin
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.kafka.bootstrap.servers = 127.0.0.1:9092
avro-memory-kafka.sinks.kafka-sink.kafka.topic = default-flume-topic

# Use a channel which buffers events in memory
avro-memory-kafka.channels.momory-channel.type = memory
avro-memory-kafka.channels.momory-channel.capacity = 1000
avro-memory-kafka.channels.momory-channel.transactionCapacity = 100

# Bind the source and sink to the channel
avro-memory-kafka.sources.avro-source.channels = momory-channel
avro-memory-kafka.sinks.kafka-sink.channel = momory-channel

后台启动两个flume-ng agent

1
2
3
$ cd /usr/local/apache-flume-1.8.0-bin
$ nohup flume-ng agent --conf conf --conf-file conf/exec-memory-avro.conf --name exec-memory-avro > exec-memory-avro.out 2>&1 &
$ nohup flume-ng agent --conf conf --conf-file conf/avro-memory-kafka.conf --name avro-memory-kafka > avro-memory-kafka.out 2>&1 &

jps -m查看目前进程

1
2
3
4
5
6
7
$ jps -m
17728 Jps -m
2898
17445 Application --conf-file conf/exec-memory-avro.conf --name exec-memory-avro
17590 Application --conf-file conf/avro-memory-kafka.conf --name avro-memory-kafka
8249 Kafka /usr/local/kafka_2.11-0.9.0.1/config/server.properties
8078 QuorumPeerMain /usr/local/zookeeper-3.4.6/bin/../conf/zoo.cfg

启动kafka控制台消费者

1
$ kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic default-flume-topic

往日志文件中写数据,查看消费者端数据输出

1
2
3
4
$ echo hahaha1 >> /usr/local/apache-flume-1.8.0-bin/data/access.log

$ kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic default-flume-topic
hahaha1
1
2
3
4
5
6
7
$ echo congratulations  >> /usr/local/apache-flume-1.8.0-bin/data/access.log
$ echo goon, you are great >> /usr/local/apache-flume-1.8.0-bin/data/access.log

$ kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic default-flume-topic
hahaha1
congratulations
goon, you are great

注意速度,flumeBatchSize调整,此处为默认值

邵志鹏 wechat
扫一扫上面的二维码关注我的公众号
0%