我们在Jet上使用F#并且从一开始就是这样,这就是为什么在评估构建DSL( 领域特定语言 )的选项时,F#是一个领先者。当我们决定构建DSL时,我们需要确定DSL有哪些重要的特征:
我们最终确定的工作流DSL主要围绕链/组合步骤的能力。每一步都只是一个函数,它给出一个输入和状态产生一个输出/副作用(Input → State → Output/SideEffect)。这些步骤可以组合或链接在一起以表示复杂的业务流程。
流程的可视化表示是工程师和业务用户可以一起设计和理解,一旦确定了流的可视化表示,开发人员就可以使用DSL来定义工作流,使用以下函数表示:
workflow : (name : string) -> (triggers : Trigger list) -> (metadata : WorkflowMetadata) -> (step : WorkflowStep) -> Workflow
比如一个示例工作流程:创建订单,预留库存,发送订单,然后最终向客户收费。这可以使用上面的工作流程函数编写:
workflow <font>"SampleWorkflow"</font><font> [Trigger.Stream (</font><font>"kafka://jetkafka/mock-input"</font><font>, TaskIdType.FromPath(Path.JsonPath(</font><font>"$.orderId"</font><font>)), Path.JsonPath(</font><font>"$.orderId"</font><font>))] metadata (step(</font><font>"CreateOrder"</font><font>, </font><font>"CreateOrder"</font><font>) => step(</font><font>"ReserveInventory"</font><font>, </font><font>"ReserveInventory"</font><font>) => step(</font><font>"ShipOrder"</font><font>, </font><font>"ShipOrder"</font><font>) =>> [ step(</font><font>"ChargeCustomer"</font><font>, </font><font>"ChargeCustomer"</font><font>) =?> [ cond(</font><font>"WriteChargeSuccess"</font><font>, </font><font>"WriteChargeSuccess"</font><font>, Condition.Simple(Qualifier.State, </font><font>"$.transactionSuccess"</font><font>, </font><font>"true"</font><font>)) cond(</font><font>"WriteChargeFailure"</font><font>, </font><font>"WriteChargeFailure"</font><font>, Condition.Simple(Qualifier.State, </font><font>"$.transactionSuccess"</font><font>, </font><font>"false"</font><font>)) ] step(</font><font>"UpdateOrderHistory"</font><font>, </font><font>"UpdateOrderHistory"</font><font>) ] ) </font>
我将在下面更详细地讨论这个DSL的一些元素。但是,我想强调该workflow功能的主要特点:
触发器
工作流可以有一个或多个不同的触发器。这些触发器用于配置WorkflowTriggers服务从哪里使用触发器消息。这种消费与我们的微服务中使用的消费机制相同,这在以前的帖子 F#Microservice Patterns @ Jet.com中 有详细 介绍 。的触发的类型是可区分联合(代数型):
type Trigger = | Simple of string * TaskIdType | Stream of string * TaskIdType * PrimaryKeyPath ...
注意:我不打算在这篇文章中详细介绍所有这些不同类型的触发器,我只讨论Simple和Stream。我们目前支持其他几个在处理包含多种不同消息类型的触发器流时非常有用的方法。
Stream上面示例工作流程中使用的触发器有三个参数:
Simple触发类型是消费为每个输入一个新的密钥指定的流定义的触发器。工作流输入的工作流实例ID设置为新GUID。当传入数据没有与之关联的唯一标识符时,将使用此触发器。
步骤
步骤表示工作流需要采取的操作。步骤是任何工作流程的基础,也是定义其功能的基础。目前有三种不同类型的步骤:
谓词逻辑
谓词逻辑又名条件,可以在任一使用cond或option步骤或在触发流用于条件过滤。通过提取数据的工作条件规定的Qualifier基础上一些JSON路径,然后他们比较这与提取基于该预期值数据Comparison和OperandType 。支持的不同类型的条件是:
type Condition = | True | False | Simple of Qualifier * path:string * expected:string | Match of Qualifier * path:string * expectedRegx:string | Compare of qualifier:Qualifier * typ:OperandsType * path:string * comparison:Comparison * expected:string | Count of qualifier:Qualifier * path:string * comparison:Comparison * expected:string | Exists of qualifier:Qualifier * path:string | Not of Condition | And of Condition List | OR of Condition List
这些基本条件解释如下:
type Comparison = | GreaterThan | GreaterThanOrEqual | LesserThan | LesserThanOrEqual | NotEqual | Equal
OperandsType是常见的数据类型,目前支持的类型是:
type OperandsType = | Int | Long | Double | Decimal | Boolean | DateTime
let cond = Condition.Count(Qualifier.Input, <font>"$.a"</font><font>, Comparison.Equal, </font><font>"10"</font><font>) let cond2 = Condition.Simple (Qualifier.Aggregate, </font><font>"$.bs"</font><font>, </font><font>"sting"</font><font>) let cond3 = Condition.Simple (Qualifier.State, </font><font>"$.a"</font><font>, </font><font>"20"</font><font>) let condORAnd = (cond OR (cond2 AND cond3)) |> Condition.Not </font>
步骤组合
step(“CreateOrder”, “CreateOrder”) => step(“PreDealOrder”, “PreDealOrder”)
我们使用合成运算符组合步骤:
运行
F#DSL允许我们轻松设计和实现工作流程,但为了便于跨服务执行DSL,我们创建了一个不同的内部工作流表示,以便在我们的后端服务中使用。我们将DSL转换为DAG(有向无 环图 ),以将我们的规范DSL与运行时环境分离。我们的服务使用此图表来实际执行工作流程。此图表很容易表示为F#类型:
type Name = string type EdgeData = string type VertexData<'V> = Name * 'V <font><i>//'V can be the representation of a step</i></font><font> type Vertex<'V> = { data : VertexData<'V> outEdges : Edge<'V> List } and Edge<'V> = { data : EdgeData tail : Vertex<'V> } type Graph<'V> = { name : Name vertices : Vertex<'V> List } </font>
我们的运行时环境使用evaluate函数将我们的F#DSL转换为Graph : evaluate : workflow:Workflow -> WorkflowEvaluation. 工作流评估只是我们图形的一个类型包装器,它允许我们轻松查找用于哪个工作流的图形:
type WorkflowEvaluation = { name : string model : Graph<WorkflowStep> }
该图允许我们的服务轻松遍历和理解任何给定工作流的路径,以及将我们的服务运行时与DSL表示分离,这使我们可能拥有可以编译到运行时的多个不同的DSL。DAG允许我们今天使用DSL,然后适应任何其他可能的DSL或语言,而无需更改我们的后端。这也使我们能够开发出一种设计语言,然后可以将其映射回DAG。可以通过图形UI指定此设计语言,以允许业务用户使用预定义的块轻松开发工作流。
除了允许我们的后端服务具有自己的运行时表示之外,它还允许我们通过将图形转换为 点图形 来轻松地在UI中可视化我们的工作流程, 点图形 是用于可视化图形的通常可接受的格式。
结论
所有这些不同元素的组合构成了我们工作流程方案的基础。虽然工作流DSL( Netflix Conductor , Apache Airflow 等)允许在运行时定义工作流,但我们发现在编译时定义工作流并通过源控制流程时,开发人员的实践会得到改进。
上述F#工作流模型允许通过F#签名检查进行工作流验证,以及能够运行和预提交工作流步骤和DSL的测试,以确保在部署之前的正确性。通过将我们的工作流DSL与后端执行层分离,我们已经能够演示以其他语言(javascript)和原始文本表达的工作流定义。