当服务的多个实例可以调用指定的外部服务,在这些服务实例中都要定制断路策略很浪费,比如调用外部服务一段时间后进行关闭处理逻辑等。他们可以统一共享调用同一个外部服务的统计信息,这样一个调用失败以后,其他服务实例就不要再重试一遍,这是使用分布式断路器的地方。
由于找不到现有解决方案,我们决定自行尝试一下。简而言之,我们从ratelimitj的启发中快速构建了一个分布式断路器。通话统计信息是使用Redis共享的,它就像一个超级按钮。我们计划将其开源,但这将是另一篇文章的主题;-) 这里分享思路。
1. 共享的统计信息:指标,跟踪和监视
我们对指标的需求非常简单:对于每个队列和每种命令,我们都希望跟踪,并了解其中有多少成功了,或失败和重试或最终移入隔离区。这将使我们能够了解流量并在需要时调整参数。
鉴于我们在Java / Kotlin应用程序中使用的是Spring Boot,因此这里没有做任何决定:我们将照常使用Micrometer来发布带有适当标签的量规,然后在 Datadog中 遵循这些指标,这是一个(好)监控SAAS。
2. 可视化
尽管有日志,指标和警报是很明显的,但我们希望有一种方法可以随时可视化计划或隔离的任务及其尝试次数和任何可能的错误,以及采取行动来执行这些任务(删除它们) ,重新安排时间,等等)。
3.总体设计思路:
第一个设计决策是关于如何表示命令,执行请求和隔离的。以下是做出的主要决定和最终决定:
4.为指定服务配置新的命令队列
<font><i>//将请求定义为命令 放入队列中</i></font><font> <b>const</b> val MY_QUEUE_NAME: String = </font><font>"myServiceQueue"</font><font> @Configuration <b>class</b> MyServiceQueueConfiguration { @Bean(MY_QUEUE_NAME) fun myServiceQueue(commandExecutionQueueFactory: CommandExecutionQueueFactory) = commandExecutionQueueFactory.createQueue( MY_QUEUE_NAME, </font><font><i>// optionally redefine part or totality of the default policy</i></font><font> DEFAULT_EXECUTION_POLICY.copy( concurrency = 4, delayBeforeConsideringTask = Duration.ofSeconds(5), maxRetriesBeforeQuarantine = 10, </font><font><i>// optional, none by default</i></font><font> rateLimits = RateLimits( 2300 executionsOver Duration.ofMinutes(15), 4500 executionsOver Duration.ofMinutes(30), 8800 executionsOver Duration.ofHours(1) ), </font><font><i>// optional, none by default</i></font><font> circuitBreaking = CircuitBreaking( failureRateThreshold = 0.5, windowDuration = Duration.ofMinutes(10), </font><font><i>// will tell that some exceptions are to be considered as provider failures</i></font><font> considerExceptionAsFailureIf = someExceptionPredicate() ) ), </font><font><i>// optional, a probe that will be queried to know whether to pause task consumption</i></font><font> </font><font><i>// (may query a feature flag, a state defined via some UI, etc.)</i></font><font> somePauseProbe() ) } </font>
您可以 在此处 找到 所有可用的队列选项
将命令执行请求定义为一个简单的对象,其中包含要执行的命令的名称和一个(Java)参数映射。
5. 命令执行队列的核心逻辑
<b>class</b> CommandQueue(...) { <font><i>// ...</i></font><font> override fun schedule(command: CommandSpecification) { </font><font><i>// add task to queue, log details, emit metrics</i></font><font> schedule(ScheduledTask( command, queueName, clock, </font><font><i>// this is the important part for deduplication to work</i></font><font> scheduledExecutionDate = executionPolicy.computeNextExecutionDate(clock, tries = 0) ), command.deduplicate) } </font><font><i>// ...</i></font><font> override fun processCommands(): Boolean { val circuitBreaker = circuitBreaker() <b>if</b> (circuitBreaker.isOpen()) { <b>return</b> false } val task = taskRepository.tryLockingTaskWithEarliestScheduleOlderThan(queueName, LocalDateTime.now(clock)) ?: <b>return</b> false val command = commandRegistry.get(task.commandName) </font><font><i>// each case: decides what to do with task, log details, emit metrics</i></font><font> val executionResult = when { command == <b>null</b> -> commandNotFound(task) violatesRateLimit(task.weight) -> rateLimited(task) !running -> aborted(task) <b>else</b> -> executeCommand(command, task) } registerCall(circuitBreaker, executionResult) </font><font><i>// remove task, or move it to quarantine, or update number of tries</i></font><font> handleExecutionResult(executionResult) <b>return</b> executionResult.commandExecuted } </font><font><i>// ...</i></font><font> } </font>
6.缺陷
轮询PostgreSQL表也不是一个好主意。但是,每个“逻辑”队列每秒最多只能轮询一次。
谈到队列,它们都在同一张表中进行管理,考虑到更多的使用情况,这可能是性能问题。如果发生这种情况,我们可以将表专用于每个队列,而当要添加队列时,我们需要付出更多配置的代价(现在我们需要创建表)。
我们系统的一个更实际的限制是它目前仅处理同步操作,但是我们可以对其进行调整,以便以异步方式接收命令执行的结果。
更多点击标题见原文