本文主要研究一下artemis的ClientProducerCreditManager
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java
public interface ClientProducerCreditManager { ClientProducerCredits getCredits(SimpleString address, boolean anon, SessionContext context); void returnCredits(SimpleString address); void receiveCredits(SimpleString address, int credits); void receiveFailCredits(SimpleString address, int credits); void reset(); void close(); int creditsMapSize(); int unReferencedCreditsSize(); /** This will determine the flow control as asynchronous, * no actual block should happen instead a callback will be sent whenever blockages change */ void setCallback(ClientProducerFlowCallback callback); }
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
public class ClientProducerCreditManagerImpl implements ClientProducerCreditManager { public static final int MAX_UNREFERENCED_CREDITS_CACHE_SIZE = 1000; private final Map<SimpleString, ClientProducerCredits> producerCredits = new LinkedHashMap<>(); private final Map<SimpleString, ClientProducerCredits> unReferencedCredits = new LinkedHashMap<>(); private final ClientSessionInternal session; private int windowSize; private ClientProducerFlowCallback callback; public ClientProducerCreditManagerImpl(final ClientSessionInternal session, final int windowSize) { this.session = session; this.windowSize = windowSize; } /** This will determine the flow control as asynchronous, * no actual block should happen instead a callback will be sent whenever blockages change */ @Override public void setCallback(ClientProducerFlowCallback callback) { this.callback = callback; } @Override public synchronized ClientProducerCredits getCredits(final SimpleString address, final boolean anon, SessionContext context) { if (windowSize == -1) { return ClientProducerCreditsNoFlowControl.instance; } else { boolean needInit = false; ClientProducerCredits credits; synchronized (this) { credits = producerCredits.get(address); if (credits == null) { // Doesn't need to be fair since session is single threaded credits = build(address); needInit = true; producerCredits.put(address, credits); } if (!anon) { credits.incrementRefCount(); // Remove from anon credits (if there) unReferencedCredits.remove(address); } else { addToUnReferencedCache(address, credits); } } // The init is done outside of the lock // otherwise packages may arrive with flow control // while this is still sending requests causing a dead lock if (needInit) { credits.init(context); } return credits; } } private ClientProducerCredits build(SimpleString address) { if (callback != null) { return new AsynchronousProducerCreditsImpl(session, address, windowSize, callback); } else { return new ClientProducerCreditsImpl(session, address, windowSize); } } @Override public synchronized void returnCredits(final SimpleString address) { ClientProducerCredits credits = producerCredits.get(address); if (credits != null && credits.decrementRefCount() == 0) { addToUnReferencedCache(address, credits); } } @Override public synchronized void receiveCredits(final SimpleString address, final int credits) { ClientProducerCredits cr = producerCredits.get(address); if (cr != null) { cr.receiveCredits(credits); } } @Override public synchronized void receiveFailCredits(final SimpleString address, int credits) { ClientProducerCredits cr = producerCredits.get(address); if (cr != null) { cr.receiveFailCredits(credits); } } @Override public synchronized void reset() { for (ClientProducerCredits credits : producerCredits.values()) { credits.reset(); } } @Override public synchronized void close() { windowSize = -1; for (ClientProducerCredits credits : producerCredits.values()) { credits.close(); } producerCredits.clear(); unReferencedCredits.clear(); } @Override public synchronized int creditsMapSize() { return producerCredits.size(); } @Override public synchronized int unReferencedCreditsSize() { return unReferencedCredits.size(); } private void addToUnReferencedCache(final SimpleString address, final ClientProducerCredits credits) { unReferencedCredits.put(address, credits); if (unReferencedCredits.size() > MAX_UNREFERENCED_CREDITS_CACHE_SIZE) { // Remove the oldest entry Iterator<Map.Entry<SimpleString, ClientProducerCredits>> iter = unReferencedCredits.entrySet().iterator(); Map.Entry<SimpleString, ClientProducerCredits> oldest = iter.next(); iter.remove(); removeEntry(oldest.getKey(), oldest.getValue()); } } private void removeEntry(final SimpleString address, final ClientProducerCredits credits) { producerCredits.remove(address); credits.releaseOutstanding(); credits.close(); } //...... }
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
static class ClientProducerCreditsNoFlowControl implements ClientProducerCredits { static ClientProducerCreditsNoFlowControl instance = new ClientProducerCreditsNoFlowControl(); @Override public void acquireCredits(int credits) { } @Override public void receiveCredits(int credits) { } @Override public void receiveFailCredits(int credits) { } @Override public boolean isBlocked() { return false; } @Override public void init(SessionContext ctx) { } @Override public void reset() { } @Override public void close() { } @Override public void incrementRefCount() { } @Override public int decrementRefCount() { return 1; } @Override public void releaseOutstanding() { } @Override public SimpleString getAddress() { return SimpleString.toSimpleString(""); } }
ClientProducerCreditManager接口定义了getCredits、returnCredits、receiveCredits、receiveFailCredits、reset、close、creditsMapSize、unReferencedCreditsSize、setCallback方法