艳杰。擅长 Python 与 JAVA , 现任饿了么物流团队资深 Python 工程师,负责分流核心链路, 专注于系统业务分析及稳定性建设。
上次我们分享了我们团队 Java应用Docker化部署GC变长的踩坑经历 ,发现还真的帮助很多同学解决了他们项目中同样的问题。这对我们来说真的是很大的一个激励,所以我们决定后面会不定期分享一些我们团队的踩坑经历,今天分享的是 Spring-RabbitMQ consumer 的两个坑,希望能对大家有所帮助。
spring-rabbit 版本变更至 1.6.2.RELEASE
consumer 数量正常,mq 控制面板的 prefetch
参数始终是1, 消息无法正常 ack, 队列处于假死状态, 系统报异常 org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException
[2018-09-09 10:31:27.27]RuntimeException-org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException [ ERROR] [ Elog ] Execution of Rabbit message listener failed. org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:870) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:780) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312) at java.lang.Thread.run(Thread.java:748) Caused by: org.springframework.amqp.AmqpIllegalStateException: No default listener method specified: Either specify a non-null value for the 'defaultListenerMethod' property or override the 'getListenerMethodName' method. at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:291) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777) ... 10 more 复制代码
ListenerExecutionFailedException
根据官方文档说明,是 consumer 在消息消费时发生了异常, 默认情况下,该消息会被 reject, 并重新回到队列中, 但如果异常发生在到达用户代码之前的异常,此消息将会一直投递。
org.springframework.amqp.AmqpIllegalStateException
该异常抛出,RabbitMQ无法收到消息回应,将一直处于等待状态。
No default listener method specified: Either specify a non-null value ...
找不到 consumer OnMessage
的方法, 猜测是类加载错误,于是重新开始检查 consumer 类定义,最终发现 rabbit:listener
的 ID 和 真实 consumer 的 ID 冲突,导致真实的 consumer Bean 无效,找不到接受消息的方法,而 rabbitmq 在与 client 端通信过程中发生异常,会停止消费。
<bean id="consumer_1" class="me.ele.Consumer1"/> <rabbit:listener-container connection-factory="galaxyConnectionFactory" acknowledge="manual" concurrency="16" > <rabbit:listener queues="queue_1" ref="consumer_1" id="consumer_1" /> </rabbit:listener-container> 复制代码
删除 rabbit:listener
的 ID 属性
原服务有 6 个队列,每个队列起 10 个 consumer, task-executor
线程池设置为 90, 后因数据量增加,发现消费能力不足,决定增加 consumer 数量,调整 listener-container 的 concurrency 为 20,重启服务器。
部分队列 consumer 数量不足,缺失项始终为 xml 中声明在后的队列
回退 concurrency 参数为 10 异常消失,观察异常现场,发现 consumer 消失队列有先后顺序之分,且 consumer 数量存在上限值为 90,猜测是收参数限制,检查配置参数如下:
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" id="taskExecutor"> <!--核心线程数 --> <property name="corePoolSize" value="16"/> <!--最大线程数 --> <property name="maxPoolSize" value="16"/> <property name="queueCapacity" value="500"/> <!--线程池维护线程所允许的空闲时间 --> <property name="keepAliveSeconds" value="60"/> <!--线程池对拒绝任务(无线程可用)的处理策略 --> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/> </property> <property name="WaitForTasksToCompleteOnShutdown" value="true"/> </bean> <rabbit:listener-container connection-factory="monitorConnectionFactory" acknowledge="manual" task-executor="taskExecutor" prefetch="10" concurrency="20"> <rabbit:listener queues="queue_1" ref="consumer_1"/> <rabbit:listener queues="queue_2" ref="consumer_2"/> <rabbit:listener queues="queue_3" ref="consumer_3"/> <rabbit:listener queues="queue_4" ref="consumer_4"/> <rabbit:listener queues="queue_5" ref="consumer_5"/> <rabbit:listener queues="queue_6" ref="consumer_6"/> </rabbit:listener-container> 复制代码
经排查对比,发现上限值与 task-executor ThreadPoolTaskExecutor 参数 corePoolSize
、 maxPoolSize
极为接近,查询相关资料发现,多个 queue 的 consumer 会共用 taskExecutor 的线程池数量,如果线程池数量不足,consumer 无法创建。 后发现官方文档已有明确说明.
增大 task-executor corePoolSize
和 maxPoolSize
的值为 200,重启服务,解决。