本文主要研究一下RocketMQTransactionAnnotationProcessor
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/config/RocketMQTransactionAnnotationProcessor.java
public class RocketMQTransactionAnnotationProcessor implements BeanPostProcessor, Ordered, ApplicationContextAware { private final static Logger log = LoggerFactory.getLogger(RocketMQTransactionAnnotationProcessor.class); private ApplicationContext applicationContext; private final Set<Class<?>> nonProcessedClasses = Collections.newSetFromMap(new ConcurrentHashMap<Class<?>, Boolean>(64)); private TransactionHandlerRegistry transactionHandlerRegistry; public RocketMQTransactionAnnotationProcessor(TransactionHandlerRegistry transactionHandlerRegistry) { this.transactionHandlerRegistry = transactionHandlerRegistry; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if (!this.nonProcessedClasses.contains(bean.getClass())) { Class<?> targetClass = AopUtils.getTargetClass(bean); RocketMQTransactionListener listener = AnnotationUtils.findAnnotation(targetClass, RocketMQTransactionListener.class); this.nonProcessedClasses.add(bean.getClass()); if (listener == null) { // for quick search log.trace("No @RocketMQTransactionListener annotations found on bean type: {}", bean.getClass()); } else { try { processTransactionListenerAnnotation(listener, bean); } catch (MQClientException e) { log.error("Failed to process annotation " + listener, e); throw new BeanCreationException("Failed to process annotation " + listener, e); } } } return bean; } private void processTransactionListenerAnnotation(RocketMQTransactionListener listener, Object bean) throws MQClientException { if (transactionHandlerRegistry == null) { throw new MQClientException("Bad usage of @RocketMQTransactionListener, " + "the class must work with RocketMQTemplate", null); } if (!RocketMQLocalTransactionListener.class.isAssignableFrom(bean.getClass())) { throw new MQClientException("Bad usage of @RocketMQTransactionListener, " + "the class must implement interface RocketMQLocalTransactionListener", null); } TransactionHandler transactionHandler = new TransactionHandler(); transactionHandler.setBeanFactory(this.applicationContext.getAutowireCapableBeanFactory()); transactionHandler.setName(listener.txProducerGroup()); transactionHandler.setBeanName(bean.getClass().getName()); transactionHandler.setListener((RocketMQLocalTransactionListener) bean); transactionHandler.setCheckExecutor(listener.corePoolSize(), listener.maximumPoolSize(), listener.keepAliveTime(), listener.blockingQueueSize()); RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), listener.accessKey(), listener.secretKey()); if (Objects.nonNull(rpcHook)) { transactionHandler.setRpcHook(rpcHook); } else { log.debug("Access-key or secret-key not configure in " + listener + "."); } transactionHandlerRegistry.registerTransactionHandler(transactionHandler); } @Override public int getOrder() { return LOWEST_PRECEDENCE; } }
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/config/TransactionHandler.java
class TransactionHandler { private String name; private String beanName; private RocketMQLocalTransactionListener bean; private BeanFactory beanFactory; private ThreadPoolExecutor checkExecutor; private RPCHook rpcHook; public String getBeanName() { return beanName; } public void setBeanName(String beanName) { this.beanName = beanName; } public String getName() { return name; } public void setName(String name) { this.name = name; } public RPCHook getRpcHook() { return rpcHook; } public void setRpcHook(RPCHook rpcHook) { this.rpcHook = rpcHook; } public BeanFactory getBeanFactory() { return beanFactory; } public void setBeanFactory(BeanFactory beanFactory) { this.beanFactory = beanFactory; } public void setListener(RocketMQLocalTransactionListener listener) { this.bean = listener; } public RocketMQLocalTransactionListener getListener() { return this.bean; } public void setCheckExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, int blockingQueueSize) { this.checkExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(blockingQueueSize)); } public ThreadPoolExecutor getCheckExecutor() { return checkExecutor; } }
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java
public class TransactionHandlerRegistry implements DisposableBean { private RocketMQTemplate rocketMQTemplate; private final Set<String> listenerContainers = new ConcurrentSet<>(); public TransactionHandlerRegistry(RocketMQTemplate template) { this.rocketMQTemplate = template; } @Override public void destroy() throws Exception { listenerContainers.clear(); } public void registerTransactionHandler(TransactionHandler handler) throws MQClientException { if (listenerContainers.contains(handler.getName())) { throw new MQClientException(-1, String .format("The transaction name [%s] has been defined in TransactionListener [%s]", handler.getName(), handler.getBeanName())); } listenerContainers.add(handler.getName()); rocketMQTemplate.createAndStartTransactionMQProducer(handler.getName(), handler.getListener(), handler.getCheckExecutor(), handler.getRpcHook()); } }