总线默认占用**15555**端口, http://localhost:15555 可以直接进入监控,注意zbus因为原生兼容HTTP协议所以监控与消息队列使用同一个端口
高可用模式启动总线分别启动ZbusServer与TrackServer,无顺序之分,默认ZbusServer占用15555端口,TrackServer占用16666端口。
//1)创建Broker代表 SingleBrokerConfig config = new SingleBrokerConfig(); config.setBrokerAddress("127.0.0.1:15555"); Broker broker = new SingleBroker(config); //2) 创建生产者 Producer producer = new Producer(broker, "MyMQ"); Message msg = new Message(); msg.setBody("hello world"); producer.send(msg, new ResultCallback() { @Override public void onCompleted(Message result) { System.out.println(result); } });
//1)创建Broker代表 SingleBrokerConfig brokerConfig = new SingleBrokerConfig(); brokerConfig.setBrokerAddress("127.0.0.1:15555"); Broker broker = new SingleBroker(brokerConfig); MqConfig config = new MqConfig(); config.setBroker(broker); config.setMq("MyMQ"); //2) 创建消费者 Consumer c = new Consumer(config); while(true){ Message msg = c.recv(10000); if(msg == null) continue; System.out.println(msg); }
参考源码test目下的rpc部分
SingleBrokerConfig config = new SingleBrokerConfig(); config.setBrokerAddress("127.0.0.1:15555"); Broker broker = new SingleBroker(config); RpcConfig rpcConfig = new RpcConfig(); rpcConfig.setBroker(broker); rpcConfig.setMq("MyRpc"); //动态代理处Interface通过zbus调用的动态实现类 Interface hello = RpcProxy.getService (Interface.class, rpcConfig); Object[] res = hello.objectArray(); for (Object obj : res) { System.out.println(obj); } Object[] array = new Object[] { getUser("rushmore"), "hong", true, 1, String.class }; int saved = hello.saveObjectArray(array); System.out.println(saved); Class<?> ret = hello.classTest(String.class); System.out.println(ret);
ZBUS协议继承于HTTP协议格式,主体采用HTTP头部协议扩展完成,HTTP协议由HTTP头部和HTTP包体组成, ZBUS协议在HTTP头部KV值做了扩展,支持浏览器方式直接访问,但是ZBUS链接机制采用保持长连接方式。 原则上, 编写客户端SDK只需要遵循下面ZBUS协议扩展即可。
ZBUS扩展头部主要是完成
扩展头部Key-Value解释
命令标识,决定Broker(ZbusServer|TrackServer)的处理
cmd: produce | consume | request | heartbeat | admin(默认值)
mq: 消息目标队列
mq-reply: 消息回复队列
msgid: 消息唯一UUID
msgid-raw: 原始消息唯一UUID, 消息消费路由后ID发生变化,该字段保留最原始的消息ID
token: 访问控制码,不填默认空
broker: 消息经过Broker的地址
topic: 消息主题,发布订阅时使用
ack: 是否需要对当前消息ACK,不填默认true
encoding: 消息体的编码格式
sub_cmd: 管理命令的二级命令
请求:GET|POST URI
应答:200 OK
URI做扩展Key-Value的字符串理解
请求格式
应答格式(在启用ack的时候才有应答)
请求格式
应答格式
请求格式
应答格式(在启用ack的时候才有应答)
请求格式
应答格式
URI = /
监控首页 = /?cmd=admin&&method=index
URI = /MyMQ
第一个?之前理解为消息队列 mq=MyMQ
第一个?之后理解为Key-Value, URI的KV优先级低于头部扩展