BPEL流程自动管理和BPM人工工作流是两种流程,前者主要将现有的服务按照流程定义规则进行调度组合协同,是纯粹的机器之间的协同工作,而BPM代表的工作流是有人工参与的协同工作。
BPEL是一种基于XML的流程规范语言,主要关注自动化;BPMN 最初 是一个纯粹的图形化业务流程表示法,是BPM的符号表示法。BPMN和BPEL经常结合使用BPMN用于以业务用户为中心的视角,BPEL用于机器之间的技术规范。
到了BPMN 版本2.0 ,BPMN标准中添加了自己的XML格式。因此,BPEL在BPMN环境中变得不那么重要了,因为BPMN现在满足了大部分业务和IT需求,目前,BPMN 2.0 XML格式可以说是在系统之间传输过程模型的最流行的标准,包括了BPEL的一部分。
Activiti是一个流程引擎核心,能够接受流程定义,运行时记录流程状态,可以作为整个流程的记录器,或状态机,Spring-Intergration是Spring的集成框架,能够实现服务之间的集成调度,这两者结合可以实现流程的人机交互协同工作。
本文翻译自Spring 官方文档 :
Activiti是一个 业务流程引擎 。它基本上是一个有节点(状态)的有向图,用于模拟复杂业务流程的状态。它跟踪业务流程中描述的工作进度。它描述了系统中的自动和基于人的角色。它还支持询问业务流程引擎,询问有关正在进行的流程实例的问题:有多少存在,哪些存在停滞等。业务流程管理系统(BPMS)提供了许多优势,其中一些优点是:
最后一点值得关注:像Activiti这样的优秀BPM引擎支持可视化建模业务流程。UML支持使用活动(步骤)和泳道(参与满足这些步骤所涉及的代理)直观地描述流程。当然,UML只是一种建模工具。它没有运行时语义。业务流程管理的圣杯一直是有一个可供业务分析师 和 应用程序开发人员使用的建模符号。 BPMN 2 就像我们实现这一目标一样接近。
例如,这是一个 非常 简单的业务流程的可视化模型。
这是为支持该模型而创建的标准XML标记。这个XML具有执行语义,而 不仅仅是 建模语义。
<?xml version="1.0" encoding="UTF-8"?> <definitions id="definitions" xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:activiti="http://activiti.org/bpmn" typeLanguage="http://www.w3.org/2001/XMLSchema" expressionLanguage="http://www.w3.org/1999/XPath" targetNamespace="http://www.activiti.org/bpmn2.0"> <process id="asyncProcess"> <startEvent id="start"/> <sequenceFlow id="flow1" sourceRef="start" targetRef="sigw"/> <serviceTask id="sigw" name="Delegate to Spring Integration" activiti:delegateExpression="#{gateway}"/> <sequenceFlow id="scriptFlow" sourceRef="sigw" targetRef="confirmMovementTask"/> <scriptTask id="confirmMovementTask" name="Execute script" scriptFormat="groovy"> <script> println 'Moving on..' </script> </scriptTask> <sequenceFlow id="flow2" sourceRef="confirmMovementTask" targetRef="theEnd"/> <endEvent id="theEnd"/> </process> </definitions>
大多数分析师不会手工编写这个XML。相反,他们将使用像 Alfresco的Activiti Cloud 这样的建模工具。然而,XML工件是可循环访问的:它可以由开发人员注释,然后在建模工具中进行修改。
但是,在检查时,您会发现大部分内容并不复杂。该过程有四种状态:
订单由sequenceFlow连接节点的各种元素明确说明。
Activiti是跟踪业务流程状态的好方法,但它不是一个特别强大的组件模型或集成技术。为此,我们需要一个像Spring Integration这样的集成框架。
Spring Integration支持跨多个不兼容的系统集成服务和数据。从概念上讲,编写集成流类似于在UNIX OS上使用stdin和组合管道和过滤器流stdout:
cat input.txt | grep ERROR | wc -l > output.txt
在该示例中,我们从源(文件input.txt)获取数据,将其传递给grep命令以过滤结果并仅保留包含令牌的行,ERROR然后将其传送到wc实用程序,我们将计算它有多少行。最后,最终计数被写入输出文件,output.txt。这些组件 - cat,grep和wc- 彼此都不知道。它们并不是为了彼此而设计的。相反,他们只知道如何阅读stdin和写作stdout。这种数据标准化使得从简单原子组成复杂解决方案变得非常容易。在该示例中,cat文件的行为将数据转换为任何stdin可识别的进程可读取的数据。它 适应 入站数据为规范化格式,字符串行。最后,redirect(>)操作符将规范化数据(字符串行)转换为文件系统上的数据。它 适应 它。pipe(|)字符用于表示一个组件的输出应该流向另一个组件的输入。
Spring Integration流程的工作方式相同:数据被规范化为Message<T>实例。每个Message<T>都有一个有效负载和头 - 关于有效负载的元数据Map<K,V>- 它们是不同消息传递组件的输入和输出。这些消息传递组件通常由Spring Integration提供,但您可以轻松编写和使用自己的组件。各种消息传递组件都支持所有 企业应用程序集成模式 (过滤器,路由器,变换器,适配器,网关等)。Spring框架MessageChannel是一个命名管道,通过它Message<T>在消息传递组件之间流动。它们是管道,默认情况下,它们的工作方式类似于java.util.Queue。数据输入,数据输出。
Spring集成入站适配器 将 来自外部系统(如RabbitMQ,Apache Kafka和JMS,电子邮件,Twitter,文件系统挂载,物联网设备和其他许多系统的消息队列)的数据调整为Message<T>s。Spring Integration出站适配器以相反的方式执行相同的操作,Message<T>并将其写入外部系统(例如,作为电子邮件,Tweet或Apache Kafka消息)。
Spring Integration支持 事件驱动的体系结构, 因为它可以帮助检测并响应外部世界中的事件。例如,您可以使用Spring Integration每10秒轮询一次文件系统,并Message<T>在出现新文件时发布。您可以使用Spring Integration充当传递给Apache Kafka主题的消息的侦听器。适配器处理响应外部事件并使您免于担心发起消息,并让您在消息到达后专注于处理消息。它是依赖注入的集成等价物!
依赖注入使组件代码不再担心资源初始化和获取,并使其可以专注于编写具有这些依赖关系的代码。该javax.sql.DataSource领域来自哪里?谁在乎!Spring将它连接起来,它可能是从测试中的Mock,经典应用程序服务器中的JNDI或配置的Spring Boot bean获得的。组件代码仍然不知道这些细节。大约15年前,当我们第一次开始谈论依赖注入时,我们会谈到“好莱坞校长:”“不要打电话给我,我会打电话给你!”这更适用于Spring Integration!
入站网关接收来自外部系统的传入请求,将其作为Message<T>s处理,并发送回复。出站网关采用Message<T>s,将它们转发到外部系统,并等待来自该系统的响应。它们支持请求和回复交互。
Activiti可用于描述记录,可审计和可预测状态的复杂,长期运行的过程,Spring Integration可用于 集成 !Spring Integration是我们保存有趣Java代码的地方,Activiti会跟踪总体状态。这个技巧在20年前很有用,今天它在大规模分布的微服务世界中也很有用,其中单个请求的处理可能跨越多个服务和节点。那么, Spring Boot,Spring Integration和Activiti可以很好地协同工作 !
一个常见的用例是使用Activiti启动BPM流程,然后在进入等待状态时,将该状态的处理委托给Spring Integration,当然,这可以将工作分散到其他系统。这是一个说明流程的简单图表。
BPM流程状态通常可以涉及人工代理。例如,工作流引擎可能具有将文档发送给人员以供批准的状态,但是审阅者正在度假并且将不会回来数周。保持线程开放是一种浪费,更不用说危险了,期望继续处理所需的任何确认都将在几毫秒或几秒内恢复。
Activiti足够聪明,可以暂停执行,在等待状态期间将执行状态持久保存到数据库,并且仅在 发出 流程执行 信号 后才恢复。发信号通知流程引擎可重新水化流程并恢复执行。一个简单的示例可能是新的用户注册业务流程,该流程委托Spring Integration发送带有注册确认链接的电子邮件。用户可能需要数天才能单击确认链接,但在单击时,系统应继续进行注册业务流程。
在这篇文章中,我们将讨论如何启动进入等待状态的BPM流程,然后委托Spring Integration进行某种自定义处理,然后在遥远的未来,继续执行流程。
我们将设置两个Spring Integration流程:一个用于处理来自Activiti的请求到Spring Integration,另一个用于处理最终的回复并触发恢复流程。
我们需要一些东西来启动我们的流程,所以这里是一个简单的REST端点(http://localhost:8080/start),每次启动一个新流程:
@RestController class ProcessStartingRestController { @Autowired private ProcessEngine processEngine; @RequestMapping(method = RequestMethod.GET, value = "/start") Map<String, String> launch() { ProcessInstance asyncProcess = this.processEngine.getRuntimeService() .startProcessInstanceByKey("asyncProcess"); return Collections.singletonMap("executionId", asyncProcess.getId()); } }
消息将跨越MesssageChannel我们将在@Configuration类中创建的两个通道:requests和replies。
@Configuration class MessageChannels { @Bean DirectChannel requests() { return new DirectChannel(); } @Bean DirectChannel replies() { return new DirectChannel(); } }
这两个通道类似一种队列通道,可以使用消息系统的队列实现,request是用来实现前面图中Activiti到Spring集成的请求,Activit可以通过这个通道发送需要Spring干的事情,而replies则是Spring干完事情的反馈结果。
使用@Configuration类的好处是它本身就是一个Spring组件,可以注入到任何地方。我们可以通过@Bean直接调用提供者的方法来取消引用通道。另一种方法是@Qualifier每次我们为其中一个通道注入引用时使用,例如:
public static final String REPLIES_CHANNEL_NAME = "replies"; @Autowired @Qualifier(REPLIES_CHANNEL_NAME) private MessageChannel replies;
我们的BPMN非常简单,但我们将使用一个特定于Activiti的命名空间属性activiti:delegateExpression="#{gateway}"来告诉Activiti需要调用一个名为gateway,它是在Spring中注册的bean 。Activiti知道这样做是因为这个应用程序是使用Spring引导的自动配置,其中包括将Spring管理的bean暴露给Activiti表达式语言。这个gateway是一种基于Activiti的bean类型ReceiveTaskActivityBehavior。Spring Boot具有Spring Integration和Activiti的自动配置,因此90%的繁琐设置都会消失。
让我们看一下我们的简单gateway组件,一个Activiti ActivityBehavior接口的实现,它充当回调函数,我们可以用其发送Message<T>到requests通道并启动Spring Integration流程。这里重要的是我们已经捕获了executionId,我们稍后需要 恢复 或 发出信号 的过程。这里等于输入stdin;
@Bean ActivityBehavior gateway(MessageChannels channels) { return new ReceiveTaskActivityBehavior() { @Override public void execute(ActivityExecution execution) throws Exception { Message<?> executionMessage = MessageBuilder .withPayload(execution) .setHeader("executionId", execution.getId()) .build(); channels.requests().send(executionMessage); } }; }
该Message<T>会触发通道requests MessageChannel另一边的处理。在一个复杂的例子中,将请求转换为有意义的消息,例如,将其转发到其他系统(如电子邮件)是小case。在这里,我们只打印出标题,以便我们可以记录executionId并稍后使用它。这里等于输出stdout;这里是请求通道,Activiti将命令Spring干的事情通过这个通道发给Spring,下面代码是Spring接受指令后干的事情,只是打印而已。
@Bean IntegrationFlow requestsFlow(MessageChannels channels) { return IntegrationFlows.from(channels.requests()) .handle(msg -> msg.getHeaders().entrySet() .forEach(e -> log.info(e.getKey() + '=' + e.getValue()))) .get(); }
此时,工作流定义将保持不变,并且没有活动的流程实例。我们需要以某种方式异步地发出信号。我们实现一个REST端点来向Activiti发出信号:http://localhost:8080/resume/{executionId}。REST很容易理解,但实际上我们可以使用Spring Integration发送到任何外部系统中的一个事件来实现这种效果。唯一要确保的是,无论外部事件如何,我们都能以某种方式发送executionId,就像我们在这里通过在URI中捕获它一样。下面代码就是Spring向Activiti发出处理结果。
@RestController class ProcessResumingRestController { @Autowired private MessageChannels messageChannels; @RequestMapping(method = RequestMethod.GET, value = "/resume/{executionId}") void resume(@PathVariable String executionId) { Message<String> build = MessageBuilder.withPayload(executionId) .setHeader("executionId", executionId) .build(); this.messageChannels.replies().send(build); } }
当Message<T>流过replies MessageChannel,它会触发另一端处理,注意这里是响应通道,Spring干完事情后的结果通过这个通道返回给Activiti。在这里,我们将使用另一个Spring Integration流程来接收传入Message<T>并发出恢复流程的信号。执行此流程后,您将看到流程中的下一步scriptTask,评估和打印到控制台上的“Moving on!”字样。
@Bean IntegrationFlow repliesFlow(MessageChannels channels, ProcessEngine engine) { return IntegrationFlows.from(channels.replies()) .handle(msg -> engine.getRuntimeService().signal( String.class.cast(msg.getHeaders().get("executionId")))) .get(); }
真正的强大功能是使用BPM来编排复杂的处理逻辑:想象一下在BPM流程中保存流程的状态,然后调用Spring Batch作业,或者RestTemplate在Spring Cloud中使用Ribbon负载平衡调用REST服务,或者将其转发Message<T>到Spring Cloud中数据流流程。Spring Cloud Data Flow是我最喜欢的数据处理方法之一,因为它建立在Spring Cloud Stream的基础之上,而Spring Cloud Stream又建立在Spring Integration上:它MessageChannel一直在最底层!
深入理解Activiti工作流
Spring Batch
Spring Cloud
Spring Boot
SOA