本文快速入门,RocketMQ消息系统的安装部署,发送,和接收消息,监控消息,的详细说明。
64位操作系统,建议使用Linux / Unix /
搭建 Apache RocketMQ 单机环境
www.ymq.io/2018/02/01/…
新建一个 maven 项目,这里就不详细操作了,大家都会的
不过也可以下载我的示例源码,下载地址如下
GitHub 源码: github.com/souyunku/sp…
在POM 中添加如下依赖
<!-- RocketMq客户端相关依赖 --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.1.0-incubating</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>4.1.0-incubating</version> </dependency>
在配置文件 application.properties
添加一下内容
# 消费者的组名 apache.rocketmq.consumer.PushConsumer=PushConsumer # 生产者的组名 apache.rocketmq.producer.producerGroup=Producer # NameServer地址 apache.rocketmq.namesrvAddr=192.168.252.121:9876
@Component public class Producer { /** * 生产者的组名 */ @Value("${apache.rocketmq.producer.producerGroup}") private String producerGroup; /** * NameServer 地址 */ @Value("${apache.rocketmq.namesrvAddr}") private String namesrvAddr; @PostConstruct public void defaultMQProducer() { //生产者的组名 DefaultMQProducer producer = new DefaultMQProducer(producerGroup); //指定NameServer地址,多个地址以 ; 隔开 producer.setNamesrvAddr(namesrvAddr); try { /** * Producer对象在使用之前必须要调用start初始化,初始化一次即可 * 注意:切记不可以在每次发送消息时,都调用start方法 */ producer.start(); for (int i = 0; i < 100; i++) { String messageBody = "我是消息内容:" + i; String message = new String(messageBody.getBytes(), "utf-8"); //构建消息 Message msg = new Message("PushTopic" /* PushTopic */, "push"/* Tag */, "key_" + i /* Keys */, message.getBytes()); //发送消息 SendResult result = producer.send(msg); System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus()); } } catch (Exception e) { e.printStackTrace(); } finally { producer.shutdown(); } } }
@Component public class Consumer { /** * 消费者的组名 */ @Value("${apache.rocketmq.consumer.PushConsumer}") private String consumerGroup; /** * NameServer地址 */ @Value("${apache.rocketmq.namesrvAddr}") private String namesrvAddr; @PostConstruct public void defaultMQPushConsumer() { //消费者的组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); //指定NameServer地址,多个地址以 ; 隔开 consumer.setNamesrvAddr(namesrvAddr); try { //订阅PushTopic下Tag为push的消息 consumer.subscribe("PushTopic", "push"); //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 //如果非第一次启动,那么按照上次消费的位置继续消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) { try { for (MessageExt messageExt : list) { System.out.println("messageExt: " + messageExt);//输出消息内容 String messageBody = new String(messageExt.getBody(), "utf-8"); System.out.println("消费响应:Msg: " + messageExt.getMsgId() + ",msgBody: " + messageBody);//输出消息内容 } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功 } }); consumer.start(); } catch (Exception e) { e.printStackTrace(); } } }
@SpringBootApplication public class SpringBootRocketmqApplication { public static void main(String[] args) { SpringApplication.run(SpringBootRocketmqApplication.class, args); } }
控制台会有响应
发送响应:MsgId:0AFF015E556818B4AAC208A0504F0063,发送状态:SEND_OK messageExt: MessageExt [queueId=0, storeSize=195, queueOffset=113824, sysFlag=0, bornTimestamp=1517559124047, bornHost=/192.168.252.1:62165, storeTimestamp=1517559135052, storeHost=/192.168.252.121:10911, msgId=C0A8FC7900002A9F00000000056F499C, commitLogOffset=91179420, bodyCRC=1687852546, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=113825, KEYS=key_99, CONSUME_START_TIME=1517559124049, UNIQ_KEY=0AFF015E556818B4AAC208A0504F0063, WAIT=true, TAGS=push}, body=21]] 消费响应:Msg: 0AFF015E556818B4AAC208A0504F0063,msgBody: 我是消息内容:99 ...
RocketMQ web界面监控RocketMQ-Console-Ng部署
github.com/apache/rock…
下载并且 maven 编译
git clone https://github.com/apache/rocketmq-externals.git rocketmq-externals/rocketmq-console/ mvn clean package -Dmaven.test.skip=true
rocketmq.config.namesrvAddr
NameServer
地址,默认启动端口8080
nohup java -jar target/rocketmq-console-ng-1.0.0.jar --rocketmq.config.namesrvAddr=127.0.0.1:9876