本文主要研究一下CarreraProducer的sendDelay
DDMQ/carrera-sdk/producer/java/carrera-producer-sdk/src/main/java/com/xiaojukeji/carrera/producer/ProducerInterface.java
public interface ProducerInterface { void start() throws Exception; void shutdown(); Result sendMessage(Message message); Result send(String topic, byte[] body); Result send(String topic, String body); Result sendByCharset(String topic, String body, String charsetName); Result send(String topic, String body, String key, String... tags); Result send(String topic, byte[] body, String key, String... tags); Result sendByCharset(String topic, String body, String charsetName, String key, String... tags); Result sendWithHashId(String topic, long hashId, String body, String key, String... tags); Result sendWithHashId(String topic, long hashId, byte[] body, String key, String... tags); Result sendWithHashIdByCharset(String topic, long hashId, String body, String charsetName, String key, String[] tags); Result sendWithPartition(String topic, int partitionId, long hashId, byte[] body, String key, String... tags); Result sendWithPartition(String topic, int partitionId, long hashId, String body, String key, String... tags); Result sendWithPartitionByCharset(String topic, int partitionId, long hashId, String body, String charsetName, String key, String[] tags); Result sendBatchConcurrently(List<Message> messages); Result sendBatchOrderly(List<Message> messages); DelayResult sendDelay(String topic, byte[] body, DelayMeta delayMeta); DelayResult sendDelay(String topic, String body, DelayMeta delayMeta); DelayResult sendDelayByCharset(String topic, String body, DelayMeta delayMeta, String charsetName); DelayResult sendDelay(String topic, String body, DelayMeta delayMeta, String... tags); DelayResult sendDelay(String topic, byte[] body, DelayMeta delayMeta, String... tags); DelayResult sendDelayByCharset(String topic, String body, DelayMeta delayMeta, String charsetName, String... tags); DelayResult cancelDelay(String topic, String uniqDelayMsgId); DelayResult cancelDelay(String topic, String uniqDelayMsgId, String... tags); Result sendBatchSync(List<Message> messages); }
DDMQ/carrera-sdk/producer/java/carrera-producer-sdk/src/main/java/com/xiaojukeji/carrera/producer/CarreraProducer.java
public class CarreraProducer implements ProducerInterface { private ProducerInterface producer; private CarreraConfig config; public CarreraProducer(CarreraConfig config) { producer = new LocalCarreraProducer(config); this.config = config; } public static CarreraProducer newCarreraProducer(CarreraConfig config) throws Exception { return new CarreraProducer(config); } public MessageBuilder messageBuilder() { return new MessageBuilder(this); } public AddDelayMessageBuilder addDelayMessageBuilder() { return new AddDelayMessageBuilder(this); } public CancelDelayMessageBuilder cancelDelayMessageBuilder() { return new CancelDelayMessageBuilder(this); } public AddTxMonitorMessageBuilder addTxMonitorMessageBuilder(AddDelayMessageBuilder addDelayMessageBuilder) { return new AddTxMonitorMessageBuilder(addDelayMessageBuilder); } public CancelTxMonitorMessageBuilder cancelTxMonitorMessageBuilder(CancelDelayMessageBuilder cancelDelayMessageBuilder) { return new CancelTxMonitorMessageBuilder(cancelDelayMessageBuilder); } public TxBusinessMessageBuilder txBusinessMessageBuilder(MessageBuilder messageBuilder) { return new TxBusinessMessageBuilder(messageBuilder); } //...... @Override public DelayResult sendDelay(String topic, byte[] body, DelayMeta delayMeta) { return producer.sendDelay(topic, body, delayMeta); } @Override public DelayResult sendDelay(String topic, String body, DelayMeta delayMeta) { return producer.sendDelay(topic, body, delayMeta); } @Override public DelayResult sendDelayByCharset(String topic, String body, DelayMeta delayMeta, String charsetName) { return producer.sendDelayByCharset(topic, body, delayMeta, charsetName); } @Override public DelayResult sendDelay(String topic, String body, DelayMeta delayMeta, String... tags) { return producer.sendDelay(topic, body, delayMeta, tags); } @Override public DelayResult sendDelay(String topic, byte[] body, DelayMeta delayMeta, String... tags) { return producer.sendDelay(topic, body, delayMeta, tags); } @Override public DelayResult sendDelayByCharset(String topic, String body, DelayMeta delayMeta, String charsetName, String... tags) { return producer.sendDelayByCharset(topic, body, delayMeta, charsetName, tags); } @Override public DelayResult cancelDelay(String topic, String uniqDelayMsgId) { return producer.cancelDelay(topic, uniqDelayMsgId); } @Override public DelayResult cancelDelay(String topic, String uniqDelayMsgId, String... tags) { return producer.cancelDelay(topic, uniqDelayMsgId, tags); } //...... }
DDMQ/carrera-sdk/producer/java/carrera-producer-sdk/src/main/java/com/xiaojukeji/carrera/producer/LocalCarreraProducer.java
public class LocalCarreraProducer extends CarreraProducerBase implements ProducerInterface { public LocalCarreraProducer(CarreraConfig config) { super(config); } @Override protected void initNodeMgr() throws Exception { nodeMgr = NodeManager.newLocalNodeManager(config, config.getCarreraProxyList()); nodeMgr.initConnectionPool(); } }
DDMQ/carrera-sdk/producer/java/carrera-producer-sdk/src/main/java/com/xiaojukeji/carrera/producer/CarreraProducerBase.java
public abstract class CarreraProducerBase implements ProducerInterface { private static final Logger LOGGER = LoggerFactory.getLogger(CarreraProducerBase.class); private static final Logger DROP_LOGGER = LoggerFactory.getLogger("DROP_LOG"); private static final int DELAY_ACTIONS_ADD = 1; private static final int DELAY_ACTIONS_CANCEL = 2; private static final String TAGS_SEPARATOR = "||"; private volatile boolean isRunning = false; protected NodeManager nodeMgr; protected CarreraConfig config; private ExecutorService executor; public CarreraProducerBase(CarreraConfig config) { this.config = config; } //...... public DelayResult sendDelay(String topic, byte[] body, DelayMeta delayMeta) { return sendDelayMessage(buildDelayMessage4Add(topic, body, delayMeta, randomKey())); } public DelayResult sendDelay(String topic, String body, DelayMeta delayMeta) { return sendDelayMessage(buildDelayMessage4Add(topic, body.getBytes(), delayMeta, randomKey())); } public DelayResult sendDelayByCharset(String topic, String body, DelayMeta delayMeta, String charsetName) { try { return sendDelayMessage(buildDelayMessage4Add(topic, body.getBytes(charsetName), delayMeta)); } catch (UnsupportedEncodingException e) { return new DelayResult(CHARSET_ENCODING_EXCEPTION, e.getMessage(), ""); } } public DelayResult sendDelay(String topic, String body, DelayMeta delayMeta, String... tags) { return sendDelayMessage(buildDelayMessage4Add(topic, body.getBytes(), delayMeta, tags)); } public DelayResult sendDelay(String topic, byte[] body, DelayMeta delayMeta, String... tags) { return sendDelayMessage(buildDelayMessage4Add(topic, body, delayMeta, tags)); } public DelayResult sendDelayByCharset(String topic, String body, DelayMeta delayMeta, String charsetName, String... tags) { try { return sendDelayMessage(buildDelayMessage4Add(topic, body.getBytes(charsetName), delayMeta, tags)); } catch (UnsupportedEncodingException e) { return new DelayResult(CHARSET_ENCODING_EXCEPTION, e.getMessage(), ""); } } public DelayResult cancelDelay(String topic, String uniqDelayMsgId) { return sendDelayMessage(buildDelayMessage4Cancel(topic, uniqDelayMsgId, randomKey())); } public DelayResult cancelDelay(String topic, String uniqDelayMsgId, String... tags) { return sendDelayMessage(buildDelayMessage4Cancel(topic, uniqDelayMsgId, tags)); } private DelayMessage buildDelayMessage4Add(String topic, byte[] body, DelayMeta delayMeta, String... tags) { DelayMessage delayMessage = new DelayMessage(); delayMessage.setTopic(topic); delayMessage.setBody(body); delayMessage.setAction(DELAY_ACTIONS_ADD); delayMessage.setTimestamp(delayMeta.getTimestamp()); delayMessage.setDmsgtype(delayMeta.getDmsgtype()); delayMessage.setInterval(delayMeta.getInterval()); delayMessage.setExpire(delayMeta.getExpire()); delayMessage.setTimes(delayMeta.getTimes()); delayMessage.setUuid(new UUID().toString()); delayMessage.setVersion(VersionUtils.getVersion()); if (null != delayMeta.getProperties() && delayMeta.getProperties().size() > 0) { delayMessage.setProperties(delayMeta.getProperties()); } if (ArrayUtils.isNotEmpty(tags)) { delayMessage.setTags(StringUtils.join(tags, TAGS_SEPARATOR)); } return delayMessage; } private DelayMessage buildDelayMessage4Cancel(String topic, String uniqDelayMsgId, String... tags) { DelayMessage delayMessage = new DelayMessage(); delayMessage.setTopic(topic); delayMessage.setUniqDelayMsgId(uniqDelayMsgId); delayMessage.setAction(DELAY_ACTIONS_CANCEL); delayMessage.setVersion(VersionUtils.getVersion()); delayMessage.setBody("c".getBytes()); // if body is null, new String(message.getBody()) will throw NullPointerException if (ArrayUtils.isNotEmpty(tags)) { delayMessage.setTags(StringUtils.join(tags, TAGS_SEPARATOR)); } return delayMessage; } private DelayResult sendDelayMessage(DelayMessage message) { DelayResult result = new DelayResult(UNKNOWN_EXCEPTION, "unknown exception", ""); if (!isRunning) { result.setCode(CLIENT_EXCEPTION); result.setMsg("please execute the start() method before sending the message"); return result; } int retryCnt = 0; long start, used = 0; long begin = TimeUtils.getCurTime(); String proxyAddress = null; do { CarreraConnection connection = null; try { connection = nodeMgr.borrowConnection(config.getCarreraClientTimeout()); if (connection == null) { if (result.getCode() == UNKNOWN_EXCEPTION) { result.setCode(NO_MORE_HEALTHY_NODE); result.setMsg("no more healthy node"); } delay(config.getCarreraClientTimeout()); continue; } proxyAddress = connection.getNode().toString(); start = TimeUtils.getCurTime(); result = connection.sendDelay(message, this.config.getCarreraProxyTimeout()); used = TimeUtils.getElapseTime(start); if (result.getCode() > OK) { switch (result.getCode()) { case FAIL_ILLEGAL_MSG: case FAIL_TOPIC_NOT_ALLOWED: case FAIL_TOPIC_NOT_EXIST: case FAIL_TIMEOUT: case FAIL_REFUSED_BY_RATE_LIMITER: delay(Math.max(this.config.getCarreraClientTimeout() - TimeUtils.getElapseTime(start), 0)); break; default: nodeMgr.unhealthyNode(connection.getNode()); delay(Math.max(this.config.getCarreraClientTimeout() - TimeUtils.getElapseTime(start), 0)); break; //break switch } } else { break; //break loop } } catch (Exception e) { LOGGER.warn("sendMessage failed, retry count:" + retryCnt + ", topic:" + message.topic + ", key:" + message.uniqDelayMsgId, e); result.setCode(CLIENT_EXCEPTION); result.setMsg(e.toString()); } finally { if (connection != null) { nodeMgr.returnConnection(connection); } } } while (retryCnt++ < this.config.getCarreraClientRetry()); if (result.getCode() > OK) { LOGGER.error("send delay msg result:{}; msg[ip:{},topic:{},uuid:{},uniqDelayMsgId:{},len:{},used:{},retryCount:{},ret.Code:{},ret.Msg:{}]", resultToString(result), proxyAddress, message.getTopic(), message.getUuid(), message.getUniqDelayMsgId(), StringUtils.length(new String(message.getBody())), TimeUtils.getElapseTime(begin), retryCnt, result.getCode(), result.getMsg()); } else { if (LOGGER.isDebugEnabled()) { LOGGER.debug("send delay msg result:{}; msg[ip:{},topic:{},uniqDelayMsgId:{},len:{},used:{},retryCount:{}]", resultToString(result), proxyAddress, message.getTopic(), result.getUniqDelayMsgId(), StringUtils.length(new String(message.getBody())), used, retryCnt); } } return result; } }
ProducerInterface定义了几个sendDelay及cancelDelay方法;CarreraProducer实现了ProducerInterface接口,其sendDelay、cancelDelay方法委托给了LocalCarreraProducer;LocalCarreraProducer继承了CarreraProducerBase,实现了ProducerInterface接口;CarreraProducerBase的sendDelay通过buildDelayMessage4Add构造DelayMessage,而cancelDelay通过buildDelayMessage4Cancel构造DelayMessage,最后通过sendDelayMessage方法发送消息