1 Star 0 Fork 72

DTC2 / light-spark

forked from hellowzk / light-spark 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
README.md 20.43 KB
一键复制 编辑 原始数据 按行查看 历史
zhaokui 提交于 2020-09-15 16:33 . 修改:format 代码,统一编码

Light-Spark 开发脚架

[toc]

1. 概述

本人从 2014 年开始使用 Apache Spark,基于以往项目经验,为了降低项目开发复杂度,减少开发成本,开发了 light-spark 框架。 本框架屏蔽了底层技术细节,让开发人员面向 SQL 编程,更专注于业务逻辑,减少开发过程中 Debug 工作,让 Apache Spark 更方便使用。

2. 适用场景

项目业务大部分功能可以用 SQL 实现,符合以下情况,建议使用本框架:

  • 逻辑链条较长,开发周期较长
  • 逻辑较复杂,开发时需要反复调试,从而反复修改 Debug 代码
  • 业务不稳定,功能频繁迭代
  • 开发人员对大数据技术细节不太熟悉

3. 设计思想

  • 大数据项目中整个处理流程如下:
    开始 -> 加载数据 -> 业务计算 -> 结果存储 -> 结束
    流程图
  • 面向配置编程
    核心思想是面向 Spark DataFrame(SQL) 编程,通过 DataFrame 串联起数据处理流程各个环节,使用 SparkSQL 处理业务逻辑,并定义到配置文件,以便于开发和后期维护,业务调整时避免修改代码。 同时,暴露接口支持开发人员自定义硬编码,实现不能使用 SQL 处理的复杂逻辑,通过配置,把这些逻辑纳入整个处理流程。
  • 使用样例 该样例背景为一个指标计算,需要加载 hive 表数据并关联 jdbc 中的表数据,经过 SQL 计算处理后,结果数据存储到 jdbc 表中,整个处理逻辑定义到一个配置文件中
    yaml1 yaml2 yaml3 yaml4

4. 使用

4.1 maven 引用

本项目已发布到 maven 中央仓库,暂时支持 Spark 2.2.x、Spark 2.3.x、Spark 2.4.x

针对 Spark 2.2.x

<dependency>
    <groupId>com.github.hellowzk</groupId>
    <artifactId>light-spark-core_2.2</artifactId>
    <version>1.0.3-release</version>
</dependency>

针对 Spark 2.3.x

<dependency>
    <groupId>com.github.hellowzk</groupId>
    <artifactId>light-spark-core_2.3</artifactId>
    <version>1.0.3-release</version>
</dependency>

针对 Spark 2.4.x

<dependency>
    <groupId>com.github.hellowzk</groupId>
    <artifactId>light-spark-core_2.4</artifactId>
    <version>1.0.3-release</version>
</dependency>

4.2 依赖配置

参考 pom 文件

4.3 启动脚本样例

参考 run

4.4 组件

4.4.1 debug 开发工具

开发过程中,可以通过配置,自动把指定步骤的结果持久化存储,支持 hdfs csv、hive 表

4.4.2 input - 数据加载

  1. 支持读取 hive 数据;
  2. 支持读取 hdfs 数据,配置中定义好字段信息后自动加载成表;
  3. 支持读取 classpath 数据,配置中定义好字段信息后自动加载成表;
  4. 支持数据源类型比较复杂时,支持 自定义 处理源数据为表;
  5. 支持读取 JDBC 的源数据,jdbcOpts 中参数请看 SparkSQL官方文档

4.4.3 process - 业务计算

  1. 支持 SparkSQL 处理。
  2. 处理逻辑较复杂,SQL 叫难实现时,支持硬编码实现业务需求。

4.4.4 output - 结果保存

  1. 存储到 HDFS ,格式可以选择为 txt / lzo 。
  2. 存储到 HIVE 。
  3. 存储到支持 JDBC 的数据库。

4.4.5 变量

4.4.5.1 定义及使用

本框架支持在配置中自定义及使用变量,支持变量嵌套使用

  • 灵活的定义位置
    自定义变量可以在 yaml 配置中,也可以定义在 class 类中,定义的变量名称必须唯一。
    定义在 class 中,例子 MyConstants

  • 变量使用

    • 在 yaml 中使用,可以直接通过 ${variable} 使用预定义的变量
    • 在类中使用,如下
      import com.hellowzk.light.spark.constants.AppConstants
      AppConstants.variables("mysql_url")
      // 类型 variables: Map[String, String]
    所有可用变量存放在 AppConstants.variables 中
    
4.4.5.2 日期表达式

支持通过日期表达式定义日期,表达式格式为

DATE([yyyy-MM-dd, 20200722, yyyyMMdd][+3d][-1m][+1y]...)
  • DATE() 为日期表达式声明符
  • 第一个 [ ] 中, 有三个参数
    • 第一个为表达式最终计算结果格式
    • 第二个参数为基准时间,后续的日期计算是基于该时间的,可以省略,代表当前日期
    • 第三个参数为第二个参数的时间格式,基于该格式解析第二个参数,第二个参数没有时,该参数同样不能定义
  • 后面的 [ ] 中,代表时间计算表达式,大小写不敏感
    • ±nd 加/减 n 天
    • ±nm 加/减 n 月
    • ±ny 加/减 n 年
  • 例子
...
constansMap:
  today: "${DATE([yyyy-MM-dd])}" # 当前日期  
  today2: "${DATE([yyyy-MM, 2020-08-05, yyyy-MM-dd])}" # 2020-08-05 所在月份,2020-08
  date1: "${DATE([yyyy-MM-dd, 2020-08-05][-1d])}" # 2020-08-05 减一天,2020-08-04
  date2: "${DATE([yyyy-MM-dd, ${date1}][-1d])}" # 2020-08-04 减 1 天,2020-08-03
  date3: "${DATE([yyyy-MM-dd, ${date2}][-1d])}" # 2020-08-03 减 1 天,2020-08-02
  date4: "${DATE([yyyy-MM, ${date3}, yyyy-MM-dd][-1m])}" # 2020-08-02 减 1 月月份,2020-07
  date5: "${DATE([yyyy-MM-dd, ${date4}, yyyy-MM][-1d])}" # 2020-07 月初减 1 天, 2020-06-30
...

5. 配置文件

5.1 整体结构

配置项 默认值 是否必填 说明
name 定义当前 yaml 文件流程名称
constansMap 否,Map 类型 自定义常量,可以在当前 yaml 中通过 ${variable} 使用;在自定义代码中通过 AppConstants.variables("variable" 使用)
persistType 使用 debug 组件,配置中间数据持久化的地址 hive/hdfs;开启条件,启动脚本中传入 --debug 参数启用 debug 组件
persistDir 持久化到 hdfs 的位置,需要启用 debug 组件
persistHiveDb 持久化到 hive 的库,需要启用 debug 组件
udaf 自定义 UDAF,Map类型, key 为 udaf 命名,value 为值
inputs 定义的输入数据
processes 处理逻辑
outputs 输出
envs 环境参数,暂时只支持 Spark 相关参数
# 流程名称
name: "xxx" 

# 自定义常量,可以在sql/文件路径中使用 ${variable},常量名称不可重复
constansMap: 
  hive_out_db: xxx
  mysql_url: xxx
  mysql_username: xxx
  mysql_passwd: xxx

# 启用 debug 工具,SQL处理时,如果 store=true , 结果会缓存到 $baseDir/{{EVENT_DATE}}/$tableName 目录,启动脚本必须传入参数 --debug, 否则该配置不生效
persistType: hive # hive hdfs
persistDir: /tmp/spark/app/full-batch
persistHiveDb: "${outputDB}"

# 输入定义,支持 3 种数据源
inputs:
  - ...
    
# 逻辑处理,支持SQL 与 自定义处理, 有 3 种
processes:
  - ...

# 结果输出, 支持 3 种
outputs:
  - ...

# 环境设置
envs:
  # 暂时只支持 spark 相关参数,如下
  spark:
    - ...

5.2 数据加载

支持类型

type 值 说明
classpathFile 加载 classpath 文件,通过配置字段、分隔符生成表
customClasspath 加载 hdfs 文件,自定义加载逻辑,生成表
hdfsfile 加载 hdfs 文件,通过配置字段、分隔符生成表
customHdfs 加载 hdfs 文件,自定义加载逻辑,生成表
hive 加载 hive 表
jdbc 加载 jdbc 表
5.2.1. 加载 classpathFile 数据
配置说明
配置项 默认值 是否必填 说明
name 定义该步生成的表的名称,可以在下面处理中引用
type 固定值 classpathFile
columns 数据生成的 columns, 用 "," 隔开
path 文件在 classpath 的路径
fs 列分割符
partations 0 进行重新分区的数量,不配置不重分区
cache false 是否缓存,如果下面的操作多次使用该表,建议设置为true
show 0 开发时 debug 使用,对表进行 show 操作,配置成需要 show 的条数
store false 开发时 debug 使用,数据存储起来,支持存储到 hdfs/hive,请配置 persistType 参数
demo
...
inputs:
  - name: person
    type: classpathFile
    columns: name, gen, age
    path: person.txt
    fs: ","
...
5.2.2. 加载 customClasspath 数据
配置说明

自定义加载 classpath 的数据,生成表

配置项 默认值 是否必填 说明
name 定义该步生成的表的名称,可以在下面处理中引用
type 固定值 customClasspath
path 文件在 classpath 的路径
clazz 自定义的加载逻辑类,必须继承 com.hellowzk.light.spark.stages.custom.CustomBaseInput 实现 process 方法
partations 0 进行重新分区的数量,不配置不重分区
cache false 是否缓存,如果下面的操作多次使用该表,建议设置为true
show 0 开发时 debug 使用,对表进行 show 操作,配置成需要 show 的条数
store false 开发时 debug 使用,数据存储起来,支持存储到 hdfs/hive,请配置 persistType 参数
demo
...
inputs:
  - name: custompersonClass
    type: customClasspath
    path: person2.txt
    clazz: com.hellowzk.light.spark.stages.Process1
...
5.2.3. 加载 hdfsfile 数据
配置说明
配置项 默认值 是否必填 说明
name 定义该步生成的表的名称,可以在下面处理中引用
type 固定值 hdfsfile
columns 数据生成的 columns, 用 "," 隔开
path 文件在 hdfs 的路径
fs 列分割符
partations 0 进行重新分区的数量,不配置不重分区
cache false 是否缓存,如果下面的操作多次使用该表,建议设置为true
show 0 开发时 debug 使用,对表进行 show 操作,配置成需要 show 的条数
store false 开发时 debug 使用,数据存储起来,支持存储到 hdfs/hive,请配置 persistType 参数
demo
...
inputs:
  - name: person_hdfs
    type: hdfsfile
    columns: name,gen,age
    path: /tmp/zhaokui/testdata/test/20200706/
    fs: ","
...
5.2.4. 加载 customHdfs 数据

自定义加载 customHdfs 的数据,生成表

  • 自定义加载逻辑类必须继承 com.hellowzk.light.spark.stages.custom.CustomBaseInput 特质,实现 process 方法,如 Process1
配置说明
配置项 默认值 是否必填 说明
name 定义该步生成的表的名称,可以在下面处理中引用
type 固定值 customHdfs
path 文件在 hdfs 的路径
clazz 自定义加载逻辑类
partations 0 进行重新分区的数量,不配置不重分区
cache false 是否缓存,如果下面的操作多次使用该表,建议设置为true
show 0 开发时 debug 使用,对表进行 show 操作,配置成需要 show 的条数
store false 开发时 debug 使用,数据存储起来,支持存储到 hdfs/hive,请配置 persistType 参数
demo
...
inputs:
  - name: customPersonHdfs
    type: customHdfs
    path: /tmp/zhaokui/testdata/test/20200706/
    clazz: com.hellowzk.light.spark.stages.Process1
...
5.2.5 加载 jdbc 数据

自定义加载 customHdfs 的数据,生成表

配置说明
配置项 默认值 是否必填 说明
name 定义该步生成的表的名称,可以在下面处理中引用
type 固定值 jdbc
driver jdbc driver
url jdbc url 地址
user 用户名
password 密码
dbtable 要加载的表,为 map 结构
jdbcOpts 请参看 SparkSQL官方文档
demo
...
inputs:
  - name: mysql1
    type: jdbc
    driver: "com.mysql.jdbc.Driver"
    url: "${jdbcurl}" # 在 constansMap 中定义的该值
    user: root
    password: 123456
    dbtable:
      b_osp_app_event_detail: b_osp_app_event_detail
      b_osp_app_area_analyze_total: b_osp_app_area_analyze_total
...
5.2.6 加载 hive 数据
配置说明
配置项 默认值 是否必填 说明
name 定义该步生成的表的名称,可以在下面处理中引用
type 固定值 hive
database hive 库名
dbtable 要加载的表,为 map 结构
demo
...
inputs:
  - name: hive1
    type: hive
    database: "${inputDB}"
    dbtable:
      dm_road_trackset_di: dm_road_trackset_di
      dm_trackset_di: dm_trackset_di
...

5.3 数据处理

支持类型

type 值 说明
sql 使用 SQL 处理数据
custom 硬编码处理数据
5.3.1 使用 SQL 处理
配置说明
配置项 默认值 是否必填 说明
name 定义该步生成的表的名称,可以在下面处理中引用
sql SQL 语句
dimKey 用于计算多维度数据
allPlaceholder all 多维度数据 select 占位符
partations 0 进行重新分区的数量,不配置不重分区
cache false 是否缓存,如果下面的操作多次使用该表,建议设置为true
show 0 开发时 debug 使用,对表进行 show 操作,配置成需要 show 的条数
store false 开发时 debug 使用,数据存储起来,支持存储到 hdfs/hive,请配置 persistType 参数
demo
...
processes:
  - name: table1
    sql: "select name,gen,age, '${DATE([yyyy-MM-dd, 2020-08-05][-3d])}' from person"
    cache: true # 出发action操作,缓存执行SQL后的表
    store: true # 是否保存到本地,debug用,保存目录 $baseDir/{{EVENT_DATE}}/table
    show: 20 # 在控制台打印表中数据,不打印则删除该节点,df.show(20)
    partations: 3 # reparation 个数,不reparation则删除节点
  # 多维度数据处理
  - name: table1_1
    dimKey: "gen, age" # 两个维度
    allPlaceholder: "ALL"
    sql: |-     #  |- :yaml 语法,支持字符串换行,适合格式化写 SQL
      select name, ${DIM_FIELDS}, count(1) as count from person group by name ${DIM_GROUP}
    cache: true # 出发action操作,缓存执行SQL后的表
    store: true # 是否保存到本地,debug用,保存目录 $baseDir/{{EVENT_DATE}}/table
    show: 20 # 在控制台打印表中数据,不打印则删除该节点,df.show(20)
    partations: 3 # reparation 个数,多维度数据处理,建议使用该功能,否则 task 数量会根据维度个数爆炸膨胀,严重降低后续计算过程
...
5.3.2 自定义编码处理

自定义处理类,必须继承 com.hellowzk.light.spark.stages.custom.CustomBaseTransform 类,实现 doProcess 方法,如 Process2

配置说明
配置项 默认值 是否必填 说明
name 定义该步生成的表的名称,可以在下面处理中引用
clazz 自定义处理类
partations 0 进行重新分区的数量,不配置不重分区
cache false 是否缓存,如果下面的操作多次使用该表,建议设置为true
show 0 开发时 debug 使用,对表进行 show 操作,配置成需要 show 的条数
store false 开发时 debug 使用,数据存储起来,支持存储到 hdfs/hive,请配置 persistType 参数
demo
...
processes:
  - name: customProcessShow
    clazz: "com.hellowzk.light.spark.stages.Process2"
    cache: true
    store: true
    show: 20
    partations: 1
...

5.4 结果输出

支持类型

type 值 说明
jdbc 保存数据到 jdbc
hive 保存数据到 hive
kafkaField 指定分隔符,数据发送到 kafka
kafkaJson JSON 格式,数据发送到 kafka
5.4.1 输出到 jdbc
配置说明
配置项 默认值 是否必填 说明
name 节点名称,用于打印日志,定位问题
type 固定值 jdbc
driver 数据库 driver
url 连接数据库的 url
user 用户名
password 密码
tables 要输出的表,为 map 类型
mode append 保存模式,默认为 append
opts jdbc 操作的其他参数
preSQL 执行保存操作前,要对目标数据库执行的 SQL语句
demo
...
outputs:
  - name: mysql1
    type: jdbc
    driver: "com.mysql.jdbc.Driver"
    url: "${jdbcurl}"
    user: root
    password: 123456
    opts:
      isolationLevel: NONE # 关闭事务
      numPartitions: 1 # 设置并发
      batchsize: 50000
    preSQL:
      - delete from b_osp_app_event_detail
      - delete from b_osp_app_area_analyze_total
    tables:
      table3: b_osp_app_event_detail
      b_osp_app_area_analyze_total: b_osp_app_area_analyze_total
...
5.4.2 输出到 HIVE
配置说明
配置项 默认值 是否必填 说明
name 节点名称,用于打印日志,定位问题
type 固定值 hive
database 要保存到的 hive 库
tables 要输出的表,为 map 类型
mode overwrite 保存模式,默认为 overwrite
preSQL 执行保存操作前,要对目标数据库执行的 SQL语句
demo
...
outputs:
  - name: mysql1
    type: hive
    database: test
    tables:
      table3: b_osp_app_event_detail
      b_osp_app_area_analyze_total: b_osp_app_area_analyze_total
...
5.4.3 输出到 HDFS
配置说明
配置项 默认值 是否必填 说明
name 节点名称,用于打印日志,定位问题
type 固定值 hdfsfile
srcName 要保存的数据,必须使用上面定义的表名
format 要输出的文件类型,暂时支持 csv, txt, lzo, json, parquet 这些类型
path 数据要保存的目录
fs \u0001 保存成文件时的列分隔符,只针对 txt lzo 类型输出生效
demo
...
outputs:
  - name: out1
    type: hdfsfile
    srcName: table1
    format: csv # 保存为 csv
    path: /user/osp/data/out1/${EVENT_DATE}

  - name: out2 
    type: hdfsfile
    srcName: table2
    format: txt # 保存为 txt,字段用逗号分隔
    fs: ","
    path: /user/osp/data/out2/${EVENT_DATE}  

  - name: out3
    type: hdfsfile
    srcName: table3
    format: lzo # 保存为 lzo,字段用逗号分隔
    fs: ","
    path: /user/osp/data/out3/${EVENT_DATE}  

  - name: out4
    type: hdfsfile
    srcName: table4
    format: json # 保存为 json
    path: /user/osp/data/out4/${EVENT_DATE}  

  - name: out5
    type: hdfsfile
    srcName: table5
    format: parquet # 保存为 parquet
    fs: ","
    path: /user/osp/data/out5/${EVENT_DATE}  
...
5.4.4 kafkaField 发送到 kafka
配置说明
配置项 默认值 是否必填 说明
name 节点名称,用于打印日志,定位问题
type 固定值 kafkaField
srcName 要保存的 table
brokers 目标 kafka brokers
topic 目标 kafka topic
fs 字段分隔符
demo
...
outputs:
  - name: kafkaFieldTest
    type: kafkaField
    srcName: table1
    brokers: ${outputKafka}
    topic: test
    fs: ","
...
5.4.5 kafkaJson 发送到 kafka
配置说明
配置项 默认值 是否必填 说明
name 节点名称,用于打印日志,定位问题
type 固定值 kafkaJson
srcName 要保存的 table
brokers 目标 kafka brokers
topic 目标 kafka topic
demo
...
outputs:
  - name: kafkaJsonTest
    type: kafkaJson
    srcName: table1
    brokers: ${outputKafka}
    topic: testJson
...

版权

请参考 LICENSE 文件.

沟通交流

请添加 QQ 群: 717436011

Scala
1
https://gitee.com/zymITsky/light-spark.git
git@gitee.com:zymITsky/light-spark.git
zymITsky
light-spark
light-spark
master

搜索帮助