当前位置: 首页 / 技术干货 / 正文
数据采集工具之Flume的选择器

2023-03-16

channel 事件 写入   

数据采集工具之Flume的选择器

  说明

  Flume中的Channel选择器作用于source阶段 ,是决定Source接受的特定事件写入到哪个Channel的组件,他们告诉Channel处理器,然后由其将事件写入到Channel。

  Agent中各个组件的交互

  由于Flume不是两阶段提交,事件被写入到一个Channel,然后事件在写入下一个Channel之前提交,如果写入一个Channel出现异常,那么之前已经写入到其他Channel的相同事件不能被回滚。当这样的异常发生时,Channel处理器抛出ChannelException异常,事务失败,如果Source试图再次写入相同的事件(大多数情况下,会再次写入,只有Syslog,Exec等Source不能重试,因为没有办法生成相同的数据),重复的事件将写入到Channel中,而先前的提交是成功的,这样在Flume中就发生了重复。

  Channel选择器的配置是通过Channel处理器完成的,Channel选择器可以指定一组Channel是必须的,另一组的可选的。

  Flume分类两种选择器,如果Source配置中没有指定选择器,那么会自动使用复制Channel选择器.

  ●replicating:该选择器复制每个事件到通过Source的Channels参数指定的所有Channel中。

  ●multiplexing:是一种专门用于动态路由事件的Channel选择器,通过选择事件应该写入到哪个Channel,基于一个特定的事件头的值进行路由

  案例演示:replicating selector

  配置方案

  [root@qianfeng01 flumeconf]# vi rep.conf

  a1.sources = r1

  a1.channels = c1 c2

  a1.sinks = s1 s2

  a1.sources.r1.type=syslogtcp

  a1.sources.r1.host = qianfeng01

  a1.sources.r1.port = 6666

  a1.sources.r1.selector.type=replicating

  a1.channels.c1.type=memory

  a1.channels.c1.capacity=1000

  a1.channels.c1.transactionCapacity=100

  a1.channels.c1.keep-alive=3

  a1.channels.c1.byteCapacityBufferPercentage=20

  a1.channels.c1.byteCapacity=800000

  a1.channels.c2.type=memory

  a1.channels.c2.capacity=1000

  a1.channels.c2.transactionCapacity=100

  a1.sinks.s1.type=hdfs

  a1.sinks.s1.hdfs.path=hdfs://qianfeng01:8020/flume/%Y/%m/%d/rep

  a1.sinks.s1.hdfs.filePrefix=s1sink

  a1.sinks.s1.hdfs.fileSuffix=.log

  a1.sinks.s1.hdfs.inUseSuffix=.tmp

  a1.sinks.s1.hdfs.rollInterval=60

  a1.sinks.s1.hdfs.rollSize=1024

  a1.sinks.s1.hdfs.rollCount=10

  a1.sinks.s1.hdfs.idleTimeout=0

  a1.sinks.s1.hdfs.batchSize=100

  a1.sinks.s1.hdfs.fileType=DataStream

  a1.sinks.s1.hdfs.writeFormat=Text

  a1.sinks.s1.hdfs.round=true

  a1.sinks.s1.hdfs.roundValue=1

  a1.sinks.s1.hdfs.roundUnit=second

  a1.sinks.s1.hdfs.useLocalTimeStamp=true

  a1.sinks.s2.type=hdfs

  a1.sinks.s2.hdfs.path=hdfs://qianfeng01:8020/flume/%Y/%m/%d/rep

  a1.sinks.s2.hdfs.filePrefix=s2sink

  a1.sinks.s2.hdfs.fileSuffix=.log

  a1.sinks.s2.hdfs.inUseSuffix=.tmp

  a1.sinks.s2.hdfs.rollInterval=60

  a1.sinks.s2.hdfs.rollSize=1024

  a1.sinks.s2.hdfs.rollCount=10

  a1.sinks.s2.hdfs.idleTimeout=0

  a1.sinks.s2.hdfs.batchSize=100

  a1.sinks.s2.hdfs.fileType=DataStream

  a1.sinks.s2.hdfs.writeFormat=Text

  a1.sinks.s2.hdfs.round=true

  a1.sinks.s2.hdfs.roundValue=1

  a1.sinks.s2.hdfs.roundUnit=second

  a1.sinks.s2.hdfs.useLocalTimeStamp=true

  a1.sources.r1.channels=c1 c2

  a1.sinks.s1.channel=c1

  a1.sinks.s2.channel=c2

  启动agent的服务

  [root@qianfeng01 flumeconf]# flume-ng agent -c ../conf -f ./rep.conf -n a1 -Dflume.root.logger=INFO,console

  测试

  [root@qianfeng01 ~]# echo "hello world hello qianfeng" | nc qianfeng01 6666

  案例演示:Multiplexing selector

  配置方案

  [root@qianfeng01 flumeconf]# vi mul.conf

  a1.sources = r1

  a1.channels = c1 c2

  a1.sinks = s1 s2

  a1.sources.r1.type=http

  a1.sources.r1.bind = qianfeng01

  a1.sources.r1.port = 6666

  a1.sources.r1.selector.type=multiplexing

  a1.sources.r1.selector.header = state

  a1.sources.r1.selector.mapping.USER = c1

  a1.sources.r1.selector.mapping.ORDER = c2

  a1.sources.r1.selector.default = c1

  a1.channels.c1.type=memory

  a1.channels.c1.capacity=1000

  a1.channels.c1.transactionCapacity=100

  a1.channels.c1.keep-alive=3

  a1.channels.c1.byteCapacityBufferPercentage=20

  a1.channels.c1.byteCapacity=800000

  a1.channels.c2.type=memory

  a1.channels.c2.capacity=1000

  a1.channels.c2.transactionCapacity=100

  a1.sinks.s1.type=hdfs

  a1.sinks.s1.hdfs.path=hdfs://qianfeng01:8020/flume/%Y/%m/%d/mul

  a1.sinks.s1.hdfs.filePrefix=s1sink

  a1.sinks.s1.hdfs.fileSuffix=.log

  a1.sinks.s1.hdfs.inUseSuffix=.tmp

  a1.sinks.s1.hdfs.rollInterval=60

  a1.sinks.s1.hdfs.rollSize=1024

  a1.sinks.s1.hdfs.rollCount=10

  a1.sinks.s1.hdfs.idleTimeout=0

  a1.sinks.s1.hdfs.batchSize=100

  a1.sinks.s1.hdfs.fileType=DataStream

  a1.sinks.s1.hdfs.writeFormat=Text

  a1.sinks.s1.hdfs.round=true

  a1.sinks.s1.hdfs.roundValue=1

  a1.sinks.s1.hdfs.roundUnit=second

  a1.sinks.s1.hdfs.useLocalTimeStamp=true

  a1.sinks.s2.type=hdfs

  a1.sinks.s2.hdfs.path=hdfs://qianfeng01:8020/flume/%Y/%m/%d/mul

  a1.sinks.s2.hdfs.filePrefix=s2sink

  a1.sinks.s2.hdfs.fileSuffix=.log

  a1.sinks.s2.hdfs.inUseSuffix=.tmp

  a1.sinks.s2.hdfs.rollInterval=60

  a1.sinks.s2.hdfs.rollSize=1024

  a1.sinks.s2.hdfs.rollCount=10

  a1.sinks.s2.hdfs.idleTimeout=0

  a1.sinks.s2.hdfs.batchSize=100

  a1.sinks.s2.hdfs.fileType=DataStream

  a1.sinks.s2.hdfs.writeFormat=Text

  a1.sinks.s2.hdfs.round=true

  a1.sinks.s2.hdfs.roundValue=1

  a1.sinks.s2.hdfs.roundUnit=second

  a1.sinks.s2.hdfs.useLocalTimeStamp=true

  a1.sources.r1.channels=c1 c2

  a1.sinks.s1.channel=c1

  a1.sinks.s2.channel=c2

  启动Agent的服务

  [root@qianfeng01 flumeconf]# flume-ng agent -c ../conf -f ./mul.conf -n a1 -Dflume.root.logger=INFO,console

  测试

  [root@qianfeng01 ~]# curl -X POST -d '[{"headers":{"state":"ORDER"},"body":"this my multiplex to c2"}]' http://qianfeng01:6666

  [root@qianfeng01 ~]# curl -X POST -d '[{"headers":{"state":"ORDER"},"body":"this is my content"}]' http://qianfeng01:6666


好程序员公众号

  • · 剖析行业发展趋势
  • · 汇聚企业项目源码

好程序员开班动态

More+
  • HTML5大前端 <高端班>

    开班时间:2021-04-12(深圳)

    开班盛况

    开班时间:2021-05-17(北京)

    开班盛况
  • 大数据+人工智能 <高端班>

    开班时间:2021-03-22(杭州)

    开班盛况

    开班时间:2021-04-26(北京)

    开班盛况
  • JavaEE分布式开发 <高端班>

    开班时间:2021-05-10(北京)

    开班盛况

    开班时间:2021-02-22(北京)

    开班盛况
  • Python人工智能+数据分析 <高端班>

    开班时间:2021-07-12(北京)

    预约报名

    开班时间:2020-09-21(上海)

    开班盛况
  • 云计算开发 <高端班>

    开班时间:2021-07-12(北京)

    预约报名

    开班时间:2019-07-22(北京)

    开班盛况
IT培训IT培训
在线咨询
IT培训IT培训
试听
IT培训IT培训
入学教程
IT培训IT培训
立即报名
IT培训

Copyright 2011-2023 北京千锋互联科技有限公司 .All Right 京ICP备12003911号-5 京公网安备 11010802035720号