Flume

Flume

Flume1.8 启动报空指针报错

回复

文泽路小男孩OuO 回复了问题 1 人关注 1 个回复 2222 次浏览 2020-05-12 16:22 来自相关话题

Flume启动报错

回复

梅西忘 回复了问题 0 人关注 1 个回复 1244 次浏览 2019-05-04 17:13 来自相关话题

启动Flume失败

Vincentayd 回复了问题 3 人关注 2 个回复 3357 次浏览 2019-04-24 13:35 来自相关话题

flume FileChannel消费过慢

西西里ecf 回复了问题 2 人关注 1 个回复 2911 次浏览 2018-10-25 16:31 来自相关话题

Flume监控文件夹采集文件到HDFS,文件名有时间戳后缀,怎么保持文件名不变?

回复

wahahahah 发起了问题 1 人关注 0 个回复 4429 次浏览 2018-08-23 17:46 来自相关话题

flume channel堆积

回复

code4j 发起了问题 1 人关注 0 个回复 2161 次浏览 2018-08-09 15:20 来自相关话题

求助,flume不能从kafka中读取数据并且写入hdfs中

回复

peter_mu 发起了问题 1 人关注 0 个回复 3845 次浏览 2018-07-25 10:20 来自相关话题

Flume启动控制台不会输出运行日志

fish 回复了问题 2 人关注 1 个回复 4070 次浏览 2018-07-03 10:09 来自相关话题

flume出现java.lang.ClassNotFoundException: org.apache.flume.sink.kafka.KafkaSink

回复

anchao 发起了问题 1 人关注 0 个回复 3905 次浏览 2018-06-02 00:07 来自相关话题

flume向kafkaSink发送消息,kafkaSink接收不到

依风逗春 回复了问题 4 人关注 3 个回复 12336 次浏览 2018-04-02 15:50 来自相关话题

flume+kafka整合时,flume启动报错

fish 回复了问题 2 人关注 1 个回复 1587 次浏览 2018-03-30 10:51 来自相关话题

flume收集syslog(udp source)时,丢失数据严重

men3cheng 回复了问题 4 人关注 3 个回复 8160 次浏览 2018-03-12 17:37 来自相关话题

flume kafkaSource 的问题

九天 回复了问题 4 人关注 4 个回复 6076 次浏览 2018-03-01 14:22 来自相关话题

flume使用file channel报错

fish 回复了问题 3 人关注 2 个回复 2509 次浏览 2017-11-28 19:24 来自相关话题

flume配置sources的ignorePattern问题

fish 回复了问题 3 人关注 2 个回复 3264 次浏览 2017-11-28 17:07 来自相关话题

Flume 使用Hive Sink 运行后 接收数据错误

nh0823 回复了问题 4 人关注 6 个回复 8354 次浏览 2017-11-03 18:08 来自相关话题

开启Flume-ng报错

回复

jane3von 回复了问题 1 人关注 1 个回复 2680 次浏览 2017-08-23 15:11 来自相关话题

Flume 如何一天的数据追加在一个XXX.txt文件中

fish 回复了问题 2 人关注 4 个回复 2315 次浏览 2017-07-24 12:22 来自相关话题

flume syslongudp 丢数据,如何提升flume效率?增加channel和sink数量?

Dong 回复了问题 2 人关注 1 个回复 3839 次浏览 2017-05-18 13:09 来自相关话题

条新动态, 点击查看
ERROR提示你的地址被使用了。JDWP是远程调试用的,不能被初始化。 1、可以把远程调试的关了试试。 2、使用命令查查调试端口与其它应用的调试端口是否冲突, 例如查当前是否有8004端口占用:netstat -na|grep 8004 或者是你配置的端口号。
ERROR提示你的地址被使用了。JDWP是远程调试用的,不能被初始化。 1、可以把远程调试的关了试试。 2、使用命令查查调试端口与其它应用的调试端口是否冲突, 例如查当前是否有8004端口占用:netstat -na|grep 8004 或者是你配置的端口号。

Flume1.8 启动报空指针报错

回复

文泽路小男孩OuO 回复了问题 1 人关注 1 个回复 2222 次浏览 2020-05-12 16:22 来自相关话题

Flume启动报错

回复

梅西忘 回复了问题 0 人关注 1 个回复 1244 次浏览 2019-05-04 17:13 来自相关话题

启动Flume失败

回复

Vincentayd 回复了问题 3 人关注 2 个回复 3357 次浏览 2019-04-24 13:35 来自相关话题

flume FileChannel消费过慢

回复

西西里ecf 回复了问题 2 人关注 1 个回复 2911 次浏览 2018-10-25 16:31 来自相关话题

Flume监控文件夹采集文件到HDFS,文件名有时间戳后缀,怎么保持文件名不变?

回复

wahahahah 发起了问题 1 人关注 0 个回复 4429 次浏览 2018-08-23 17:46 来自相关话题

flume channel堆积

回复

code4j 发起了问题 1 人关注 0 个回复 2161 次浏览 2018-08-09 15:20 来自相关话题

求助,flume不能从kafka中读取数据并且写入hdfs中

回复

peter_mu 发起了问题 1 人关注 0 个回复 3845 次浏览 2018-07-25 10:20 来自相关话题

Flume启动控制台不会输出运行日志

回复

fish 回复了问题 2 人关注 1 个回复 4070 次浏览 2018-07-03 10:09 来自相关话题

flume出现java.lang.ClassNotFoundException: org.apache.flume.sink.kafka.KafkaSink

回复

anchao 发起了问题 1 人关注 0 个回复 3905 次浏览 2018-06-02 00:07 来自相关话题

flume向kafkaSink发送消息,kafkaSink接收不到

回复

依风逗春 回复了问题 4 人关注 3 个回复 12336 次浏览 2018-04-02 15:50 来自相关话题

flume+kafka整合时,flume启动报错

回复

fish 回复了问题 2 人关注 1 个回复 1587 次浏览 2018-03-30 10:51 来自相关话题

flume收集syslog(udp source)时,丢失数据严重

回复

men3cheng 回复了问题 4 人关注 3 个回复 8160 次浏览 2018-03-12 17:37 来自相关话题

flume kafkaSource 的问题

回复

九天 回复了问题 4 人关注 4 个回复 6076 次浏览 2018-03-01 14:22 来自相关话题

flume使用file channel报错

回复

fish 回复了问题 3 人关注 2 个回复 2509 次浏览 2017-11-28 19:24 来自相关话题

flume配置sources的ignorePattern问题

回复

fish 回复了问题 3 人关注 2 个回复 3264 次浏览 2017-11-28 17:07 来自相关话题

Flume 使用Hive Sink 运行后 接收数据错误

回复

nh0823 回复了问题 4 人关注 6 个回复 8354 次浏览 2017-11-03 18:08 来自相关话题

开启Flume-ng报错

回复

jane3von 回复了问题 1 人关注 1 个回复 2680 次浏览 2017-08-23 15:11 来自相关话题

Flume 如何一天的数据追加在一个XXX.txt文件中

回复

fish 回复了问题 2 人关注 4 个回复 2315 次浏览 2017-07-24 12:22 来自相关话题

flume syslongudp 丢数据,如何提升flume效率?增加channel和sink数量?

回复

Dong 回复了问题 2 人关注 1 个回复 3839 次浏览 2017-05-18 13:09 来自相关话题

Flume NG source和sink实现

唐半张 发表了文章 0 个评论 2038 次浏览 2015-10-06 11:07 来自相关话题

Source实现: Avro source,监听avro端口并从外部avro客户端流接受events; Thrift source,监听thrift端口并从外部thrift客户端流接收events; Exec source, ...查看全部
Source实现:
Avro source,监听avro端口并从外部avro客户端流接受events;
Thrift source,监听thrift端口并从外部thrift客户端流接收events;
Exec source,在启动时运行一个Unix命令,在标准输出上持续产生数据;
JMS source,从一个Java消息服务(JMS)终端上读取信息;
Spooling directory source,监听指定目录,当有新文件产生时,将新文件转化成events;
Twitter 1% firehose source,这个source还处于试验阶段;
Netcat source,类似于netcat,监听指定网络端口,将text文档的每一行转化成events;
Sequence generator source,一个简单的序列生成器,用一个计数器按+1递增连续events;
Syslog source,读取系统日志数据生成events;  
HTTP source,通过HTTP协议接收events;
Legacey source,这个source允许接受Flume0.9.4agents生成的events
Custom source,用于实现自定义source的接口;
Scribe source,另一种注入系统;
 
Sink 实现:
  HDFS sink,将events写入HDFS;
  Logger sink,INFO层的日志event,常用语测试目的;
  Avro sink,Flume将events转化成Avro events然后发送Avro sink;
   Thriftsink,Flume将events转化成Thrift events然后发送Thrift sink;
  IRC sink,将数据发送到配置好的IRC终端;
   Fileroll sink,将events存储在本地文件系统;
   Nullsink ,丢弃所有events;
  Hbase sinks,将数据写入HBase,分HBaseSink和AsyncHBaseSink两种;
  MorphineSolrSink,将数据从events抽取出来,转化后,几乎实时的存储在Apache Solr服务器,用于检索或搜索服务;
  ElasticSearchSink,将数据写入elasticsearch 集群中;
   Custom Sink,用于实现自定义sink的接口。

Flume og agent source和sink数据源格式

唐半张 发表了文章 0 个评论 2046 次浏览 2015-10-06 10:23 来自相关话题

Flume自带了很多直接可用的数据源(source),如: ·                text(“filename”):将文件filename作为数据源,按行发送 ·              ...查看全部
Flume自带了很多直接可用的数据源(source),如:
·                text(“filename”):将文件filename作为数据源,按行发送
·                tail(“filename”):探测filename新产生的数据,按行发送出去
·                fsyslogTcp(5140):监听TCP的5140端口,并且接收到的数据发送出去
·                tailDir("dirname"[,fileregex=".*"[, startFromEnd=false[, recurseDepth=0]]]):监听目录中的文件末尾,使用正则去选定需要监听的文件(不包含目录),recurseDepth为递归监听其下子目录的深度
同时提供了很多sink,如:
·                console[("format")]:直接将将数据显示在consolr上
·                text(“txtfile”):将数据写到文件txtfile中
·                dfs(“dfsfile”):将数据写到HDFS上的dfsfile文件中
·                syslogTcp(“host”,port):将数据通过TCP传递给host节点
·                agentSink[("machine"[,port])]:等价于agentE2ESink,如果省略,machine参数,默认使用flume.collector.event.host与flume.collector.event.port作为默认collecotr
·                agentDFOSink[("machine"[,port])]:本地热备agent,agent发现collector节点故障后,不断检查collector的存活状态以便重新发送event,在此间产生的数据将缓存到本地磁盘中
·                agentBESink[("machine"[,port])]:不负责的agent,如果collector故障,将不做任何处理,它发送的数据也将被直接丢弃
·                agentE2EChain:指定多个collector提高可用性。 当向主collector发送event失效后,转向第二个collector发送,当所有的collector失败后,它会非常执着的再来一遍

Flume进阶-将日志文件写入HDFS

唐半张 发表了文章 0 个评论 2724 次浏览 2015-10-06 10:15 来自相关话题

配置将指定目录下的日志文件写入HDFS 步骤 1:通过hadoop创建目录/flume/log 2:复制hadoop-core-1.1.1.jar到flume/lib 3:配置一个hdfs.conf如下 ...查看全部
配置将指定目录下的日志文件写入HDFS
步骤
1:通过hadoop创建目录/flume/log
2:复制hadoop-core-1.1.1.jar到flume/lib
3:配置一个hdfs.conf如下
agent1.sources = spooldirSource
agent1.channels = memoryChannel
agent1.sinks = hdfsSink

agent1.sources.spooldirSource.type=spooldir
agent1.sources.spooldirSource.spoolDir=/tmp/flume
agent1.sources.spooldirSource.channels=memoryChannel

agent1.sinks.hdfsSink.type=hdfs
agent1.sinks.hdfsSink.hdfs.path=hdfs://pg2:9000/flume/log
agent1.sinks.hdfsSink.filePrefix=log-
agent1.sinks.hdfsSink.channel=memoryChannel

agent1.channels.memoryChannel.type=memory
agent1.channels.memoryChannel.capacity=100
4:启动
[root@pg1 apache-flume-1.4.0-bin]# ./bin/flume-ng agent -n agent1 -c conf -f conf/hdfs.conf                                                                                                                                                   Info: Sourcing environment configuration script /flume/apache-flume-1.4.0-bin/conf/flume-env.sh
Info: Including Hadoop libraries found via (/hadoop/hadoop-1.1.1/bin/hadoop) for HDFS access
Info: Excluding /hadoop/hadoop-1.1.1/libexec/../lib/slf4j-api-1.4.3.jar from classpath
Info: Excluding /hadoop/hadoop-1.1.1/libexec/../lib/slf4j-log4j12-1.4.3.jar from classpath
Info: Including HBASE libraries found via (/hbase/hbase-0.94.4/bin/hbase) for HBASE access
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flume/tools/GetJavaProperty
Caused by: java.lang.ClassNotFoundException: org.apache.flume.tools.GetJavaProperty
 at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
Could not find the main class: org.apache.flume.tools.GetJavaProperty.  Program will exit.
Info: Excluding /hbase/hbase-0.94.4/lib/slf4j-api-1.4.3.jar from classpath
Info: Excluding /hbase/hbase-0.94.4/lib/slf4j-log4j12-1.4.3.jar from classpath
Info: Excluding /hadoop/hadoop-1.1.1/libexec/../lib/slf4j-api-1.4.3.jar from classpath
Info: Excluding /hadoop/hadoop-1.1.1/libexec/../lib/slf4j-log4j12-1.4.3.jar from classpath
 
报错内容org/apache/flume/tools/GetJavaProperty在该包中。
flume-ng-core-1.4.0.jar
5:测试
[root@pg1 conf]# cd /tmp/flume
[root@pg1 flume]# ls
[root@pg1 flume]# echo "Test hello flume write data to hdfs">test123.txt
[root@pg1 flume]# ls
test123.txt.COMPLETED 
[root@pg1 flume]#
可以看到已经被处理
6:查看Flume日志 $FLUME_HOME/logs下
2013 08:19:42,859 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:145)  - Starting Channel memoryChannelemoryChannel type memory
2013 08:19:42,916 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.register:110)  - Monitoried counter group for type: CHANNEL, name: memoryChannel, registered successfully.
2013 08:19:42,917 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:94)  - Component type: CHANNEL, name: memoryChannel started
2013 08:19:42,917 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:173)  - Starting Sink hdfsSinkdfsSink, type: hdfs
2013 08:19:42,918 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:184)  - Starting Source spooldirSourcee
2013 08:19:42,921 INFO  [lifecycleSupervisor-1-1] (org.apache.flume.instrumentation.MonitoredCounterGroup.register:110)  - Monitoried counter group for type: SINK, name: hdfsSink, registered successfully.
2013 08:19:42,921 INFO  [lifecycleSupervisor-1-1] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:94)  - Component type: SINK, name: hdfsSink startedce=EventDrivenSourceRunner: { source:org.apache.flume.source.Spooll
7:查看HDFS文件

Apache Flume - HDFSSink

唐半张 发表了文章 0 个评论 1655 次浏览 2015-10-06 10:14 来自相关话题

        HDFSSink用来将数据写入Hadoop分布式文件系统(HDFS)中。支持创建text和sequence文件。支持这2种文件类型的压缩。支持文件周期性滚动(就是关闭当前文件在建立一个新 ...查看全部
        HDFSSink用来将数据写入Hadoop分布式文件系统(HDFS)中。支持创建text和sequence文件。支持这2种文件类型的压缩。支持文件周期性滚动(就是关闭当前文件在建立一个新的),滚动可以基于时间、数据大小、事件数量。也支持通过event hearder属性timestamp或host分割数据。HDFS目录路径或文件名支持格式化封装,相应的封装串在HDFSSink生成目录或文件时被恰当的替换。使用HDFSSink需要首先安装hadoop,HDFSSink是通过hadoop jar和HDFS集群通信的。注意Hadoop版本需要支持sync().
    以下是HDFS支持的封装串:

      使用中的文件以".tmp"结尾,一旦文件关闭,这个扩展名将被移除。这样我们就可以排除没有完成的文件。时间相关的封装串都依赖于event的header中的timestamp属性。HDFSSink的属性如下:

配置示例:
log-collector.sources=screen
log-collector.sinks=screen
log-collector.channels=screen

# Describe the source


# Describe the sink
log-collector.sinks.screen.type = hdfs
#日志保存路径,这里按天存放
log-collector.sinks.screen.hdfs.path=hdfs://nn.uusee.com:9000/user/flume/original/screen/%Y/%m/%d
#文件前缀,也可以使用封装串
log-collector.sinks.screen.hdfs.filePrefix=screen-%Y%m%d-%H0000000
#使用LZO压缩算法,注意这里有一个bug,使用LzopCodec时文档没有说明
log-collector.sinks.screen.hdfs.codeC=LzopCodec
log-collector.sinks.screen.hdfs.fileType=CompressedStream
#每10分钟滚动
log-collector.sinks.screen.hdfs.rollInterval=600
#不根据文件大小滚动
log-collector.sinks.screen.hdfs.rollSize=0
#不根据事件条数滚动
log-collector.sinks.screen.hdfs.rollCount=0
log-collector.sinks.screen.hdfs.writeFormat=Text
#批量保存大小,需要和channel的匹配
log-collector.sinks.screen.hdfs.batchSize=1000
log-collector.sinks.screen.hdfs.threadsPoolSize=15
#hadoop集群响应时间较长时需要配置
log-collector.sinks.screen.hdfs.callTimeout=30000


# Describe the channel
log-collector.channels.screen.type = file
log-collector.channels.screen.checkpointDir = /data0/flume-ng-channel/log/screen/checkpoint
log-collector.channels.screen.dataDirs = /data0/flume-ng-channel/log/screen/data
log-collector.channels.screen.capacity = 2000000
log-collector.channels.screen.transactionCapacity = 1000

# Bind the source and sink to the channel
log-collector.sources.screen.channels = screen
log-collector.sinks.screen.channel = screen

flume+kafka+storm+mysql 数据流

唐半张 发表了文章 0 个评论 3513 次浏览 2015-10-06 10:04 来自相关话题

天终于将 flume + kafka + storm + mysql 这条数据流走通了,虽然只是一个简单的测试例子,但是依据这条数据流可以做的事情很多。 ...查看全部
天终于将 flume + kafka + storm + mysql 这条数据流走通了,虽然只是一个简单的测试例子,但是依据这条数据流可以做的事情很多。
先简单看一下这几个工具的架构吧,架构图会更好说明:
flume的架构图:


kafka的架构图:



storm的架构图:


我们使用的  flume + kafka + storm +mysql的数据流架构图:

下面介绍一下kafka到storm的配置:
其实这些都是通过java代码实现的,这里用到了 KafkaSpout类,RDBMSDumperBolt类(以后这些可以作为工具类打包上传到集群中)
storm作业中,我们写了一个KafkaStormRdbms类,作业具体配置如下:
首先设置连接mysql的参数:
[java] view plaincopyprint?
  • ArrayList columnNames = new ArrayList();  
  • ArrayList columnTypes = new ArrayList();  
  • String tableName = "stormTestTable_01";  
  • // Note: if the rdbms table need not to have a primary key, set the variable 'primaryKey' to 'N/A'  
  • // else set its value to the name of the tuple field which is to be treated as primary key  
  • String primaryKey = "N/A";  
  • String rdbmsUrl = "jdbc:mysql://$hostname:3306/fuqingwuDB" ;  
  • String rdbmsUserName = "fuqingwu";  
  • String rdbmsPassword = "password";  
  •   
  • //add the column names and the respective types in the two arraylists  
  • columnNames.add("word");  
  •   
  • //add the types  
  • columnTypes.add("varchar (100)");  
配置 KafkaSpout 及 Topology:[java] view plaincopyprint?
  • TopologyBuilder builder = new TopologyBuilder();  
  •          
  •         List hosts = new ArrayList();  
  •         hosts.add("hadoop01");  
  •         SpoutConfig spoutConf = SpoutConfig.fromHostStrings(hosts, 1, "flume_kafka", "/root", "id");  
  •         spoutConf.scheme = new StringScheme();  
  •         spoutConf.forceStartOffsetTime(-2);  
  •          
  •         spoutConf.zkServers = new ArrayList() {{  
  •                       add("hadoop01");   
  •                     }};  
  •         spoutConf.zkPort = 2181;  
  •          
  •         //set the spout for the topology  
  •         builder.setSpout("spout",  new KafkaSpout(spoutConf), 1);  
  •   
  •         //dump the stream data into rdbms table      
  •         RDBMSDumperBolt dumperBolt = new RDBMSDumperBolt(primaryKey, tableName, columnNames, columnTypes, rdbmsUrl, rdbmsUserName, rdbmsPassword);  
  •         builder.setBolt("dumperBolt",dumperBolt, 1).shuffleGrouping("spout");  

flume 几个比较有用的source、sink和decorator

唐半张 发表了文章 0 个评论 2260 次浏览 2015-09-30 11:11 来自相关话题

Source catalog: 1.Console 控制台输出,可以带输出内容格式的参数 比如console(“raw”), console(“json”) ...查看全部
Source catalog:
1.Console 控制台输出,可以带输出内容格式的参数
比如console(“raw”), console(“json”)
2.text("filename"[, format])
一次读取文件内容,每一个换行做为一个事件。
3.tail("filename"[,startFromEnd=false]{,delim="regex", delimMode="exclude|prev|next"})
   读取文件尾部内容跟linux上tail类似。
   Filename:读取的文件名,
   StartFromEnd 每次重读时,是否重头开始读。默认false每次重头开始重读,
   Delim 是分割参数,
   DelimMode分隔符数据属于哪一部分的指定。
4.tailDir("dirname"[,fileregex=".*"[, startFromEnd=false[,recurseDepth=0]]]{,delim="regex",delimMode="exclude|prev|next"})
对一个目录下面有改动文件的事件发送
Dirname:目录名
Fileregex:文件名正则表达式匹配,需要符合java中正则表达式规则
StartFromeEnd 跟tail中的参数一样。
RecurseDepth :指定目录下是否递归对其子目录中文件的监控。指定递归层数。
其他剩余参数跟tail中一致。
Sinks catalog
CollectorTier Event Sinks
1.   collectorSink("fsdir","fsfileprefix"[, rollmillis[, format]])
收集发送过来的事件。
Fsdir:目录,
Fsfileprefix:文件前缀,
Rollmillis:对于hdfs来就是文件的打开是关闭这段时间
Format是输出文件格式
AgentTier Event Sinks 1、agentSink[("machine"[,port])]Defaults to agentE2ESink
2、agentE2ESink[("machine"[,port])]
先讲event内容写入文件防止在缓冲丢失和点对点确认的数据传输
Machine:collectorSource的ip,
Port:collectorSource的端口
3、agentDFOSink[("machine"[,port])]
当连接断开时,会把event写到本地,并且会不断重试发送event给对方,把之前未发送成功的event再次发送。
4、agentBESink[("machine"[,port])]
BestEffort Agent sink,最有效率的agent sink,顾名思义,就是只管发送,不管有没有发送成功。就可能存在丢失event的情况存在。
1、     agentE2EChain("m1[:_p1_]"[, "m2[:_p2_]"[,…]])
有多个collector可供选择发送。如果第一丢失连接了,就会向第二发送event,以此类推。并且会不定期回来查看原来没反应的collector是否已经恢复了,如果恢复了,就会跟原来的节点交互了。发送机制跟agentE2ESink一样
2、     agentDFOChain("m1[:_p1_]"[, "m2[:_p2_]"[,…]])
3、 agentBEChain("m1[:_p1_]"[, "m2[:_p2_]"[,…]])
上面两个跟6类似 
autoE2EChain,autoDFOChain,autoBEChain,自动的会想master要可以连接的collector 
装饰器catalog
1、writeAhead 在sink发送前先写入文件后,在本地缓冲机制,可让接收和发送分别在不同的线程中进行。
2、ackedWriteAhead[(maxmillis)]
3、diskFailover[(maxmillis)]
如果event发送失败,就在本地磁盘缓存,然后每隔maxmillis时间去重试。
4、ackInjector:注入ack确认操作。
5、ackChecker:计算发送组的checksum值,插入到ackInjector中
6、lazyOpen:只在条用append的时候,才会做真正的open,closed操作 7、 format("pattern"):改变输出格式,具体格式有pattern参数指定。对于需要做checksum的events来说由于改变了数据格式将导致checksum的值也不会改变。从而导致消息的丢失。这个装饰器一般用在对消息可靠性要求不是很高,或者reports中。

8、 batch(n,maxlatency):缓存n个events然后统一发送。如果等待时间操作maxlatency,即使未有n个events也会发送。

9、 unbatch:对于上面被batch操作过的events,同步unbatch来拆分开。

10、    gzip:将events事件序列化后压缩

11、    gunzip:对gzip压缩过的events,解压

12、     intervalSampler(n)每隔n个events发送一次,算是一个优化手段,特别是通过网络传输时。

13、     probSampler(p):通过概率p来做决定是否将events接着往下发送

14、     reservoirSampler(k):对接收到events根据发送时的顺序重新来组织。

15、     delay(ms)对要发送的events内容延迟ms

16、    choke[(choke-id)]:限制发送速度

flume ng配置拓扑图

唐半张 发表了文章 0 个评论 2589 次浏览 2015-09-30 11:06 来自相关话题

生产环境flume ng配置拓扑图。超级详细 ...查看全部
生产环境flume ng配置拓扑图。超级详细

Flume(ng) 自定义sink实现和属性注入

唐半张 发表了文章 0 个评论 2192 次浏览 2015-09-30 11:03 来自相关话题

远端日志收集的整体思路是远端自定义实现log4j的appender把消息发送到flume端,flume端自定义实现一个sink来按照我们的规则保存日志。 ...查看全部
远端日志收集的整体思路是远端自定义实现log4j的appender把消息发送到flume端,flume端自定义实现一个sink来按照我们的规则保存日志。

自定义Sink代码:
public class LocalFileLogSink extends AbstractSink implements Configurable {
     private static final Logger logger = LoggerFactory
              . getLogger(LocalFileLogSink .class );
            private static final String PROP_KEY_ROOTPATH = "rootPath";
      private String rootPath;
     @Override
     public void configure(Context context) {
          String rootPath = context.getString(PROP_KEY_ROOTPATH );
          setRootPath(rootPath);
     }
           
          @Override
          public Status process() throws EventDeliveryException {
           logger .debug("Do process" );
       }
}






实现Configurable接口,即可在初始化时,通过configure方法从context中获取配置的参数的值。这里,我们是想从flume的配置文件中获取rootPath的值,也就是日志保存的根路径。在flume-conf.properties中配置如下:
agent.sinks = loggerSink
agent.sinks.loggerSink.rootPath = ./logs

loggerSink是自定义sink的名称,我们取值时的key,只需要loggerSink后面的部分即可,即这里的rootPath。

实际业务逻辑的执行,是通过继承复写AbstractSink中的process方法实现。从基类的getChannel方法中获取信道,从中取出Event处理即可。


Channel ch = getChannel();
           Transaction txn = ch.getTransaction();
         txn.begin();
          try {
              logger .debug("Get event." );
             Event event = ch.take();
             txn.commit();
             status = Status. READY ;
             return status;
                   }finally {
             Log. info( "trx close.");
             txn.close();
         }

Flume NG flume-hdfs-sink 源代码分析

唐半张 发表了文章 0 个评论 1559 次浏览 2015-09-30 10:59 来自相关话题

SINK的分享C1: HDFSEventSink 0. HDFSEventSink.configure() also needs to implement a Configurab ...查看全部
SINK的分享C1: HDFSEventSink
0. HDFSEventSink.configure() also needs to implement a Configurable interface for processing its own configuration settings.
0.1 从context中读取配置参数configure;
0.2 设置编码,
      codeC = getCodec(codecName);
      // TODO : set proper compression type
      compType = CompressionType.BLOCK;
0.2.1 getCodec()
(1) 通过 CompressionCodecFactory.getCodecClasses(conf); 获取所能兼容的编码类型codecs
(2) 通过codecMatches(cls, codecName)判断是否相等,以获取编码名codecName所对应的编码类;
(3) 获取codec = cls.newInstance(),
(4) 
0.3 set writeFormat
if writeFormat = null,
then set format according to file type, if fileType= DataStreamType or CompStreamType, set 
1. HDFSEventSink.start()  method should initialize the sink and bring it to a state where it can forward the events to its next destination.
 
2. HDFSEventSink.process() method from sink interface is should do the core processing of extracting the event from channel and forwarding it.
 
3. HDFSEventSink.stop() method should do the necessary cleanup.
 
 

HDFSEventSink will call (2) HDFSFormatterFactory  
(2.1) HDFSWriterableFormatter
(2.2) HDFSTextFormatter
(3) HDFSWriterFactory
(3.1) HDFSSequenceFile
(3.2) HDFSDataStream
(3.3) HDFSCompressDataStream
(4) BucketWriter
(5) HDFSWriter
 

Flume OG 常用source sink cllector

唐半张 发表了文章 0 个评论 1564 次浏览 2015-09-30 10:20 来自相关话题

Flume OG  source ...查看全部
Flume OG 
source
text(“filename”):将文件filename作为数据源,按行发送
tail(“filename”):探测filename新产生的数据,按行发送出去
fsyslogTcp(5140):监听TCP的5140端口,并且接收到的数据发送出去
tailDir("dirname"[, fileregex=".*"[, startFromEnd=false[, recurseDepth=0]]]):监

听目录中的文件末尾,使用正则去选定需要监听的文件(不包含目录),recurseDepth为递

归监听其下子目录的深度
 
sink
console[("format")] :直接将将数据显示在consolr上
text(“txtfile”):将数据写到文件txtfile中
dfs(“dfsfile”):将数据写到HDFS上的dfsfile文件中
syslogTcp(“host”,port):将数据通过TCP传递给host节点
agentSink[("machine"[,port])]:等价于agentE2ESink,如果省略,machine参数,默认使

用flume.collector.event.host与flume.collector.event.port作为默认collecotr
agentDFOSink[("machine" [,port])]:本地热备agent,agent发现collector节点故障后,

不断检查collector的存活状态以便重新发送event,在此间产生的数据将缓存到本地磁盘中
agentBESink[("machine"[,port])]:不负责的agent,如果collector故障,将不做任何处

理,它发送的数据也将被直接丢弃
agentE2EChain:指定多个collector提高可用性。 当向主collector发送event失效后,转

向第二个collector发送,当所有的collector失败后,它会非常执着的再来一遍

cllector
collectorSource[(port)]:Collector source,监听端口汇聚数据
autoCollectorSource:通过master协调物理节点自动汇聚数据

Flume-ng HDFS sink原理解析

唐半张 发表了文章 0 个评论 2193 次浏览 2015-09-30 10:18 来自相关话题

HDFS sink主要处理过程在process方法: //循环batchSize次或者Channel为空 for (txnEventCount = 0; txnEventCount < batchSiz ...查看全部
HDFS sink主要处理过程在process方法:
//循环batchSize次或者Channel为空
for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
//该方法会调用BasicTransactionSemantics的具体实现
Event event = channel.take();
if (event == null) {
break;
}
......
//sfWriter是一个LRU缓存,缓存对文件Handler,最大打开文件由参数maxopenfiles控制
BucketWriter bucketWriter = sfWriters.get(lookupPath);
// 如果不存在,则构造一个缓存
if (bucketWriter == null) {
//通过HDFSWriterFactory根据filetype生成一个hdfswriter,由参数hdfs.Filetype控制;eg:HDFSDataStream
HDFSWriter hdfsWriter = writerFactory.getWriter(fileType);
//idleCallback会在bucketWriter flush完毕后从LRU中删除;
bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount,
batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,
suffix, codeC, compType, hdfsWriter, timedRollerPool,
proxyTicket, sinkCounter, idleTimeout, idleCallback,
lookupPath, callTimeout, callTimeoutPool);
sfWriters.put(lookupPath, bucketWriter);
}
......
// track一个事务内的bucket
if (!writers.contains(bucketWriter)) {
writers.add(bucketWriter);
}
// 写数据到HDFS;
bucketWriter.append(event);->
open();//如果底层支持append,则通过open接口打开;否则create接口
//判断是否进行日志切换
//根据复制的副本书和目标副本数做对比,如果不满足则doRotate=false
if (doRotate) {
close();
open();
}
HDFSWriter.append(event);
if (batchCounter == batchSize) {//如果达到batchSize行进行一次flush
flush();->
doFlush()->
HDFSWriter.sync()->
FSDataoutputStream.flush/sync
}
// 提交事务之前,刷新所有的bucket
for (BucketWriter bucketWriter : writers) {
bucketWriter.flush();
}
transaction.commit();
这里,无论是BucketWriter执行append,sync还是rename等操作都是提交到一个后台线程池进行异步处理:callWithTimeout,这个线程池的大小是由hdfs.threadsize来设置;

Flume配合hadoop的使用

唐半张 发表了文章 0 个评论 1566 次浏览 2015-09-29 10:48 来自相关话题

下载最新版0.9.4  ...查看全部
下载最新版0.9.4 https://github.com/downloads/clo ... on-0.9.4-bin.tar.gz
1、设置环境变量:FLUME_CONF_DIR  解压后conf路径(JAVA路径等)
2、启动master:  ./flume master 
       启动node  ./flume node 
       以上启动方式信息直接输出在console,要将信息输出到日志文件:修改flume-daemon.sh中FLUME_HOME、FLUME_LOG_DIR、FLUME_PID_DIR 值
3、master启动后,可以通过 http://机器名:35871 访问管理界面,通过点击config 配置node(下拉选择对应节点)的Source 和 Sink ,也可以通过下面配置多个node ,格式:  host : Source |  Sink;  (勿忘分号)
4、配置多个master 需要配置conf下配置文件:
     以下几点:
flume.master.servers
host1,host2,...


flume.master.serverid
index
The unique identifier for a machine in a
Flume Master ensemble. Must be different on every
master instance.

< /property>
< property>
< name>flume.master.gossip.port
< value>57890
< /property>
5、通过./flume shell 以命令方式进行配置
connect localhost
  通过 ./flume shell  -c "localhost:35873" -q -s "命令文件"      执行包含命令的文件
           ./flume shell  -c "localhost:35873" -q  -e "command"     执行一条命令 (注意要用\"进行转义)
6、通过tailDir可以监控文件目录变化,通过%{tailSrcFile} 可以获取文件名,并可以给Event添加键值对
eg:
  exec host1 'collectorSource' 'collectorSink("hdfs://hostname:9000/data/%{category}/%{host}","%{tailSrcFile}-")'
exec host2 'collectorSource' 'collectorSink("hdfs://hostname:9000/data/%{category}/%{host}","%{tailSrcFile}-")'
exec config agent1 'tailDir("filepath",fileregex=".*\\.log$",startFromEnd=true,recurseDepth=1)' '{value("category","A") => agentE2EChain("host1","host2")}'
exec config agent2 'tailDir("filepath",fileregex=".*\\.log$",startFromEnd=true,recurseDepth=1)' '{value("category","B") => agentE2EChain("host1","host2")}'
exec map host3 agent1
exec map host3 agent1
7、还在熟悉中,有时数据会重复、传不上hadoop

     以下是 提取文件目录下的正则表达式:
     
 (?:^money)_\d+_\d+_\d{8}(?:\.log$)    形如:money_30_34_20110922.log  在配置中写成 (?:^money)_\\d+_\\d+_\\d{8}(?:\\.log$)

Flume分布式环境部署

唐半张 发表了文章 0 个评论 2338 次浏览 2015-09-29 10:45 来自相关话题

$wget http://cloud.github.com/downloads/cloudera/flume/flume-distribution-0.9.4-bin.tar.gz $tar -xzvf flume-distributio ...查看全部
$wget http://cloud.github.com/downloads/cloudera/flume/flume-distribution-0.9.4-bin.tar.gz
$tar -xzvf flume-distribution-0.9.4-bin.tar.gz
$cp -rf flume-distribution-0.9.4-bin /usr/local/flume
$vi /etc/profile #添加环境配置
export FLUME_HOME=/usr/local/flume
export PATH=.:$PATH::$FLUME_HOME/bin
$source /etc/profile

$flume #验证安装

Flume介绍

唐半张 发表了文章 0 个评论 1361 次浏览 2015-09-29 10:44 来自相关话题

Flume是一个分布式、可靠、和高可用的海量日志聚合的系统,支持在系统中定制各 ...查看全部
Flume是一个分布式、可靠、和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
设计目标:
(1) 可靠性
当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:end-to-end(收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。),Store on failure(这也是scribe采用的策略,当数据接收方crash时,将数据写到本地,待恢复后,继续发送),Best effort(数据发送到接收方后,不会进行确认)。
(2) 可扩展性
Flume采用了三层架构,分别为agent,collector和storage,每一层均可以水平扩展。其中,所有agent和collector由master统一管理,这使得系统容易监控和维护,且master允许有多个(使用ZooKeeper进行管理和负载均衡),这就避免了单点故障问题。
(3) 可管理性
所有agent和colletor由master统一管理,这使得系统便于维护。多master情况,Flume利用ZooKeeper和gossip,保证动态配置数据的一致性。用户可以在master上查看各个数据源或者数据流执行情况,且可以对各个数据源配置和动态加载。Flume提供了web 和shell script command两种形式对数据流进行管理。
(4) 功能可扩展性
用户可以根据需要添加自己的agent,collector或者storage。此外,Flume自带了很多组件,包括各种agent(file, syslog等),collector和storage(file,HDFS等)。

Flume OG自带sink、source、collector

唐半张 发表了文章 0 个评论 1609 次浏览 2015-09-29 10:43 来自相关话题

# Flume OG自带Source console                      监听用户编辑历史和快捷 ...查看全部
# Flume OG自带Source
console                      监听用户编辑历史和快捷键输入,只在node_nowatch模式下可用
stdin                        监听标准输入,只在node_nowatch模式下可用,每行将作为一个event source
rpcSource(port)              由rpc框架(thrift/avro)监听tcp端口
text("filename")             一次性读取一个文本,每行为一个event
tail("filename"[, startFromEnd=false])每行为一个event。监听文件尾部的追加行,如果startFromEnd为true,tail将从文件尾读取,如果为false,tail将从文件开始读取全部数据
multitail("filename"[, file2 [,file3… ] ]) 同上,同时监听多个文件的末尾
tailDir("dirname"[, fileregex=".*"[, startFromEnd=false[, recurseDepth=0]]])  监听目录中的文件末尾,使用正则去选定需要监听的文件(不包含目录),recurseDepth为递归监听其下子目录的深度

分布式日志系统类比

唐半张 发表了文章 0 个评论 1891 次浏览 2015-09-28 10:27 来自相关话题

背景 Google、Facebook、Amazon等互联网巨头对于数据的创造性使用,创造出了很多辉煌的商业产品。如Amazon创造出的新的推荐模式:”查询此 ...查看全部
背景
Google、Facebook、Amazon等互联网巨头对于数据的创造性使用,创造出了很多辉煌的商业产品。如Amazon创造出的新的推荐模式:”查询此商品的顾客也查询了。。。。。”、“看过此商品的后的顾客买的其他商品有。。。。。。”、“购买了您最近浏览过的商品的顾客同时购买了。。。。。。”,还有LinkedIn公司创造的“你可能认识的人”。这些机制无不是建立在大量数据分析的基础上。
 
分布式日志方案

作为互联网公司,每天庞大的日志数据将是一笔宝贵的财富,对大规模日志数据进行采集、追踪、处理将是非常有收益的。一些开源项目的出现,也极快地促进了这个方面工作的进展。

本文针对据目前流行的日志框架,主要根据互联网的一些资料,结合自己的一些了解,对一些关键因素进行横向的对比,比便对技术选型提供参考。

关键要素

配置(Configuration)
客服端如何发现服务端?(不好的方式:固定配置。好的方式:基于zookeeper的pub-sub)
怎么配置消息路由?(点对点;广播;通过brokers路由消息(kafka))

容错(Failure and Recovery)
当集群中的一个节点出错,系统是如何什么方式进行buffer的?
系统是否保证数据传输过程的高可用,一旦发送,即保证到达。
系统是否支持树状集群。

维护管理(Maintenance)
本地的日志是否可以被配置成持久化,都提供了哪些支持?
如果日志数据被发送,消息是否是有序的?

Kafka

Kafka是一个大型分布式日志框架,实际更是消息中间件(类似RabbitMQ,ActiveMQ,etc)。但是他不同于一般的消息中间件,不遵从消息队列的协议,一般消息中间件的协议里面消息是不能被消费者删除掉的(或者标记成删除)。

体系架构:
 


Kafka设计的目标:高吞吐量

数据被存储在OS pagecache,这使得消息的数据存储不能可能造成out of memory。Kafka
宣称他们改进了Varnish的pagecache-centric design。消息在brokers上被保存在文件上并建索引。消息根据到达的时间戳是有序的, 不产生二次索引,这样就可以很容易进行顺序文件访问和高效的磁盘扫描,即便是在数据量很大的情况下。文件通过Java中的 FileChannel.transferTo发送,操作系统层面是sendFile(),这样避免了额外的数据考虑。这个号称“Zero-copy”的机制比直接存内存的消息队列性能好上很多,具体原因可以看主创团队如何解释。

消息的状态维护是消费者自己负责的,通过Zookeeper来协调一致性。负载均衡和分布式消息的分区也是Zookeeper来实现的。这意味着,所有消息随机的分布在各个broker上,每个broker都注册到Zooker上。每个消息的生产者可以通过broker列表来选择一个broker去发送消息。Kafka在producer和broker层都不提供保障机制。消费端负责保存消息状态。但是Broker可以保留缓存一个固定时间段的消息。比如LinkderIn保留一个星期的数据在每个broker上,这意味着如果消费者fail,那么在一个星期内,这个消费者如果重启,它能够衔接上原来的顺序接着消费。


Flume

Flume是一个分布式、可靠、高可用的海量日志聚合系统,支持在系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据的简单处理,并写到各种数据接收方的能力。
Flume 在0.9.x and 1.x之间有较大的架构调整,1.x版本之后的改称Flume NG,0.9.x的称为Flume OG。New和Old的分界岭如下。

体系架构:
 
 

                           Flume NG 体系架构
 
 

                           Flume OG 体系架构

Flume NG在之前版本的基础上进行了重构和精简,去除了Zookeeper对于集中式配置管理、负载均衡和集群管理的支持,而专注于对数据流的采集和传输。对于每个服务器Node的配置,都需要自行在Node上配置。没法说这样的简化是好是坏,需求不一样,评价自然不同。因为Flume OG已经不再进行版本更新,所以后面讨论的Flume都是指Flume NG。

Flume Age是一个运行在JVM中的进程,主要任务是把Event(消息)从外部的一个source开始流向到下一个结点。Flune NG 有一个“数据流”的概念。

数据源(source):消费Events消息,并把消息传递出去。比如一个Avro的source能够接收来自Avro客户端采集到的或者其他Agent的sink发送过来的Event消息。当source接收到Event,它把消息保存到一个或者多个channel中,直到消费者通过sink来消费Event。sink把消息从channel中移出,并放到外部的存储(如HDFS,通过HDFS sink)或者是把消息推向另外一个Flume Agent的source。

Flume提供了各种source的实现,包括Avro Source、Exce Source、Spooling Directory Source、NetCat Source、Syslog Source、Syslog TCP Source、Syslog UDP Source、HTTP Source、HDFS Source,etc。
Flume也提供了各种sink的实现,包括HDFS sink、Logger sink、Avro sink、File Roll sink、Null sink、HBase sink,etc。
Flume对于Channel,则提供了Memory Channel、JDBC Chanel、File Channel,etc。


Chuwa

Chukwa是Hadoop的一个子项目,致力于大规模日志收集和分析。Chukwa是建立在Hadoop分布式文件系统(HDFS)和MapReduce框架之上,并继承了Hadoop的可扩展性和鲁棒性。Chukwa还包括一个灵活,功能强大的工具包,用于显示监测和分析结果,以充分利用此收集的数据。

体系架构:



其中主要的部件为: 
1. Agent:负责采集最原始的数据,并发送给collectors。Agent添加了“watchdog”机制,一旦agent出现故障,会自动重启终止的数据采集进程,防止数据源的数据丢失。
2. Adaptor: 直接采集数据的接口和工具,chukwa对以下常见的数据来源包括命令行输出、log 文件和 httpSender等提供了实现。一个 agent 可以管理多个 adaptor 的数据采集。
3. Collectors:负责收集 agents 收送来的数据,并定时写入集群中。Collector负责把大量小文件合并成少量大文件,再写入集群,以发挥hadoop在处理少量大文件上的优势。Collertor层实现了负载均衡,agent随机从列表中选择collertor来发送数据。
4. map/reduce jobs:定时启动,负责把集群中的数据分类、排序、去重和合并 
5. HICC:负责数据的展示。


Scribe

Scribe是facebook开源的日志收集系统,在facebook内部已经得到大量的应用。 Scribe是基于一个使用非阻断C++服务器的thrift服务的实现。它能够从各种日志源上收集日志,存储到
一个中央存储系统 (可以是NFS,分布式文件系统等)上,以便于进行集中统计分析处理。它为日志的“分布式收集,统一处理”提供了一个可扩展的,高容错的方案。

体系架构:


Scribe从各种数据源上收集数据,放到一个共享队列上,然后push到后端的中央存储系上。当中央存储系统出现故障时,scribe可以暂时把日志写到本地文件中,待中央存储系统恢复性能后,scribe把本地日志续传到中央存储系统上。

(1) scribe agent
scribe agent实际上是一个thrift client。 向scribe发送数据的唯一方法是使用thrift client,scribe内部定义了一个thrift接口,用户使用该接口将数据发送给server。
(2) scribe
scribe接收到thrift client发送过来的数据,根据配置文件,将不同topic的数据发送给不同的对象。scribe提供了各种各样的store,如 file, HDFS等,scribe可将数据加载到这些store中。