1 Star 0 Fork 904

jinlin / zbus

forked from 开放金融技术 / zbus 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

ZBUS--轻量级MQ、RPC、服务总线

##ZBUS = MQ + RPC

  • 支持消息队列, 发布订阅, RPC, 代理(TCP/DMZ)
  • 亿级消息堆积能力、支持HA高可用
  • 单个Jar包无依赖 ~300K
  • 服务代理 -- 适配改造已有业务系统,使之具备跨平台与语言
  • 丰富的API--JAVA/C/C++/C#/Python/Node.JS多语言接入
  • 支持JVM进程内API本地化优化(适合进程内持久化队列优化)

QQ群: 467741880, 如果喜欢该项目,请右上角star,让更多人参与改进

ZBUS 启动与监控

zbus-dist选择zbus.sh或者zbus.bat直接执行

简单监控

总线默认占用 15555 端口, http://localhost:15555 可以直接进入监控,注意zbus因为原生兼容HTTP协议所以监控与消息队列使用同一个端口

ZBUS 角色概要

zbus-arch

ZBUS 消息通讯基础(NET模块)

ZBUS项目不依赖其他第三方库,消息通讯基于NIO完成(NET子项目)。NET包对NIO做了简洁的封装,相对Netty而言,学习成本低几个量级,模型简单,但不失扩展性。

znet-arch

框架结构保持 Dispatcher + N SelectorThread + IoAdaptor

Dispatcher 负责管理N个Selector线程

SelectorThread 负责NIO读写事件分发

IoAdaptor 个性化读写事件

基于NET的服务器程序基本只要关心IoAdaptor的个性化,比如ZBUS入口就是MqAdaptor

ZBUS API

ZBUS PROXY

ZBUS 示例

Java Maven 依赖

<dependency>
	<groupId>org.zbus</groupId>
	<artifactId>zbus</artifactId>
	<version>6.2.9</version>
</dependency>

生产者

public static void main(String[] args) throws Exception { 
	//创建Broker代理
	BrokerConfig config = new BrokerConfig();
	config.setServerAddress("127.0.0.1:15555");
	final Broker broker = new SingleBroker(config);

	Producer producer = new Producer(broker, "MyMQ");
	producer.createMQ(); // 如果已经确定存在,不需要创建

	//创建消息,消息体可以是任意binary,应用协议交给使用者
	Message msg = new Message();
	msg.setBody("hello world");
	producer.sendSync(msg);  
	
	broker.close();
}

消费者

public static void main(String[] args) throws Exception{  
	//创建Broker代表
	BrokerConfig brokerConfig = new BrokerConfig();
	brokerConfig.setServerAddress("127.0.0.1:15555");
	Broker broker = new SingleBroker(brokerConfig);
	
	MqConfig config = new MqConfig(); 
	config.setBroker(broker);
	config.setMq("MyMQ");
	
	//创建消费者
	@SuppressWarnings("resource")
	Consumer c = new Consumer(config);  
	
	c.onMessage(new MessageHandler() { 
		@Override
		public void handle(Message msg, Session sess) throws IOException {
			System.out.println(msg);
		}
	});

	//启动消费线程
	c.start();   
	
}  

RPC动态代理【各类复杂类型】

参考源码test目下的rpc部分

	//1)创建Broker代表(可用高可用替代)
	BrokerConfig config = new BrokerConfig();
	config.setServerAddress("127.0.0.1:15555");
	Broker broker = new SingleBroker(config);
	 
	//2)创建基于MQ的Invoker以及Rpc工厂,指定RPC采用的MQ为MyRpc
	MqInvoker invoker = new MqInvoker(broker, "MyRpc"); 
	RpcFactory factory = new RpcFactory(invoker); 
	
	//3) 动态代理出实现类
	Interface hello = factory.getService(Interface.class);
	
	test(hello);  
	
	broker.close();

Spring集成--服务端(RPC示例)

无任何代码侵入使得你已有的业务接口接入到zbus,获得跨平台和多语言支持

<!-- 暴露的的接口实现示例 -->
<bean id="interface" class="org.zbus.rpc.biz.InterfaceImpl"></bean>

<bean id="serviceProcessor" class="org.zbus.rpc.RpcProcessor">
	<constructor-arg>
		<list>
			<!-- 放入你需要的暴露的的接口 -->
			<ref bean="interface"/>
		</list>
	</constructor-arg>
</bean>
 
<bean id="broker" class="org.zbus.broker.SingleBroker">
	<constructor-arg>
		<bean class="org.zbus.broker.BrokerConfig">
			<property name="serverAddress" value="127.0.0.1:15555" />
			<property name="maxTotal" value="20"/>
			<!-- 这里可以增加连接池参数配置,不配置使用默认值(参考commons-pool2) -->
		</bean>
	</constructor-arg>
</bean>

<!-- 默认调用了start方法,由Spring容器直接带起来注册到zbus总线上 -->
<bean id="myrpcService" class="org.zbus.rpc.mq.Service" init-method="start">
	<constructor-arg>  
		<bean class="org.zbus.rpc.mq.ServiceConfig">
		    <!-- 支持多总线注册 -->
			<constructor-arg> 
				<list>
					<ref bean="broker"/> 
				</list>
			</constructor-arg>  
			<property name="mq" value="MyRpc"/>
			<property name="consumerCount" value="2"/> 
			<property name="messageProcessor" ref="serviceProcessor"/>
		</bean>
	</constructor-arg>
</bean>

Spring集成--客户端

<bean id="broker" class="org.zbus.broker.SingleBroker">
	<constructor-arg>
		<bean class="org.zbus.broker.BrokerConfig">
			<property name="serverAddress" value="127.0.0.1:15555" /> 
		</bean>
	</constructor-arg>
</bean>

<bean id="myrpc" class="org.zbus.rpc.RpcFactory">
	<constructor-arg> 
		<bean class="org.zbus.rpc.mq.MqInvoker"> 
			<constructor-arg ref="broker"/>
			<constructor-arg value="MyRpc"/> 
		</bean>
	</constructor-arg>
</bean>


<bean id="interface" factory-bean="myrpc" factory-method="getService">
	<constructor-arg type="java.lang.Class" value="org.zbus.rpc.biz.Interface"/> 
</bean> 

Spring完成zbus代理透明化,zbus设施从你的应用逻辑中彻底消失

public static void main(String[] args) { 
	ApplicationContext context = new ClassPathXmlApplicationContext("SpringRpcClient.xml");
	 
	Interface intf = (Interface) context.getBean("interface"); 
	for(int i=0;i<100;i++){
		System.out.println(intf.listMap());
	} 
} 

ZBUS消息协议

空文件

简介

ZBUS=MQ+RPC 服务总线 1)支持消息队列, 发布订阅, RPC, 交易系统队列适配 2)亿级消息堆积能力、支持HA高可用 3)无依赖单个Jar包 ~300K 4)丰富的API--JAVA/C/C++/C#/Python/Node.JS多语言接入,支持HTTP等协议长连接入 展开 收起
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
1
https://gitee.com/kimjx/zbus.git
git@gitee.com:kimjx/zbus.git
kimjx
zbus
zbus
master

搜索帮助