This action will force synchronization from 开放金融技术/zbus, which will overwrite any changes that you have made since you forked the repository, and can not be recovered!!!
Synchronous operation will process in the background and will refresh the page when finishing processing. Please be patient.
##ZBUS = MQ + RPC + PROXY
zbus-dist选择zbus.sh或者zbus.bat直接执行
总线默认占用 15555 端口, http://localhost:15555 可以直接进入监控,注意zbus因为原生兼容HTTP协议所以监控与消息队列使用同一个端口
ZBUS项目不依赖其他第三方库,消息通讯基于NIO完成(NET子项目)。NET包对NIO做了简洁的封装,相对Netty而言,学习成本低几个量级,模型简单,但不失扩展性。
框架结构保持 Dispatcher + N SelectorThread + IoAdaptor
Dispatcher 负责管理N个Selector线程
SelectorThread 负责NIO读写事件分发
IoAdaptor 个性化读写事件
基于NET的服务器程序基本只要关心IoAdaptor的个性化,比如ZBUS入口就是MqAdaptor
<dependency>
<groupId>org.zbus</groupId>
<artifactId>zbus</artifactId>
<version>6.2.0-SNAPSHOT</version>
</dependency>
//创建Broker代理
BrokerConfig config = new BrokerConfig();
config.setBrokerAddress("127.0.0.1:15555");
final Broker broker = new SingleBroker(config);
Producer producer = new Producer(broker, "MyMQ");
producer.createMQ(); // 如果已经确定存在,不需要创建
//创建消息,消息体可以是任意binary,应用协议交给使用者
Message msg = new Message();
msg.setBody("hello world");
producer.sendSync(msg);
//销毁Broker
broker.close();
//创建Broker代表
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setBrokerAddress("127.0.0.1:15555");
Broker broker = new SingleBroker(brokerConfig);
MqConfig config = new MqConfig();
config.setBroker(broker);
config.setMq("MyMQ");
//创建消费者
@SuppressWarnings("resource")
Consumer c = new Consumer(config);
c.onMessage(new MessageHandler() {
@Override
public void handle(Message msg, Session sess) throws IOException {
System.out.println(msg);
}
});
//启动消费线程
c.start();
参考源码test目下的rpc部分
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setBrokerAddress("127.0.0.1:15555");
Broker broker = new SingleBroker(brokerConfig);
RpcProxy proxy = new RpcProxy(broker);
RpcConfig config = new RpcConfig();
config.setMq("MyRpc");
config.setTimeout(10000);
Interface hello = proxy.getService(Interface.class, config);
Object[] res = hello.objectArray();
for (Object obj : res) {
System.out.println(obj);
}
Object[] array = new Object[] { getUser("rushmore"), "hong", true, 1,
String.class };
int saved = hello.saveObjectArray(array);
System.out.println(saved);
Class<?> ret = hello.classTest(String.class);
System.out.println(ret);
broker.close();
无任何代码侵入使得你已有的业务接口接入到zbus,获得跨平台和多语言支持
<!-- 暴露的的接口实现示例 -->
<bean id="interface" class="org.zbus.rpc.biz.InterfaceImpl"></bean>
<bean id="serviceHandler" class="org.zbus.rpc.RpcServiceHandler">
<constructor-arg>
<list>
<!-- 放入你需要的暴露的的接口 -->
<ref bean="interface"/>
</list>
</constructor-arg>
</bean>
<bean id="broker" class="org.zbus.mq.SingleBroker">
<constructor-arg>
<bean class="org.zbus.mq.BrokerConfig">
<property name="brokerAddress" value="127.0.0.1:15555" />
<property name="maxTotal" value="20"/>
<!-- 这里可以增加连接池参数配置,不配置使用默认值(参考commons-pool2) -->
</bean>
</constructor-arg>
</bean>
<!-- 默认调用了start方法,由Spring容器直接带起来注册到zbus总线上 -->
<bean id="zbusService" class="org.zbus.rpc.service.Service" init-method="start">
<constructor-arg>
<bean class="org.zbus.rpc.service.ServiceConfig">
<!-- 支持多总线注册 -->
<constructor-arg>
<list>
<ref bean="broker"/>
</list>
</constructor-arg>
<property name="mq" value="MyRpc"/>
<property name="consumerCount" value="2"/>
<property name="threadCount" value="20"/>
<property name="serviceHandler" ref="serviceHandler"/>
</bean>
</constructor-arg>
</bean>
<bean id="broker" class="org.zbus.mq.SingleBroker">
<constructor-arg>
<bean class="org.zbus.mq.BrokerConfig">
<property name="brokerAddress" value="127.0.0.1:15555" />
<property name="maxTotal" value="20"/>
<!-- 这里可以增加连接池参数配置,不配置使用默认值(参考commons-pool2) -->
</bean>
</constructor-arg>
</bean>
<bean id="rpcProxy" class="org.zbus.rpc.RpcProxy">
<constructor-arg> <ref bean="broker"/> </constructor-arg>
</bean>
<!-- 动态代理由RpcProxy的getService生成,需要知道对应的MQ配置信息(第二个参数) -->
<bean id="interface" factory-bean="rpcProxy" factory-method="getService">
<constructor-arg type="java.lang.Class" value="org.zbus.rpc.biz.Interface"/>
<constructor-arg>
<bean class="org.zbus.rpc.RpcConfig">
<property name="mq" value="MyRpc"/>
</bean>
</constructor-arg>
</bean>
Spring完成zbus代理透明化,zbus设施从你的应用逻辑中彻底消失
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("zbusSpringClient.xml");
Interface intf = (Interface) context.getBean("interface");
System.out.println(intf.listMap());
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。