当前位置: 首页 / 技术干货 / 正文
数据采集工具之Flume 的拦截器

2023-03-15

拦截器    interceptor 事件

  在Flume运行过程中 ,Flume有能力在运行阶段修改/删除Event,这是通过拦截器(Interceptors)来实现的。拦截器有下面几个特点:

  拦截器需要实现org.apache.flume.interceptor.Interceptor接口。

  拦截器可以修改或删除事件基于开发者在选择器中选择的任何条件。

  拦截器采用了责任链模式,多个拦截器可以按指定顺序拦截。

  一个拦截器返回的事件列表被传递给链中的下一个拦截器。

  如果一个拦截器需要删除事件,它只需要在返回的事件集中不包含要删除的事件即可。

  一、系统内置拦截器

  Timestamp Interceptor :时间戳拦截器,将当前时间戳(毫秒)加入到events header中,key名字为:timestamp,值为当前时间戳。用的不是很多

  Host Interceptor:主机名拦截器。将运行Flume agent的主机名或者IP地址加入到events header中,key名字为:host(也可自定义)

  Static Interceptor:静态拦截器,用于在events header中加入一组静态的key和value。

  二、内置拦截器的使用

  2.1. Timestamp+HTTP+File+HDFS

  通过时间拦截器,数据源为HTTP,传送的通道模式是FileChannel,最后输出的目的地为HDFS

  采集方案

  a1.sources = r1

  a1.channels = c1

  a1.sinks = s1

  a1.sources.r1.type=http

  a1.sources.r1.bind = qianfeng01

  a1.sources.r1.port = 6666

  a1.sources.r1.handler = org.apache.flume.source.http.JSONHandler

  a1.sources.r1.handler.nickname = JSON props

  a1.sources.r1.interceptors=i1 i2 i3

  a1.sources.r1.interceptors.i1.type=timestamp

  #如果拦截器中已经有了时间戳,直接替换成现在的

  a1.sources.r1.interceptors.i1.preserveExisting=false

  a1.sources.r1.interceptors.i2.type=host

  a1.sources.r1.interceptors.i2.preserveExisting=false

  a1.sources.r1.interceptors.i2.useIP=true

  a1.sources.r1.interceptors.i2.hostHeader=hostname

  a1.sources.r1.interceptors.i3.type=static

  a1.sources.r1.interceptors.i3.preserveExisting=false

  a1.sources.r1.interceptors.i3.key=hn

  a1.sources.r1.interceptors.i3.value=qianfeng01

  a1.channels.c1.type=memory

  a1.channels.c1.capacity=1000

  a1.channels.c1.transactionCapacity=100

  a1.channels.c1.keep-alive=3

  a1.channels.c1.byteCapacityBufferPercentage=20

  a1.channels.c1.byteCapacity=800000

  a1.sinks.s1.type=hdfs

  a1.sinks.s1.hdfs.path=hdfs://qianfeng01:8020/flume/%Y/%m/%d/%H%M

  a1.sinks.s1.hdfs.filePrefix=%{hostname}

  a1.sinks.s1.hdfs.fileSuffix=.log

  a1.sinks.s1.hdfs.inUseSuffix=.tmp

  a1.sinks.s1.hdfs.rollInterval=60

  a1.sinks.s1.hdfs.rollSize=1024

  a1.sinks.s1.hdfs.rollCount=10

  a1.sinks.s1.hdfs.idleTimeout=0

  a1.sinks.s1.hdfs.batchSize=100

  a1.sinks.s1.hdfs.fileType=DataStream

  a1.sinks.s1.hdfs.writeFormat=Text

  a1.sinks.s1.hdfs.round=true

  a1.sinks.s1.hdfs.roundValue=1

  a1.sinks.s1.hdfs.roundUnit=second

  a1.sinks.s1.hdfs.useLocalTimeStamp=true

  a1.sources.r1.channels=c1

  a1.sinks.s1.channel=c1

  启动 Agent

  [root@qianfeng01 flumeconf]# flume-ng agent -c ../conf -f ./ts.conf -n a1 -Dflume.root.logger=INFO,console

  测试数据

  [root@qianfeng01 ~]# curl -X POST -d '[{"headers":{"hn":"qianfeng01","pwd":"123456"},"body":"this is my content qianfeng01"}]' http://qianfeng01:6666

  三、自定义拦截器

  为了提高Flume的扩展性,用户可以自己定义一个拦截器, 对每一组的item_type和active_time都过滤出相应的HOST和USERID

  处理数据样例:

  log='{

  "host":"www.baidu.com",

  "user_id":"13755569427",

  "items":[

  {

  "item_type":"eat",

  "active_time":156234

  },

  {

  "item_type":"car",

  "active_time":156233

  }

  ]

  }'

  结果样例:

  {"active_time":156234,"user_id":"13755569427","item_type":"eat","host":"www.baidu.com"}

  3.1. pom.xml

<dependencies>
  <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
  <dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.9.0</version>
  </dependency>
  <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
  <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.48</version>
  </dependency>
</dependencies>

  3.2. 代码实现

  /**

  * @Author 千锋大数据教学团队

  * @Company 千锋好程序员大数据

  * @Description 自定义拦截器:对每一组的item_type和active_time都过滤出相应的HOST和USERID

  */

  public class MyInterceptor implements Interceptor {

  @Override

  public void initialize() {

  //初始化方法,写拦截器初始化时的业务

  }

  @Override

  public void close() {

  //关闭方法,写拦截器关闭时的代码

  }

  /**

  * 解析单条event

  * @param event

  * @return

  */

  @Override

  public Event intercept(Event event) {

  //输入

  String inputeBody=null;

  //输出

  byte[] outputBoday=null;

  //解析---这里定义对单条Event处理规则

  try {

  inputeBody=new String(event.getBody(), Charsets.UTF_8);

  ArrayListtemp = new ArrayList<>();

  JSONObject bodyObj = JSON.parseObject(inputeBody);

  //1)公共字段

  String host = bodyObj.getString("host");

  String user_id = bodyObj.getString("user_id");

  JSONArray data = bodyObj.getJSONArray("items");

  //2)Json数组=>every json obj

  for (Object item : data) {

  JSONObject itemObj = JSON.parseObject(item.toString());

  HashMap<string, object=""> fields = new HashMap<>();

  fields.put("host",host);

  fields.put("user_id",user_id);

  fields.put("item_type",itemObj.getString("item_type"));

  fields.put("active_time",itemObj.getLongValue("active_time"));

  temp.add(new JSONObject(fields).toJSONString());

  }

  //3)Json obj 拼接

  outputBoday=String.join("\n",temp).getBytes();

  }catch (Exception e){

  System.out.println("输入数据:"+inputeBody);

  e.printStackTrace();

  }

  event.setBody(outputBoday);

  return event;

  }

  /**

  * 解析一批event

  * @param events

  * @return

  */

  @Override

  public Listintercept(Listevents) {

  //输出---一批Event

  ArrayListresult = new ArrayList<>();

  //输入---一批Event

  try{

  for (Event event : events) {

  //一条条解析

  Event interceptedEvent = intercept(event);

  byte[] interceptedEventBody = interceptedEvent.getBody();

  if(interceptedEventBody.length!=0){

  String multiEvent = new String(interceptedEventBody, Charsets.UTF_8);

  String[] multiEventArr = multiEvent.split("\n");

  for (String needEvent : multiEventArr) {

  SimpleEvent simpleEvent = new SimpleEvent();

  simpleEvent.setBody(needEvent.getBytes());

  result.add(simpleEvent);

  }

  }

  }

  }catch (Exception e){

  e.printStackTrace();

  }

  return result;

  }

  /**

  * 实现内部类接口

  */

  public static class Builder implements Interceptor.Builder{

  @Override

  public Interceptor build() {

  return new MyInterceptor();

  }

  @Override

  public void configure(Context context) {

  }

  }

  }

  3.3. 打包上传

  使用maven将拦截器打包,然后把此包和依赖的fastjson一起上传到Flume lib目录下

  3.4. 采集方案制定

  a1.sources = s1

  a1.channels = c1

  a1.sinks = r1

  a1.sources.s1.type = TAILDIR

  #文件以JSON格式记录inode、绝对路径和每个跟踪文件的最后位置

  a1.sources.s1.positionFile = /root/flume/taildir_position.json

  #以空格分隔的文件组列表。每个文件组表示要跟踪的一组文件

  a1.sources.s1.filegroups = f1

  #文件组的绝对路径

  a1.sources.s1.filegroups.f1=/root/flume/data/.*log

  #是否添加存储绝对路径文件名的标题

  a1.sources.s1.fileHeader = true

  #使用自定义拦截器

  a1.sources.s1.interceptors = i1

  a1.sources.s1.interceptors.i1.type = flume.MyInterceptor$Builder

  a1.channels.c1.type = file

  a1.channels.c1.dataDirs = /root/flume/filechannle/dataDirs

  a1.channels.c1.checkpointDir = /root/flume/filechannle/checkpointDir

  a1.channels.c1.capacity = 1000

  a1.channels.c1.transactionCapacity = 100

  a1.sinks.r1.type = hdfs

  a1.sinks.r1.hdfs.path = hdfs://qianfeng01:8020/flume/spooldir

  a1.sinks.r1.hdfs.filePrefix =

  a1.sinks.r1.hdfs.round = true

  a1.sinks.r1.hdfs.roundValue = 10

  a1.sinks.r1.hdfs.roundUnit = minute

  a1.sinks.r1.hdfs.fileSuffix= .log

  a1.sinks.r1.hdfs.rollInterval=60

  a1.sinks.r1.hdfs.fileType=DataStream

  a1.sinks.r1.hdfs.writeFormat=Text

  a1.sources.s1.channels = c1

  a1.sinks.r1.channel = c1

  3.5. 启动 Agent

  [root@qianfeng01 flumeconf]# flume-ng agent -c ../conf/ -f ./mytest.conf -n a1 -Dflume.root.logger=INFO,console

  3.6. 测试数据

  [root@qianfeng01 ~]# vi my.sh

  #!/bin/bash

  log='{

  "host":"www.baidu.com",

  "user_id":"13755569427",

  "items":[

  {

  "item_type":"eat",

  "active_time":156234

  },

  {

  "item_type":"car",

  "active_time":156233

  }

  ]

  }'

  echo $log>> /root/flume/data/test.log

  [root@qianfeng01 ~]# bash my.sh

  执行后我们希望得到是数据格式:

  {"active_time":156234,"user_id":"13755569427","item_type":"eat","host":"www.baidu.com"}

  {"active_time":156233,"user_id":"13755569427","item_type":"car","host":"www.baidu.com"}


好程序员公众号

  • · 剖析行业发展趋势
  • · 汇聚企业项目源码

好程序员开班动态

More+
  • HTML5大前端 <高端班>

    开班时间:2021-04-12(深圳)

    开班盛况

    开班时间:2021-05-17(北京)

    开班盛况
  • 大数据+人工智能 <高端班>

    开班时间:2021-03-22(杭州)

    开班盛况

    开班时间:2021-04-26(北京)

    开班盛况
  • JavaEE分布式开发 <高端班>

    开班时间:2021-05-10(北京)

    开班盛况

    开班时间:2021-02-22(北京)

    开班盛况
  • Python人工智能+数据分析 <高端班>

    开班时间:2021-07-12(北京)

    预约报名

    开班时间:2020-09-21(上海)

    开班盛况
  • 云计算开发 <高端班>

    开班时间:2021-07-12(北京)

    预约报名

    开班时间:2019-07-22(北京)

    开班盛况
IT培训IT培训
在线咨询
IT培训IT培训
试听
IT培训IT培训
入学教程
IT培训IT培训
立即报名
IT培训

Copyright 2011-2023 北京千锋互联科技有限公司 .All Right 京ICP备12003911号-5 京公网安备 11010802035720号