本文主要研究一下apache gossip的ActiveGossiper
incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
/** * The ActiveGossipThread sends information. Pick a random partner and send the membership list to that partner */ public abstract class AbstractActiveGossiper { protected static final Logger LOGGER = Logger.getLogger(AbstractActiveGossiper.class); protected final GossipManager gossipManager; protected final GossipCore gossipCore; private final Histogram sharedDataHistogram; private final Histogram sendPerNodeDataHistogram; private final Histogram sendMembershipHistogram; private final Random random; private final GossipSettings gossipSettings; public AbstractActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) { this.gossipManager = gossipManager; this.gossipCore = gossipCore; sharedDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sharedDataHistogram-time")); sendPerNodeDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendPerNodeDataHistogram-time")); sendMembershipHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendMembershipHistogram-time")); random = new Random(); gossipSettings = gossipManager.getSettings(); } public void init() { } public void shutdown() { } public final void sendShutdownMessage(LocalMember me, LocalMember target){ if (target == null){ return; } ShutdownMessage m = new ShutdownMessage(); m.setNodeId(me.getId()); m.setShutdownAtNanos(gossipManager.getClock().nanoTime()); gossipCore.sendOneWay(m, target.getUri()); } //...... /** * Performs the sending of the membership list, after we have incremented our own heartbeat. */ protected void sendMembershipList(LocalMember me, LocalMember member) { if (member == null){ return; } long startTime = System.currentTimeMillis(); me.setHeartbeat(System.nanoTime()); UdpActiveGossipMessage message = new UdpActiveGossipMessage(); message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString()); message.setUuid(UUID.randomUUID().toString()); message.getMembers().add(convert(me)); for (LocalMember other : gossipManager.getMembers().keySet()) { message.getMembers().add(convert(other)); } Response r = gossipCore.send(message, member.getUri()); if (r instanceof ActiveGossipOk){ //maybe count metrics here } else { LOGGER.debug("Message " + message + " generated response " + r); } sendMembershipHistogram.update(System.currentTimeMillis() - startTime); } protected final Member convert(LocalMember member){ Member gm = new Member(); gm.setCluster(member.getClusterName()); gm.setHeartbeat(member.getHeartbeat()); gm.setUri(member.getUri().toASCIIString()); gm.setId(member.getId()); gm.setProperties(member.getProperties()); return gm; } /** * * @param memberList * An immutable list * @return The chosen LocalGossipMember to gossip with. */ protected LocalMember selectPartner(List<LocalMember> memberList) { LocalMember member = null; if (memberList.size() > 0) { int randomNeighborIndex = random.nextInt(memberList.size()); member = memberList.get(randomNeighborIndex); } return member; } }
incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java
public class ActiveGossipMessageHandler implements MessageHandler { /** * @param gossipCore context. * @param gossipManager context. * @param base message reference. * @return boolean indicating success. */ @Override public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { List<Member> remoteGossipMembers = new ArrayList<>(); RemoteMember senderMember = null; UdpActiveGossipMessage activeGossipMessage = (UdpActiveGossipMessage) base; for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) { URI u; try { u = new URI(activeGossipMessage.getMembers().get(i).getUri()); } catch (URISyntaxException e) { GossipCore.LOGGER.debug("Gossip message with faulty URI", e); continue; } RemoteMember member = new RemoteMember( activeGossipMessage.getMembers().get(i).getCluster(), u, activeGossipMessage.getMembers().get(i).getId(), activeGossipMessage.getMembers().get(i).getHeartbeat(), activeGossipMessage.getMembers().get(i).getProperties()); if (i == 0) { senderMember = member; } if (!(member.getClusterName().equals(gossipManager.getMyself().getClusterName()))) { UdpNotAMemberFault f = new UdpNotAMemberFault(); f.setException("Not a member of this cluster " + i); f.setUriFrom(activeGossipMessage.getUriFrom()); f.setUuid(activeGossipMessage.getUuid()); GossipCore.LOGGER.warn(f); gossipCore.sendOneWay(f, member.getUri()); continue; } remoteGossipMembers.add(member); } UdpActiveGossipOk o = new UdpActiveGossipOk(); o.setUriFrom(activeGossipMessage.getUriFrom()); o.setUuid(activeGossipMessage.getUuid()); gossipCore.sendOneWay(o, senderMember.getUri()); gossipCore.mergeLists(senderMember, remoteGossipMembers); return true; } }
incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java
public class GossipCore implements GossipCoreConstants { class LatchAndBase { private final CountDownLatch latch; private volatile Base base; LatchAndBase(){ latch = new CountDownLatch(1); } } public static final Logger LOGGER = Logger.getLogger(GossipCore.class); private final GossipManager gossipManager; private ConcurrentHashMap<String, LatchAndBase> requests; private final ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> perNodeData; private final ConcurrentHashMap<String, SharedDataMessage> sharedData; private final Meter messageSerdeException; private final Meter transmissionException; private final Meter transmissionSuccess; private final DataEventManager eventManager; public GossipCore(GossipManager manager, MetricRegistry metrics){ this.gossipManager = manager; requests = new ConcurrentHashMap<>(); perNodeData = new ConcurrentHashMap<>(); sharedData = new ConcurrentHashMap<>(); eventManager = new DataEventManager(metrics); metrics.register(PER_NODE_DATA_SIZE, (Gauge<Integer>)() -> perNodeData.size()); metrics.register(SHARED_DATA_SIZE, (Gauge<Integer>)() -> sharedData.size()); metrics.register(REQUEST_SIZE, (Gauge<Integer>)() -> requests.size()); messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION); transmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION); transmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS); } public void receive(Base base) { if (!gossipManager.getMessageHandler().invoke(this, gossipManager, base)) { LOGGER.warn("received message can not be handled"); } } /** * Sends a blocking message. * todo: move functionality to TransportManager layer. * @param message * @param uri * @throws RuntimeException if data can not be serialized or in transmission error */ private void sendInternal(Base message, URI uri) { byte[] json_bytes; try { json_bytes = gossipManager.getProtocolManager().write(message); } catch (IOException e) { messageSerdeException.mark(); throw new RuntimeException(e); } try { gossipManager.getTransportManager().send(uri, json_bytes); transmissionSuccess.mark(); } catch (IOException e) { transmissionException.mark(); throw new RuntimeException(e); } } public Response send(Base message, URI uri){ if (LOGGER.isDebugEnabled()){ LOGGER.debug("Sending " + message); LOGGER.debug("Current request queue " + requests); } final Trackable t; LatchAndBase latchAndBase = null; if (message instanceof Trackable){ t = (Trackable) message; latchAndBase = new LatchAndBase(); requests.put(t.getUuid() + "/" + t.getUriFrom(), latchAndBase); } else { t = null; } sendInternal(message, uri); if (latchAndBase == null){ return null; } try { boolean complete = latchAndBase.latch.await(1, TimeUnit.SECONDS); if (complete){ return (Response) latchAndBase.base; } else{ return null; } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { if (latchAndBase != null){ requests.remove(t.getUuid() + "/" + t.getUriFrom()); } } } /** * Sends a message across the network while blocking. Catches and ignores IOException in transmission. Used * when the protocol for the message is not to wait for a response * @param message the message to send * @param u the uri to send it to */ public void sendOneWay(Base message, URI u) { try { sendInternal(message, u); } catch (RuntimeException ex) { LOGGER.debug("Send one way failed", ex); } } public void handleResponse(String k, Base v) { LatchAndBase latch = requests.get(k); latch.base = v; latch.latch.countDown(); } /** * Merge lists from remote members and update heartbeats * * @param senderMember * @param remoteList * */ public void mergeLists(RemoteMember senderMember, List<Member> remoteList) { if (LOGGER.isDebugEnabled()){ debugState(senderMember, remoteList); } for (LocalMember i : gossipManager.getDeadMembers()) { if (i.getId().equals(senderMember.getId())) { LOGGER.debug(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri()); i.recordHeartbeat(senderMember.getHeartbeat()); i.setHeartbeat(senderMember.getHeartbeat()); //TODO consider forcing an UP here } } for (Member remoteMember : remoteList) { if (remoteMember.getId().equals(gossipManager.getMyself().getId())) { continue; } LocalMember aNewMember = new LocalMember(remoteMember.getClusterName(), remoteMember.getUri(), remoteMember.getId(), remoteMember.getHeartbeat(), remoteMember.getProperties(), gossipManager.getSettings().getWindowSize(), gossipManager.getSettings().getMinimumSamples(), gossipManager.getSettings().getDistribution()); aNewMember.recordHeartbeat(remoteMember.getHeartbeat()); Object result = gossipManager.getMembers().putIfAbsent(aNewMember, GossipState.UP); if (result != null){ for (Entry<LocalMember, GossipState> localMember : gossipManager.getMembers().entrySet()){ if (localMember.getKey().getId().equals(remoteMember.getId())){ localMember.getKey().recordHeartbeat(remoteMember.getHeartbeat()); localMember.getKey().setHeartbeat(remoteMember.getHeartbeat()); localMember.getKey().setProperties(remoteMember.getProperties()); } } } } if (LOGGER.isDebugEnabled()){ debugState(senderMember, remoteList); } } //...... }
incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
public abstract class GossipManager { public static final Logger LOGGER = Logger.getLogger(GossipManager.class); // this mapper is used for ring and user-data persistence only. NOT messages. public static final ObjectMapper metdataObjectMapper = new ObjectMapper() { private static final long serialVersionUID = 1L; { enableDefaultTyping(); configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false); }}; private final ConcurrentSkipListMap<LocalMember, GossipState> members; private final LocalMember me; private final GossipSettings settings; private final AtomicBoolean gossipServiceRunning; private TransportManager transportManager; private ProtocolManager protocolManager; private final GossipCore gossipCore; private final DataReaper dataReaper; private final Clock clock; private final ScheduledExecutorService scheduledServiced; private final MetricRegistry registry; private final RingStatePersister ringState; private final UserDataPersister userDataState; private final GossipMemberStateRefresher memberStateRefresher; private final MessageHandler messageHandler; private final LockManager lockManager; public GossipManager(String cluster, URI uri, String id, Map<String, String> properties, GossipSettings settings, List<Member> gossipMembers, GossipListener listener, MetricRegistry registry, MessageHandler messageHandler) { this.settings = settings; this.messageHandler = messageHandler; clock = new SystemClock(); me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties, settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution()); gossipCore = new GossipCore(this, registry); this.lockManager = new LockManager(this, settings.getLockManagerSettings(), registry); dataReaper = new DataReaper(gossipCore, clock); members = new ConcurrentSkipListMap<>(); for (Member startupMember : gossipMembers) { if (!startupMember.equals(me)) { LocalMember member = new LocalMember(startupMember.getClusterName(), startupMember.getUri(), startupMember.getId(), clock.nanoTime(), startupMember.getProperties(), settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution()); //TODO should members start in down state? members.put(member, GossipState.DOWN); } } gossipServiceRunning = new AtomicBoolean(true); this.scheduledServiced = Executors.newScheduledThreadPool(1); this.registry = registry; this.ringState = new RingStatePersister(GossipManager.buildRingStatePath(this), this); this.userDataState = new UserDataPersister( gossipCore, GossipManager.buildPerNodeDataPath(this), GossipManager.buildSharedDataPath(this)); this.memberStateRefresher = new GossipMemberStateRefresher(members, settings, listener, this::findPerNodeGossipData); readSavedRingState(); readSavedDataState(); } /** * Starts the client. Specifically, start the various cycles for this protocol. Start the gossip * thread and start the receiver thread. */ public void init() { // protocol manager and transport managers are specified in settings. // construct them here via reflection. protocolManager = ReflectionUtils.constructWithReflection( settings.getProtocolManagerClass(), new Class<?>[] { GossipSettings.class, String.class, MetricRegistry.class }, new Object[] { settings, me.getId(), this.getRegistry() } ); transportManager = ReflectionUtils.constructWithReflection( settings.getTransportManagerClass(), new Class<?>[] { GossipManager.class, GossipCore.class}, new Object[] { this, gossipCore } ); // start processing gossip messages. transportManager.startEndpoint(); transportManager.startActiveGossiper(); dataReaper.init(); if (settings.isPersistRingState()) { scheduledServiced.scheduleAtFixedRate(ringState, 60, 60, TimeUnit.SECONDS); } if (settings.isPersistDataState()) { scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS); } memberStateRefresher.init(); LOGGER.debug("The GossipManager is started."); } /** * Shutdown the gossip service. */ public void shutdown() { gossipServiceRunning.set(false); lockManager.shutdown(); gossipCore.shutdown(); transportManager.shutdown(); dataReaper.close(); memberStateRefresher.shutdown(); scheduledServiced.shutdown(); try { scheduledServiced.awaitTermination(1, TimeUnit.SECONDS); } catch (InterruptedException e) { LOGGER.error(e); } scheduledServiced.shutdownNow(); } //...... }
incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java
public class GossipMemberStateRefresher { public static final Logger LOGGER = Logger.getLogger(GossipMemberStateRefresher.class); private final Map<LocalMember, GossipState> members; private final GossipSettings settings; private final List<GossipListener> listeners = new CopyOnWriteArrayList<>(); private final Clock clock; private final BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData; private final ExecutorService listenerExecutor; private final ScheduledExecutorService scheduledExecutor; private final BlockingQueue<Runnable> workQueue; public GossipMemberStateRefresher(Map<LocalMember, GossipState> members, GossipSettings settings, GossipListener listener, BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData) { this.members = members; this.settings = settings; listeners.add(listener); this.findPerNodeGossipData = findPerNodeGossipData; clock = new SystemClock(); workQueue = new ArrayBlockingQueue<>(1024); listenerExecutor = new ThreadPoolExecutor(1, 20, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy()); scheduledExecutor = Executors.newScheduledThreadPool(1); } public void init() { scheduledExecutor.scheduleAtFixedRate(() -> run(), 0, 100, TimeUnit.MILLISECONDS); } public void run() { try { runOnce(); } catch (RuntimeException ex) { LOGGER.warn("scheduled state had exception", ex); } } public void runOnce() { for (Entry<LocalMember, GossipState> entry : members.entrySet()) { boolean userDown = processOptimisticShutdown(entry); if (userDown) continue; Double phiMeasure = entry.getKey().detect(clock.nanoTime()); GossipState requiredState; if (phiMeasure != null) { requiredState = calcRequiredState(phiMeasure); } else { requiredState = calcRequiredStateCleanupInterval(entry.getKey(), entry.getValue()); } if (entry.getValue() != requiredState) { members.put(entry.getKey(), requiredState); /* Call listeners asynchronously */ for (GossipListener listener: listeners) listenerExecutor.execute(() -> listener.gossipEvent(entry.getKey(), requiredState)); } } } public GossipState calcRequiredState(Double phiMeasure) { if (phiMeasure > settings.getConvictThreshold()) return GossipState.DOWN; else return GossipState.UP; } public GossipState calcRequiredStateCleanupInterval(LocalMember member, GossipState state) { long now = clock.nanoTime(); long nowInMillis = TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS); if (nowInMillis - settings.getCleanupInterval() > member.getHeartbeat()) { return GossipState.DOWN; } else { return state; } } /** * If we have a special key the per-node data that means that the node has sent us * a pre-emptive shutdown message. We process this so node is seen down sooner * * @param l member to consider * @return true if node forced down */ public boolean processOptimisticShutdown(Entry<LocalMember, GossipState> l) { PerNodeDataMessage m = findPerNodeGossipData.apply(l.getKey().getId(), ShutdownMessage.PER_NODE_KEY); if (m == null) { return false; } ShutdownMessage s = (ShutdownMessage) m.getPayload(); if (s.getShutdownAtNanos() > l.getKey().getHeartbeat()) { members.put(l.getKey(), GossipState.DOWN); if (l.getValue() == GossipState.UP) { for (GossipListener listener: listeners) listenerExecutor.execute(() -> listener.gossipEvent(l.getKey(), GossipState.DOWN)); } return true; } return false; } public void register(GossipListener listener) { listeners.add(listener); } public void shutdown() { scheduledExecutor.shutdown(); try { scheduledExecutor.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { LOGGER.debug("Issue during shutdown", e); } listenerExecutor.shutdown(); try { listenerExecutor.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { LOGGER.debug("Issue during shutdown", e); } listenerExecutor.shutdownNow(); } }
每隔100ms执行 默认为10
incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java
/** * Manage the protcol threads (active and passive gossipers). */ public abstract class AbstractTransportManager implements TransportManager { public static final Logger LOGGER = Logger.getLogger(AbstractTransportManager.class); private final ExecutorService gossipThreadExecutor; private final AbstractActiveGossiper activeGossipThread; protected final GossipManager gossipManager; protected final GossipCore gossipCore; public AbstractTransportManager(GossipManager gossipManager, GossipCore gossipCore) { this.gossipManager = gossipManager; this.gossipCore = gossipCore; gossipThreadExecutor = Executors.newCachedThreadPool(); activeGossipThread = ReflectionUtils.constructWithReflection( gossipManager.getSettings().getActiveGossipClass(), new Class<?>[]{ GossipManager.class, GossipCore.class, MetricRegistry.class }, new Object[]{ gossipManager, gossipCore, gossipManager.getRegistry() }); } // shut down threads etc. @Override public void shutdown() { gossipThreadExecutor.shutdown(); if (activeGossipThread != null) { activeGossipThread.shutdown(); } try { boolean result = gossipThreadExecutor.awaitTermination(10, TimeUnit.MILLISECONDS); if (!result) { // common when blocking patterns are used to read data from a socket. LOGGER.warn("executor shutdown timed out"); } } catch (InterruptedException e) { LOGGER.error(e); } gossipThreadExecutor.shutdownNow(); } @Override public void startActiveGossiper() { activeGossipThread.init(); } @Override public abstract void startEndpoint(); }
incubator-retired-gossip/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossiper.java
/** * Base implementation gossips randomly to live nodes periodically gossips to dead ones * */ public class SimpleActiveGossiper extends AbstractActiveGossiper { private ScheduledExecutorService scheduledExecutorService; private final BlockingQueue<Runnable> workQueue; private ThreadPoolExecutor threadService; public SimpleActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) { super(gossipManager, gossipCore, registry); scheduledExecutorService = Executors.newScheduledThreadPool(2); workQueue = new ArrayBlockingQueue<Runnable>(1024); threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy()); } @Override public void init() { super.init(); scheduledExecutorService.scheduleAtFixedRate(() -> { threadService.execute(() -> { sendToALiveMember(); }); }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); scheduledExecutorService.scheduleAtFixedRate(() -> { sendToDeadMember(); }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); scheduledExecutorService.scheduleAtFixedRate( () -> sendPerNodeData(gossipManager.getMyself(), selectPartner(gossipManager.getLiveMembers())), 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); scheduledExecutorService.scheduleAtFixedRate( () -> sendSharedData(gossipManager.getMyself(), selectPartner(gossipManager.getLiveMembers())), 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); } @Override public void shutdown() { super.shutdown(); scheduledExecutorService.shutdown(); try { scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { LOGGER.debug("Issue during shutdown", e); } sendShutdownMessage(); threadService.shutdown(); try { threadService.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { LOGGER.debug("Issue during shutdown", e); } } protected void sendToALiveMember(){ LocalMember member = selectPartner(gossipManager.getLiveMembers()); sendMembershipList(gossipManager.getMyself(), member); } protected void sendToDeadMember(){ LocalMember member = selectPartner(gossipManager.getDeadMembers()); sendMembershipList(gossipManager.getMyself(), member); } /** * sends an optimistic shutdown message to several clusters nodes */ protected void sendShutdownMessage(){ List<LocalMember> l = gossipManager.getLiveMembers(); int sendTo = l.size() < 3 ? 1 : l.size() / 2; for (int i = 0; i < sendTo; i++) { threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l))); } } }
每隔gossipInterval执行
每隔gossipInterval执行
) 每隔100ms执行
);runOnce方法遍历GossipManager传入的members,然后挨个调用LocalMember的detect方法计算phiMeasure,如果该值不为null则执行calcRequiredState,否则执行calcRequiredStateCleanupInterval来计算requiredState;如果state发生变更则更新然后异步回调GossipListener的gossipEvent方法;calcRequiredState方法判断phiMeasure是否大于convictThreshold( 默认为10
),大于则返回GossipState.DOWN,否则返回GossipState.UP;calcRequiredStateCleanupInterval方法则判断当前时间是否大于cleanupInterval+member.getHeartbeat(),大于则返回GossipState.DOWN,否则返回原有的state 每次这样全量sendMembershipList在memberList非常多的情况下可能会有效率方面的问题