<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.0</version> </dependency> <!--一个好用的工具包,可以不引入--> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.3.0</version> </dependency>
不管是生产者还是消费者,都有很多参数可以配置,rocketmq命名比较好,基本可以从参数名上判断具体作用,还有注释可以看。
下面例子中只给出了常用的一些参数设置。更多参数可自行探索。
注意:
1、NamesrvAddr参数在多个节点时,用英文分号分隔,例: 192.168.9.58:9876;192.168.9.59:9876
import cn.hutool.core.util.RandomUtil; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.message.Message; public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("127.0.0.1:9876"); //发送超时时间,默认3000 单位ms producer.setSendMsgTimeout(5000); producer.start(); try { Message msg = new Message("TestTopic",// topic "177", // tag 可以为空,用以简单的筛选。 RandomUtil.randomString(8), // key 可以为空,可用以查询。 ("test" + RandomUtil.randomString(8)).getBytes()); // body ,我常将对象转json再获取byte[] 进行传输。 SendResult send = producer.send(msg); if (send.getSendStatus().equals(SendStatus.SEND_OK)) { //发送成功处理 }else { //发送失败处理 } } catch (Exception e) { //发送失败处理 e.printStackTrace(); } //正式环境不要发完就就shutdown,要在应用退出时再shutdown。 producer.shutdown(); } }
注意:
1、批量发送时,topic必须为同一个,否则发送会报异常。
2、批量发送相较于单条发送速度提升很大。
import cn.hutool.core.util.RandomUtil; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.message.Message; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("127.0.0.1:9876"); //发送超时时间,默认3000 单位ms producer.setSendMsgTimeout(5000); producer.start(); int threadCount = 20; int forCount = 100000; CountDownLatch latch = new CountDownLatch(threadCount); long start = System.currentTimeMillis(); for (int i = 0; i < threadCount; i++) { new Thread(() -> { try { List<Message> list = new ArrayList<>(); for (int j = 0; j < forCount; j++) { try { Message msg = new Message("TestTopic",// topic "177", // tag RandomUtil.randomString(8), // key ("test" + RandomUtil.randomString(8)).getBytes()); // body list.add(msg); if (list.size() >= 100) { SendResult send = producer.send(list); if (send.getSendStatus().equals(SendStatus.SEND_OK)) { //发送成功处理 list.clear(); }else { //发送失败处理 } } } catch (Exception e) { //发送失败处理 e.printStackTrace(); } } if (list.size() > 0) { SendResult send = producer.send(list); if (!send.getSendStatus().equals(SendStatus.SEND_OK)) { System.out.println(send); } list.clear(); } } catch (Exception e) { e.printStackTrace(); } finally { latch.countDown(); } }).start(); } latch.await(); long hs = System.currentTimeMillis() - start; System.out.println(hs); long speed = (threadCount * forCount) / (hs >= 0 ? 1 : hs / 1000); System.out.println("速度" + speed); //正式环境不要发完就就shutdown,要在应用退出时再shutdown。 producer.shutdown(); } }
import cn.hutool.core.lang.Console; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; public class PushConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumerGroupName"); consumer.setNamesrvAddr("127.0.0.1:9876"); //一个GroupName第一次消费时的位置 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setConsumeThreadMin(20); consumer.setConsumeThreadMax(20); //要消费的topic,可使用tag进行简单过滤 consumer.subscribe("TestTopic", "*"); //一次最大消费的条数 consumer.setConsumeMessageBatchMaxSize(100); //消费模式,广播或者集群,默认集群。 consumer.setMessageModel(MessageModel.CLUSTERING); //在同一jvm中 需要启动两个同一GroupName的情况需要这个参数不一样。 consumer.setInstanceName("InstanceName"); //配置消息监听 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { try { //业务处理 msgs.forEach(msg -> { Console.log(msg); }); } catch (Exception e) { System.err.println("接收异常" + e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); System.out.println("Consumer Started."); } }
从4.6之后,提供了DefaultLitePullConsumer 大大简化了pull的操作。以下为新实现,4.6之前的版本不支持。
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.lang.Console; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class PullConsumer { private static boolean runFlag = true; public static void main(String[] args) throws Exception { DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("PullConsumerGroupName"); consumer.setNamesrvAddr("127.0.0.1:9876"); //要消费的topic,可使用tag进行简单过滤 consumer.subscribe("TestTopic", "*"); //一次最大消费的条数 consumer.setPullBatchSize(100); //无消息时,最大阻塞时间。默认5000 单位ms consumer.setPollTimeoutMillis(5000); consumer.start(); while (runFlag){ try { //拉取消息,无消息时会阻塞 List<MessageExt> msgs = consumer.poll(); if (CollUtil.isEmpty(msgs)){ continue; } //业务处理 msgs.forEach(msg-> Console.log(new String(msg.getBody()))); //同步消费位置。不执行该方法,应用重启会存在重复消费。 consumer.commitSync(); }catch (Exception e){ e.printStackTrace(); } } consumer.shutdown(); } }