10 Star 18 Fork 3

supermy / rule-interceptor

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

flume-interceptor-ex

Build Status

##介绍

可以数据采集端,利用采集设备的能力对数据进行非业务类 进行实时的 ETL 处理;适用于 hadoop 大数据生态的各种通道环境;特色是采用 groovy 作为规则语言,能够完成各种条件的处理。

flume 全能版本拦截器 flume-rule-interceptor. groovy 作为规则dsl,通过groovy 脚本编写逻辑,支持JSON 格式函数,脚本动态加载,灰度发布。

业务场景1:RuleSearchAndReplaceInterceptor, 互联网算进行数据传输的安全,通过拦截器进行加密解密;官方原有的正则不能实现此功能。

业务场景2:RuleFilteringInterceptor, 数据条件过滤,可以通过groovy 脚本进行条件过滤,非常灵活;官方原有的正则不支持条件。

业务场景3:RuleSearchAndReplaceInterceptor,数据格式变更,可以通过groovy 脚本进行数据格式转换;官方可以通过正则完成,效率较低。

业务场景3:RuleSearchAndReplaceInterceptor,定制head 属性,可以通过groovy 脚本配置head 属性;官方配置较为复杂,不能支持灵活业务。

[new]业务场景4:支持内容过滤;支持将内容转换为 redis-lua 支持的脚本,支持 flume-redis 组件进行数据处理。

Install

安装:

打包rule-interceptor-1.0-SNAPSHOT.jar拷贝到flume的plugins.d/flume-interceptor/lib目录
拷贝groovy-all-2.4.7.jar到flume的plugins.d/flume-interceptor/libext目录

Usage

a1.sources.r1.interceptors.i2.type = com.supermy.flume.interceptor.RuleFilteringInterceptor$Builder
a1.sources.r1.interceptors.i2.rule = /etc/flume/conf/g-filter.groovy
a1.sources.r1.interceptors.i2.ruleName = filterGroovy

a1.sources.r1.interceptors.i3.type = com.supermy.flume.interceptor.RuleSearchAndReplaceInterceptor$Builder
a1.sources.r1.interceptors.i3.searchReplaceDsl = /etc/flume/conf/g-search-replace.groovy
a1.sources.r1.interceptors.i3.searchReplaceKey = searchReplaceGroovy

g-filter.groovy

过滤脚本,可以使用head and body 的数据作为条件 判定词条数据是否过滤


    //#当它的值为true 的时候,过滤掉匹配到当前正则表达式的一行
    //#当它的值为false的时候,就接受匹配到正则表达式的一行
    println "netuser filter"
    //println head
    //println body
    body = "20170621162925,113.225.23.151,test_10056368,1"
    //body = "20170621162925,113.225.23.152,test_10056368,3"
    
    def split = body.split(",")
    
    println split.size()
    
    if(split.size()<4){
        //数据不合格过滤
        return false;
    }
    
    def type = split[3]
    
    println type.getClass()
    println type.substring(0,1) == '1'
    //println type.toInteger() == 1
    
    if (type.substring(0,1) == '1' || type.substring(0,1) == '2') {
        return true;  //不过滤
    } else {
        return false; //过滤
    }

g-search-replace.groovy

替换脚本,可以更改head and body 的数据,适配不同的业务场景,脚本支持动态更新;


        import  com.supermy.flume.interceptor.*
        import javax.crypto.Cipher;
        import javax.crypto.spec.SecretKeySpec;
        
        println head
        println body
        body=body.replace('a','aaa')
        head["newhead"]='abcd'
        
        
        
        String text = "Body 的数据 , I Love BONC"
        
        //
        def key = new SecretKeySpec("123456789987654321".bytes, "AES")
        def c = Cipher.getInstance("AES")
        
        //加密
        c.init(Cipher.ENCRYPT_MODE, key)
        e_text = new String(Hex.encodeHex(c.doFinal(text.getBytes("UTF-8"))))
        
        //解密
        c.init(Cipher.DECRYPT_MODE, key)
        text1 = new String(c.doFinal(Hex.decodeHex(e_text.toCharArray())))
        
        println text
        println e_text
        println text1
        
        
        def resultMap = [:]
        
        //加密数据,用于互联网数据传输
        
        
        resultMap["head"]=head
        resultMap["body"]=body
        
        return resultMap

    import com.google.gson.Gson
    
    //数据加工为 lua 脚本,插入到 redis
    println "netuser redis script 准备"
    //println head
    //println body
    
    //body = "20170621162925,113.225.23.151,test_10056368,1"
    body = "20170621162925,113.225.23.152,test_10056368,2"
    // eval "return redis.call('ZADD','KEYS[1]',ARGV[1],ARGV[2])" 1 keyset   123  u123
    
    
    
    def split = body.split(",")
    
    
    
    def type = split[3]
    if (type.substring(0,1) == '1') {
        split[2]=split[2]+"@Start";
    } else {
        split[2]=split[2]+"@End";
    }
    
    Gson gson = new Gson();
    
    Map full= new HashMap();
    full.put("script","return redis.call('ZADD',KEYS[2],KEYS[1],KEYS[3])");
    full.put("args",new ArrayList());
    full.put("keys",split);
    
    String json = gson.toJson(full);
    println json
    
    Map m=gson.fromJson(json, HashMap.class);
    println m
    
    
    //StringBuffer sb = new StringBuffer("return redis.call('ZADD','");
    //
    //sb.append(split[1]).append("',").append(split[0]).append(",'").append(split[2]);
    //if (type.substring(0,1) == '1') {
    //    sb.append("@Start'");
    //} else {
    //    sb.append("@End'");
    //}
    //sb.append(")");
    //println sb
    
    def resultMap = [:]
    
    resultMap["head"] = head
    resultMap["body"] = json
    
    return resultMap

空文件

简介

flume 规则过滤 展开 收起
Java
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
Java
1
https://gitee.com/supermy/rule-interceptor.git
git@gitee.com:supermy/rule-interceptor.git
supermy
rule-interceptor
rule-interceptor
master

搜索帮助