本文主要研究一下artemis的SessionConsumerFlowCreditMessage
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionConsumerFlowCreditMessage.java
public class SessionConsumerFlowCreditMessage extends PacketImpl { private long consumerID; private int credits; public SessionConsumerFlowCreditMessage(final long consumerID, final int credits) { super(SESS_FLOWTOKEN); this.consumerID = consumerID; this.credits = credits; } public SessionConsumerFlowCreditMessage() { super(SESS_FLOWTOKEN); } // Public -------------------------------------------------------- public long getConsumerID() { return consumerID; } public int getCredits() { return credits; } @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeLong(consumerID); buffer.writeInt(credits); } @Override public void decodeRest(final ActiveMQBuffer buffer) { consumerID = buffer.readLong(); credits = buffer.readInt(); } @Override public String toString() { return getParentString() + ", consumerID=" + consumerID + ", credits=" + credits + "]"; } @Override public int hashCode() { final int prime = 31; int result = super.hashCode(); result = prime * result + (int) (consumerID ^ (consumerID >>> 32)); result = prime * result + credits; return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (!super.equals(obj)) return false; if (!(obj instanceof SessionConsumerFlowCreditMessage)) return false; SessionConsumerFlowCreditMessage other = (SessionConsumerFlowCreditMessage) obj; if (consumerID != other.consumerID) return false; if (credits != other.credits) return false; return true; } }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
public class ServerSessionPacketHandler implements ChannelHandler { //...... private volatile AtomicInteger availableCredits = new AtomicInteger(0); //...... private void onMessagePacket(final Packet packet) { if (logger.isTraceEnabled()) { logger.trace("ServerSessionPacketHandler::handlePacket," + packet); } final byte type = packet.getType(); switch (type) { case SESS_SEND: { onSessionSend(packet); break; } case SESS_ACKNOWLEDGE: { onSessionAcknowledge(packet); break; } case SESS_PRODUCER_REQUEST_CREDITS: { onSessionRequestProducerCredits(packet); break; } case SESS_FLOWTOKEN: { onSessionConsumerFlowCredit(packet); break; } default: // separating a method for everything else as JIT was faster this way slowPacketHandler(packet); break; } } private void onSessionConsumerFlowCredit(Packet packet) { this.storageManager.setContext(session.getSessionContext()); try { Packet response = null; boolean requiresResponse = false; try { SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage) packet; session.receiveConsumerCredits(message.getConsumerID(), message.getCredits()); } catch (ActiveMQIOErrorException e) { response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session); } catch (ActiveMQXAException e) { response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response); } catch (ActiveMQQueueMaxConsumerLimitReached e) { response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response); } catch (ActiveMQException e) { response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response); } catch (Throwable t) { response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session); } sendResponse(packet, response, false, false); } finally { this.storageManager.clearContext(); } } //...... }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
public class ServerSessionImpl implements ServerSession, FailureListener { //...... public void receiveConsumerCredits(final long consumerID, final int credits) throws Exception { ServerConsumer consumer = locateConsumer(consumerID); if (consumer == null) { logger.debug("There is no consumer with id " + consumerID); return; } consumer.receiveCredits(credits); } //...... }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
public class ServerConsumerImpl implements ServerConsumer, ReadyListener { //...... public void receiveCredits(final int credits) { if (credits == -1) { if (logger.isDebugEnabled()) { logger.debug(this + ":: FlowControl::Received disable flow control message"); } // No flow control availableCredits = null; // There may be messages already in the queue promptDelivery(); } else if (credits == 0) { // reset, used on slow consumers logger.debug(this + ":: FlowControl::Received reset flow control message"); availableCredits.set(0); } else { int previous = availableCredits.getAndAdd(credits); if (logger.isDebugEnabled()) { logger.debug(this + "::FlowControl::Received " + credits + " credits, previous value = " + previous + " currentValue = " + availableCredits.get()); } if (previous <= 0 && previous + credits > 0) { if (logger.isTraceEnabled()) { logger.trace(this + "::calling promptDelivery from receiving credits"); } promptDelivery(); } } } public void promptDelivery() { // largeMessageDeliverer is always set inside a lock // if we don't acquire a lock, we will have NPE eventually if (largeMessageDeliverer != null) { resumeLargeMessage(); } else { forceDelivery(); } } private void forceDelivery() { if (browseOnly) { messageQueue.getExecutor().execute(browserDeliverer); } else { messageQueue.deliverAsync(); } } //...... }
SessionConsumerFlowCreditMessage继承了PacketImpl,其type为SESS_FLOWTOKEN;ServerSessionPacketHandler的onMessagePacket方法在type为SESS_FLOWTOKEN时执行onSessionConsumerFlowCredit方法;该方法执行的是session.receiveConsumerCredits以及sendResponse方法;receiveConsumerCredits方法在receiveCredits方法在credits为-1时设置availableCredits为null,然后执行promptDelivery方法;在credits为0时设置availableCredits为0;其他情况执行availableCredits.getAndAdd(credits)