1 Star 0 Fork 17

hbwhypw / raptor-databus

forked from lance / raptor-databus 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
MIT

一、DataBus部署清单

##1.1 江苏测试环境 ###1.1.1 主机信息

主机地址 用户名 密码 备注
132.252.4.56 databus databus123 抽取和分发用户
132.252.4.56 amq amq1234 消息队列用户
###1.1.2 数据库信息
主机地址 端口 用户名 密码 SID 备注
132.252.5.64 1621 databus_mq1 S17F*RvT CRMTEST02 存放普通业务
132.252.5.64 1621 databus_mq2 K3Co!RCf CRMTEST02 存放停复机业务
132.252.5.64 1621 databus_mq3 QO6ad!!L CRMTEST02 批量业务
132.252.5.64 1621 databus_mq4 BGu4Nh#k CRMTEST02 业务中心之间事件
132.252.5.64 1621 databus pYA#Zi8A CRMTEST02 抽取和分发用户
###1.1.3 应用部署说明
应用名称 部署路径 主机地址 服务端口 控制台端口 应用说明
----------------- ------------------------------- ------------ ----- ----- --------------
nm-extractor /home/databus/nm-extractor 132.252.4.56 9001 普通业务抽取器
dispatcher-server /home/databus/dispatcher-server 132.252.4.56 9000 数据分发器
amq-1 /home/amq/amq-1 132.252.4.56 61616 8161 消息队列(存放普通业务)
amq-2 /home/amq/amq-2 132.252.4.56 61617 8162 消息队列(存放停复机业务)
amq-3 /home/amq/amq-3 132.252.4.56 61618 8163 消息队列(批量业务)
amq-4 /home/amq/amq-4 132.252.4.56 61619 8164 消息队列(业务中心之间事件)

###1.1.4 其他组件说明

应用名称 部署路径 主机地址 服务端口
zookeeper /home/databus/zookeeper 132.252.4.56 2181
redis /home/databus/redis 132.252.4.56 6379

##1.2 四川测试环境 ###1.2.1 主机信息

主机地址 用户名 密码 备注
132.252.4.56 databus databus123 抽取和分发用户
132.252.4.56 amq amq1234 消息队列用户
###1.2.2 数据库信息
主机地址 端口 用户名 密码
------------- ---- ----------- ----
133.37.135.40 1521 databus_mq1 bss3
133.37.135.40 1521 databus_mq2 bss3
133.37.135.40 1521 databus_mq3 bss3
133.37.135.40 1521 databus_mq4 bss3
133.37.135.40 1521 databus bss3
###1.2.3 应用部署说明
应用名称 部署路径 主机地址 服务端口
----------------- ------------------------------- ------------- -----
nm-extractor /home/databus/nm-extractor 133.37.135.38 9001
dispatcher-server /home/databus/dispatcher-server 133.37.135.38 9000
amq-1 /home/amq/amq-1 133.37.135.38 61616
amq-2 /home/amq/amq-2 133.37.135.38 61617
amq-3 /home/amq/amq-3 133.37.135.38 61618
amq-4 /home/amq/amq-4 133.37.135.38 61619

###1.2.4 其他组件说明

应用名称 部署路径 主机地址 服务端口
zookeeper /home/databus/zookeeper 133.37.135.38 2181
redis /home/databus/redis 133.37.135.38 6379

#二 抽取说明

3.1 地址

https://git.oschina.net/f150/raptor-databus

3.2 代码说明

- extractor-agent:抽取器代理,监控抽取器运行时状态
- extractor-common:公共模块,存放常量类,工具类
- extractor-segment:抽取器核心代码模块,负责普通业务、停复机业务的报文获取、拼装、发送等功能
- nm-extractor-server:普通业务模块,负责普通业务抽取器的启动和初始化配置
- sas-extractor-server:停服级业务模块,负责停复机业务抽取器的启动和初始化配置

3.3 配置说明

抽取器启动时,从远程配置中心获取application.properties、extractor.properties两个配置文件。

3.3.1 application.properties

配置名称 配置说明
spring.application.name 应用服务器名称
spring.redis.database Redis数据库索引(默认为0)
spring.redis.host Redis服务器地址
spring.redis.port Redis服务器连接端口
spring.redis.password Redis服务器连接密码(默认为空)
spring.redis.pool.max-active 连接池最大连接数(使用负值表示没有限制)
spring.redis.pool.max-wait 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.pool.max-idle 连接池中的最大空闲连接
spring.redis.pool.min-idle 连接池中的最小空闲连接
spring.redis.timeout 连接超时时间(毫秒)

参考配置样例:

spring.application.name = nm-extractor-server
spring.redis.database = 0
spring.redis.host = 127.0.01
spring.redis.port = 6379
spring.redis.password = 
spring.redis.pool.max-active = 8
spring.redis.pool.max-wait = -1
spring.redis.pool.max-idle = 8
spring.redis.pool.min-idle = 0
spring.redis.timeout = 0

3.3.2 extractor.properties

3.3.2.1 数据源配置

配置名称 配置说明
extractor.datasource.jdbcUrl 数据库连接地址
extractor.datasource.type 连接池类
extractor.datasource.username 数据库用户名
extractor.datasource.password 数据库密码
extractor.datasource.driver-class-name 驱动包
extractor.datasource.hikari.connection-timeout 连接超时设置
extractor.datasource.hikari.auto-commit 自动提交
extractor.datasource.hikari.maximum-pool-size 最大连接数

配置参考样例:

extractor.datasource.jdbcUrl = jdbc:oracle:thin:@133.37.135.40:1521/crmso
extractor.datasource.type = com.zaxxer.hikari.HikariDataSource
extractor.datasource.username = databus
extractor.datasource.password = bss3
extractor.datasource.driver-class-name = oracle.jdbc.OracleDriver
extractor.datasource.hikari.connection-timeout = 60000
extractor.datasource.hikari.auto-commit = false
extractor.datasource.hikari.maximum-pool-size = 5

3.3.2.2 http连接设置

配置名称 配置说明
extractor.http.maxTotal 连接池连接数
extractor.http.defaultMaxPerRoute 每个服务地址连接数
extractor.http.socketTimeout socket超时时间
extractor.http.connectTimeout 连接超时时间
extractor.http.connectionRequestTimeout 请求超时时间

配置参考样例:

extractor.http.maxTotal = 10
extractor.http.defaultMaxPerRoute = 10
extractor.http.socketTimeout = 3000
extractor.http.connectTimeout = 5000
extractor.http.connectionRequestTimeout = 5000

3.3.2.3 抽取队列配置

配置名称 配置说明
extractor.mq.eventKindId databus队列,受理中心event事件队列kindId

配置参考样例:

extractor.mq.eventKindId[0] = 1008
extractor.mq.eventKindId[1] = 1009

3.3.2.4 zookeeper配置

配置名称 配置说明
extractor.zk.servers zk服务地址
extractor.zk.namespace zk节点名称
extractor.zk.digest zk访问节点控制

配置参考样例:

extractor.zk.servers = 127.0.0.1:2181
extractor.zk.namespace = extractor
extractor.zk.digest = 

3.3.2.5 队列大小配置

配置名称 配置说明
extractor.queue.eventQueueSize 存放受理中心event事件队列大小
extractor.queue.eventDataQueueSize 存放各节点报文队列大小
extractor.queue.buildJsonQueueSize event元信息+全量报文 队列大小

配置参考样例:

extractor.queue.eventQueueSize = 100
extractor.queue.eventDataQueueSize = 100
extractor.queue.buildJsonQueueSize = 100

3.3.2.6 获取报文超时设置

配置名称 配置说明
extractor.thread.overTime 获取某一事件的超时时间(单位:毫秒)

配置参考样例:

extractor.thread.overTime = 100000

3.4 表说明

3.4.1抽取器表清单

表名 用途
EVENT_META 事件元信息表,存储受理中心Event对象信息
EVENT_SOURCE_CONFIG 业务事件表,配置各程序对应的业务事件
EVENT_NODE_SERVER 节点报文服务表,配置各节点报文服务路径
EVENT_PARAMS 参数表,存储参数信息,比如cust_id","acct_id","prod_id","offer_id","offer_prod_id"

3.4.2各个表语句

CREATE TABLE EVENT_META(
  "META_ID" NUMBER(19,0) not null,
  "EVENT_KIND_ID" NUMBER(5,0) not null,
  "DELIVER_TIME" date not null,
  "OBJECT_KEY" VARCHAR2(20) not null,
  "EVENT_TYPE" NUMBER(1,0) not null,
  "REGION_ID" NUMBER(10,0),
  "BEGIN_TIME" TIMESTAMP default CURRENT_TIMESTAMP,
  "FINISH_TIME" TIMESTAMP default CURRENT_TIMESTAMP,
  "RETRY_TIMES" NUMBER(2,0) default 0,
  "STATUS" VARCHAR2(4) not null,
  "ORDER_TYPE_ID" VARCHAR2(1000),
  "OFFER_IDS" VARCHAR2(1000),
  "PROD_IDS" VARCHAR2(1000),
  "SERVICE_OFFER_IDS" VARCHAR2(1000),
  "OFFER_SPEC_ATTR" VARCHAR2(100),
  "SERVICE_SPEC_ATTR" VARCHAR2(100),
  "REMARK" VARCHAR2(1024)
);
comment on column "EVENT_META"."EVENT_KIND_ID"  is '事件来源id';
comment on column "EVENT_META"."DELIVER_TIME"  is '事件信息投递时间';
comment on column "EVENT_META"."OBJECT_KEY"  is '归档组id or 订单id';
comment on column "EVENT_META"."EVENT_TYPE"  is '事件类型,1:停复机,0:普通业务';
comment on column "EVENT_META"."ORDER_TYPE_ID"  is '一个归档组竣工消息事件的业务动作集,多个业务动作,用英文逗号分隔';
comment on column "EVENT_META"."RETRY_TIMES"  is '抽取报文次数';
comment on column "EVENT_META"."BEGIN_TIME"  is '开始抽取报文时间';
comment on column "EVENT_META"."FINISH_TIME"  is '抽取报文完成时间';
comment on column "EVENT_META"."STATUS"  is '事件状态:W(初始状态) P(处理中) C(抽取成功) F(抽取失败,超时或者系统异常)';
comment on table "EVENT_META" is '事件元信息表';


CREATE TABLE EVENT_SOURCE_CONFIG
(
  "EVENT_KIND_ID" NUMBER(10,0) not null,
  "EVENT_NAME" VARCHAR2(100) not null,
  "EVENT_SOURCE" VARCHAR2(50) not null,
  "EVENT_SORT" NUMBER (1,0) not null,
  "DATABUS_KIND_ID" NUMBER(10,0) not null
);
comment on column "EVENT_SOURCE_CONFIG"."EVENT_KIND_ID"  is '事件序号';
comment on column "EVENT_SOURCE_CONFIG"."EVENT_NAME"  is '业务事件名称';
comment on column "EVENT_SOURCE_CONFIG"."EVENT_SOURCE"  is '业务中心名称';
comment on column "EVENT_SOURCE_CONFIG"."EVENT_SORT"  is '事件是否需要按照获取顺寻进行发送,1:需要,0:不需要';
comment on column "EVENT_SOURCE_CONFIG"."DATABUS_KIND_ID"  is 'databus内部队列kindid';
INSERT INTO EVENT_SOURCE_CONFIG (EVENT_KIND_ID, EVENT_NAME, EVENT_SOURCE, EVENT_SORT, DATABUS_KIND_ID) VALUES (1002,'so_order_committing','Extractor_NRM',0,8001);
INSERT INTO EVENT_SOURCE_CONFIG (EVENT_KIND_ID, EVENT_NAME, EVENT_SOURCE, EVENT_SORT, DATABUS_KIND_ID) VALUES (1008,'so_order_construct','Extractor_NRM',0,8002);
INSERT INTO EVENT_SOURCE_CONFIG (EVENT_KIND_ID, EVENT_NAME, EVENT_SOURCE, EVENT_SORT, DATABUS_KIND_ID) VALUES (1007,'so_order_orderitemfinish','Extractor_NRM',0,8003);
INSERT INTO EVENT_SOURCE_CONFIG (EVENT_KIND_ID, EVENT_NAME, EVENT_SOURCE, EVENT_SORT, DATABUS_KIND_ID) VALUES (1009,'so_order_archgrpfinish','Extractor_NRM',1,8004);
INSERT INTO EVENT_SOURCE_CONFIG (EVENT_KIND_ID, EVENT_NAME, EVENT_SOURCE, EVENT_SORT, DATABUS_KIND_ID) VALUES (1003,'so_order_complete','Extractor_NRM',0,8005);
INSERT INTO EVENT_SOURCE_CONFIG (EVENT_KIND_ID, EVENT_NAME, EVENT_SOURCE, EVENT_SORT, DATABUS_KIND_ID) VALUES (1010,'stopAndRestart','Extractor_NRM',0,8006);




CREATE TABLE EVENT_NODE_SERVER
(
  "EVENT_KIND_ID" NUMBER(10,0) not null,
  "NODE_NAME"  VARCHAR2(100) not null,
  "SERVER_URL" VARCHAR2(1000) not null
);
comment on column "EVENT_NODE_SERVER"."EVENT_KIND_ID"  is '事件序号';
comment on column "EVENT_NODE_SERVER"."NODE_NAME"  is '节点名称';
comment on column "EVENT_NODE_SERVER"."SERVER_URL"  is '服务url';
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1008,'customerOrder', 'http://133.37.135.36:9003/so-service/service/so_order_queryCustomerOrderDetailsByCodId');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1008,'customerDetails', 'http://133.37.135.36:9002/cust-service/service/cust_cust_qryCustomerDetail');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1008,'accountDetails', 'http://133.37.135.36:9001/acct-service/service/cust_acct_qryAccountListForMulti');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1008,'offerProdInstDetails', 'http://133.37.135.36:9010/inst-service/service/cust_inst_qryAccProdInstDetail');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1008,'offerInstDetails', 'http://133.37.135.36:9010/inst-service/service/cust_inst_qryOfferInstDetail');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1008,'eventMsg', '');

INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1009,'customerOrders', 'http://133.37.135.36:9003/so-service/service/so_order_queryCustomerOrdersByOrderItemIds');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1009,'customerDetails', 'http://133.37.135.36:9002/cust-service/service/cust_cust_qryCustomerDetail');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1009,'accountDetails', 'http://133.37.135.36:9001/acct-service/service/cust_acct_qryAccountListForMulti');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1009,'offerProdInstDetails', 'http://133.37.135.36:9010/inst-service/service/cust_inst_qryAccProdInstDetail');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1009,'offerInstDetails', 'http://133.37.135.36:9010/inst-service/service/cust_inst_qryOfferInstDetail');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1009,'eventMsg', '');


CREATE TABLE EVENT_PARAMS
(
  "META_ID"   NUMBER(19) not null,
  "PARAM_NAME"  VARCHAR2(16) not null,
  "PARAM_VALUE" VARCHAR2(32) not null
);

comment on table "EVENT_PARAMS" is '事件参数表';
comment on column "EVENT_PARAMS"."META_ID"  is '事件元信息表主键';
comment on column "EVENT_PARAMS"."PARAM_NAME"  is '参数名称';
comment on column "EVENT_PARAMS"."PARAM_VALUE"  is '参数值';

3.5 部署说明

3.5.1 部署抽取器依赖应用清单

应用名称 | 应用用途 zookpeer | 生成全局序列 redis | 缓存受理中心event报文 cmdb | 配置中心,存放抽取程序配置文件:application.properties、extractor.properties

3.5.2 工程中配置读取的配置中心位置

NRM抽取程序config 修改文件(nm-extractor-server\src\main\resources\cmdb-env.properties)如下:

env=dev
dev.meta=http://133.37.135.38:7001
配置名称 配置说明
env 环境用户
dev.meta 服务地址
修改文件如下:
app.id=8001

3.5.3 打出部署环境的war包

找到抽取器代码根目录,例如:...\raptor-databus\data-extractor,执行如下命令:

mvn clean install -Dmaven.test.skip=true

就会生成对应war包,nm-extractor war包如下:...\raptor-databus\data-extractor\nm-extractor-server\target,名称为:nm-extractor-server-1.0-SNAPSHOT.war

3.5.4 上传war到指定路径

/home/databus/nm-extractor/webapp 下面

3.5.5 添加数据库表已经默认配置

INSERT INTO EVENT_SOURCE_CONFIG (EVENT_KIND_ID, EVENT_NAME, EVENT_SOURCE, EVENT_SORT, DATABUS_KIND_ID) VALUES (1002,'so_order_committing','Extractor_NRM',0,8001);
INSERT INTO EVENT_SOURCE_CONFIG (EVENT_KIND_ID, EVENT_NAME, EVENT_SOURCE, EVENT_SORT, DATABUS_KIND_ID) VALUES (1008,'so_order_construct','Extractor_NRM',0,8002);
INSERT INTO EVENT_SOURCE_CONFIG (EVENT_KIND_ID, EVENT_NAME, EVENT_SOURCE, EVENT_SORT, DATABUS_KIND_ID) VALUES (1007,'so_order_orderitemfinish','Extractor_NRM',0,8003);
INSERT INTO EVENT_SOURCE_CONFIG (EVENT_KIND_ID, EVENT_NAME, EVENT_SOURCE, EVENT_SORT, DATABUS_KIND_ID) VALUES (1009,'so_order_archgrpfinish','Extractor_NRM',1,8004);
INSERT INTO EVENT_SOURCE_CONFIG (EVENT_KIND_ID, EVENT_NAME, EVENT_SOURCE, EVENT_SORT, DATABUS_KIND_ID) VALUES (1003,'so_order_complete','Extractor_NRM',0,8005);
INSERT INTO EVENT_SOURCE_CONFIG (EVENT_KIND_ID, EVENT_NAME, EVENT_SOURCE, EVENT_SORT, DATABUS_KIND_ID) VALUES (1010,'stopAndRestart','Extractor_NRM',0,8006);
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1008,'customerOrder', 'http://133.37.135.36:9003/so-service/service/so_order_queryCustomerOrderDetailsByCodId');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1008,'customerDetails', 'http://133.37.135.36:9002/cust-service/service/cust_cust_qryCustomerDetail');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1008,'accountDetails', 'http://133.37.135.36:9001/acct-service/service/cust_acct_qryAccountListForMulti');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1008,'offerProdInstDetails', 'http://133.37.135.36:9010/inst-service/service/cust_inst_qryAccProdInstDetail');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1008,'offerInstDetails', 'http://133.37.135.36:9010/inst-service/service/cust_inst_qryOfferInstDetail');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1008,'eventMsg', '');

INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1009,'customerOrders', 'http://133.37.135.36:9003/so-service/service/so_order_queryCustomerOrdersByOrderItemIds');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1009,'customerDetails', 'http://133.37.135.36:9002/cust-service/service/cust_cust_qryCustomerDetail');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1009,'accountDetails', 'http://133.37.135.36:9001/acct-service/service/cust_acct_qryAccountListForMulti');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1009,'offerProdInstDetails', 'http://133.37.135.36:9010/inst-service/service/cust_inst_qryAccProdInstDetail');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1009,'offerInstDetails', 'http://133.37.135.36:9010/inst-service/service/cust_inst_qryOfferInstDetail');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1009,'eventMsg', '');

3.5.6 启动抽取程序

到服务器路径:/home/databus/nm-extractor/bin,执行命令:

sh startup.sh

启动程序。对应的程序启动日志路径: /home/databus/nm-extractor/bin/catalina.log

3.5.7 停止抽取程序

抽取器停止

到服务器路径:/home/databus/nm-extractor/bin,执行命令:

sh shutdown.sh

停止程序。对应的程序停止日志路径: /home/databus/nm-extractor/bin/catalina.log

#三 分发说明(data-dispatcher)

3.1 地址

https://git.oschina.net/f150/raptor-databus

3.2 代码说明

3.2.1 模块说明

- dispatcher-agent:分发器代理,监控分发器运行时状态
- dispatcher-common:公共模块
- dispatcher-scheduler:分发调度器模块,负责调度分发器运行哪些分发任务
- dispatcher-server:分发器启动模块,负责配置加载、消息的获取、幂等性校验、排序以及发送等操作

3.3 配置说明

3.3.1 配置中心配置

1. 文件目录:data-diapatcher/dispatcher-server/src/main/resources
2. 文件说明
 - app.properties:配置应用的appid
   app.id=7001
 - cmdb-env.properties:配置应用中心地址
    env=dev #配置环境类型
    dev.meta =http://133.37.135.38:7001 #配置中心地址

3.3.2 分发器配置

分发器运行时配置从远程配置中心获取,文件名为application.properties

属性名 备注 示例
spring.application.name 应用名称 dispatcher-server
spring.profiles.active 分发运行时环境配置,江苏普通业务:jiangsu,ord_biz;江苏停复机:jiangsu,sar_biz,四川普通业务:sichuan,ord_biz,四川停复机:sichuan,sar_biz sichuang,ord_biz
server.port 服务端口 9000
dispatcher.datasource.jdbcUrl jdbcurl jdbc:oracle:thin:@192.168.199.24:1521:orcla
dispatcher.datasource.type 数据源类型 com.zaxxer.hikari.HikariDataSource
dispatcher.datasource.driver-class-name 驱动类型 oracle.jdbc.OracleDriver
dispatcher.datasource.username 数据库用户名 crm
dispatcher.datasource.password 数据库密码 crm123
dispatcher.datasource.hikari.connection-timeout 数据源连接最大超时 60000
dispatcher.datasource.hikari.auto-commit 自动提交 false
dispatcher.datasource.hikari.maximum-pool-size 数据源连接池最大数 50
spring.redis.host redis主机地址 127.0.0.1
spring.redis.port redis端口 6379
spring.redis.password redis密码
spring.redis.pool.max-active redis连接池最大连接 100
spring.redis.pool.max-wait redis连接池最大等待数量 1
spring.redis.pool.max-idle redis连接池最大空闲数 100
spring.redis.pool.min-idle redis连接池最小空闲数 1
spring.redis.timeout redis超时时间
zk.servers 127.0.0.1:2181 zookeeper地址
zk.namespace dispatcher 分发器zk命名空间
zk.digest zk节点权限
logging.config 日志配置文件 classpath:logback.xml
mq.bizKey 消息队列中应用名称 Dispatcher_NRM
mq.receiveTimeout mq接受消息最大超时时间 10000
thread.pool.coreSize 线程池核心线程数量 100
thread.pool.capacity 线程池队列最大数量 100000
thread.pool.maxSize 线程池最大数量 200

3.4 部署说明

3.4.1 修改配置中心配置

根据部署的环境和分发的消息不同,修改分发工程的appid和配置中心地址

3.4.2 应用打包

打包脚本目录:data-dispatcher/scripts 执行脚本:sh build.sh

3.4.3 执行建表脚本(系统初始化时执行一次)

建表脚本目录:data-dispatcher/scripts/sql

3.5 表说明

事件分发跟踪表: EVENT_TRACE
订阅关系配置表:SUB_BASE_ITEM、SUB_CHANNEL、SUB_CONFIG、SUB_RELATION

表配置说明及表字段说明见建表脚本

#四 配置中心使用说明

  1. 根据应用的appid和应用名称,找到对应的配置

  2. 以分发普通业务为例,appid:7001 应用名:Dispatcher_NRM

  3. 找对应的配置文件之后,在此文件中点击新增配置,添加配置项

  4. 配置提交后,需要发布才能生效

#五 接口分发开发说明

5.1 接口工程说明

5.1.1 工程地址

https://git.oschina.net/f150/raptor-databus

5.1.2 模块说明

service-hub服务集线器,集成了所有服务的调用(包括crm内部以及外域系统)

  • hub-in:集成crm内部服务的调用,主要用于数据的抽取
  • hub-out:集成外域系统,提供给分发器使用,用于将消息发送给外域系统
  • hub-out-common:公共模块
  • hub-out-js:江苏调用外域接口模块
  • hub-out-sc:四川调用外域接口模块
  • hub-out-sdk:调用外域系统服务开发包,提供消息的发送,转换等功能
  • hub-out-example:接口开发demo

5.2 新增接口说明

  1. 所有接口基于spring 4.x开发,接口、配置、连接等都有spring容器初始化,无需手动创建
  2. 基于spring boot零配置的思想,彻底告别xml配置文件。利用spring的annotation来代替各类xml文件,所有xml配置都用java config的形式代替
  3. 新增接口包含以下部分:
    • 接口代码:需要实现三部分功能,协议转换、消息发送、发送响应处理
    • 接口配置:接口配置包括接口编码、名称、订阅事件、数据源等信息
    • 接口连接配置:连接配置包含数据源配置,http连接配置等信息

5.3 配置说明

  • 本地开发测试使用工程src/main/resources目录下的配置
    • 测试环境读取从配置读取配置文件
    • 配置文件属性说明附录

5.4 新增接口案例

5.4.1 表接口

  1. proxy.properties增加接口的配置

    dbExample.code=1025214000000046
    dbExample.name=dbExample
    dbExample.kindId=8001
    dbExample.level=1
    dbExample.sort=true
    dbExample.sender=studio.raptor.hutout.example.sender.DBExampleProxySender
    dbExample.db.dataSourceName=usbDataSource
    dbExample.db.sqls[0]=INSERT INTO teacher(sno,sname,sex,dept,birth,age) VALUES(?,?,?,?,?,?)
    dbExample.db.sqls[1]=INSERT INTO student(sno,sname,sex,dept,birth,age) VALUES(?,?,?,?,?,?)
  2. connection.properties增加数据源配置

    usb.datasource.jdbcUrl=jdbc:mysql://127.0.0.1:3306/ds_0
    usb.datasource.username=root
    usb.datasource.password=root
    usb.datasource.maximumPoolSize=10
    usb.datasource.minimumIdle=1
    usb.datasource.autoCommit=false
    usb.datasource.poolName=usbDataSourcePool
  3. 在DataSourceConfig.java文件中增加数据源的实例bean

  //name必须与proxy文件中example.db.dataSourceName保持一致
   @Bean(name = "usbDataSource")
   //数据源属性前缀
   @ConfigurationProperties(prefix = "usb.datasource")
   public DataSource usbDataSource() {
     return DataSourceBuilder.create().build();
   }
  1. 创建接口的的java config,类的命名规则为接口名+ProxyConfig

    DBExampleProxyConfig.java

    	@Configuration
    	@AutoConfigureAfter(value = {DataSourceConfig.class})
    	//创建接口配置条件,当配置文件中存在前缀为dbExample,且有属性code时才会初始化bean
    	@ConditionalOnProperty(prefix = "dbExample", name = "code")
    	public class DBExampleProxyConfig {
    	 @Bean(name = "dbExample")
    	 @ConfigurationProperties(prefix = "dbExample")
    	 public ProxyProperties proxyConfig() {
    	   return new ProxyProperties();
    	 }
    	
    	 @Bean(name = "dbExampleTableConfig")
    	 @ConfigurationProperties(prefix = "dbExample.db")
    	 public TableProxyProperties tableProxyConfig() {
    	   return new TableProxyProperties();
    	 }
    	
    	 @Bean(name = "dbExampleDBMessageSender")
    	 public DBMessageSender dbMessageSender(@Qualifier("usbDataSource") DataSource dataSource) {
    	   return new DBMessageSender(dataSource);
    	 }
     
  2. 创建接口的代理发送实现类,需要继承DefaultProxySender.java

    @Component
    //当容器中存在接口配置的时候才会初始化接口发送类
    @ConditionalOnBean(DBExampleProxyConfig.class)
    public class DBExampleProxySender extends DefaultProxySender {
    
      private static final Logger log = LoggerFactory.getLogger(DBExampleProxySender.class);

    @Autowired @Qualifier("dbExample") ProxyProperties proxyConfig;

    @Autowired @Qualifier("dbExampleDBMessageSender") DBMessageSender dbMessageSender;

    @Autowired @Qualifier("dbExampleTableConfig") TableProxyProperties tableProxyConfig;

    //协议转换 @Override public TranslateResult translate(String archiveMsg) { log.debug("translate the message");

    return null;

    } //发送消息 @Override public Response doSend(TranslateResult message) { return dbMessageSender.sendByDB(message.getTableResults()); } //响应处理 @Override public void handleResponse(Response response) { log.debug("handle response"); }

    }

    
    

5.4.2 http接口

  1. proxy.properties增加接口的配置

    httpExample.code=1025214000000047
    httpExample.name=httpExample
    httpExample.kindId=sar_biz
    httpExample.level=1
    httpExample.sort=true
    httpExample.sender=studio.raptor.hutout.example.sender.HttpExampleProxySender
    
    //http接口地址
    httpExample.http.url=http://127.0.0.1:8080
    httpExample.http.headers[0]=Connection:Keep-Alive
    httpExample.http.headers[1]=Accept-Encoding:gzip,deflate
    httpExample.http.headers[2]=Content-Type:text/xml;charset=UTF-8
     ​```
    
  2. 创建接口的的java config,类的命名规则为接口名+ProxyConfig

    HttpExampleProxyConfig.java

    	@Configuration
    	@AutoConfigureAfter(value = {HttpConnectionConfig.class})
    	@ConditionalOnProperty(prefix = "httpExample", name = "code")
    	public class HttpExampleProxyConfig {
    	@Bean(name = "httpExample")
    	 @ConfigurationProperties(prefix = "httpExample")
    	 public ProxyProperties proxyConfig() {
    	   return new ProxyProperties();
    	 }
    	
    	 @Bean(name = "httpExampleConfig")
    	 @ConfigurationProperties(prefix = "httpExample.http")
    	 public HttpProxyProperties httpProxyConfig() {
    	   return new HttpProxyProperties();
    	 }
    	
    	 @Bean(name = "httpExampleMessageSender")
    	 public HttpMessageSender httpMessageSender(HttpClient httpClient,
    	     HttpProxyProperties httpProxyProperties) {
    	   return new HttpMessageSender(httpClient, httpProxyProperties);
    	 }
    	}
  3. 创建接口的代理发送实现类

 @Component
 @ConditionalOnBean(HttpExampleProxyConfig.class)
 public class HttpExampleProxySender extends DefaultProxySender {
   
   //注入当前接口的配置
   @Autowired
   @Qualifier("httpExample")
   ProxyProperties proxyConfig;

   //注入http发送代理
   @Autowired
   @Qualifier("httpExampleMessageSender")
   HttpMessageSender httpMessageSender;

   @Override
   public TranslateResult translate(String archiveMsg) {
     return null;
   }

   @Override
   public Response doSend(TranslateResult message) {
     return null;
   }

   @Override
   public void handleResponse(Response response) {

   }
 }

5.4.3 httpinvoker接口

  1. proxy.properties增加接口的配置

    httpinvokerExample.code=1025214000000048
    httpinvokerExample.name=httpinvokerExample
    httpinvokerExample.kindId=8001
    httpinvokerExample.level=1
    httpinvokerExample.sort=true
    httpinvokerExample.sender=studio.raptor.hutout.example.sender.HttpInvokerExampleSender
    httpinvokerExample.httpinvoker.serviceUrl=http://localhost:9090/booking
    httpinvokerExample.httpinvoker.serviceInterface=studio.raptor.httpinvoker.service.CabBookingService
  2. 创建接口的的java config,类的命名规则为接口名+ProxyConfig

    HttpInvokerExampleProxyConfig.java

    @Configuration
    @ConditionalOnProperty(prefix = "httpinvokerExample", name = "code")
    public class HttpInvokerExampleProxyConfig {
    
      @Bean(name = "httpinvokerExample")
      @ConfigurationProperties(prefix = "httpinvokerExample")
      public ProxyProperties proxyConfig() {
        return new ProxyProperties();
      }
     
      @Bean("httpinvokerExampleProxyFactoryBean")
      public HttpInvokerProxyFactoryBean httpInvokerProxyFactoryBean() throws ClassNotFoundException {
        HttpInvokerProxyFactoryBean invoker = new HttpInvokerProxyFactoryBean();
        invoker.setServiceUrl(httpInvokerProxyProperties().getServiceUrl());
        Class serviceInterface = Class.forName(httpInvokerProxyProperties().getServiceInterface());
        invoker.setServiceInterface(serviceInterface);
        return invoker;
      }
     
      @Bean("testHttpInvokerProxyProperties")
      @ConfigurationProperties(prefix = "httpinvokerExample.httpinvoker")
      public HttpInvokerProxyProperties httpInvokerProxyProperties() {
        return new HttpInvokerProxyProperties();
      }
    
    }
    1. 创建接口的代理发送实现类
    @Component
    @ConditionalOnBean(HttpInvokerExampleProxyConfig.class)
    public class HttpInvokerExampleSender extends DefaultProxySender {
    
      @Autowired
      CabBookingService cabBookingService;
    
      @Override
      public TranslateResult translate(String archiveMsg) {
        return null;
      }
    
      @Override
      public Response doSend(TranslateResult message) {
    
        Booking  booking = cabBookingService.bookRide("HttpInvoker");
    
        return null;
      }
    
      @Override
      public void handleResponse(Response response) {
    
      }
    }

5.4.4 webservice接口

  1. proxy.properties增加接口的配置
  webSeviceExample.code=1025214000000049
  webSeviceExample.name=webSeviceExample
  webSeviceExample.kindId=8001
  webSeviceExample.level=1
  webSeviceExample.sender=studio.raptor.hutout.example.sender.WebServiceExampleSender
  webSeviceExample.webservice.url=http://133.37.117.160:8000/VsopWebService/WorkSheetAcceptSV?wsdl
  webSeviceExample.webservice.soapHeader=<SOAP-ENV:Envelope xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/" xmlns:SOAP-ENC="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema"><SOAP-ENV:Body><m:WorkListFKToVSOPReq xmlns:m="http://www.mbossvsop.com.cn/vsop"><request><![CDATA[
  webSeviceExample.webservice.soapTail=]]></request></m:WorkListFKToVSOPReq></SOAP-ENV:Body></SOAP-ENV:Envelope>
  1. 创建接口的的java config,类的命名规则为接口名+ProxyConfig

WebServiceExampleProxyConfig.java

@Configuration
@AutoConfigureAfter(value = {HttpConnectionConfig.class})
@ConditionalOnProperty(prefix = "webSeviceExample", name = "code")
public class WebServiceExampleProxyConfig {

  @Bean(name = "webSeviceExample")
  @ConfigurationProperties(prefix = "webSeviceExample")
  public ProxyProperties proxyConfig() {
    return new ProxyProperties();
  }

  @Bean(name = "webServiceExampleConfig")
  @ConfigurationProperties(prefix = "webSeviceExample.webservice")
  public WebServiceProxyProperties webServiceProxyProperties() {
    return new WebServiceProxyProperties();
  }

  @Bean(name = "webServiceExampleMessageSender")
  public WebServiceMessageSender webserviceMessageSender(HttpClient httpClient,
      WebServiceProxyProperties webServiceProxyProperties) {
    return new WebServiceMessageSender(httpClient, webServiceProxyProperties);
  }

}
  1. 创建接口的代理发送实现类

    WebServiceExampleSender.java

    @Component
    @ConditionalOnBean(WebServiceExampleProxyConfig.class)
    public class WebServiceExampleSender extends DefaultProxySender {
    
      @Autowired
      @Qualifier("webSeviceExample")
      ProxyProperties proxyConfig;
    
      @Autowired
      @Qualifier("webServiceExampleMessageSender")
      WebServiceMessageSender webServiceExampleMessageSender;
    
      @Override
      public TranslateResult translate(String archiveMsg) {
        return null;
      }
    
      @Override
      public Response doSend(TranslateResult message) {
        return null;
      }
    
      @Override
      public void handleResponse(Response response) {
    
      }
    }

5.4.5 socket接口

  1. proxy.properties增加接口的配置

    socketExample.code=1025214000000050
    socketExample.name=socketExample
    socketExample.kindId=8001
    socketExample.level=1
    socketExample.sender=studio.raptor.hutout.example.sender.SocketExampleSender
    socketExample.socket.url=127.0.0.1:8080
  2. 创建接口的的java config,类的命名规则为接口名+ProxyConfig

    SocketExampleProxyConfig.java

    @Configuration
    @ConditionalOnProperty(prefix = "socketExample", name = "code")
    public class SocketExampleProxyConfig {
    
      @Bean(name = "socketExample")
      @ConfigurationProperties(prefix = "socketExample")
      public ProxyProperties proxyConfig() {
        return new ProxyProperties();
      }
    
      @Bean("socketExampleProxyProperties")
      @ConfigurationProperties(prefix = "socketExample.socket")
      public SocketProxyProperties socketProxyProperties() {
        return new SocketProxyProperties();
      }
    
      @Bean(name="socketExampleMessageSender")
      public SocketMessageSender socketMessageSender(SocketProxyProperties socketProxyProperties) {
        return new SocketMessageSender(socketProxyProperties);
      }
    
    }
  3. 创建接口的代理发送实现类

    SocketExampleSender.java

    @Component
    @ConditionalOnBean(SocketExampleProxyConfig.class)
    public class SocketExampleSender extends DefaultProxySender {
    
      @Autowired
      @Qualifier("socketExample")
      ProxyProperties proxyConfig;
    
      @Autowired
      @Qualifier("socketExampleMessageSender")
      SocketMessageSender socketMessageSender;
    
      @Override
      public TranslateResult translate(String archiveMsg) {
        return null;
      }
    
      @Override
      public Response doSend(TranslateResult message) {
        return null;
      }
    
      @Override
      public void handleResponse(Response response) {
    
      }
    }

5.4.6 MQ接口

  1. proxy.properties增加接口的配置

    mqExample.code=1025214000000050
    mqExample.name=socketExample
    mqExample.kindId=8001
    mqExample.level=1
    mqExample.sender=studio.raptor.hutout.example.sender.MQExampleSender
    mqExample.mq.url=127.0.0.1:8080
    mqExample.mq.type=kakfa
    mqExample.mq.topic=test
  2. 创建接口的的java config,类的命名规则为接口名+ProxyConfig

    MQExampleProxyConfig.java

    @Configuration
    @ConditionalOnProperty(prefix = "mqExample", name = "dynamic")
    @AutoConfigureAfter(value = {MQConnectionConfig.class})
    public class MQExampleProxyConfig {
    
      @Bean(name = "mqExample")
      @ConfigurationProperties(prefix = "mqExample")
      public ProxyProperties proxyConfig() {
        return new ProxyProperties();
      }
    
      @Bean(name = "mqExampleProxyProperties")
      @ConfigurationProperties(prefix = "mqExample.mq")
      public MQProxyProperties socketProxyProperties() {
        return new MQProxyProperties();
      }
    
      @Bean(name = "mqExampleMessageSender")
      public MQMessageSender mqMessageSender(MQProxyProperties mqProxyProperties) {
        return new MQMessageSender(mqProxyProperties);
      }
    
    }
    ​``` ​
  3. 创建接口的代理发送实现类

    MQExampleSender.java

    @Component
    @ConditionalOnBean(MQExampleProxyConfig.class)
    public class MQExampleSender extends DefaultProxySender {
    
      @Autowired
      @Qualifier("mqExample")
      ProxyProperties proxyConfig;
    
      @Autowired
      @Qualifier("mqExampleMessageSender")
      MQMessageSender mqMessageSender;
    
      @Override
      public TranslateResult translate(String archiveMsg) {
        return null;
      }
    
      @Override
      public Response doSend(TranslateResult message) {
        return null;
      }
    
      @Override
      public void handleResponse(Response response) {
      }
    }

##附录 ###1.接口配置(proxy.properties)

接口配置文件,包括接口属性,发送信息等信息。属性名以xxxx.key形式定义,xxxx表示接口名称,同一个接口的属性前缀必须一致。

属性名 属性名 示例 是否必填
xxxx.code 接口编码 10000
xxxx.name 接口名称 billing
xxxx.kindId 接口消费的事件种类 Extractor_NRM
xxxx.level 接口消息处理界别,用数字定义,越大,需要处理的数据量越大
xxxx.sender 接口的代理发送实现类 studio.raptor.hubout.sc.sender.ExampleProxySender
xxxx.sort 接口消息是否需要排序 true
xxxx.name 接口名称,全局唯一 billing
xxxx.kindId 接口消费的事件种类
xxxx.level 接口消息处理界别,用数字定义,越大,需要处理的数据量越大 1
xxxx.sender 接口的代理发送实现类 studio.raptor.hubout.sc.sender.ExampleProxySender
xxxx.db.dataSourceName 表接口数据源 usbDataSource
xxxx.db.sqls[i] 表接口写入语句,多个语句用数组表示 example.db.sqls[0]=INSERT INTO student(id,name,age) VALUES(?,?,?) example.db.sqls[1]=INSERT INTO teacher(id,name,address,age) VALUES(?,?,?,?)
xxxx.http.url http接口地址
xxxx.http.headers[i] http请求头信息 xxxx.http.headers[0]=Content-Type:text/xml;charset=UTF-8 xxxx.http.headers[1]=Connection:Keep-Alive key和value使用冒号分隔,可不填,默认值如上
xxxx.httpinvoker.serviceUrl httpinvoker接口地址
xxxx.httpinvoker.serviceInterface httpinvoker服务接口类
xxxx.webservice.url webservice接口地址
xxxx.socket.url socket接口地址 127.0.0.1:8080
xxxx.mq.url mq接口
xxxx.mq.topic 消息主题
xxxx.mq.type mq类型 kafaka,rocketmq,ibmmq

###2.接口连接配置(connection.properties) 接口连接配置文件,包括数据源连接、http连接属性、webservice连接属性信息。数据源连接对应具体的一个数据库,以xxx.datasource为前缀,xxx表示数据源名称。http和webservice的连接池信息不与具体某个连接关联,因此属于全局的配置,用http和webservice作为前缀

属性名 属性名 示例 是否必填
xxx.datasource.jdbcUrl jdbc地址
xxx.datasource.username 数据库用户名
xxx.datasource.password 数据库密码
xxx.datasource.maximumPoolSize 数据库连接池最大连接数
xxx.datasource.minimumIdle 数据库连接池最小连接数
xxx.datasource.autoCommit 是否自动提交
xxx.datasource.poolName 连接池名称
http.maxTotal http连接池最大连接数 100
http.defaultMaxPerRoute 最大路由数 10
http.socketTimeout 3000
http.connectTimeout 50000
http.connectionRequestTimeout 50000

###3.配置中心APPID规划

业务中心中文名称 业务中心应用名称 业务中心应用英文名 appid
分发 普通业务分发 Dispatcher_NRM 7001
分发 停复机业务分发 Dispatcher_SAR 7002
抽取 普通业务抽取 Extractor_NRM 8001
抽取 停复机业务抽取 Extractor_SAR 8002
The MIT License (MIT) Copyright (c) 2017 bruce Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

简介

数据总线(Data Bus)是以消息队列(MQ)为技术核心的,用于系统间数据的高效、可靠、异步交互的中间件,是分布式系统不可或缺的基础设施之一。其由数据抽取器(Data extractor)、消息队列(Message Queue)、数据分发器(Data dispatcher)、分发调度器(Dispatch scheduler)、服务集线器(Service hub)和 过程跟踪管理工具(Process tracking manager)6个组件构成。 展开 收起
Java
MIT
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
Java
1
https://gitee.com/hbwhypw/raptor-databus.git
git@gitee.com:hbwhypw/raptor-databus.git
hbwhypw
raptor-databus
raptor-databus
master

搜索帮助