"events": [
{
"name": "noisyEvent",
"source": "",
"type": "noisy",
"dataOnly" : "false"
},
{
"name": "silentEvent",
"source": "",
"type": "silent"
}
]
OpenShift Serverless Logic 使开发人员能够定义声明式工作流模型,这些模型可以协调事件驱动的无服务器应用程序。
您可以使用 YAML 或 JSON 格式编写工作流模型,这非常适合在云或容器环境中开发和部署无服务器应用程序。
要将工作流部署到 OpenShift Container Platform,您可以使用 OpenShift Serverless Logic Operator。
以下部分概述了各种 OpenShift Serverless Logic 概念。
事件状态由一个或多个事件定义组成。事件定义组合在一起以指定事件状态侦听的CloudEvent
类型。您可以使用事件状态在接收到指定的CloudEvent
时启动新的工作流实例,或者暂停现有工作流实例的执行,直到接收到指定的CloudEvent
。
在事件状态定义中,onEvents
属性用于分组可能触发同一组actions
的CloudEvent
类型。事件定义中的exclusive
属性指示如何计算事件匹配。如果exclusive
属性的值为false
,则必须接收eventRefs
数组中的所有CloudEvent
类型才能发生匹配。否则,接收任何引用的CloudEvent
类型都视为匹配。
以下示例显示事件定义,包括两个 CloudEvent 类型,包括noisy
和silent
"events": [
{
"name": "noisyEvent",
"source": "",
"type": "noisy",
"dataOnly" : "false"
},
{
"name": "silentEvent",
"source": "",
"type": "silent"
}
]
为了表明当接收到noisy
和silent
CloudEvent 类型时发生事件匹配,并对这两种 CloudEvent 类型执行不同的操作,请定义一个事件状态,其中包含onEvent
项中的两个事件定义,并将exclusive
属性设置为false。
onEvent
项的事件状态定义示例{
"name": "waitForEvent",
"type": "event",
"onEvents": [
{
"eventRefs": [
"noisyEvent"
],
"actions": [
{
"functionRef": "letsGetLoud"
}
]
},
{
"eventRefs": [
"silentEvent"
],
"actions": [
{
"functionRef": "beQuiet"
}
]
}
]
,
"exclusive": false
}
回调状态执行一个操作并等待由此操作产生的事件,然后再恢复工作流。回调状态执行的操作是异步外部服务调用。因此,回调状态适用于执行fire&wait-for-result
操作。
从工作流的角度来看,异步服务表示控制立即返回给调用者,而无需等待操作完成。操作完成后,将发布CloudEvent
以恢复工作流。
{
"name": "CheckCredit",
"type": "callback",
"action": {
"functionRef": {
"refName": "callCreditCheckMicroservice",
"arguments": {
"customer": "${ .customer }"
}
}
},
"eventRef": "CreditCheckCompletedEvent",
"timeouts": {
"stateExecTimeout": "PT15M"
},
"transition": "EvaluateDecision"
}
name: CheckCredit
type: callback
action:
functionRef:
refName: callCreditCheckMicroservice
arguments:
customer: "${ .customer }"
eventRef: CreditCheckCompletedEvent
timeouts:
stateExecTimeout: PT15M
transition: EvaluateDecision
action
属性定义一个函数调用,该调用触发外部活动或服务。操作执行后,回调状态将等待CloudEvent
,该事件指示被调用服务的手动决策完成。
接收到完成回调事件后,回调状态将完成其执行并转换到下一个定义的工作流状态,如果它是结束状态,则完成工作流执行。
每个工作流实例都关联一个数据模型。无论工作流文件包含的是YAML
还是JSON
,数据模型都由一个JSON
对象构成。JSON对象的初始内容取决于工作流的启动方式。如果使用CloudEvent
创建工作流,则工作流内容取自data
属性。如果通过HTTP
POST
请求启动工作流,则工作流内容取自请求体。
JQ表达式用于与数据模型交互。支持的表达式语言包括JsonPath和JQ。JQ表达式语言是默认语言。您可以使用expressionLang
属性将表达式语言更改为JsonPath。
{
"name": "max",
"type": "expression",
"operation": "{max: .numbers | max_by(.x), min: .numbers | min_by(.y)}"
}
OpenShift Serverless Logic允许您定义显式
错误处理。您可以在工作流模型中定义发生错误时应采取的操作,而不是一些通用的错误处理实体。显式错误处理使您可以处理工作流与外部系统交互过程中可能发生的错误。当发生错误时,它会改变常规的工作流顺序。在这种情况下,工作流状态会转换到一个可以处理错误的替代状态,而不是转换到预定义的状态。
每个工作流状态都可以定义错误处理,这仅与在其执行过程中可能出现的错误相关。在一个状态中定义的错误处理不能用于处理工作流执行过程中另一个状态执行期间发生的错误。
在工作流状态执行期间可能出现的未处理的未知错误,应由运行时实现报告并停止工作流执行。
工作流中的错误定义由name
和code
参数组成。name
是对错误的简短自然语言描述,例如参数错误
。code
参数帮助实现识别错误。
code
参数是必需的,引擎使用不同的策略将提供的值映射到运行时遇到的异常。可用的策略包括FQCN、错误消息和状态代码。
在工作流执行期间,必须在工作流顶级errors
属性中处理已知的工作流错误。此属性可以是string
类型,这意味着它可以引用包含错误定义的可重用JSON
或YAML
定义文件,或者它可以具有array
类型,您可以在其中内联定义这些已检查的错误在您的工作流定义中。
以下示例显示了两种类型的定义
{
"errors": "file://documents/reusable/errors.json"
}
errors: file://documents/reusable/errors.json
{
"errors": [
{
"name": "Service not found error",
"code": "404",
"description": "Server has not found anything matching the provided service endpoint information"
}
]
}
errors:
- name: Service not found error
code: '404'
description: Server has not found anything matching the provided service endpoint
information
OpenShift Serverless Logic支持两种类型的模式定义:输入模式定义和输出模式定义。
dataInputSchema
参数根据定义的JSON
模式验证工作流数据输入。提供dataInputSchema
非常重要,因为它用于在执行任何工作流状态之前验证提供的工 作流数据输入是否正确。
您可以按如下方式定义dataInputSchema
dataInputSchema
定义示例"dataInputSchema": {
"schema": "URL_to_json_schema",
"failOnValidationErrors": false
}
schema属性是一个URI,它保存用于验证工作流数据输入的JSON模式的路径。URI可以是类路径URI、文件或HTTP URL。如果指定了类路径URI,则JSON模式文件必须放在项目的resources部分或类路径中包含的任何其他目录中。
failOnValidationErrors
是一个可选标志,指示当输入数据与指定的JSON模式不匹配时采用的行为。如果未指定或设置为true
,则会抛出异常并导致流程执行失败。如果设置为false
,则流程将执行,并打印包含验证错误的WARN级别日志。
输出模式定义应用于工作流执行后,以验证输出模型是否具有预期的格式。它也适用于Swagger生成。
与输入模式定义类似,您必须使用outputSchema
指定JSON模式的URL,如下所示
outputSchema
定义示例"extensions" : [ {
"extensionid": "workflow-output-schema",
"outputSchema": {
"schema" : "URL_to_json_schema",
"failOnValidationErrors": false
}
} ]
dataInputSchema
中描述的相同规则适用于schema
和failOnValidationErrors
。唯一的区别是后者标志应用于工作流执行之后。
OpenShift Serverless Logic支持custom
函数类型,这使实现能够扩展函数定义能力。结合operation
字符串,您可以使用预定义函数类型的列表。
自定义函数类型可能无法跨其他运行时实现移植。 |
您可以使用sysout
函数进行日志记录,如下例所示
sysout
函数定义示例{
"functions": [
{
"name": "logInfo",
"type": "custom",
"operation": "sysout:INFO"
}
]
}
:
后面的字符串是可选的,用于指示日志级别。可能的值为TRACE
、DEBUG
、INFO
、WARN
和ERROR
。如果该值不存在,则默认为INFO
。
在state
定义中,您可以调用相同的sysout
函数,如下例所示
state
中引用sysout
函数的示例{
"states": [
{
"name": "myState",
"type": "operation",
"actions": [
{
"name": "printAction",
"functionRef": {
"refName": "logInfo",
"arguments": {
"message": "\"Workflow model is \\(.)\""
}
}
}
]
}
]
}
在前面的示例中,message
参数可以是jq表达式或使用插值的jq字符串。
OpenShift Serverless Logic支持Apache Maven项目中的java
函数,您在其中定义工作流服务。
以下示例显示了java
函数的声明
java
函数声明示例{
"functions": [
{
"name": "myFunction", (1)
"type": "custom", (2)
"operation": "service:java:com.acme.MyInterfaceOrClass::myMethod" (3)
}
]
}
1 | myFunction 是函数名。 |
2 | custom 是函数类型。 |
3 | service:java:com.acme.MyInterfaceOrClass::myMethod 是自定义操作定义。在自定义操作定义中,service 是保留的操作关键字,后跟 java 关键字。com.acme.MyInterfaceOrClass 是接口或实现类的完全限定类名 (FQCN),后跟方法名 myMethod 。 |
OpenShift Serverless Logic 通过 knative-serving
插件提供自定义函数的实现来调用 Knative 服务。它允许您拥有一个静态 URI,定义一个 Knative 服务,用于执行 HTTP 请求。URI 中定义的 Knative 服务在当前 Knative 集群中查询,并转换为有效的 URL。
以下示例使用已部署的 Knative 服务
$ kn service list
NAME URL LATEST AGE CONDITIONS READY REASON
custom-function-knative-service http://custom-function-knative-service.default.10.109.169.193.sslip.io custom-function-knative-service-00001 3h16m 3 OK / 3 True
您可以使用 Knative 服务名称声明 OpenShift Serverless Logic 自定义函数,如下例所示
"functions": [
{
"name": "greet", (1)
"type": "custom", (2)
"operation": "knative:services.v1.serving.knative.dev/custom-function-knative-service?path=/plainJsonFunction", (3)
}
]
1 | greet 是函数名。 |
2 | custom 是函数类型。 |
3 | 在 operation 中,您设置 Knative 服务的坐标。 |
此函数发送 |
OpenShift Serverless Logic 提供 REST
自定义类型作为快捷方式。使用自定义 rest 时,在函数定义中,您指定要调用的 HTTP URI 和要使用的 HTTP 方法 (get、post、patch 或 put)。这是通过使用 operation
字符串完成的。调用函数时,您像使用 OpenAPI 函数一样传递请求参数。
以下示例显示了 rest
函数的声明
{
"functions": [
{
"name": "multiplyAllByAndSum", (1)
"type": "custom", (2)
"operation": "rest:post:/numbers/{multiplier}/multiplyByAndSum" (3)
}
]
}
1 | multiplyAllAndSum 是函数名。 |
2 | custom 是函数类型。 |
3 | rest:post:/numbers/{multiplier}/multiplyByAndSum 是自定义操作定义。在自定义操作定义中,rest 是指示这是 REST 调用的保留操作关键字,post 是 HTTP 方法,/numbers/{multiplier}/multiplyByAndSum 是相对端点。 |
使用相对端点时,必须将主机指定为属性。主机属性的格式为 kogito.sw.functions.<function_name>
.host。在此示例中,kogito.sw.functions.multiplyAllByAndSum.host
是主机属性键。如果需要,您可以通过指定 kogito.sw.functions.multiplyAllAndSum.port
属性来覆盖默认端口 (80)。
此端点期望主体为一个 JSON 对象,其字段 numbers
是一个整数数组,将数组中的每个项目乘以 multiplier
并返回所有已乘项目的总和。
OpenShift Serverless Logic 定义了几种超时配置,您可以使用这些配置来配置不同场景下工作流执行的最大时间。您可以配置工作流在给定状态下等待事件到达的最长时间,或者工作流的最大执行时间。
无论超时定义在哪里,都必须将其配置为一段时间或持续时间,该时间从引用的工作流元素变为活动状态时开始。超时使用ISO 8601 数据和时间标准
来指定一段时间,并遵循格式PnDTnHnMn.nS
,其中天数被认为恰好是 24 小时。例如,PT15M
配置 15 分钟,P2DT3H4M
定义 2 天、3 小时和 4 分钟。
基于月份的超时,例如 |
要配置工作流在被取消之前可以运行的最大时间量,您可以使用工作流超时。取消后,工作流被视为已完成,并且无法再通过 GET 请求访问。因此,它的行为就像默认情况下中断为 true
一样。
工作流超时使用顶级 timeouts
属性定义。它可以具有两种类型,string
和 object
。string
类型定义一个指向包含工作流超时定义的 JSON 或 YAML 文件的 URI。object
类型用于内联定义超时定义。
例如,要在执行一小时后取消工作流,请使用以下配置
{
"id": "workflow_timeouts",
"version": "1.0",
"name": "Workflow Timeouts",
"description": "Simple workflow to show the workflowExecTimeout working",
"start": "PrintStartMessage",
"timeouts": {
"workflowExecTimeout": "PT1H"
} ...
}
在工作流中定义状态时,您可以使用 timeouts
属性来配置完成此状态的最大时间。当该时间超时时,状态被视为超时,引擎从该状态继续执行。执行流程取决于状态类型,例如,可能会执行到下一个状态的转换。
基于事件的状态可以使用子属性 eventTimeout
来配置等待事件到达的最大时间。这是当前实现中支持的唯一属性。
事件超时支持回调状态超时、切换状态超时和事件状态超时。
当您必须执行一个操作来调用外部服务并以事件的形式等待异步响应时,可以使用 Callback
状态。
一旦消耗了响应事件,工作流将继续执行,通常会转到转换属性中定义的下一个状态。
由于 Callback
状态会暂停执行直到事件被消耗,您可以为其配置 eventTimeout
,如果事件在配置的持续时间内没有到达,则工作流将继续执行,并转到转换中定义的下一个状态。
Callback
状态示例{
"name": "CallbackState",
"type": "callback",
"action": {
"name": "callbackAction",
"functionRef": {
"refName": "callbackFunction",
"arguments": {
"input": "${\"callback-state-timeouts: \" + $WORKFLOW.instanceId + \" has executed the callbackFunction.\"}"
}
}
},
"eventRef": "callbackEvent",
"transition": "CheckEventArrival",
"onErrors": [
{
"errorRef": "callbackError",
"transition": "FinalizeWithError"
}
],
"timeouts": {
"eventTimeout": "PT30S"
}
}
当您需要根据某些条件采取行动时,可以使用 Switch
状态。这些条件可以基于工作流数据 dataConditions
或事件 eventConditions
。
当您使用 eventConditions
时,工作流执行将等待做出决定,直到任何配置的事件到达并匹配条件。在这种情况下,您可以配置一个事件超时,它控制等待事件匹配条件的最大时间。
如果此时间过期,工作流将移动到 defaultCondition
属性中定义的状态。
{
"name": "ChooseOnEvent",
"type": "switch",
"eventConditions": [
{
"eventRef": "visaApprovedEvent",
"transition": "ApprovedVisa"
},
{
"eventRef": "visaDeniedEvent",
"transition": "DeniedVisa"
}
],
"defaultCondition": {
"transition": "HandleNoVisaDecision"
},
"timeouts": {
"eventTimeout": "PT5S"
}
}
Event
状态用于等待工作流接收一个或多个事件,执行一组操作,然后继续执行。如果 Event 状态是起始状态,则会创建一个新的工作流实例。
此状态使用 timeouts
属性来配置工作流应等待配置的事件到达的最大时间。
如果超过此时间并且未收到事件,则工作流将移动到转换属性中定义的状态,或者在结束状态的情况下结束工作流实例,而无需执行任何操作。
{
"name": "WaitForEvent",
"type": "event",
"onEvents": [
{
"eventRefs": [
"event1"
],
"eventDataFilter": {
"data": "${ \"The event1 was received.\" }",
"toStateData": "${ .exitMessage }"
},
"actions": [
{
"name": "printAfterEvent1",
"functionRef": {
"refName": "systemOut",
"arguments": {
"message": "${\"event-state-timeouts: \" + $WORKFLOW.instanceId + \" executing actions for event1.\"}"
}
}
}
]
},
{
"eventRefs": [
"event2"
],
"eventDataFilter": {
"data": "${ \"The event2 was received.\" }",
"toStateData": "${ .exitMessage }"
},
"actions": [
{
"name": "printAfterEvent2",
"functionRef": {
"refName": "systemOut",
"arguments": {
"message": "${\"event-state-timeouts: \" + $WORKFLOW.instanceId + \" executing actions for event2.\"}"
}
}
}
]
}
],
"timeouts": {
"eventTimeout": "PT30S"
},
"transition": "PrintExitMessage"
}
OpenShift Serverless Logic 对并行任务的执行进行序列化。parallel
这个词并不表示同时执行,而是意味着分支的执行之间没有逻辑依赖关系。如果活动分支暂停其执行,则非活动分支可以在不等待活动分支完成的情况下启动或恢复任务的执行。例如,活动分支在等待事件接收时可能会暂停其执行。
并行状态是一种将当前工作流实例执行路径拆分为多个路径的状态,每个分支对应一个路径。这些执行路径并行执行,并根据定义的completionType
参数值重新加入当前执行路径。
{
"name":"ParallelExec",
"type":"parallel",
"completionType": "allOf",
"branches": [
{
"name": "Branch1",
"actions": [
{
"functionRef": {
"refName": "functionNameOne",
"arguments": {
"order": "${ .someParam }"
}
}
}
]
},
{
"name": "Branch2",
"actions": [
{
"functionRef": {
"refName": "functionNameTwo",
"arguments": {
"order": "${ .someParam }"
}
}
}
]
}
],
"end": true
}
name: ParallelExec
type: parallel
completionType: allOf
branches:
- name: Branch1
actions:
- functionRef:
refName: functionNameOne
arguments:
order: "${ .someParam }"
- name: Branch2
actions:
- functionRef:
refName: functionNameTwo
arguments:
order: "${ .someParam }"
end: true
在之前的示例中,allOf
定义了所有分支必须完成执行,状态才能转换或结束。如果未设置此参数,则这是默认值。