×

OpenShift Serverless Logic 使开发人员能够定义声明式工作流模型,这些模型可以协调事件驱动的无服务器应用程序。

您可以使用 YAML 或 JSON 格式编写工作流模型,这非常适合在云或容器环境中开发和部署无服务器应用程序。

要将工作流部署到 OpenShift Container Platform,您可以使用 OpenShift Serverless Logic Operator。

以下部分概述了各种 OpenShift Serverless Logic 概念。

事件

事件状态由一个或多个事件定义组成。事件定义组合在一起以指定事件状态侦听的CloudEvent类型。您可以使用事件状态在接收到指定的CloudEvent时启动新的工作流实例,或者暂停现有工作流实例的执行,直到接收到指定的CloudEvent

在事件状态定义中,onEvents属性用于分组可能触发同一组actionsCloudEvent类型。事件定义中的exclusive属性指示如何计算事件匹配。如果exclusive属性的值为false,则必须接收eventRefs数组中的所有CloudEvent类型才能发生匹配。否则,接收任何引用的CloudEvent类型都视为匹配。

以下示例显示事件定义,包括两个 CloudEvent 类型,包括noisysilent

事件定义示例
"events": [
    {
      "name": "noisyEvent",
      "source": "",
      "type": "noisy",
      "dataOnly" : "false"
    },
    {
      "name": "silentEvent",
      "source": "",
      "type": "silent"
    }
  ]

为了表明当接收到noisysilent CloudEvent 类型时发生事件匹配,并对这两种 CloudEvent 类型执行不同的操作,请定义一个事件状态,其中包含onEvent项中的两个事件定义,并将exclusive属性设置为false。

包含多个onEvent项的事件状态定义示例
{
    "name": "waitForEvent",
    "type": "event",
    "onEvents": [
      {
        "eventRefs": [
          "noisyEvent"
         ],
         "actions": [
           {
             "functionRef": "letsGetLoud"
           }
         ]
      },
      {
        "eventRefs": [
           "silentEvent"
        ],
        "actions": [
          {
            "functionRef": "beQuiet"
          }
        ]
      }
    ]
    ,
    "exclusive": false
  }

回调

回调状态执行一个操作并等待由此操作产生的事件,然后再恢复工作流。回调状态执行的操作是异步外部服务调用。因此,回调状态适用于执行fire&wait-for-result操作。

从工作流的角度来看,异步服务表示控制立即返回给调用者,而无需等待操作完成。操作完成后,将发布CloudEvent以恢复工作流。

JSON 格式的回调状态示例
{
        "name": "CheckCredit",
        "type": "callback",
        "action": {
            "functionRef": {
                "refName": "callCreditCheckMicroservice",
                "arguments": {
                    "customer": "${ .customer }"
                }
            }
        },
        "eventRef": "CreditCheckCompletedEvent",
        "timeouts": {
          "stateExecTimeout": "PT15M"
        },
        "transition": "EvaluateDecision"
}
YAML 格式的回调状态示例
name: CheckCredit
type: callback
action:
  functionRef:
    refName: callCreditCheckMicroservice
    arguments:
      customer: "${ .customer }"
eventRef: CreditCheckCompletedEvent
timeouts:
  stateExecTimeout: PT15M
transition: EvaluateDecision

action属性定义一个函数调用,该调用触发外部活动或服务。操作执行后,回调状态将等待CloudEvent,该事件指示被调用服务的手动决策完成。

接收到完成回调事件后,回调状态将完成其执行并转换到下一个定义的工作流状态,如果它是结束状态,则完成工作流执行。

JQ表达式

每个工作流实例都关联一个数据模型。无论工作流文件包含的是YAML还是JSON,数据模型都由一个JSON对象构成。JSON对象的初始内容取决于工作流的启动方式。如果使用CloudEvent创建工作流,则工作流内容取自data属性。如果通过HTTP POST请求启动工作流,则工作流内容取自请求体。

JQ表达式用于与数据模型交互。支持的表达式语言包括JsonPath和JQ。JQ表达式语言是默认语言。您可以使用expressionLang属性将表达式语言更改为JsonPath。

函数中JQ表达式的示例
{
      "name": "max",
      "type": "expression",
      "operation": "{max: .numbers | max_by(.x), min: .numbers | min_by(.y)}"
    }

错误处理

OpenShift Serverless Logic允许您定义显式错误处理。您可以在工作流模型中定义发生错误时应采取的操作,而不是一些通用的错误处理实体。显式错误处理使您可以处理工作流与外部系统交互过程中可能发生的错误。当发生错误时,它会改变常规的工作流顺序。在这种情况下,工作流状态会转换到一个可以处理错误的替代状态,而不是转换到预定义的状态。

每个工作流状态都可以定义错误处理,这仅与在其执行过程中可能出现的错误相关。在一个状态中定义的错误处理不能用于处理工作流执行过程中另一个状态执行期间发生的错误。

在工作流状态执行期间可能出现的未处理的未知错误,应由运行时实现报告并停止工作流执行。

错误定义

工作流中的错误定义由namecode参数组成。name是对错误的简短自然语言描述,例如参数错误code参数帮助实现识别错误。

code参数是必需的,引擎使用不同的策略将提供的值映射到运行时遇到的异常。可用的策略包括FQCN、错误消息和状态代码。

在工作流执行期间,必须在工作流顶级errors属性中处理已知的工作流错误。此属性可以是string类型,这意味着它可以引用包含错误定义的可重用JSONYAML定义文件,或者它可以具有array类型,您可以在其中内联定义这些已检查的错误在您的工作流定义中。

以下示例显示了两种类型的定义

引用可重用JSON错误定义文件的示例
{
"errors": "file://documents/reusable/errors.json"
}
引用可重用YAML错误定义文件的示例
errors: file://documents/reusable/errors.json
使用JSON文件内联定义工作流错误的示例
{
"errors": [
  {
    "name": "Service not found error",
    "code": "404",
    "description": "Server has not found anything matching the provided service endpoint information"
  }
]
}
使用YAML文件内联定义工作流错误的示例
errors:
  - name: Service not found error
    code: '404'
    description: Server has not found anything matching the provided service endpoint
      information

模式定义

OpenShift Serverless Logic支持两种类型的模式定义:输入模式定义和输出模式定义。

输入模式定义

dataInputSchema参数根据定义的JSON模式验证工作流数据输入。提供dataInputSchema非常重要,因为它用于在执行任何工作流状态之前验证提供的工 作流数据输入是否正确。

您可以按如下方式定义dataInputSchema

dataInputSchema定义示例
"dataInputSchema": {
   "schema": "URL_to_json_schema",
   "failOnValidationErrors": false
}

schema属性是一个URI,它保存用于验证工作流数据输入的JSON模式的路径。URI可以是类路径URI、文件或HTTP URL。如果指定了类路径URI,则JSON模式文件必须放在项目的resources部分或类路径中包含的任何其他目录中。

failOnValidationErrors是一个可选标志,指示当输入数据与指定的JSON模式不匹配时采用的行为。如果未指定或设置为true,则会抛出异常并导致流程执行失败。如果设置为false,则流程将执行,并打印包含验证错误的WARN级别日志。

输出模式定义

输出模式定义应用于工作流执行后,以验证输出模型是否具有预期的格式。它也适用于Swagger生成。

与输入模式定义类似,您必须使用outputSchema指定JSON模式的URL,如下所示

outputSchema定义示例
"extensions" : [ {
      "extensionid": "workflow-output-schema",
      "outputSchema": {
         "schema" : "URL_to_json_schema",
          "failOnValidationErrors": false
     }
  } ]

dataInputSchema中描述的相同规则适用于schemafailOnValidationErrors。唯一的区别是后者标志应用于工作流执行之后。

自定义函数

OpenShift Serverless Logic支持custom函数类型,这使实现能够扩展函数定义能力。结合operation字符串,您可以使用预定义函数类型的列表。

自定义函数类型可能无法跨其他运行时实现移植。

Sysout自定义函数

您可以使用sysout函数进行日志记录,如下例所示

sysout函数定义示例
{
  "functions": [
    {
      "name": "logInfo",
      "type": "custom",
      "operation": "sysout:INFO"
    }
  ]
}

:后面的字符串是可选的,用于指示日志级别。可能的值为TRACEDEBUGINFOWARNERROR。如果该值不存在,则默认为INFO

state定义中,您可以调用相同的sysout函数,如下例所示

state中引用sysout函数的示例
{
  "states": [
    {
      "name": "myState",
      "type": "operation",
      "actions": [
        {
          "name": "printAction",
          "functionRef": {
            "refName": "logInfo",
            "arguments": {
              "message": "\"Workflow model is \\(.)\""
            }
          }
        }
      ]
    }
  ]
}

在前面的示例中,message参数可以是jq表达式或使用插值的jq字符串。

Java自定义函数

OpenShift Serverless Logic支持Apache Maven项目中的java函数,您在其中定义工作流服务。

以下示例显示了java函数的声明

java函数声明示例
{
  "functions": [
    {
      "name": "myFunction", (1)
      "type": "custom", (2)
      "operation": "service:java:com.acme.MyInterfaceOrClass::myMethod" (3)
    }
  ]
}
1 myFunction是函数名。
2 custom是函数类型。
3 service:java:com.acme.MyInterfaceOrClass::myMethod 是自定义操作定义。在自定义操作定义中,service 是保留的操作关键字,后跟 java 关键字。com.acme.MyInterfaceOrClass 是接口或实现类的完全限定类名 (FQCN),后跟方法名 myMethod

Knative 自定义函数

OpenShift Serverless Logic 通过 knative-serving 插件提供自定义函数的实现来调用 Knative 服务。它允许您拥有一个静态 URI,定义一个 Knative 服务,用于执行 HTTP 请求。URI 中定义的 Knative 服务在当前 Knative 集群中查询,并转换为有效的 URL。

以下示例使用已部署的 Knative 服务

$ kn service list
NAME                              URL                                                                      LATEST                                  AGE     CONDITIONS   READY   REASON
custom-function-knative-service   http://custom-function-knative-service.default.10.109.169.193.sslip.io   custom-function-knative-service-00001   3h16m   3 OK / 3     True

您可以使用 Knative 服务名称声明 OpenShift Serverless Logic 自定义函数,如下例所示

  "functions": [
    {
      "name": "greet", (1)
      "type": "custom", (2)
      "operation": "knative:services.v1.serving.knative.dev/custom-function-knative-service?path=/plainJsonFunction", (3)
    }
  ]
1 greet 是函数名。
2 custom是函数类型。
3 operation 中,您设置 Knative 服务的坐标。

此函数发送 POST 请求。如果您没有指定路径,OpenShift Serverless Logic 使用根路径(/)。您也可以通过在操作中设置 method=GET 来发送 GET 请求。在这种情况下,参数将通过查询字符串转发。

REST 自定义函数

OpenShift Serverless Logic 提供 REST 自定义类型作为快捷方式。使用自定义 rest 时,在函数定义中,您指定要调用的 HTTP URI 和要使用的 HTTP 方法 (get、post、patch 或 put)。这是通过使用 operation 字符串完成的。调用函数时,您像使用 OpenAPI 函数一样传递请求参数。

以下示例显示了 rest 函数的声明

  {
  "functions": [
    {
      "name": "multiplyAllByAndSum", (1)
      "type": "custom", (2)
      "operation": "rest:post:/numbers/{multiplier}/multiplyByAndSum" (3)
    }
  ]
}
1 multiplyAllAndSum 是函数名。
2 custom是函数类型。
3 rest:post:/numbers/{multiplier}/multiplyByAndSum 是自定义操作定义。在自定义操作定义中,rest 是指示这是 REST 调用的保留操作关键字,post 是 HTTP 方法,/numbers/{multiplier}/multiplyByAndSum 是相对端点。

使用相对端点时,必须将主机指定为属性。主机属性的格式为 kogito.sw.functions.<function_name>.host。在此示例中,kogito.sw.functions.multiplyAllByAndSum.host 是主机属性键。如果需要,您可以通过指定 kogito.sw.functions.multiplyAllAndSum.port 属性来覆盖默认端口 (80)。

此端点期望主体为一个 JSON 对象,其字段 numbers 是一个整数数组,将数组中的每个项目乘以 multiplier 并返回所有已乘项目的总和。

超时

OpenShift Serverless Logic 定义了几种超时配置,您可以使用这些配置来配置不同场景下工作流执行的最大时间。您可以配置工作流在给定状态下等待事件到达的最长时间,或者工作流的最大执行时间。

无论超时定义在哪里,都必须将其配置为一段时间或持续时间,该时间从引用的工作流元素变为活动状态时开始。超时使用ISO 8601 数据和时间标准来指定一段时间,并遵循格式PnDTnHnMn.nS,其中天数被认为恰好是 24 小时。例如,PT15M 配置 15 分钟,P2DT3H4M 定义 2 天、3 小时和 4 分钟。

基于月份的超时,例如 P2M(两个月的时间段),是无效的,因为月份的持续时间可能会有所不同。在这种情况下,请改用 PT60D

工作流超时

要配置工作流在被取消之前可以运行的最大时间量,您可以使用工作流超时。取消后,工作流被视为已完成,并且无法再通过 GET 请求访问。因此,它的行为就像默认情况下中断为 true 一样。

工作流超时使用顶级 timeouts 属性定义。它可以具有两种类型,stringobjectstring 类型定义一个指向包含工作流超时定义的 JSON 或 YAML 文件的 URI。object 类型用于内联定义超时定义。

例如,要在执行一小时后取消工作流,请使用以下配置

工作流超时示例
  {
  "id": "workflow_timeouts",
  "version": "1.0",
  "name": "Workflow Timeouts",
  "description": "Simple workflow to show the workflowExecTimeout working",
  "start": "PrintStartMessage",
  "timeouts": {
    "workflowExecTimeout": "PT1H"
  } ...
}

事件超时

在工作流中定义状态时,您可以使用 timeouts 属性来配置完成此状态的最大时间。当该时间超时时,状态被视为超时,引擎从该状态继续执行。执行流程取决于状态类型,例如,可能会执行到下一个状态的转换。

基于事件的状态可以使用子属性 eventTimeout 来配置等待事件到达的最大时间。这是当前实现中支持的唯一属性。

事件超时支持回调状态超时、切换状态超时和事件状态超时。

回调状态超时

当您必须执行一个操作来调用外部服务并以事件的形式等待异步响应时,可以使用 Callback 状态。

一旦消耗了响应事件,工作流将继续执行,通常会转到转换属性中定义的下一个状态。

由于 Callback 状态会暂停执行直到事件被消耗,您可以为其配置 eventTimeout,如果事件在配置的持续时间内没有到达,则工作流将继续执行,并转到转换中定义的下一个状态。

带有超时的 Callback 状态示例
{
 "name": "CallbackState",
 "type": "callback",
 "action": {
   "name": "callbackAction",
   "functionRef": {
     "refName": "callbackFunction",
     "arguments": {
       "input": "${\"callback-state-timeouts: \" + $WORKFLOW.instanceId + \" has executed the callbackFunction.\"}"
     }
   }
 },
 "eventRef": "callbackEvent",
 "transition": "CheckEventArrival",
 "onErrors": [
   {
     "errorRef": "callbackError",
     "transition": "FinalizeWithError"
   }
 ],
 "timeouts": {
   "eventTimeout": "PT30S"
 }
}

切换状态超时

当您需要根据某些条件采取行动时,可以使用 Switch 状态。这些条件可以基于工作流数据 dataConditions 或事件 eventConditions

当您使用 eventConditions 时,工作流执行将等待做出决定,直到任何配置的事件到达并匹配条件。在这种情况下,您可以配置一个事件超时,它控制等待事件匹配条件的最大时间。

如果此时间过期,工作流将移动到 defaultCondition 属性中定义的状态。

带有超时的 Switch 状态示例
{
    "name": "ChooseOnEvent",
    "type": "switch",
    "eventConditions": [
    {
        "eventRef": "visaApprovedEvent",
        "transition": "ApprovedVisa"
    },
    {
        "eventRef": "visaDeniedEvent",
        "transition": "DeniedVisa"
    }
    ],
        "defaultCondition": {
        "transition": "HandleNoVisaDecision"
    },
        "timeouts": {
        "eventTimeout": "PT5S"
    }
}

事件状态超时

Event 状态用于等待工作流接收一个或多个事件,执行一组操作,然后继续执行。如果 Event 状态是起始状态,则会创建一个新的工作流实例。

此状态使用 timeouts 属性来配置工作流应等待配置的事件到达的最大时间。

如果超过此时间并且未收到事件,则工作流将移动到转换属性中定义的状态,或者在结束状态的情况下结束工作流实例,而无需执行任何操作。

带有超时的 Event 状态示例
{
  "name": "WaitForEvent",
  "type": "event",
  "onEvents": [
    {
      "eventRefs": [
        "event1"
      ],
      "eventDataFilter": {
        "data": "${ \"The event1 was received.\" }",
        "toStateData": "${ .exitMessage }"
      },
      "actions": [
        {
          "name": "printAfterEvent1",
          "functionRef": {
            "refName": "systemOut",
            "arguments": {
              "message": "${\"event-state-timeouts: \" + $WORKFLOW.instanceId + \" executing actions for event1.\"}"
            }
          }
        }
      ]
    },
    {
      "eventRefs": [
        "event2"
      ],
      "eventDataFilter": {
        "data": "${ \"The event2 was received.\" }",
        "toStateData": "${ .exitMessage }"
      },
      "actions": [
        {
          "name": "printAfterEvent2",
          "functionRef": {
            "refName": "systemOut",
            "arguments": {
              "message": "${\"event-state-timeouts: \" + $WORKFLOW.instanceId + \" executing actions for event2.\"}"
            }
          }
        }
      ]
    }
  ],
  "timeouts": {
    "eventTimeout": "PT30S"
  },
  "transition": "PrintExitMessage"
}

并行性

OpenShift Serverless Logic 对并行任务的执行进行序列化。parallel 这个词并不表示同时执行,而是意味着分支的执行之间没有逻辑依赖关系。如果活动分支暂停其执行,则非活动分支可以在不等待活动分支完成的情况下启动或恢复任务的执行。例如,活动分支在等待事件接收时可能会暂停其执行。

并行状态是一种将当前工作流实例执行路径拆分为多个路径的状态,每个分支对应一个路径。这些执行路径并行执行,并根据定义的completionType参数值重新加入当前执行路径。

JSON格式的并行工作流示例
 {
     "name":"ParallelExec",
     "type":"parallel",
     "completionType": "allOf",
     "branches": [
        {
          "name": "Branch1",
          "actions": [
            {
                "functionRef": {
                    "refName": "functionNameOne",
                    "arguments": {
                        "order": "${ .someParam }"
                    }
                }
            }
        ]
        },
        {
          "name": "Branch2",
          "actions": [
              {
                  "functionRef": {
                      "refName": "functionNameTwo",
                      "arguments": {
                          "order": "${ .someParam }"
                      }
                  }
              }
          ]
        }
     ],
     "end": true
}
YAML格式的并行工作流示例
name: ParallelExec
type: parallel
completionType: allOf
branches:
- name: Branch1
  actions:
  - functionRef:
      refName: functionNameOne
      arguments:
        order: "${ .someParam }"
- name: Branch2
  actions:
  - functionRef:
      refName: functionNameTwo
      arguments:
        order: "${ .someParam }"
end: true

在之前的示例中,allOf 定义了所有分支必须完成执行,状态才能转换或结束。如果未设置此参数,则这是默认值。