转载

聊聊artemis ClientConsumer的handleRegularMessage

本文主要研究一下artemis ClientConsumer的handleRegularMessage

handleRegularMessage

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java

public final class ClientConsumerImpl implements ClientConsumerInternal {

   //......

   private final PriorityLinkedList<ClientMessageInternal> buffer = new PriorityLinkedListImpl<>(ClientConsumerImpl.NUM_PRIORITIES);

   private final Runner runner = new Runner();

   private volatile MessageHandler handler;

   //......

   private void handleRegularMessage(ClientMessageInternal message) {
      if (message.getAddress() == null) {
         message.setAddress(queueInfo.getAddress());
      }

      message.onReceipt(this);

      if (!ackIndividually && message.getPriority() != 4 && !message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {
         // We have messages of different priorities so we need to ack them individually since the order
         // of them in the ServerConsumerImpl delivery list might not be the same as the order they are
         // consumed in, which means that acking all up to won't work
         ackIndividually = true;
      }

      // Add it to the buffer
      buffer.addTail(message, message.getPriority());

      if (handler != null) {
         // Execute using executor
         if (!stopped) {
            queueExecutor();
         }
      } else {
         notify();
      }
   }

   private void queueExecutor() {
      if (logger.isTraceEnabled()) {
         logger.trace(this + "::Adding Runner on Executor for delivery");
      }

      sessionExecutor.execute(runner);
   }

   //......
}
  • ClientConsumerImpl的handleRegularMessage方法先执行buffer.addTail(message, message.getPriority()),之后对于handler不为null的会执行queueExecutor(),否则执行notify();queueExecutor方法是通过sessionExecutor执行runner

Runner

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java

public final class ClientConsumerImpl implements ClientConsumerInternal {

   //......

   private class Runner implements Runnable {

      @Override
      public void run() {
         try {
            callOnMessage();
         } catch (Exception e) {
            ActiveMQClientLogger.LOGGER.onMessageError(e);

            lastException = e;
         }
      }
   }

   private void callOnMessage() throws Exception {
      if (closing || stopped) {
         return;
      }

      session.workDone();

      // We pull the message from the buffer from inside the Runnable so we can ensure priority
      // ordering. If we just added a Runnable with the message to the executor immediately as we get it
      // we could not do that

      ClientMessageInternal message;

      // Must store handler in local variable since might get set to null
      // otherwise while this is executing and give NPE when calling onMessage
      MessageHandler theHandler = handler;

      if (theHandler != null) {
         if (rateLimiter != null) {
            rateLimiter.limit();
         }

         failedOver = false;

         synchronized (this) {
            message = buffer.poll();
         }

         if (message != null) {
            if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {
               //Ignore, this could be a relic from a previous receiveImmediate();
               return;
            }

            boolean expired = message.isExpired();

            flowControlBeforeConsumption(message);

            if (!expired) {
               if (logger.isTraceEnabled()) {
                  logger.trace(this + "::Calling handler.onMessage");
               }
               final ClassLoader originalLoader = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
                  @Override
                  public ClassLoader run() {
                     ClassLoader originalLoader = Thread.currentThread().getContextClassLoader();

                     Thread.currentThread().setContextClassLoader(contextClassLoader);

                     return originalLoader;
                  }
               });

               onMessageThread = Thread.currentThread();
               try {
                  theHandler.onMessage(message);
               } finally {
                  try {
                     AccessController.doPrivileged(new PrivilegedAction<Object>() {
                        @Override
                        public Object run() {
                           Thread.currentThread().setContextClassLoader(originalLoader);
                           return null;
                        }
                     });
                  } catch (Exception e) {
                     ActiveMQClientLogger.LOGGER.failedPerformPostActionsOnMessage(e);
                  }

                  onMessageThread = null;
               }

               if (logger.isTraceEnabled()) {
                  logger.trace(this + "::Handler.onMessage done");
               }

               if (message.isLargeMessage()) {
                  message.discardBody();
               }
            } else {
               session.expire(this, message);
            }

            // If slow consumer, we need to send 1 credit to make sure we get another message
            if (clientWindowSize == 0) {
               startSlowConsumer();
            }
         }
      }
   }

   private void flowControlBeforeConsumption(final ClientMessageInternal message) throws ActiveMQException {
      // Chunk messages will execute the flow control while receiving the chunks
      if (message.getFlowControlSize() != 0) {
         // on large messages we should discount 1 on the first packets as we need continuity until the last packet
         flowControl(message.getFlowControlSize(), !message.isLargeMessage());
      }
   }

   public void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws ActiveMQException {
      if (clientWindowSize >= 0) {
         creditsToSend += messageBytes;

         if (creditsToSend >= clientWindowSize) {
            if (clientWindowSize == 0 && discountSlowConsumer) {
               if (logger.isTraceEnabled()) {
                  logger.trace(this + "::FlowControl::Sending " + creditsToSend + " -1, for slow consumer");
               }

               // sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be
               // always buffering one after received the first message
               final int credits = creditsToSend - 1;

               creditsToSend = 0;

               if (credits > 0) {
                  sendCredits(credits);
               }
            } else {
               if (logger.isDebugEnabled()) {
                  logger.debug("Sending " + messageBytes + " from flow-control");
               }

               final int credits = creditsToSend;

               creditsToSend = 0;

               if (credits > 0) {
                  sendCredits(credits);
               }
            }
         }
      }
   }


   //......
}
  • Runner实现了Runnable接口,其run方法执行callOnMessage();该方法对于rateLimiter不为null会执行rateLimiter.limit();之后执行buffer.poll()获取ClientMessageInternal,若不为null,则执行flowControlBeforeConsumption(message),对于非expired的会执行theHandler.onMessage(message)方法;对于clientWindowSize为0的则执行startSlowConsumer();flowControlBeforeConsumption方法会执行flowControl方法,该方法会计算credits,然后执行sendCredits(credits)

小结

ClientConsumerImpl的handleRegularMessage方法先执行buffer.addTail(message, message.getPriority()),之后对于handler不为null的会执行queueExecutor(),否则执行notify();queueExecutor方法是通过sessionExecutor执行runner;Runner实现了Runnable接口,其run方法执行callOnMessage();该方法对于rateLimiter不为null会执行rateLimiter.limit();之后执行buffer.poll()获取ClientMessageInternal进行处理

doc

  • ClientConsumerImpl
原文  https://segmentfault.com/a/1190000021613438
正文到此结束
Loading...