同步操作将从 lance/raptor-databus 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
##1.1 江苏测试环境 ###1.1.1 主机信息
主机地址 | 用户名 | 密码 | 备注 |
---|---|---|---|
132.252.4.56 | databus | databus123 | 抽取和分发用户 |
132.252.4.56 | amq | amq1234 | 消息队列用户 |
###1.1.2 数据库信息 |
主机地址 | 端口 | 用户名 | 密码 | SID | 备注 |
---|---|---|---|---|---|
132.252.5.64 | 1621 | databus_mq1 | S17F*RvT | CRMTEST02 | 存放普通业务 |
132.252.5.64 | 1621 | databus_mq2 | K3Co!RCf | CRMTEST02 | 存放停复机业务 |
132.252.5.64 | 1621 | databus_mq3 | QO6ad!!L | CRMTEST02 | 批量业务 |
132.252.5.64 | 1621 | databus_mq4 | BGu4Nh#k | CRMTEST02 | 业务中心之间事件 |
132.252.5.64 | 1621 | databus | pYA#Zi8A | CRMTEST02 | 抽取和分发用户 |
###1.1.3 应用部署说明 | |||||
应用名称 | 部署路径 | 主机地址 | 服务端口 | 控制台端口 | 应用说明 |
----------------- | ------------------------------- | ------------ | ----- | ----- | -------------- |
nm-extractor | /home/databus/nm-extractor | 132.252.4.56 | 9001 | 普通业务抽取器 | |
dispatcher-server | /home/databus/dispatcher-server | 132.252.4.56 | 9000 | 数据分发器 | |
amq-1 | /home/amq/amq-1 | 132.252.4.56 | 61616 | 8161 | 消息队列(存放普通业务) |
amq-2 | /home/amq/amq-2 | 132.252.4.56 | 61617 | 8162 | 消息队列(存放停复机业务) |
amq-3 | /home/amq/amq-3 | 132.252.4.56 | 61618 | 8163 | 消息队列(批量业务) |
amq-4 | /home/amq/amq-4 | 132.252.4.56 | 61619 | 8164 | 消息队列(业务中心之间事件) |
###1.1.4 其他组件说明
应用名称 | 部署路径 | 主机地址 | 服务端口 |
---|---|---|---|
zookeeper | /home/databus/zookeeper | 132.252.4.56 | 2181 |
redis | /home/databus/redis | 132.252.4.56 | 6379 |
##1.2 四川测试环境 ###1.2.1 主机信息
主机地址 | 用户名 | 密码 | 备注 |
---|---|---|---|
132.252.4.56 | databus | databus123 | 抽取和分发用户 |
132.252.4.56 | amq | amq1234 | 消息队列用户 |
###1.2.2 数据库信息 | |||
主机地址 | 端口 | 用户名 | 密码 |
------------- | ---- | ----------- | ---- |
133.37.135.40 | 1521 | databus_mq1 | bss3 |
133.37.135.40 | 1521 | databus_mq2 | bss3 |
133.37.135.40 | 1521 | databus_mq3 | bss3 |
133.37.135.40 | 1521 | databus_mq4 | bss3 |
133.37.135.40 | 1521 | databus | bss3 |
###1.2.3 应用部署说明 | |||
应用名称 | 部署路径 | 主机地址 | 服务端口 |
----------------- | ------------------------------- | ------------- | ----- |
nm-extractor | /home/databus/nm-extractor | 133.37.135.38 | 9001 |
dispatcher-server | /home/databus/dispatcher-server | 133.37.135.38 | 9000 |
amq-1 | /home/amq/amq-1 | 133.37.135.38 | 61616 |
amq-2 | /home/amq/amq-2 | 133.37.135.38 | 61617 |
amq-3 | /home/amq/amq-3 | 133.37.135.38 | 61618 |
amq-4 | /home/amq/amq-4 | 133.37.135.38 | 61619 |
###1.2.4 其他组件说明
应用名称 | 部署路径 | 主机地址 | 服务端口 |
---|---|---|---|
zookeeper | /home/databus/zookeeper | 133.37.135.38 | 2181 |
redis | /home/databus/redis | 133.37.135.38 | 6379 |
#二 抽取说明
https://git.oschina.net/f150/raptor-databus
- extractor-agent:抽取器代理,监控抽取器运行时状态
- extractor-common:公共模块,存放常量类,工具类
- extractor-segment:抽取器核心代码模块,负责普通业务、停复机业务的报文获取、拼装、发送等功能
- nm-extractor-server:普通业务模块,负责普通业务抽取器的启动和初始化配置
- sas-extractor-server:停服级业务模块,负责停复机业务抽取器的启动和初始化配置
抽取器启动时,从远程配置中心获取application.properties、extractor.properties两个配置文件。
配置名称 | 配置说明 |
---|---|
spring.application.name | 应用服务器名称 |
spring.redis.database | Redis数据库索引(默认为0) |
spring.redis.host | Redis服务器地址 |
spring.redis.port | Redis服务器连接端口 |
spring.redis.password | Redis服务器连接密码(默认为空) |
spring.redis.pool.max-active | 连接池最大连接数(使用负值表示没有限制) |
spring.redis.pool.max-wait | 连接池最大阻塞等待时间(使用负值表示没有限制) |
spring.redis.pool.max-idle | 连接池中的最大空闲连接 |
spring.redis.pool.min-idle | 连接池中的最小空闲连接 |
spring.redis.timeout | 连接超时时间(毫秒) |
参考配置样例:
spring.application.name = nm-extractor-server
spring.redis.database = 0
spring.redis.host = 127.0.01
spring.redis.port = 6379
spring.redis.password =
spring.redis.pool.max-active = 8
spring.redis.pool.max-wait = -1
spring.redis.pool.max-idle = 8
spring.redis.pool.min-idle = 0
spring.redis.timeout = 0
配置名称 | 配置说明 |
---|---|
extractor.datasource.jdbcUrl | 数据库连接地址 |
extractor.datasource.type | 连接池类 |
extractor.datasource.username | 数据库用户名 |
extractor.datasource.password | 数据库密码 |
extractor.datasource.driver-class-name | 驱动包 |
extractor.datasource.hikari.connection-timeout | 连接超时设置 |
extractor.datasource.hikari.auto-commit | 自动提交 |
extractor.datasource.hikari.maximum-pool-size | 最大连接数 |
配置参考样例:
extractor.datasource.jdbcUrl = jdbc:oracle:thin:@133.37.135.40:1521/crmso
extractor.datasource.type = com.zaxxer.hikari.HikariDataSource
extractor.datasource.username = databus
extractor.datasource.password = bss3
extractor.datasource.driver-class-name = oracle.jdbc.OracleDriver
extractor.datasource.hikari.connection-timeout = 60000
extractor.datasource.hikari.auto-commit = false
extractor.datasource.hikari.maximum-pool-size = 5
配置名称 | 配置说明 |
---|---|
extractor.http.maxTotal | 连接池连接数 |
extractor.http.defaultMaxPerRoute | 每个服务地址连接数 |
extractor.http.socketTimeout | socket超时时间 |
extractor.http.connectTimeout | 连接超时时间 |
extractor.http.connectionRequestTimeout | 请求超时时间 |
配置参考样例:
extractor.http.maxTotal = 10
extractor.http.defaultMaxPerRoute = 10
extractor.http.socketTimeout = 3000
extractor.http.connectTimeout = 5000
extractor.http.connectionRequestTimeout = 5000
配置名称 | 配置说明 |
---|---|
extractor.mq.eventKindId | databus队列,受理中心event事件队列kindId |
配置参考样例:
extractor.mq.eventKindId[0] = 1008
extractor.mq.eventKindId[1] = 1009
配置名称 | 配置说明 |
---|---|
extractor.zk.servers | zk服务地址 |
extractor.zk.namespace | zk节点名称 |
extractor.zk.digest | zk访问节点控制 |
配置参考样例:
extractor.zk.servers = 127.0.0.1:2181
extractor.zk.namespace = extractor
extractor.zk.digest =
配置名称 | 配置说明 |
---|---|
extractor.queue.eventQueueSize | 存放受理中心event事件队列大小 |
extractor.queue.eventDataQueueSize | 存放各节点报文队列大小 |
extractor.queue.buildJsonQueueSize | event元信息+全量报文 队列大小 |
配置参考样例:
extractor.queue.eventQueueSize = 100
extractor.queue.eventDataQueueSize = 100
extractor.queue.buildJsonQueueSize = 100
配置名称 | 配置说明 |
---|---|
extractor.thread.overTime | 获取某一事件的超时时间(单位:毫秒) |
配置参考样例:
extractor.thread.overTime = 100000
表名 | 用途 |
---|---|
EVENT_META | 事件元信息表,存储受理中心Event对象信息 |
EVENT_SOURCE_CONFIG | 业务事件表,配置各程序对应的业务事件 |
EVENT_NODE_SERVER | 节点报文服务表,配置各节点报文服务路径 |
EVENT_PARAMS | 参数表,存储参数信息,比如cust_id","acct_id","prod_id","offer_id","offer_prod_id" |
CREATE TABLE EVENT_META(
"META_ID" NUMBER(19,0) not null,
"EVENT_KIND_ID" NUMBER(5,0) not null,
"DELIVER_TIME" date not null,
"OBJECT_KEY" VARCHAR2(20) not null,
"EVENT_TYPE" NUMBER(1,0) not null,
"REGION_ID" NUMBER(10,0),
"BEGIN_TIME" TIMESTAMP default CURRENT_TIMESTAMP,
"FINISH_TIME" TIMESTAMP default CURRENT_TIMESTAMP,
"RETRY_TIMES" NUMBER(2,0) default 0,
"STATUS" VARCHAR2(4) not null,
"ORDER_TYPE_ID" VARCHAR2(1000),
"OFFER_IDS" VARCHAR2(1000),
"PROD_IDS" VARCHAR2(1000),
"SERVICE_OFFER_IDS" VARCHAR2(1000),
"OFFER_SPEC_ATTR" VARCHAR2(100),
"SERVICE_SPEC_ATTR" VARCHAR2(100),
"REMARK" VARCHAR2(1024)
);
comment on column "EVENT_META"."EVENT_KIND_ID" is '事件来源id';
comment on column "EVENT_META"."DELIVER_TIME" is '事件信息投递时间';
comment on column "EVENT_META"."OBJECT_KEY" is '归档组id or 订单id';
comment on column "EVENT_META"."EVENT_TYPE" is '事件类型,1:停复机,0:普通业务';
comment on column "EVENT_META"."ORDER_TYPE_ID" is '一个归档组竣工消息事件的业务动作集,多个业务动作,用英文逗号分隔';
comment on column "EVENT_META"."RETRY_TIMES" is '抽取报文次数';
comment on column "EVENT_META"."BEGIN_TIME" is '开始抽取报文时间';
comment on column "EVENT_META"."FINISH_TIME" is '抽取报文完成时间';
comment on column "EVENT_META"."STATUS" is '事件状态:W(初始状态) P(处理中) C(抽取成功) F(抽取失败,超时或者系统异常)';
comment on table "EVENT_META" is '事件元信息表';
CREATE TABLE EVENT_SOURCE_CONFIG
(
"EVENT_KIND_ID" NUMBER(10,0) not null,
"EVENT_NAME" VARCHAR2(100) not null,
"EVENT_SOURCE" VARCHAR2(50) not null,
"EVENT_SORT" NUMBER (1,0) not null,
"DATABUS_KIND_ID" NUMBER(10,0) not null
);
comment on column "EVENT_SOURCE_CONFIG"."EVENT_KIND_ID" is '事件序号';
comment on column "EVENT_SOURCE_CONFIG"."EVENT_NAME" is '业务事件名称';
comment on column "EVENT_SOURCE_CONFIG"."EVENT_SOURCE" is '业务中心名称';
comment on column "EVENT_SOURCE_CONFIG"."EVENT_SORT" is '事件是否需要按照获取顺寻进行发送,1:需要,0:不需要';
comment on column "EVENT_SOURCE_CONFIG"."DATABUS_KIND_ID" is 'databus内部队列kindid';
INSERT INTO EVENT_SOURCE_CONFIG (EVENT_KIND_ID, EVENT_NAME, EVENT_SOURCE, EVENT_SORT, DATABUS_KIND_ID) VALUES (1002,'so_order_committing','Extractor_NRM',0,8001);
INSERT INTO EVENT_SOURCE_CONFIG (EVENT_KIND_ID, EVENT_NAME, EVENT_SOURCE, EVENT_SORT, DATABUS_KIND_ID) VALUES (1008,'so_order_construct','Extractor_NRM',0,8002);
INSERT INTO EVENT_SOURCE_CONFIG (EVENT_KIND_ID, EVENT_NAME, EVENT_SOURCE, EVENT_SORT, DATABUS_KIND_ID) VALUES (1007,'so_order_orderitemfinish','Extractor_NRM',0,8003);
INSERT INTO EVENT_SOURCE_CONFIG (EVENT_KIND_ID, EVENT_NAME, EVENT_SOURCE, EVENT_SORT, DATABUS_KIND_ID) VALUES (1009,'so_order_archgrpfinish','Extractor_NRM',1,8004);
INSERT INTO EVENT_SOURCE_CONFIG (EVENT_KIND_ID, EVENT_NAME, EVENT_SOURCE, EVENT_SORT, DATABUS_KIND_ID) VALUES (1003,'so_order_complete','Extractor_NRM',0,8005);
INSERT INTO EVENT_SOURCE_CONFIG (EVENT_KIND_ID, EVENT_NAME, EVENT_SOURCE, EVENT_SORT, DATABUS_KIND_ID) VALUES (1010,'stopAndRestart','Extractor_NRM',0,8006);
CREATE TABLE EVENT_NODE_SERVER
(
"EVENT_KIND_ID" NUMBER(10,0) not null,
"NODE_NAME" VARCHAR2(100) not null,
"SERVER_URL" VARCHAR2(1000) not null
);
comment on column "EVENT_NODE_SERVER"."EVENT_KIND_ID" is '事件序号';
comment on column "EVENT_NODE_SERVER"."NODE_NAME" is '节点名称';
comment on column "EVENT_NODE_SERVER"."SERVER_URL" is '服务url';
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1008,'customerOrder', 'http://133.37.135.36:9003/so-service/service/so_order_queryCustomerOrderDetailsByCodId');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1008,'customerDetails', 'http://133.37.135.36:9002/cust-service/service/cust_cust_qryCustomerDetail');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1008,'accountDetails', 'http://133.37.135.36:9001/acct-service/service/cust_acct_qryAccountListForMulti');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1008,'offerProdInstDetails', 'http://133.37.135.36:9010/inst-service/service/cust_inst_qryAccProdInstDetail');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1008,'offerInstDetails', 'http://133.37.135.36:9010/inst-service/service/cust_inst_qryOfferInstDetail');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1008,'eventMsg', '');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1009,'customerOrders', 'http://133.37.135.36:9003/so-service/service/so_order_queryCustomerOrdersByOrderItemIds');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1009,'customerDetails', 'http://133.37.135.36:9002/cust-service/service/cust_cust_qryCustomerDetail');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1009,'accountDetails', 'http://133.37.135.36:9001/acct-service/service/cust_acct_qryAccountListForMulti');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1009,'offerProdInstDetails', 'http://133.37.135.36:9010/inst-service/service/cust_inst_qryAccProdInstDetail');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1009,'offerInstDetails', 'http://133.37.135.36:9010/inst-service/service/cust_inst_qryOfferInstDetail');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1009,'eventMsg', '');
CREATE TABLE EVENT_PARAMS
(
"META_ID" NUMBER(19) not null,
"PARAM_NAME" VARCHAR2(16) not null,
"PARAM_VALUE" VARCHAR2(32) not null
);
comment on table "EVENT_PARAMS" is '事件参数表';
comment on column "EVENT_PARAMS"."META_ID" is '事件元信息表主键';
comment on column "EVENT_PARAMS"."PARAM_NAME" is '参数名称';
comment on column "EVENT_PARAMS"."PARAM_VALUE" is '参数值';
应用名称 | 应用用途 zookpeer | 生成全局序列 redis | 缓存受理中心event报文 cmdb | 配置中心,存放抽取程序配置文件:application.properties、extractor.properties
修改文件(nm-extractor-server\src\main\resources\cmdb-env.properties)如下:
env=dev
dev.meta=http://133.37.135.38:7001
配置名称 | 配置说明 |
---|---|
env | 环境用户 |
dev.meta | 服务地址 |
修改文件如下: |
app.id=8001
找到抽取器代码根目录,例如:...\raptor-databus\data-extractor,执行如下命令:
mvn clean install -Dmaven.test.skip=true
就会生成对应war包,nm-extractor war包如下:...\raptor-databus\data-extractor\nm-extractor-server\target,名称为:nm-extractor-server-1.0-SNAPSHOT.war
/home/databus/nm-extractor/webapp 下面
INSERT INTO EVENT_SOURCE_CONFIG (EVENT_KIND_ID, EVENT_NAME, EVENT_SOURCE, EVENT_SORT, DATABUS_KIND_ID) VALUES (1002,'so_order_committing','Extractor_NRM',0,8001);
INSERT INTO EVENT_SOURCE_CONFIG (EVENT_KIND_ID, EVENT_NAME, EVENT_SOURCE, EVENT_SORT, DATABUS_KIND_ID) VALUES (1008,'so_order_construct','Extractor_NRM',0,8002);
INSERT INTO EVENT_SOURCE_CONFIG (EVENT_KIND_ID, EVENT_NAME, EVENT_SOURCE, EVENT_SORT, DATABUS_KIND_ID) VALUES (1007,'so_order_orderitemfinish','Extractor_NRM',0,8003);
INSERT INTO EVENT_SOURCE_CONFIG (EVENT_KIND_ID, EVENT_NAME, EVENT_SOURCE, EVENT_SORT, DATABUS_KIND_ID) VALUES (1009,'so_order_archgrpfinish','Extractor_NRM',1,8004);
INSERT INTO EVENT_SOURCE_CONFIG (EVENT_KIND_ID, EVENT_NAME, EVENT_SOURCE, EVENT_SORT, DATABUS_KIND_ID) VALUES (1003,'so_order_complete','Extractor_NRM',0,8005);
INSERT INTO EVENT_SOURCE_CONFIG (EVENT_KIND_ID, EVENT_NAME, EVENT_SOURCE, EVENT_SORT, DATABUS_KIND_ID) VALUES (1010,'stopAndRestart','Extractor_NRM',0,8006);
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1008,'customerOrder', 'http://133.37.135.36:9003/so-service/service/so_order_queryCustomerOrderDetailsByCodId');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1008,'customerDetails', 'http://133.37.135.36:9002/cust-service/service/cust_cust_qryCustomerDetail');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1008,'accountDetails', 'http://133.37.135.36:9001/acct-service/service/cust_acct_qryAccountListForMulti');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1008,'offerProdInstDetails', 'http://133.37.135.36:9010/inst-service/service/cust_inst_qryAccProdInstDetail');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1008,'offerInstDetails', 'http://133.37.135.36:9010/inst-service/service/cust_inst_qryOfferInstDetail');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1008,'eventMsg', '');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1009,'customerOrders', 'http://133.37.135.36:9003/so-service/service/so_order_queryCustomerOrdersByOrderItemIds');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1009,'customerDetails', 'http://133.37.135.36:9002/cust-service/service/cust_cust_qryCustomerDetail');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1009,'accountDetails', 'http://133.37.135.36:9001/acct-service/service/cust_acct_qryAccountListForMulti');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1009,'offerProdInstDetails', 'http://133.37.135.36:9010/inst-service/service/cust_inst_qryAccProdInstDetail');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1009,'offerInstDetails', 'http://133.37.135.36:9010/inst-service/service/cust_inst_qryOfferInstDetail');
INSERT INTO EVENT_NODE_SERVER (EVENT_KIND_ID, NODE_NAME, SERVER_URL) VALUES (1009,'eventMsg', '');
到服务器路径:/home/databus/nm-extractor/bin,执行命令:
sh startup.sh
启动程序。对应的程序启动日志路径: /home/databus/nm-extractor/bin/catalina.log
到服务器路径:/home/databus/nm-extractor/bin,执行命令:
sh shutdown.sh
停止程序。对应的程序停止日志路径: /home/databus/nm-extractor/bin/catalina.log
#三 分发说明(data-dispatcher)
https://git.oschina.net/f150/raptor-databus
- dispatcher-agent:分发器代理,监控分发器运行时状态
- dispatcher-common:公共模块
- dispatcher-scheduler:分发调度器模块,负责调度分发器运行哪些分发任务
- dispatcher-server:分发器启动模块,负责配置加载、消息的获取、幂等性校验、排序以及发送等操作
1. 文件目录:data-diapatcher/dispatcher-server/src/main/resources
2. 文件说明
- app.properties:配置应用的appid
app.id=7001
- cmdb-env.properties:配置应用中心地址
env=dev #配置环境类型
dev.meta =http://133.37.135.38:7001 #配置中心地址
分发器运行时配置从远程配置中心获取,文件名为application.properties
属性名 | 备注 | 示例 |
---|---|---|
spring.application.name | 应用名称 | dispatcher-server |
spring.profiles.active | 分发运行时环境配置,江苏普通业务:jiangsu,ord_biz;江苏停复机:jiangsu,sar_biz,四川普通业务:sichuan,ord_biz,四川停复机:sichuan,sar_biz | sichuang,ord_biz |
server.port | 服务端口 | 9000 |
dispatcher.datasource.jdbcUrl | jdbcurl | jdbc:oracle:thin:@192.168.199.24:1521:orcla |
dispatcher.datasource.type | 数据源类型 | com.zaxxer.hikari.HikariDataSource |
dispatcher.datasource.driver-class-name | 驱动类型 | oracle.jdbc.OracleDriver |
dispatcher.datasource.username | 数据库用户名 | crm |
dispatcher.datasource.password | 数据库密码 | crm123 |
dispatcher.datasource.hikari.connection-timeout | 数据源连接最大超时 | 60000 |
dispatcher.datasource.hikari.auto-commit | 自动提交 | false |
dispatcher.datasource.hikari.maximum-pool-size | 数据源连接池最大数 | 50 |
spring.redis.host | redis主机地址 | 127.0.0.1 |
spring.redis.port | redis端口 | 6379 |
spring.redis.password | redis密码 | |
spring.redis.pool.max-active | redis连接池最大连接 | 100 |
spring.redis.pool.max-wait | redis连接池最大等待数量 | 1 |
spring.redis.pool.max-idle | redis连接池最大空闲数 | 100 |
spring.redis.pool.min-idle | redis连接池最小空闲数 | 1 |
spring.redis.timeout | redis超时时间 | |
zk.servers | 127.0.0.1:2181 | zookeeper地址 |
zk.namespace | dispatcher | 分发器zk命名空间 |
zk.digest | zk节点权限 | |
logging.config | 日志配置文件 | classpath:logback.xml |
mq.bizKey | 消息队列中应用名称 | Dispatcher_NRM |
mq.receiveTimeout | mq接受消息最大超时时间 | 10000 |
thread.pool.coreSize | 线程池核心线程数量 | 100 |
thread.pool.capacity | 线程池队列最大数量 | 100000 |
thread.pool.maxSize | 线程池最大数量 | 200 |
根据部署的环境和分发的消息不同,修改分发工程的appid和配置中心地址
打包脚本目录:data-dispatcher/scripts 执行脚本:sh build.sh
建表脚本目录:data-dispatcher/scripts/sql
事件分发跟踪表: EVENT_TRACE
订阅关系配置表:SUB_BASE_ITEM、SUB_CHANNEL、SUB_CONFIG、SUB_RELATION
表配置说明及表字段说明见建表脚本
#四 配置中心使用说明
根据应用的appid和应用名称,找到对应的配置
以分发普通业务为例,appid:7001 应用名:Dispatcher_NRM
找对应的配置文件之后,在此文件中点击新增配置,添加配置项
配置提交后,需要发布才能生效
#五 接口分发开发说明
https://git.oschina.net/f150/raptor-databus
service-hub服务集线器,集成了所有服务的调用(包括crm内部以及外域系统)
在proxy.properties增加接口的配置
dbExample.code=1025214000000046
dbExample.name=dbExample
dbExample.kindId=8001
dbExample.level=1
dbExample.sort=true
dbExample.sender=studio.raptor.hutout.example.sender.DBExampleProxySender
dbExample.db.dataSourceName=usbDataSource
dbExample.db.sqls[0]=INSERT INTO teacher(sno,sname,sex,dept,birth,age) VALUES(?,?,?,?,?,?)
dbExample.db.sqls[1]=INSERT INTO student(sno,sname,sex,dept,birth,age) VALUES(?,?,?,?,?,?)
在connection.properties增加数据源配置
usb.datasource.jdbcUrl=jdbc:mysql://127.0.0.1:3306/ds_0
usb.datasource.username=root
usb.datasource.password=root
usb.datasource.maximumPoolSize=10
usb.datasource.minimumIdle=1
usb.datasource.autoCommit=false
usb.datasource.poolName=usbDataSourcePool
在DataSourceConfig.java文件中增加数据源的实例bean
//name必须与proxy文件中example.db.dataSourceName保持一致
@Bean(name = "usbDataSource")
//数据源属性前缀
@ConfigurationProperties(prefix = "usb.datasource")
public DataSource usbDataSource() {
return DataSourceBuilder.create().build();
}
创建接口的的java config,类的命名规则为接口名+ProxyConfig
DBExampleProxyConfig.java
@Configuration
@AutoConfigureAfter(value = {DataSourceConfig.class})
//创建接口配置条件,当配置文件中存在前缀为dbExample,且有属性code时才会初始化bean
@ConditionalOnProperty(prefix = "dbExample", name = "code")
public class DBExampleProxyConfig {
@Bean(name = "dbExample")
@ConfigurationProperties(prefix = "dbExample")
public ProxyProperties proxyConfig() {
return new ProxyProperties();
}
@Bean(name = "dbExampleTableConfig")
@ConfigurationProperties(prefix = "dbExample.db")
public TableProxyProperties tableProxyConfig() {
return new TableProxyProperties();
}
@Bean(name = "dbExampleDBMessageSender")
public DBMessageSender dbMessageSender(@Qualifier("usbDataSource") DataSource dataSource) {
return new DBMessageSender(dataSource);
}
创建接口的代理发送实现类,需要继承DefaultProxySender.java
@Component
//当容器中存在接口配置的时候才会初始化接口发送类
@ConditionalOnBean(DBExampleProxyConfig.class)
public class DBExampleProxySender extends DefaultProxySender {
private static final Logger log = LoggerFactory.getLogger(DBExampleProxySender.class);
@Autowired @Qualifier("dbExample") ProxyProperties proxyConfig;
@Autowired @Qualifier("dbExampleDBMessageSender") DBMessageSender dbMessageSender;
@Autowired @Qualifier("dbExampleTableConfig") TableProxyProperties tableProxyConfig;
//协议转换 @Override public TranslateResult translate(String archiveMsg) { log.debug("translate the message");
return null;
} //发送消息 @Override public Response doSend(TranslateResult message) { return dbMessageSender.sendByDB(message.getTableResults()); } //响应处理 @Override public void handleResponse(Response response) { log.debug("handle response"); }
}
在proxy.properties增加接口的配置
httpExample.code=1025214000000047
httpExample.name=httpExample
httpExample.kindId=sar_biz
httpExample.level=1
httpExample.sort=true
httpExample.sender=studio.raptor.hutout.example.sender.HttpExampleProxySender
//http接口地址
httpExample.http.url=http://127.0.0.1:8080
httpExample.http.headers[0]=Connection:Keep-Alive
httpExample.http.headers[1]=Accept-Encoding:gzip,deflate
httpExample.http.headers[2]=Content-Type:text/xml;charset=UTF-8
```
创建接口的的java config,类的命名规则为接口名+ProxyConfig
HttpExampleProxyConfig.java
@Configuration
@AutoConfigureAfter(value = {HttpConnectionConfig.class})
@ConditionalOnProperty(prefix = "httpExample", name = "code")
public class HttpExampleProxyConfig {
@Bean(name = "httpExample")
@ConfigurationProperties(prefix = "httpExample")
public ProxyProperties proxyConfig() {
return new ProxyProperties();
}
@Bean(name = "httpExampleConfig")
@ConfigurationProperties(prefix = "httpExample.http")
public HttpProxyProperties httpProxyConfig() {
return new HttpProxyProperties();
}
@Bean(name = "httpExampleMessageSender")
public HttpMessageSender httpMessageSender(HttpClient httpClient,
HttpProxyProperties httpProxyProperties) {
return new HttpMessageSender(httpClient, httpProxyProperties);
}
}
创建接口的代理发送实现类
@Component
@ConditionalOnBean(HttpExampleProxyConfig.class)
public class HttpExampleProxySender extends DefaultProxySender {
//注入当前接口的配置
@Autowired
@Qualifier("httpExample")
ProxyProperties proxyConfig;
//注入http发送代理
@Autowired
@Qualifier("httpExampleMessageSender")
HttpMessageSender httpMessageSender;
@Override
public TranslateResult translate(String archiveMsg) {
return null;
}
@Override
public Response doSend(TranslateResult message) {
return null;
}
@Override
public void handleResponse(Response response) {
}
}
在proxy.properties增加接口的配置
httpinvokerExample.code=1025214000000048
httpinvokerExample.name=httpinvokerExample
httpinvokerExample.kindId=8001
httpinvokerExample.level=1
httpinvokerExample.sort=true
httpinvokerExample.sender=studio.raptor.hutout.example.sender.HttpInvokerExampleSender
httpinvokerExample.httpinvoker.serviceUrl=http://localhost:9090/booking
httpinvokerExample.httpinvoker.serviceInterface=studio.raptor.httpinvoker.service.CabBookingService
创建接口的的java config,类的命名规则为接口名+ProxyConfig
HttpInvokerExampleProxyConfig.java
@Configuration
@ConditionalOnProperty(prefix = "httpinvokerExample", name = "code")
public class HttpInvokerExampleProxyConfig {
@Bean(name = "httpinvokerExample")
@ConfigurationProperties(prefix = "httpinvokerExample")
public ProxyProperties proxyConfig() {
return new ProxyProperties();
}
@Bean("httpinvokerExampleProxyFactoryBean")
public HttpInvokerProxyFactoryBean httpInvokerProxyFactoryBean() throws ClassNotFoundException {
HttpInvokerProxyFactoryBean invoker = new HttpInvokerProxyFactoryBean();
invoker.setServiceUrl(httpInvokerProxyProperties().getServiceUrl());
Class serviceInterface = Class.forName(httpInvokerProxyProperties().getServiceInterface());
invoker.setServiceInterface(serviceInterface);
return invoker;
}
@Bean("testHttpInvokerProxyProperties")
@ConfigurationProperties(prefix = "httpinvokerExample.httpinvoker")
public HttpInvokerProxyProperties httpInvokerProxyProperties() {
return new HttpInvokerProxyProperties();
}
}
@Component
@ConditionalOnBean(HttpInvokerExampleProxyConfig.class)
public class HttpInvokerExampleSender extends DefaultProxySender {
@Autowired
CabBookingService cabBookingService;
@Override
public TranslateResult translate(String archiveMsg) {
return null;
}
@Override
public Response doSend(TranslateResult message) {
Booking booking = cabBookingService.bookRide("HttpInvoker");
return null;
}
@Override
public void handleResponse(Response response) {
}
}
webSeviceExample.code=1025214000000049
webSeviceExample.name=webSeviceExample
webSeviceExample.kindId=8001
webSeviceExample.level=1
webSeviceExample.sender=studio.raptor.hutout.example.sender.WebServiceExampleSender
webSeviceExample.webservice.url=http://133.37.117.160:8000/VsopWebService/WorkSheetAcceptSV?wsdl
webSeviceExample.webservice.soapHeader=<SOAP-ENV:Envelope xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/" xmlns:SOAP-ENC="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema"><SOAP-ENV:Body><m:WorkListFKToVSOPReq xmlns:m="http://www.mbossvsop.com.cn/vsop"><request><![CDATA[
webSeviceExample.webservice.soapTail=]]></request></m:WorkListFKToVSOPReq></SOAP-ENV:Body></SOAP-ENV:Envelope>
WebServiceExampleProxyConfig.java
@Configuration
@AutoConfigureAfter(value = {HttpConnectionConfig.class})
@ConditionalOnProperty(prefix = "webSeviceExample", name = "code")
public class WebServiceExampleProxyConfig {
@Bean(name = "webSeviceExample")
@ConfigurationProperties(prefix = "webSeviceExample")
public ProxyProperties proxyConfig() {
return new ProxyProperties();
}
@Bean(name = "webServiceExampleConfig")
@ConfigurationProperties(prefix = "webSeviceExample.webservice")
public WebServiceProxyProperties webServiceProxyProperties() {
return new WebServiceProxyProperties();
}
@Bean(name = "webServiceExampleMessageSender")
public WebServiceMessageSender webserviceMessageSender(HttpClient httpClient,
WebServiceProxyProperties webServiceProxyProperties) {
return new WebServiceMessageSender(httpClient, webServiceProxyProperties);
}
}
创建接口的代理发送实现类
WebServiceExampleSender.java
@Component
@ConditionalOnBean(WebServiceExampleProxyConfig.class)
public class WebServiceExampleSender extends DefaultProxySender {
@Autowired
@Qualifier("webSeviceExample")
ProxyProperties proxyConfig;
@Autowired
@Qualifier("webServiceExampleMessageSender")
WebServiceMessageSender webServiceExampleMessageSender;
@Override
public TranslateResult translate(String archiveMsg) {
return null;
}
@Override
public Response doSend(TranslateResult message) {
return null;
}
@Override
public void handleResponse(Response response) {
}
}
在proxy.properties增加接口的配置
socketExample.code=1025214000000050
socketExample.name=socketExample
socketExample.kindId=8001
socketExample.level=1
socketExample.sender=studio.raptor.hutout.example.sender.SocketExampleSender
socketExample.socket.url=127.0.0.1:8080
创建接口的的java config,类的命名规则为接口名+ProxyConfig
SocketExampleProxyConfig.java
@Configuration
@ConditionalOnProperty(prefix = "socketExample", name = "code")
public class SocketExampleProxyConfig {
@Bean(name = "socketExample")
@ConfigurationProperties(prefix = "socketExample")
public ProxyProperties proxyConfig() {
return new ProxyProperties();
}
@Bean("socketExampleProxyProperties")
@ConfigurationProperties(prefix = "socketExample.socket")
public SocketProxyProperties socketProxyProperties() {
return new SocketProxyProperties();
}
@Bean(name="socketExampleMessageSender")
public SocketMessageSender socketMessageSender(SocketProxyProperties socketProxyProperties) {
return new SocketMessageSender(socketProxyProperties);
}
}
创建接口的代理发送实现类
SocketExampleSender.java
@Component
@ConditionalOnBean(SocketExampleProxyConfig.class)
public class SocketExampleSender extends DefaultProxySender {
@Autowired
@Qualifier("socketExample")
ProxyProperties proxyConfig;
@Autowired
@Qualifier("socketExampleMessageSender")
SocketMessageSender socketMessageSender;
@Override
public TranslateResult translate(String archiveMsg) {
return null;
}
@Override
public Response doSend(TranslateResult message) {
return null;
}
@Override
public void handleResponse(Response response) {
}
}
在proxy.properties增加接口的配置
mqExample.code=1025214000000050
mqExample.name=socketExample
mqExample.kindId=8001
mqExample.level=1
mqExample.sender=studio.raptor.hutout.example.sender.MQExampleSender
mqExample.mq.url=127.0.0.1:8080
mqExample.mq.type=kakfa
mqExample.mq.topic=test
创建接口的的java config,类的命名规则为接口名+ProxyConfig
MQExampleProxyConfig.java
@Configuration
@ConditionalOnProperty(prefix = "mqExample", name = "dynamic")
@AutoConfigureAfter(value = {MQConnectionConfig.class})
public class MQExampleProxyConfig {
@Bean(name = "mqExample")
@ConfigurationProperties(prefix = "mqExample")
public ProxyProperties proxyConfig() {
return new ProxyProperties();
}
@Bean(name = "mqExampleProxyProperties")
@ConfigurationProperties(prefix = "mqExample.mq")
public MQProxyProperties socketProxyProperties() {
return new MQProxyProperties();
}
@Bean(name = "mqExampleMessageSender")
public MQMessageSender mqMessageSender(MQProxyProperties mqProxyProperties) {
return new MQMessageSender(mqProxyProperties);
}
}
```
创建接口的代理发送实现类
MQExampleSender.java
@Component
@ConditionalOnBean(MQExampleProxyConfig.class)
public class MQExampleSender extends DefaultProxySender {
@Autowired
@Qualifier("mqExample")
ProxyProperties proxyConfig;
@Autowired
@Qualifier("mqExampleMessageSender")
MQMessageSender mqMessageSender;
@Override
public TranslateResult translate(String archiveMsg) {
return null;
}
@Override
public Response doSend(TranslateResult message) {
return null;
}
@Override
public void handleResponse(Response response) {
}
}
##附录 ###1.接口配置(proxy.properties)
接口配置文件,包括接口属性,发送信息等信息。属性名以xxxx.key形式定义,xxxx表示接口名称,同一个接口的属性前缀必须一致。
属性名 | 属性名 | 示例 | 是否必填 |
---|---|---|---|
xxxx.code | 接口编码 | 10000 | 是 |
xxxx.name | 接口名称 | billing | 是 |
xxxx.kindId | 接口消费的事件种类 | Extractor_NRM | 是 |
xxxx.level | 接口消息处理界别,用数字定义,越大,需要处理的数据量越大 | 是 | 是 |
xxxx.sender | 接口的代理发送实现类 | studio.raptor.hubout.sc.sender.ExampleProxySender | 是 |
xxxx.sort | 接口消息是否需要排序 | true | 是 |
xxxx.name | 接口名称,全局唯一 | billing | 是 |
xxxx.kindId | 接口消费的事件种类 | 是 | |
xxxx.level | 接口消息处理界别,用数字定义,越大,需要处理的数据量越大 | 1 | 是 |
xxxx.sender | 接口的代理发送实现类 | studio.raptor.hubout.sc.sender.ExampleProxySender | 是 |
xxxx.db.dataSourceName | 表接口数据源 | usbDataSource | 否 |
xxxx.db.sqls[i] | 表接口写入语句,多个语句用数组表示 | example.db.sqls[0]=INSERT INTO student(id,name,age) VALUES(?,?,?) example.db.sqls[1]=INSERT INTO teacher(id,name,address,age) VALUES(?,?,?,?) | 否 |
xxxx.http.url | http接口地址 | 否 | |
xxxx.http.headers[i] | http请求头信息 | xxxx.http.headers[0]=Content-Type:text/xml;charset=UTF-8 xxxx.http.headers[1]=Connection:Keep-Alive key和value使用冒号分隔,可不填,默认值如上 | 否 |
xxxx.httpinvoker.serviceUrl | httpinvoker接口地址 | 否 | |
xxxx.httpinvoker.serviceInterface | httpinvoker服务接口类 | 否 | |
xxxx.webservice.url | webservice接口地址 | 否 | |
xxxx.socket.url | socket接口地址 | 127.0.0.1:8080 | 否 |
xxxx.mq.url | mq接口 | 否 | |
xxxx.mq.topic | 消息主题 | 否 | |
xxxx.mq.type | mq类型 | kafaka,rocketmq,ibmmq | 否 |
|
###2.接口连接配置(connection.properties) 接口连接配置文件,包括数据源连接、http连接属性、webservice连接属性信息。数据源连接对应具体的一个数据库,以xxx.datasource为前缀,xxx表示数据源名称。http和webservice的连接池信息不与具体某个连接关联,因此属于全局的配置,用http和webservice作为前缀
属性名 | 属性名 | 示例 | 是否必填 |
---|---|---|---|
xxx.datasource.jdbcUrl | jdbc地址 | 是 | |
xxx.datasource.username | 数据库用户名 | 是 | |
xxx.datasource.password | 数据库密码 | 是 | |
xxx.datasource.maximumPoolSize | 数据库连接池最大连接数 | 是 | |
xxx.datasource.minimumIdle | 数据库连接池最小连接数 | 是 | |
xxx.datasource.autoCommit | 是否自动提交 | 是 | |
xxx.datasource.poolName | 连接池名称 | 是 | |
http.maxTotal | http连接池最大连接数 | 100 | 是 |
http.defaultMaxPerRoute | 最大路由数 | 10 | 是 |
http.socketTimeout | 3000 | 是 | |
http.connectTimeout | 50000 | 是 | |
http.connectionRequestTimeout | 50000 | 是 |
###3.配置中心APPID规划
业务中心中文名称 | 业务中心应用名称 | 业务中心应用英文名 | appid |
---|---|---|---|
分发 | 普通业务分发 | Dispatcher_NRM | 7001 |
分发 | 停复机业务分发 | Dispatcher_SAR | 7002 |
抽取 | 普通业务抽取 | Extractor_NRM | 8001 |
抽取 | 停复机业务抽取 | Extractor_SAR | 8002 |
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。