flume1.8 kafka Channel + HDFS sink(without sources)
将 kafka 中的数据转存到 HDFS 中, 用作离线计算, flume 已经帮我们实现了, 添加配置文件, 直接启动 flume-ng 即可.
The Kafka channel can be used for multiple scenarios:
- With Flume source and sink - it provides a reliable and highly available channel for events
- With Flume source and interceptor but no sink - it allows writing Flume events into a Kafka topic, for use by other apps
- With Flume sink, but no source - it is a low-latency, fault tolerant way to send events from Kafka to Flume sinks such as HDFS, HBase or Solr
- $FLUME_HOME/conf/kafka-hdfs.conf
# kafka Channel + HDFS sink(without sources)a1.channels = c1a1.sinks = k1# 定义 KafkaChannela1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannela1.channels.c1.parseAsFlumeEvent = falsea1.channels.c1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092a1.channels.c1.kafka.topic = usera1.channels.c1.kafka.consumer.group.id = g1# 定义 HDFS sinka1.sinks.k1.channel = c1a1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = hdfs://hadoop-1:9000/flume/%Y%m%d/%Ha1.sinks.k1.hdfs.useLocalTimeStamp = truea1.sinks.k1.hdfs.filePrefix = loga1.sinks.k1.hdfs.fileType = DataStream# 不按照条数生成文件a1.sinks.k1.hdfs.rollCount = 0# HDFS 上的文件达到128M 生成一个文件a1.sinks.k1.hdfs.rollSize = 134217728# HDFS 上的文件达到10分钟生成一个文件a1.sinks.k1.hdfs.rollInterval = 600复制代码
记得配 hosts
- 添加 HDFS 相关jar包和配置文件
commons-configuration-1.6.jarcommons-io-2.4.jarhadoop-auth-2.8.3.jarhadoop-common-2.8.3.jarhadoop-hdfs-2.8.3.jarhadoop-hdfs-client-2.8.3.jarhtrace-core4-4.0.1-incubating.jarcore-site.xmlhdfs-site.xml复制代码
-
flume-1.8 kafka客户端默认版本0.9 但是向上兼容(别用这个 有巨坑 ~_~#)
kafka-clients-2.0.0.jar kafka_2.11-2.0.0.jar
-
先启动 zookeeper kafka 和 HDFS(否则会各种报错,)
-
进入
$FLUME_HOME
启动 flumeroot@common:/usr/local/flume# ./bin/flume-ng agent -c conf/ -f conf/kafka-hdfs.conf -n a1 -Dflume.root.logger=INFO,console