本文主要研究一下artemis的SessionProducerCreditsMessage
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionProducerCreditsMessage.java
public class SessionProducerCreditsMessage extends PacketImpl { private int credits; private SimpleString address; public SessionProducerCreditsMessage(final int credits, final SimpleString address) { super(SESS_PRODUCER_CREDITS); this.credits = credits; this.address = address; } public SessionProducerCreditsMessage() { super(SESS_PRODUCER_CREDITS); } public int getCredits() { return credits; } public SimpleString getAddress() { return address; } @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeInt(credits); buffer.writeSimpleString(address); } @Override public void decodeRest(final ActiveMQBuffer buffer) { credits = buffer.readInt(); address = buffer.readSimpleString(); } @Override public int hashCode() { final int prime = 31; int result = super.hashCode(); result = prime * result + ((address == null) ? 0 : address.hashCode()); result = prime * result + credits; return result; } @Override public String toString() { StringBuffer buff = new StringBuffer(getParentString()); buff.append(", address=" + address); buff.append(", credits=" + credits); buff.append("]"); return buff.toString(); } @Override public boolean equals(Object obj) { if (this == obj) return true; if (!super.equals(obj)) return false; if (!(obj instanceof SessionProducerCreditsMessage)) return false; SessionProducerCreditsMessage other = (SessionProducerCreditsMessage) obj; if (address == null) { if (other.address != null) return false; } else if (!address.equals(other.address)) return false; if (credits != other.credits) return false; return true; } }
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
class ClientSessionPacketHandler implements ChannelHandler { @Override public void handlePacket(final Packet packet) { byte type = packet.getType(); try { switch (type) { case DISCONNECT_CONSUMER: { handleConsumerDisconnected((DisconnectConsumerMessage) packet); break; } case SESS_RECEIVE_CONTINUATION: { handleReceiveContinuation((SessionReceiveContinuationMessage) packet); break; } case SESS_RECEIVE_MSG: { handleReceivedMessagePacket((SessionReceiveMessage) packet); break; } case SESS_RECEIVE_LARGE_MSG: { handleReceiveLargeMessage((SessionReceiveLargeMessage) packet); break; } case PacketImpl.SESS_PRODUCER_CREDITS: { handleReceiveProducerCredits((SessionProducerCreditsMessage) packet); break; } case PacketImpl.SESS_PRODUCER_FAIL_CREDITS: { handleReceiveProducerFailCredits((SessionProducerCreditsFailMessage) packet); break; } case PacketImpl.DISCONNECT_CONSUMER_KILL: { handleReceiveSlowConsumerKillMessage((DisconnectConsumerWithKillMessage) packet); break; } case EXCEPTION: { // We can only log these exceptions // maybe we should cache it on SessionContext and throw an exception on any next calls ActiveMQExceptionMessage mem = (ActiveMQExceptionMessage) packet; ActiveMQClientLogger.LOGGER.receivedExceptionAsynchronously(mem.getException()); break; } default: { throw ActiveMQClientMessageBundle.BUNDLE.invalidPacket(type); } } } catch (Exception e) { throw ActiveMQClientMessageBundle.BUNDLE.failedToHandlePacket(e); } sessionChannel.confirm(packet); } }
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
public class ActiveMQSessionContext extends SessionContext { //...... protected void handleReceiveProducerCredits(SessionProducerCreditsMessage message) { handleReceiveProducerCredits(message.getAddress(), message.getCredits()); } //...... }
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
public abstract class SessionContext { protected ClientSessionInternal session; protected SendAcknowledgementHandler sendAckHandler; protected volatile RemotingConnection remotingConnection; protected final IDGenerator idGenerator = new SimpleIDGenerator(0); //...... protected void handleReceiveProducerCredits(SimpleString address, int credits) { ClientSessionInternal session = this.session; if (session != null) { session.handleReceiveProducerCredits(address, credits); } } //...... }
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
public final class ClientSessionImpl implements ClientSessionInternal, FailureListener { //...... @Override public void handleReceiveProducerCredits(final SimpleString address, final int credits) { synchronized (producerCreditManager) { producerCreditManager.receiveCredits(address, credits); } } //...... }
SessionProducerCreditsMessage继承了PacketImpl,其type为SESS_PRODUCER_CREDITS;ClientSessionPacketHandler实现了ChannelHandler接口,其handlePacket根据不同的type做不同的处理,当type为SESS_PRODUCER_CREDITS时执行handleReceiveProducerCredits方法;handleReceiveProducerCredits调用的是父类SessionContext的handleReceiveProducerCredits方法;SessionContext的handleReceiveProducerCredits方法执行的是ClientSessionInternal的handleReceiveProducerCredits方法;ClientSessionImpl的handleReceiveProducerCredits调用的是producerCreditManager.receiveCredits方法