转载

Spring Cloud Stream 错误处理详解

点击上方 "IT牧场" ,选择 "设为星标" 技术干货每日送达!

TIPS

本文基于Spring Cloud Greenwich SR1,理论支持Finchley及更高版本。

本节详细探讨Spring Cloud Stream的错误处理。

应用处理

局部处理(通用)

配置:

spring:

cloud:

stream:

bindings:

input:

destination: my-destination

group: my-group

output:

destination: my-destination

代码:

@Slf4j

@SpringBootApplication

@EnableBinding({Processor.class})

@EnableScheduling

public class ConsumerApplication {

public static void main(String[] args) {

SpringApplication.run(ConsumerApplication.class, args);

}


@StreamListener(value = Processor.INPUT)

public void handle(String body) {

throw new RuntimeException("x");

}


@ServiceActivator(inputChannel = "my-destination.my-group.errors")

public void handleError(ErrorMessage message) {

Throwable throwable = message.getPayload();

log.error("截获异常", throwable);


Message<?> originalMessage = message.getOriginalMessage();

assert originalMessage != null;


log.info("原始消息体 = {}", new String((byte[]) originalMessage.getPayload()));

}


@Bean

@InboundChannelAdapter(value = Processor.OUTPUT,

poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))

public MessageSource<String> test() {

return () -> new GenericMessage<>("adfdfdsafdsfa");

}

}

全局处理(通用)

@StreamListener(value = Processor.INPUT)

public void handle(String body) {

throw new RuntimeException("x");

}


@StreamListener("errorChannel")

public void error(Message<?> message) {

ErrorMessage errorMessage = (ErrorMessage) message;

System.out.println("Handling ERROR: " + errorMessage);

}

系统处理

系统处理方式,因消息中间件不同而异。如果应用没有配置错误处理,那么error将会被传播给binder,binder将error回传给消息中间件。消息中间件可以丢弃消息、requeue(重新排队,从而重新处理)或将失败的消息发送给DLQ(死信队列)。

丢弃

默认情况下,错误消息将被丢弃。虽然在某些情况下可以接受,但这种方式一般不适用于生产。

DLQ(RabbitMQ)

TIPS

虽然RocketMQ也支持DLQ,但目前RocketMQ控制台并不支持在界面上操作,将死信放回消息队列,让客户端重新处理。所以使用很不方便,而且用法也和本节有一些差异。 如使用RocketMQ, 建议参考上面 应用处理 一节的用法,也可额外订阅这个Topic  %DLQ%+consumerGroup 个人给RocketMQ控制台提的Issue: https://github.com/apache/rocketmq/issues/1334

配置:

spring:

cloud:

stream:

bindings:

input:

destination: my-destination

group: my-group

output:

destination: my-destination

rabbit:

bindings:

input:

consumer:

auto-bind-dlq: true

代码:

@StreamListener(value = Processor.INPUT)

public void handle(String body) {

throw new RuntimeException("x");

}


@Bean

@InboundChannelAdapter(value = Processor.OUTPUT,

poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))

public MessageSource<String> test() {

return () -> new GenericMessage<>("adfdfdsafdsfa");

}

这样,消息消费失败后,就会放入死信队列。在控制台操作一下,即可将死信放回消息队列,这样,客户端就可以重新处理。

如果想获取原始错误的异常堆栈,可添加如下配置:

spring:

cloud:

stream:

rabbit:

bindings:

input:

consumer:

republish-to-dlq: true

requeue(RabbitMQ)

Rabbit/Kafka的binder依赖RetryTemplate实现重试,从而提升消息处理的成功率。然而,如果设置了 spring.cloud.stream.bindings.input.consumer.max-attempts=1 ,那么RetryTemplate则不再重试。此时可通过requeue方式处理异常。

添加如下配置:

# 默认是3,设为1则禁用重试

spring.cloud.stream.bindings.<input channel名称>.consumer.max-attempts=1

# 表示是否要requeue被拒绝的消息(即:requeue处理失败的消息)

spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=true

这样,失败的消息将会被重新提交到同一个handler进行处理,直到handler抛出 AmqpRejectAndDontRequeueException 异常为止。

RetryTemplate(通用)

配置方式

RetryTemplate重试也是错误处理的一种手段。

spring:

cloud:

stream:

bindings:

<input channel名称>:

consumer:

# 最多尝试处理几次,默认3

maxAttempts: 3

# 重试时初始避退间隔,单位毫秒,默认1000

backOffInitialInterval: 1000

# 重试时最大避退间隔,单位毫秒,默认10000

backOffMaxInterval: 10000

# 避退乘数,默认2.0

backOffMultiplier: 2.0

# 当listen抛出retryableExceptions未列出的异常时,是否要重试

defaultRetryable: true

# 异常是否允许重试的map映射

retryableExceptions:

java.lang.RuntimeException: true

java.lang.IllegalStateException: false

测试代码:

@StreamListener(value = Processor.INPUT)

public void handle(String body) {

throw new RuntimeException(body);

}


private AtomicInteger count = new AtomicInteger(0);


@Bean

@InboundChannelAdapter(value = Processor.OUTPUT,

poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))

public MessageSource<String> test() {

return () -> new GenericMessage<>(count.getAndAdd(1) + "");

}

编码方式

多数场景下,使用配置方式定制重试行为都是可以满足需求的,但配置方式可能无法满足一些复杂需求。此时可使用编码方式配置RetryTemplate:

@Configuration

class RetryConfiguration {

@StreamRetryTemplate

public RetryTemplate sinkConsumerRetryTemplate() {

RetryTemplate retryTemplate = new RetryTemplate();

retryTemplate.setRetryPolicy(retryPolicy());

retryTemplate.setBackOffPolicy(backOffPolicy());


return retryTemplate;

}


private ExceptionClassifierRetryPolicy retryPolicy() {

BinaryExceptionClassifier keepRetryingClassifier = new BinaryExceptionClassifier(

Collections.singletonList(IllegalAccessException.class

));

keepRetryingClassifier.setTraverseCauses(true);


SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(3);

AlwaysRetryPolicy alwaysRetryPolicy = new AlwaysRetryPolicy();


ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy();

retryPolicy.setExceptionClassifier(

classifiable -> keepRetryingClassifier.classify(classifiable) ?

alwaysRetryPolicy : simpleRetryPolicy);


return retryPolicy;

}


private FixedBackOffPolicy backOffPolicy() {

final FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();

backOffPolicy.setBackOffPeriod(2);


return backOffPolicy;

}

}

然后添加配置:

spring.cloud.stream.bindings.<input channel名称>.consumer.retry-template-name=myRetryTemplate

注意:

Spring Cloud Stream 2.2才支持设置retry-template-name

干货分享

最近将个人学习笔记整理成册,使用PDF分享。关注我,回复如下代码,即可获得百度盘地址,无套路领取!

001:《Java并发与高并发解决方案》学习笔记; 002:《深入JVM内核——原理、诊断与优化》学习笔记; 003:《Java面试宝典》 004:《Docker开源书》 005:《Kubernetes开源书》 006:《DDD速成(领域驱动设计速成)》 007: 全部 008: 加技术讨论群

近期热文

想知道更多?长按/扫码关注我吧↓↓↓ Spring Cloud Stream 错误处理详解 >>>技术讨论群<<< 喜欢就点个 "在看" 呗^_^

原文  http://mp.weixin.qq.com/s?__biz=MzI4ODQ3NjE2OA==&mid=2247485390&idx=2&sn=42c84dc59b8ec044a3ede5c1ffd68897
正文到此结束
Loading...