openmessaging-java/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java
public class PullConsumerApp { public static void main(String[] args) throws OMSResourceNotExistException { //Load and start the vendor implementation from a specific OMS driver URL. final MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east"); messagingAccessPoint.startup(); //Fetch a ResourceManager to create Queue resource. ResourceManager resourceManager = messagingAccessPoint.resourceManager(); resourceManager.createQueue( "NS://HELLO_QUEUE", OMS.newKeyValue()); //Start a PullConsumer to receive messages from the specific queue. final PullConsumer pullConsumer = messagingAccessPoint.createPullConsumer(); pullConsumer.attachQueue("NS://HELLO_QUEUE"); pullConsumer.startup(); //Register a shutdown hook to close the opened endpoints. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { pullConsumer.shutdown(); messagingAccessPoint.shutdown(); } })); //Receive one message from queue. Message message = pullConsumer.receive(); //Acknowledge the consumed message pullConsumer.ack(message.sysHeaders().getString(Message.BuiltinKeys.RECEIPT_HANDLE)); } } 复制代码
openmessaging-java/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java
public class PushConsumerApp { public static void main(String[] args) throws OMSResourceNotExistException { //Load and start the vendor implementation from a specific OMS driver URL. final MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://localhost:10911/us-east"); messagingAccessPoint.startup(); //Fetch a ResourceManager to create Queue resource. ResourceManager resourceManager = messagingAccessPoint.resourceManager(); final PushConsumer consumer = messagingAccessPoint.createPushConsumer(); consumer.startup(); //Register a shutdown hook to close the opened endpoints. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { consumer.shutdown(); messagingAccessPoint.shutdown(); } })); //Consume messages from a simple queue. String simpleQueue = "NS://HELLO_QUEUE"; resourceManager.createQueue( simpleQueue, OMS.newKeyValue()); //This queue doesn't has a source queue, so only the message delivered to the queue directly can //be consumed by this consumer. consumer.attachQueue(simpleQueue, new MessageListener() { @Override public void onReceived(Message message, Context context) { System.out.println("Received one message: " + message); context.ack(); } }); } } 复制代码
openmessaging-java/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/StreamingConsumerApp.java
public class StreamingConsumerApp { public static void main(String[] args) throws OMSResourceNotExistException { //Load and start the vendor implementation from a specific OMS driver URL. final MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east"); messagingAccessPoint.startup(); //Fetch a ResourceManager to create Queue resource. String targetQueue = "NS://HELLO_QUEUE"; ResourceManager resourceManager = messagingAccessPoint.resourceManager(); resourceManager.createQueue(targetQueue, OMS.newKeyValue()); //Fetch the streams of the target queue. List<String> streams = resourceManager.listStreams(targetQueue); //Start a StreamingConsumer to iterate messages from the specific stream. final StreamingConsumer streamingConsumer = messagingAccessPoint.createStreamingConsumer(); streamingConsumer.startup(); //Register a shutdown hook to close the opened endpoints. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { streamingConsumer.shutdown(); messagingAccessPoint.shutdown(); } })); assert streams.size() != 0; StreamingIterator streamingIterator = streamingConsumer.seekToBeginning(streams.get(0)); while (streamingIterator.hasNext()) { Message message = streamingIterator.next(); System.out.println("Received one message: " + message); } //All the messages in the stream has been consumed. //Now consume the messages in reverse order while (streamingIterator.hasPrevious()) { Message message = streamingIterator.previous(); System.out.println("Received one message again: " + message); } } } 复制代码
openmessaging-java/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java
public class ProducerApp { public static void main(String[] args) { final MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east"); final Producer producer = messagingAccessPoint.createProducer(); messagingAccessPoint.startup(); producer.startup(); //Register a shutdown hook to close the opened endpoints. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { producer.shutdown(); messagingAccessPoint.shutdown(); } })); //Sends a message to the specified destination synchronously. { SendResult sendResult = producer.send(producer.createBytesMessage( "NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8")))); System.out.println("Send sync message OK, message id is: " + sendResult.messageId()); } //Sends a message to the specified destination asynchronously. //And get the result through Future { final Future<SendResult> result = producer.sendAsync(producer.createBytesMessage( "NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8")))); final SendResult sendResult = result.get(3000L); System.out.println("Send async message OK, message id is: " + sendResult.messageId()); } //Sends a message to the specified destination asynchronously. //And retrieve the result through FutureListener { final Future<SendResult> result = producer.sendAsync(producer.createBytesMessage( "NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8")))); result.addListener(new FutureListener<SendResult>() { @Override public void operationComplete(Future<SendResult> future) { if (future.isDone() && null == future.getThrowable()) { System.out.println("Send async message OK, message id is: " + future.get().messageId()); } else { System.out.println("Send async message Failed, cause is: " + future.getThrowable().getMessage()); } } }); } //Sends a message to the specific queue in OneWay manner. { //There is no {@code Future} related or {@code RuntimeException} thrown. The calling thread doesn't //care about the send result and also have no context to get the result. producer.sendOneway(producer.createBytesMessage( "NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8")))); } } } 复制代码
openmessaging-java/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/TransactionProducerApp.java
public class TransactionProducerApp { public static void main(String[] args) { final MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east"); final Producer producer = messagingAccessPoint.createProducer(); messagingAccessPoint.startup(); producer.startup(); //Register a shutdown hook to close the opened endpoints. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { producer.shutdown(); messagingAccessPoint.shutdown(); } })); Message message = producer.createBytesMessage( "NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))); //Sends a transaction message to the specified destination synchronously. SendResult sendResult = producer.send(message, new LocalTransactionExecutor() { @Override public void execute(final Message message, final ExecutionContext context) { //Do some local transaction //Then commit this transaction and the message will be delivered. context.commit(); } @Override public void check(final Message message, final CheckContext context) { //The server may lookup the transaction status forwardly associated the specified message context.commit(); } }, OMS.newKeyValue()); System.out.println("Send transaction message OK, message id is: " + sendResult.messageId()); } } 复制代码
openmessaging-java/openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java
public class RoutingApp { public static void main(String[] args) throws OMSResourceNotExistException { //Load and start the vendor implementation from a specific OMS driver URL. final MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east"); messagingAccessPoint.startup(); String destinationQueue = "NS://DESTINATION_QUEUE"; String sourceQueue = "NS://SOURCE_QUEUE"; //Fetch a ResourceManager to create source Queue, destination Queue, and the Routing instance. ResourceManager resourceManager = messagingAccessPoint.resourceManager(); //Create the destination queue. resourceManager.createQueue(destinationQueue, OMS.newKeyValue()); //Create the source queue. resourceManager.createQueue(sourceQueue, OMS.newKeyValue()); KeyValue routingAttr = OMS.newKeyValue(); routingAttr.put(OMSBuiltinKeys.ROUTING_SOURCE, sourceQueue) .put(OMSBuiltinKeys.ROUTING_DESTINATION, destinationQueue) .put(OMSBuiltinKeys.ROUTING_EXPRESSION, "color = 'red'"); resourceManager.createRouting("NS://HELLO_ROUTING", routingAttr); //Send messages to the source queue ahead of the routing final Producer producer = messagingAccessPoint.createProducer(); producer.startup(); producer.send(producer.createBytesMessage(sourceQueue, "RED_COLOR".getBytes()) .putUserHeaders("color", "red")); producer.send(producer.createBytesMessage(sourceQueue, "GREEN_COLOR".getBytes()) .putUserHeaders("color", "green")); //Consume messages from the queue behind the routing. final PushConsumer pushConsumer = messagingAccessPoint.createPushConsumer(); pushConsumer.startup(); pushConsumer.attachQueue(destinationQueue, new MessageListener() { @Override public void onReceived(Message message, Context context) { //The message sent to the sourceQueue will be delivered to anotherConsumer by the routing rule //In this case, the push consumer will only receive the message with red color. System.out.println("Received a red message: " + message); context.ack(); } }); //Register a shutdown hook to close the opened endpoints. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { producer.shutdown(); pushConsumer.shutdown(); messagingAccessPoint.shutdown(); } })); } } 复制代码