代码拉取完成,页面将自动刷新
同步操作将从 皮鞋铮亮/zbus 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
##ZBUS = MQ + RPC + PROXY
zbus-dist选择zbus.sh或者zbus.bat直接执行
总线默认占用 15555 端口, http://localhost:15555 可以直接进入监控,注意zbus因为原生兼容HTTP协议所以监控与消息队列使用同一个端口
ZBUS项目不依赖其他第三方库,消息通讯基于NIO完成(NET子项目)。NET包对NIO做了简洁的封装,相对Netty而言,学习成本低几个量级,模型简单,但不失扩展性。
框架结构保持 Dispatcher + N SelectorThread + IoAdaptor
Dispatcher 负责管理N个Selector线程
SelectorThread 负责NIO读写事件分发
IoAdaptor 个性化读写事件
基于NET的服务器程序基本只要关心IoAdaptor的个性化,比如ZBUS入口就是MqAdaptor
<dependency>
<groupId>org.zbus</groupId>
<artifactId>zbus</artifactId>
<version>6.2.0-SNAPSHOT</version>
</dependency>
//创建Broker代理
BrokerConfig config = new BrokerConfig();
config.setBrokerAddress("127.0.0.1:15555");
final Broker broker = new SingleBroker(config);
Producer producer = new Producer(broker, "MyMQ");
producer.createMQ(); // 如果已经确定存在,不需要创建
//创建消息,消息体可以是任意binary,应用协议交给使用者
Message msg = new Message();
msg.setBody("hello world");
producer.sendSync(msg);
//销毁Broker
broker.close();
//创建Broker代表
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setBrokerAddress("127.0.0.1:15555");
Broker broker = new SingleBroker(brokerConfig);
MqConfig config = new MqConfig();
config.setBroker(broker);
config.setMq("MyMQ");
//创建消费者
@SuppressWarnings("resource")
Consumer c = new Consumer(config);
c.onMessage(new MessageHandler() {
@Override
public void handle(Message msg, Session sess) throws IOException {
System.out.println(msg);
}
});
//启动消费线程
c.start();
参考源码test目下的rpc部分
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setBrokerAddress("127.0.0.1:15555");
Broker broker = new SingleBroker(brokerConfig);
RpcProxy proxy = new RpcProxy(broker);
RpcConfig config = new RpcConfig();
config.setMq("MyRpc");
config.setTimeout(10000);
Interface hello = proxy.getService(Interface.class, config);
Object[] res = hello.objectArray();
for (Object obj : res) {
System.out.println(obj);
}
Object[] array = new Object[] { getUser("rushmore"), "hong", true, 1,
String.class };
int saved = hello.saveObjectArray(array);
System.out.println(saved);
Class<?> ret = hello.classTest(String.class);
System.out.println(ret);
broker.close();
无任何代码侵入使得你已有的业务接口接入到zbus,获得跨平台和多语言支持
<!-- 暴露的的接口实现示例 -->
<bean id="interface" class="org.zbus.rpc.biz.InterfaceImpl"></bean>
<bean id="serviceHandler" class="org.zbus.rpc.RpcServiceHandler">
<constructor-arg>
<list>
<!-- 放入你需要的暴露的的接口 -->
<ref bean="interface"/>
</list>
</constructor-arg>
</bean>
<bean id="broker" class="org.zbus.mq.SingleBroker">
<constructor-arg>
<bean class="org.zbus.mq.BrokerConfig">
<property name="brokerAddress" value="127.0.0.1:15555" />
<property name="maxTotal" value="20"/>
<!-- 这里可以增加连接池参数配置,不配置使用默认值(参考commons-pool2) -->
</bean>
</constructor-arg>
</bean>
<!-- 默认调用了start方法,由Spring容器直接带起来注册到zbus总线上 -->
<bean id="zbusService" class="org.zbus.rpc.service.Service" init-method="start">
<constructor-arg>
<bean class="org.zbus.rpc.service.ServiceConfig">
<!-- 支持多总线注册 -->
<constructor-arg>
<list>
<ref bean="broker"/>
</list>
</constructor-arg>
<property name="mq" value="MyRpc"/>
<property name="consumerCount" value="2"/>
<property name="threadCount" value="20"/>
<property name="serviceHandler" ref="serviceHandler"/>
</bean>
</constructor-arg>
</bean>
<bean id="broker" class="org.zbus.mq.SingleBroker">
<constructor-arg>
<bean class="org.zbus.mq.BrokerConfig">
<property name="brokerAddress" value="127.0.0.1:15555" />
<property name="maxTotal" value="20"/>
<!-- 这里可以增加连接池参数配置,不配置使用默认值(参考commons-pool2) -->
</bean>
</constructor-arg>
</bean>
<bean id="rpcProxy" class="org.zbus.rpc.RpcProxy">
<constructor-arg> <ref bean="broker"/> </constructor-arg>
</bean>
<!-- 动态代理由RpcProxy的getService生成,需要知道对应的MQ配置信息(第二个参数) -->
<bean id="interface" factory-bean="rpcProxy" factory-method="getService">
<constructor-arg type="java.lang.Class" value="org.zbus.rpc.biz.Interface"/>
<constructor-arg>
<bean class="org.zbus.rpc.RpcConfig">
<property name="mq" value="MyRpc"/>
</bean>
</constructor-arg>
</bean>
Spring完成zbus代理透明化,zbus设施从你的应用逻辑中彻底消失
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("zbusSpringClient.xml");
Interface intf = (Interface) context.getBean("interface");
System.out.println(intf.listMap());
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。