博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
flume列子回想
阅读量:6153 次
发布时间:2019-06-21

本文共 8190 字,大约阅读时间需要 27 分钟。

a1.sources = r1a1.sinks = s1a1.channels = c1a1.sources.r1.type = avroa1.sources.r1.bind =mastera1.sources.r1.port =8888a1.sinks.s1.type = loggera1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100a1.sources.r1.channels = c1a1.sinks.s1.channel = c1a1.sources.r1.intercpetors=i1a1.sources.r1.interceptors.i1.type=search_replacea1.sources.r1.interceptors.i1.searchPattern=(\\d{3})\\d{4}(\\d{4})a1.sources.r1.interceptors.i1.replaceString=$1xxxx$2

手机号码抹去中间四位

这个还需要java类去制造avro序列化的文件

package flumeTest;import java.nio.charset.Charset;import java.util.Date;import java.util.HashMap;import java.util.Map;import org.apache.flume.Event;import org.apache.flume.api.RpcClient;import org.apache.flume.api.RpcClientFactory;import org.apache.flume.event.EventBuilder;//链接avro的flume source 发送event 到flume agentpublic class FlumeClient {    private RpcClient flumeClient;    private String hostName;    private int port;        public FlumeClient(String hostname,int port){        this.hostName =hostname;        this.port=port;        this.flumeClient=RpcClientFactory.getDefaultInstance(hostname, port);    }    //把字符串消息发送event到avro source    public void sendEvent(String msg){        Map
headers =new HashMap
(); headers.put("timestamp", String.valueOf(new Date().getTime())); //构建event Event event =EventBuilder.withBody(msg, Charset.forName("UTF-8"), headers); try{ flumeClient.append(event); }catch (Exception e) { e.printStackTrace(); flumeClient.close(); flumeClient=null; flumeClient=RpcClientFactory.getDefaultInstance(hostName, port); } } public void close(){ flumeClient.close(); } //这个类的作用就是向hostName的port端口输入Flume定义的RpcClient avro格式的内容 public static void main(String[] args) { FlumeClient flumeClient =new FlumeClient("master", 8888); String bMsg="fromjava-msg"; for(int i=0;i<100;i++){ flumeClient.sendEvent(bMsg+i); } flumeClient.close(); }}
package flumeTest;import java.util.Random;public class SendPhoneNo {    public static void main(String[] args) {        FlumeClient flumeClient = new FlumeClient("master", 8888);        Random random = new Random();        for (int i = 0; i < 100; i++) {            String phoneNo = "1" + random.nextInt(10) + random.nextInt(10) + random.nextInt(10) + random.nextInt(10)                    + random.nextInt(10) + random.nextInt(10) + random.nextInt(10) + random.nextInt(10)                    + random.nextInt(10) + random.nextInt(10);            flumeClient.sendEvent(phoneNo);                    }        flumeClient.close();    }    }

列子二

a1.sources=r1a1.channels=c1 c2 c3 c4a1.sinks=s1 s2 s3 s4a1.sources.r1.type=avroa1.sources.r1.bind=mastera1.sources.r1.port=8888a1.channels.c1.type=memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=100a1.channels.c2.type=memorya1.channels.c2.capacity=1000a1.channels.c2.transactionCapacity=100a1.channels.c3.type=memorya1.channels.c3.capacity=1000a1.channels.c3.transactionCapacity=100a1.channels.c4.type=memorya1.channels.c4.capacity=1000a1.channels.c4.transactionCapacity=100a1.sinks.s1.type=hdfsa1.sinks.s1.hdfs.path=/flumelog/henana1.sinks.s1.hdfs.fileSuffix=.loga1.sinks.s1.hdfs.filePrefix=test_loga1.sinks.s1.hdfs.rollInterval=0a1.sinks.s1.hdfs.rollSize=0a1.sinks.s1.hdfs.fileType=DataStreama1.sinks.s1.hdfs.writeFormat=Texta1.sinks/s1.hdfs.userLocalTimeStamp=truea1.sinks.s2.type=hdfsa1.sinks.s2.hdfs.path=/flumelog/hebeia1.sinks.s2.hdfs.fileSuffix=.loga1.sinks.s2.hdfs.filePrefix=test_loga1.sinks.s2.hdfs.rollInterval=0a1.sinks.s2.hdfs.rollSize=0a1.sinks.s2.hdfs.fileType=DataStreama1.sinks.s2.hdfs.writeFormat=Texta1.sinks/s2.hdfs.userLocalTimeStamp=true

这个conf是为了将events按照不同地区拦截写入到hdfs上的不同文件夹中 相当于按区归类处理source文件为avro所以写了一个java类来传送到master

package flumeTest;//agent的slector 为multiplexing//到event的header去匹配key为ptovince的value 然后发送到相应的channel中import java.nio.charset.Charset;import java.util.HashMap;import java.util.Map;import java.util.Random;import org.apache.flume.Event;import org.apache.flume.EventDeliveryException;import org.apache.flume.api.RpcClient;import org.apache.flume.api.RpcClientFactory;import org.apache.flume.event.EventBuilder;public class ForFanoutSelectorClient {    private RpcClient client;    private final String[] provinces={"henan","hebei","shanghai","shandong"};        private final Random random =new Random();        public ForFanoutSelectorClient(String hostname,int port){        this.client=RpcClientFactory.getDefaultInstance(hostname, port);    }    public Event getRandomEvent(String msg){        Map
headers=new HashMap
(); String province =provinces[random.nextInt(4)]; headers.put("province", province); Event result=EventBuilder.withBody(msg, Charset.forName("UTF-8"), headers); return result; } public void sendEvent(Event event){ try { client.append(event); } catch (EventDeliveryException e) { e.printStackTrace(); } } public void close(){ client.close(); } public static void main(String[] args) { ForFanoutSelectorClient fanoutSelectorClient =new ForFanoutSelectorClient("master", 8888); String msg ="peopleinfo_"; for(int i=0;i<300;i++){ Event event =fanoutSelectorClient.getRandomEvent(msg+i+"_"); fanoutSelectorClient.sendEvent(event); } fanoutSelectorClient.close(); }}

列子三

a1.sources = r1a1.sinks = s1a1.channels = c1a1.sources.r1.type = spooldira1.sources.r1.spoolDir=/opt/spooldirnewa1.sources.r1.fileHeader=truea1.sinks.s1.type = loggera1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100a1.sources.r1.channels = c1a1.sinks.s1.channel = c1a1.sources.r1.interceptor=i1a1.sources.r1.interceptor.i1.type=regex_filtera1.sources.r1.interceptor.i1.regex=^[a-z0-9]+([._\\-]*[a-z0-9])*@([a-z0-9]+[-a-z0-9]*[a-z0-9]+.){1,63}[a-z0-9]+$a1.sources.r1.interceptor.excludeEvents=false

删选出在spooldirnew文件里面有关邮箱的event时间 输出在logger中

a1.sources=r1a1.sinks=k1a1.channels=c1a1.sources.r1.type=spooldira1.sources.r1.spoolDir=/opt/spooldira1.sources.r1.fileHeader=truea1.sinks.k1.type=hdfsa1.sinks.k1.hdfs.path=/flumelog/%Y%m%da1.sinks.k1.hdfs.fileSuffix=.loga1.simks.k1.hdfs.rollInterval=0a1.sinks.k1.hdfs.roolSize=0a1.sinks.k1.hdfs.roolCount=100a1.sinks.k1.hdfs.fileType=DataStreama1.sinks.k1.hdfs.writeFormat=Texta1.sinks.k1.hdfs.useLocalTimeStamp=truea1.channels.c1.type=memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=100a1.sources.r1.channels=c1a1.sinks.k1.channel=c1

这个conf是用来动态分布采集到的数据的

动态即按照年月日创建文件夹在集群上 需要注意的一点事得写userLocalTimeStamp=true 不然flume无法参照时间做事

a1.sources = r1a1.sinks = s1a1.channels = c1a1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 44444a1.sinks.s1.type = loggera1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100a1.sources.r1.channels = c1a1.sinks.s1.channel = c1

采集netcat类型的数据以日志形式展示 (官网例子)

a1.sources=r1a1.sinkes=s1a1.channels=c1a1.sources.r1.type=avroa1.sources.r1.bind=mastera1.sources.r1.port=8888a1.channels.c1.type=memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=100a1.sinks.s1.type=loggera1.sources.r1.channels=c1a1.sinks.s1.channel=c1a1.sources.r1.interceptors=i1a1.sources.r1.interceptors.i1.type=host

host拦截器 之前总结过 就是在header中添加host Ip

a1.sources= r1a1.sinkes=s1 s2a1.channels=c1 c2a1.sources.r1.type=avroa1.sources.r1.bind=mastera1.sources.r1.port=8888a1.channels.c1.type=memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=100a1.sinks.s1.type=loggera1.sinks.s2.type=hdfsa1.sinks.s2.hdfs.path=/flumelog/%Y%m%da1.sinks.s2.hdfs.fileSuffix=.loga1.sinks.s2.hdfs.filePrefix=test_loga1.sinks.s2.hdfs.rollInterval=0a1.sinks.s2.hdfs.rollSize=0a1.sinks.s2.hdfs.fileType=DataStreama1.sinks.s2.hdfs.writeFormat=Texta1.sinks/s2.hdfs.userLocalTimeStamp=truea1.sources.r1.channels=c1 c2a1.sinks.s1.channel=c1a1.sinks.s2.channel=c2

多sinks和channels 一个用来输出到logger中 一个用于保存在hdfs上

转载地址:http://ypzfa.baihongyu.com/

你可能感兴趣的文章
并行程序设计学习心得1——并行计算机存储
查看>>
bulk
查看>>
js document.activeElement 获得焦点的元素
查看>>
C++ 迭代器运算
查看>>
【支持iOS11】UITableView左滑删除自定义 - 实现多选项并使用自定义图片
查看>>
JavaWeb学习笔记(十四)--JSP语法
查看>>
【算法笔记】多线程斐波那契数列
查看>>
java8函数式编程实例
查看>>
jqgrid滚动条宽度/列显示不全问题
查看>>
在mac OS10.10下安装 cocoapods遇到的一些问题
查看>>
angularjs表达式中的HTML内容,如何不转义,直接表现为html元素
查看>>
css技巧
查看>>
Tyvj 1728 普通平衡树
查看>>
javascript性能优化
查看>>
多路归并排序之败者树
查看>>
java连接MySql数据库
查看>>
转:Vue keep-alive实践总结
查看>>
深入python的set和dict
查看>>
C++ 11 lambda
查看>>
Android JSON数据解析
查看>>