Please ref flume user guide first
http://flume.apache.org/FlumeUserGuide.html
And the Cloudera flume blogs
http://blog.cloudera.com/blog/category/flume/
How to define JAVA_HOME, java options and add our customized lib into flume-ng.
All these information will be defined in FLUME_CONFI_DIR/flume-env.sh
Example like below.
JAVA_HOME=/opt/java
JAVA_OPTS="-Xms200m -Xmx200m -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.port=3669 -Dflume.called.from.service"
FLUME_CLASSPATH=/opt/sponge/flume/lib/*
How start flume-ng as agent
Please note we should name the flume collector name to hostname_agent and this name will be used in the flume-conf-agent.properties
$/usr/lib/flume/bin/flume-ng agent --conf /opt/sponge/flume/config/ --conf-file /opt/sponge/flume/conf/flume-conf-agent.properties --name hostname_agent &
How to start flume-en as collector
Please note we should name the flume collector name to hostname_collector and this name will be used in the flume-conf-collector.properties
$/usr/lib/flume/bin/flume-ng agent --conf /opt/sponge/flume/config/ --conf-file /opt/sponge/flume/conf/flume-conf-collector.properties --name hostname_collector &
How to define the flume agent and flume collector property file.
I’ve already committed 2 different property files to https://svn.nam.nsroot.net:9050/svn/153299/elf/sponge-branches/2013-03-14-FlumeNG/sponge/myflumeng/config
Please ref flume-conf-agent.properties and flume-conf-collector.properties.
The basic name convention are
1)each agent name will be set as hostname_agent
2)each collector name will be set as hostname_collector
3)the source names will be source1, source2,source3…..
4)the sink name will be avroSink1, avroSink2, avroSink3….
5)each sink’s interceptor will be set as interceptor1, interceptor2, interceptor3 ….
6)all agent sinks will be AVRO sink.
7)the default collector source is AVRO source
8)agent sinks are load balanced as round robin
9)file channel is default for both agent and collector
flume-conf-agent.properties
hostname_agent.sources = source1, source2
hostname_agent.channels = fileChannel
hostname_agent.sinks = avroSink1, avroSink2
# For each one of the sources, the type is defined
hostname_agent.sources.source1.type = exec
hostname_agent.sources.source1.command = tail -F /var/log/audit/audit.log
hostname_agent.sources.source1.channels = fileChannel
hostname_agent.sources.source1.batchSize=10
hostname_agent.sources.source2.type = exec
hostname_agent.sources.source2.command = tail -F /var/log/flume/flume.log
hostname_agent.sources.source2.channels = fileChannel
hostname_agent.sources.source2.batchSize=10
# For each one of the sources, the log interceptor is defined
hostname_agent.sources.source1.interceptors = logIntercept1
hostname_agent.sources.source1.interceptors.logIntercept1.type = com.citi.sponge.flume.sink.LogInterceptor$Builder
hostname_agent.sources.source1.interceptors.logIntercept1.preserveExisting = false
hostname_agent.sources.source1.interceptors.logIntercept1.hostName = hostname
hostname_agent.sources.source1.interceptors.logIntercept1.env = PROD
hostname_agent.sources.source1.interceptors.logIntercept1.logType = AUDIT_LOG
hostname_agent.sources.source1.interceptors.logIntercept1.appId = 111111
hostname_agent.sources.source1.interceptors.logIntercept1.logFilePath = /var/log/audit
hostname_agent.sources.source1.interceptors.logIntercept1.logFileName = audit.log
hostname_agent.sources.source2.interceptors = logIntercept2
hostname_agent.sources.source2.interceptors.logIntercept2.type = com.citi.sponge.flume.sink.LogInterceptor$Builder
hostname_agent.sources.source2.interceptors.logIntercept2.preserveExisting = false
hostname_agent.sources.source2.interceptors.logIntercept2.hostName = hostname
hostname_agent.sources.source2.interceptors.logIntercept2.env = PROD
hostname_agent.sources.source2.interceptors.logIntercept2.logType = FLUME
hostname_agent.sources.source2.interceptors.logIntercept2.appId = 111111
hostname_agent.sources.source2.interceptors.logIntercept2.logFilePath = /var/log/flume
hostname_agent.sources.source2.interceptors.logIntercept2.logFileName = flume.log
#for each of the sink, type is defined
hostname_agent.sinks.avroSink1.type = avro
hostname_agent.sinks.avroSink1.hostname=collector1
hostname_agent.sinks.avroSink1.port=1442
hostname_agent.sinks.avroSink1.batchSize=10
hostname_agent.sinks.avroSink1.channel = fileChannel
hostname_agent.sinks.avroSink2.type = avro
hostname_agent.sinks.avroSink2.hostname=collector2
hostname_agent.sinks.avroSink2.port=1442
hostname_agent.sinks.avroSink2.batchSize=10
hostname_agent.sinks.avroSink2.channel = fileChannel
#Specify the load balance configurations for sinks
agent.sinkgroups = sinkGroup
agent.sinkgroups.sinkGroup.sinks = avroSink1 avroSink2
agent.sinkgroups.sinkGroup.processor.type = load_balance
agent.sinkgroups.sinkGroup.processor.backoff = true
agent.sinkgroups.sinkGroup.processor.selector = round_robin
agent.sinkgroups.sinkGroup.processor.selector.maxBackoffMillis=30000
# Each channel's type is defined.
hostname_agent.channels.fileChannel.type = file
hostname_agent.channels.fileChannel.checkpointDir = /opt/sponge/file-channel/checkpoint
hostname_agent.channels.fileChannel.dataDirs = /opt/sponge/file-channel/dataDirs
hostname_agent.channels.fileChannel.transactionCapacity = 1000
hostname_agent.channels.fileChannel.checkpointInterval = 30000
hostname_agent.channels.fileChannel.maxFileSize = 2146435071
hostname_agent.channels.fileChannel.minimumRequiredSpace = 524288000
hostname_agent.channels.fileChannel.keep-alive = 5
hostname_agent.channels.fileChannel.write-timeout = 5
hostname_agent.channels.fileChannel.checkpoint-timeout = 600
flume-collector.properties
hostname_collector.sources = source
hostname_collector.channels = fileChannel
hostname_collector.sinks = hbaseSink
# For each one of the sources, the type is defined
hostname_collector.sources.avroSource.channels = fileChannel
hostname_collector.sources.avroSource.type = avro
hostname_collector.sources.avroSource.bind = hostname
hostname_collector.sources.avroSource.port = 1442
hostname_collector.sinks.hbaseSink.type=org.apache.flume.sink.hbase.HBaseSink
hostname_collector.sinks.hbaseSink.table=spong_flumeng_log2
hostname_collector.sinks.hbaseSink.columnFamily=content
hostname_collector.sinks.hbaseSink.serializer=com.citi.sponge.flume.sink.LogHbaseEventSerializer
hostname_collector.sinks.hbaseSink.timeout=120
hostname_collector.sinks.hbaseSink.column=log
hostname_collector.sinks.hbaseSink.batchSize=2
hostname_collector.sinks.hbaseSink.channel=fileChannel
# Each channel's type is defined.
hostname_collector.channels.fileChannel.type = file
hostname_collector.channels.fileChannel.checkpointDir = /opt/sponge/file-channel/checkpoint
hostname_collector.channels.fileChannel.dataDirs = /opt/sponge/file-channel/dataDirs
hostname_collector.channels.fileChannel.transactionCapacity = 1000
hostname_collector.channels.fileChannel.checkpointInterval = 30000
hostname_collector.channels.fileChannel.maxFileSize = 2146435071
hostname_collector.channels.fileChannel.minimumRequiredSpace = 524288000
hostname_collector.channels.fileChannel.keep-alive = 5
hostname_collector.channels.fileChannel.write-timeout = 5
hostname_collector.channels.fileChannel.checkpoint-timeout = 600
相关推荐
Flume NG是Cloudera提供的一个分布式、可靠、可用的系统,它能够将不同数据源的海量日志数据进行高效收集、聚合、移动,最后存储到一个中心化数据存储系统中。由原来的Flume OG到现在的Flume NG,进行了架构重构,...
flume-ng安装
mvn flume ng sdk mvn flume ng sdk mvn flume ng sdk
Flume-ng-1.6.0-cdh.zip 内压缩了 3 个项目,分别为:flume-ng-1.6.0-cdh5.5.0.tar.gz、flume-ng-1.6.0-cdh5.7.0.tar.gz 和 flume-ng-1.6.0-cdh5.10.1.tar.gz,选择你需要的版本。
flume ng的分享资料. 启动到源码解析,到实例分享
flume-ng-sql-source-1.5.2.jar从数据库中增量读取数据到hdfs中的jar包
Flume-ng在windows环境搭建并测试+log4j日志通过Flume输出到HDFS 11111
flume-ng-1.5.0-cdh5.3.6.rarflume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume...
flume-ng-sql-source-1.5.2源码
flume-ng-sql-source-1.5.3.jar,flume采集mysql数据jar包,将此文件拖入FLUME_HOME/lib目录下,如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下,同样需要的包还有mysql-...
flume-ng-1.6.0-cdh5.5.0.tar.gz
Kafka+FlumeNG+Storm+HBase实时处理系统介绍
Flumeng简介 Apache Flume是从不同数据源收集、聚合、传输大量数据、日志到数据中心的分布式系统,具有可靠、可伸缩、可定制、高可用、高性能等明显优点。其主要特点有:声明式配置,可动态更新;提供上下文路由,...
flume-ng-sql-source-release-1.5.2.jar 用flume-ng-sql-source 从数据库抽取数据到kafka,支持sql
flume-ng-sql-source-1.5.1 flume连接数据库 很好用的工具
Flume documentation section of NG
flumeng-kafka-plugin 技术指标水槽1.4 Kafka 0.8.0 Beta
Kafka+FlumeNG+Storm+HBase构架设计
flume1.9采集数据入存入elasticsearch6.2.4,flume1.9本身只支持低版本的elasticsearch,基于apache-flume-1.9.0-src的flume-ng-sinks/flume-ng-elasticsearch-sink源码修改,支持es6.2.4,打的包,直接替换flume/...
flume kafka sparkstreamngpush channel and poll snik