##介绍
可以数据采集端,利用采集设备的能力对数据进行非业务类 进行实时的 ETL 处理;适用于 hadoop 大数据生态的各种通道环境;特色是采用 groovy 作为规则语言,能够完成各种条件的处理。
flume 全能版本拦截器 flume-rule-interceptor. groovy 作为规则dsl,通过groovy 脚本编写逻辑,支持JSON 格式函数,脚本动态加载,灰度发布。
业务场景1:RuleSearchAndReplaceInterceptor, 互联网算进行数据传输的安全,通过拦截器进行加密解密;官方原有的正则不能实现此功能。
业务场景2:RuleFilteringInterceptor, 数据条件过滤,可以通过groovy 脚本进行条件过滤,非常灵活;官方原有的正则不支持条件。
业务场景3:RuleSearchAndReplaceInterceptor,数据格式变更,可以通过groovy 脚本进行数据格式转换;官方可以通过正则完成,效率较低。
业务场景3:RuleSearchAndReplaceInterceptor,定制head 属性,可以通过groovy 脚本配置head 属性;官方配置较为复杂,不能支持灵活业务。
[new]业务场景4:支持内容过滤;支持将内容转换为 redis-lua 支持的脚本,支持 flume-redis 组件进行数据处理。
安装:
打包rule-interceptor-1.0-SNAPSHOT.jar拷贝到flume的plugins.d/flume-interceptor/lib目录
拷贝groovy-all-2.4.7.jar到flume的plugins.d/flume-interceptor/libext目录
a1.sources.r1.interceptors.i2.type = com.supermy.flume.interceptor.RuleFilteringInterceptor$Builder
a1.sources.r1.interceptors.i2.rule = /etc/flume/conf/g-filter.groovy
a1.sources.r1.interceptors.i2.ruleName = filterGroovy
a1.sources.r1.interceptors.i3.type = com.supermy.flume.interceptor.RuleSearchAndReplaceInterceptor$Builder
a1.sources.r1.interceptors.i3.searchReplaceDsl = /etc/flume/conf/g-search-replace.groovy
a1.sources.r1.interceptors.i3.searchReplaceKey = searchReplaceGroovy
过滤脚本,可以使用head and body 的数据作为条件 判定词条数据是否过滤
//#当它的值为true 的时候,过滤掉匹配到当前正则表达式的一行
//#当它的值为false的时候,就接受匹配到正则表达式的一行
println "netuser filter"
//println head
//println body
body = "20170621162925,113.225.23.151,test_10056368,1"
//body = "20170621162925,113.225.23.152,test_10056368,3"
def split = body.split(",")
println split.size()
if(split.size()<4){
//数据不合格过滤
return false;
}
def type = split[3]
println type.getClass()
println type.substring(0,1) == '1'
//println type.toInteger() == 1
if (type.substring(0,1) == '1' || type.substring(0,1) == '2') {
return true; //不过滤
} else {
return false; //过滤
}
替换脚本,可以更改head and body 的数据,适配不同的业务场景,脚本支持动态更新;
import com.supermy.flume.interceptor.*
import javax.crypto.Cipher;
import javax.crypto.spec.SecretKeySpec;
println head
println body
body=body.replace('a','aaa')
head["newhead"]='abcd'
String text = "Body 的数据 , I Love BONC"
//
def key = new SecretKeySpec("123456789987654321".bytes, "AES")
def c = Cipher.getInstance("AES")
//加密
c.init(Cipher.ENCRYPT_MODE, key)
e_text = new String(Hex.encodeHex(c.doFinal(text.getBytes("UTF-8"))))
//解密
c.init(Cipher.DECRYPT_MODE, key)
text1 = new String(c.doFinal(Hex.decodeHex(e_text.toCharArray())))
println text
println e_text
println text1
def resultMap = [:]
//加密数据,用于互联网数据传输
resultMap["head"]=head
resultMap["body"]=body
return resultMap
import com.google.gson.Gson
//数据加工为 lua 脚本,插入到 redis
println "netuser redis script 准备"
//println head
//println body
//body = "20170621162925,113.225.23.151,test_10056368,1"
body = "20170621162925,113.225.23.152,test_10056368,2"
// eval "return redis.call('ZADD','KEYS[1]',ARGV[1],ARGV[2])" 1 keyset 123 u123
def split = body.split(",")
def type = split[3]
if (type.substring(0,1) == '1') {
split[2]=split[2]+"@Start";
} else {
split[2]=split[2]+"@End";
}
Gson gson = new Gson();
Map full= new HashMap();
full.put("script","return redis.call('ZADD',KEYS[2],KEYS[1],KEYS[3])");
full.put("args",new ArrayList());
full.put("keys",split);
String json = gson.toJson(full);
println json
Map m=gson.fromJson(json, HashMap.class);
println m
//StringBuffer sb = new StringBuffer("return redis.call('ZADD','");
//
//sb.append(split[1]).append("',").append(split[0]).append(",'").append(split[2]);
//if (type.substring(0,1) == '1') {
// sb.append("@Start'");
//} else {
// sb.append("@End'");
//}
//sb.append(")");
//println sb
def resultMap = [:]
resultMap["head"] = head
resultMap["body"] = json
return resultMap
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。
1. 开源生态
2. 协作、人、软件
3. 评估模型