×

如果您需要从 Knative 中未包含的事件生产者或从发出非CloudEvent格式事件的生产者引入事件,您可以通过创建自定义事件源来实现。您可以使用以下方法之一创建自定义事件源

  • 通过创建接收器绑定,使用PodSpecable对象作为事件源。

  • 通过创建容器源,使用容器作为事件源。

接收器绑定

SinkBinding对象支持将事件生产与传递寻址解耦。接收器绑定用于将事件生产者连接到事件消费者或接收器。事件生产者是一个嵌入PodSpec模板并产生事件的 Kubernetes 资源。接收器是可以接收事件的可寻址 Kubernetes 对象。

SinkBinding对象将环境变量注入接收器的PodTemplateSpec中,这意味着应用程序代码无需直接与 Kubernetes API 交互即可找到事件目标。这些环境变量如下所示

K_SINK

已解析接收器的 URL。

K_CE_OVERRIDES

指定出站事件覆盖的 JSON 对象。

SinkBinding对象当前不支持服务的自定义修订名称。

使用 YAML 创建接收器绑定

使用 YAML 文件创建 Knative 资源使用声明式 API,使您可以声明性和可重复地描述事件源。要使用 YAML 创建接收器绑定,必须创建一个定义SinkBinding对象的 YAML 文件,然后使用oc apply命令应用它。

先决条件
  • 在集群上安装了 OpenShift Serverless Operator、Knative Serving 和 Knative Eventing。

  • 安装 OpenShift CLI (oc)。

  • 您已创建项目或有权访问具有在 OpenShift Container Platform 中创建应用程序和其他工作负载的适当角色和权限的项目。

步骤
  1. 要检查接收器绑定是否已正确设置,请创建一个 Knative 事件显示服务或事件接收器,该服务或接收器将其传入的消息转储到其日志中。

    1. 创建一个服务 YAML 文件

      示例服务 YAML 文件
      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display
      spec:
        template:
          spec:
            containers:
              - image: quay.io/openshift-knative/showcase
    2. 创建服务

      $ oc apply -f <filename>
  2. 创建一个将事件定向到服务的接收器绑定实例。

    1. 创建一个接收器绑定 YAML 文件

      示例服务 YAML 文件
      apiVersion: sources.knative.dev/v1alpha1
      kind: SinkBinding
      metadata:
        name: bind-heartbeat
      spec:
        subject:
          apiVersion: batch/v1
          kind: Job (1)
          selector:
            matchLabels:
              app: heartbeat-cron
      
        sink:
          ref:
            apiVersion: serving.knative.dev/v1
            kind: Service
            name: event-display
      1 在此示例中,任何具有标签app: heartbeat-cron的作业都将绑定到事件接收器。
    2. 创建接收器绑定

      $ oc apply -f <filename>
  3. 创建一个CronJob对象。

    1. 创建一个 cron 作业 YAML 文件

      示例 cron 作业 YAML 文件
      apiVersion: batch/v1
      kind: CronJob
      metadata:
        name: heartbeat-cron
      spec:
        # Run every minute
        schedule: "* * * * *"
        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
          spec:
            template:
              spec:
                restartPolicy: Never
                containers:
                  - name: single-heartbeat
                    image: quay.io/openshift-knative/heartbeats:latest
                    args:
                      - --period=1
                    env:
                      - name: ONE_SHOT
                        value: "true"
                      - name: POD_NAME
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.name
                      - name: POD_NAMESPACE
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.namespace

      要使用接收器绑定,必须手动将bindings.knative.dev/include=true标签添加到您的 Knative 资源。

      例如,要将此标签添加到CronJob资源,请将以下几行添加到Job资源 YAML 定义中

        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
    2. 创建 cron 作业

      $ oc apply -f <filename>
  4. 通过输入以下命令并检查输出,检查控制器是否已正确映射

    $ oc get sinkbindings.sources.knative.dev bind-heartbeat -oyaml
    示例输出
    spec:
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
          namespace: default
      subject:
        apiVersion: batch/v1
        kind: Job
        namespace: default
        selector:
          matchLabels:
            app: heartbeat-cron
验证

您可以通过查看消息转储函数日志来验证 Kubernetes 事件是否已发送到 Knative 事件接收器。

  1. 输入命令

    $ oc get pods
  2. 输入命令

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container
    示例输出
    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.eventing.samples.heartbeat
      source: https://knative.k8s.ac.cn/eventing-contrib/cmd/heartbeats/#event-test/mypod
      id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596
      time: 2019-10-18T15:23:20.809775386Z
      contenttype: application/json
    Extensions,
      beats: true
      heart: yes
      the: 42
    Data,
      {
        "id": 1,
        "label": ""
      }

使用 Knative CLI 创建接收器绑定

您可以使用kn source binding create命令使用 Knative (kn) CLI 创建接收器绑定。使用 Knative CLI 创建事件源比直接修改 YAML 文件提供更简化和直观的用户界面。

先决条件
  • 在集群上安装了 OpenShift Serverless Operator、Knative Serving 和 Knative Eventing。

  • 您已创建项目或有权访问具有在 OpenShift Container Platform 中创建应用程序和其他工作负载的适当角色和权限的项目。

  • 安装 Knative (kn) CLI。

  • 安装 OpenShift CLI (oc)。

以下步骤要求您创建 YAML 文件。

如果更改 YAML 文件的名称(与示例中使用的名称不同),则必须确保也更新相应的 CLI 命令。

步骤
  1. 要检查接收器绑定是否已正确设置,请创建一个 Knative 事件显示服务(或事件接收器),该服务将其传入的消息转储到其日志中。

    $ kn service create event-display --image quay.io/openshift-knative/showcase
  2. 创建一个接收器绑定实例,将事件定向到该服务。

    $ kn source binding create bind-heartbeat --subject Job:batch/v1:app=heartbeat-cron --sink ksvc:event-display
  3. 创建一个CronJob对象。

    1. 创建一个 cron 作业 YAML 文件

      示例 cron 作业 YAML 文件
      apiVersion: batch/v1
      kind: CronJob
      metadata:
        name: heartbeat-cron
      spec:
        # Run every minute
        schedule: "* * * * *"
        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
          spec:
            template:
              spec:
                restartPolicy: Never
                containers:
                  - name: single-heartbeat
                    image: quay.io/openshift-knative/heartbeats:latest
                    args:
                      - --period=1
                    env:
                      - name: ONE_SHOT
                        value: "true"
                      - name: POD_NAME
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.name
                      - name: POD_NAMESPACE
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.namespace

      要使用接收器绑定,必须手动将bindings.knative.dev/include=true标签添加到您的 Knative CR 中。

      例如,要将此标签添加到CronJob CR,请将以下几行添加到Job CR YAML 定义中。

        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
    2. 创建 cron 作业

      $ oc apply -f <filename>
  4. 通过输入以下命令并检查输出,检查控制器是否已正确映射

    $ kn source binding describe bind-heartbeat
    示例输出
    Name:         bind-heartbeat
    Namespace:    demo-2
    Annotations:  sources.knative.dev/creator=minikube-user, sources.knative.dev/lastModifier=minikub ...
    Age:          2m
    Subject:
      Resource:   job (batch/v1)
      Selector:
        app:      heartbeat-cron
    Sink:
      Name:       event-display
      Resource:   Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE     AGE REASON
      ++ Ready     2m
验证

您可以通过查看消息转储函数日志来验证 Kubernetes 事件是否已发送到 Knative 事件接收器。

  • 通过输入以下命令来查看消息转储函数日志。

    $ oc get pods
    $ oc logs $(oc get pod -o name | grep event-display) -c user-container
    示例输出
    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.eventing.samples.heartbeat
      source: https://knative.k8s.ac.cn/eventing-contrib/cmd/heartbeats/#event-test/mypod
      id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596
      time: 2019-10-18T15:23:20.809775386Z
      contenttype: application/json
    Extensions,
      beats: true
      heart: yes
      the: 42
    Data,
      {
        "id": 1,
        "label": ""
      }

Knative CLI 接收器标志

使用 Knative (kn) CLI 创建事件源时,您可以使用--sink标志指定一个接收器,事件将从该资源发送到该接收器。接收器可以是任何可寻址或可调用的资源,可以接收来自其他资源的传入事件。

以下示例创建一个接收器绑定,该绑定使用服务http://event-display.svc.cluster.local作为接收器。

使用接收器标志的示例命令
$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \ (1)
  --ce-override "sink=bound"
1 http://event-display.svc.cluster.local中的svc确定接收器是 Knative 服务。其他默认接收器前缀包括channelbroker

使用 Web 控制台创建接收器绑定

在集群上安装 Knative Eventing 后,您可以使用 Web 控制台创建接收器绑定。使用 OpenShift Container Platform Web 控制台提供了一个简化且直观的用户界面来创建事件源。

先决条件
  • 您已登录到 OpenShift Container Platform Web 控制台。

  • OpenShift Serverless Operator、Knative Serving 和 Knative Eventing 已安装在您的 OpenShift Container Platform 集群上。

  • 您已创建项目或有权访问具有在 OpenShift Container Platform 中创建应用程序和其他工作负载的适当角色和权限的项目。

步骤
  1. 创建一个用作接收器的 Knative 服务。

    1. 在**开发者**视角中,导航到**+添加** → **YAML**。

    2. 复制示例 YAML。

      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display
      spec:
        template:
          spec:
            containers:
              - image: quay.io/openshift-knative/showcase
    3. 单击**创建**。

  2. 创建一个用作事件源的CronJob资源,并每分钟发送一个事件。

    1. 在**开发者**视角中,导航到**+添加** → **YAML**。

    2. 复制示例 YAML。

      apiVersion: batch/v1
      kind: CronJob
      metadata:
        name: heartbeat-cron
      spec:
        # Run every minute
        schedule: "*/1 * * * *"
        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: true (1)
          spec:
            template:
              spec:
                restartPolicy: Never
                containers:
                  - name: single-heartbeat
                    image: quay.io/openshift-knative/heartbeats
                    args:
                    - --period=1
                    env:
                      - name: ONE_SHOT
                        value: "true"
                      - name: POD_NAME
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.name
                      - name: POD_NAMESPACE
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.namespace
      1 确保包含bindings.knative.dev/include: true标签。OpenShift Serverless 的默认命名空间选择行为使用包含模式。
    3. 单击**创建**。

  3. 在与上一步中创建的服务相同的命名空间中创建接收器绑定,或任何您想要发送事件到的其他接收器。

    1. 在**开发者**视角中,导航到**+添加** → **事件源**。将显示**事件源**页面。

    2. 可选:如果您有多个事件源提供程序,请从**提供程序**列表中选择所需的提供程序,以筛选来自提供程序的可用事件源。

    3. 选择**接收器绑定**,然后单击**创建事件源**。将显示**创建事件源**页面。

      您可以使用**表单视图**或**YAML 视图**配置**接收器绑定**设置,并且可以在视图之间切换。在视图之间切换时,数据会持久保存。

    4. 在**apiVersion**字段中输入batch/v1

    5. 在**Kind**字段中输入Job

      CronJob类型不受 OpenShift Serverless 接收器绑定直接支持,因此**Kind**字段必须定位 cron 作业创建的Job对象,而不是 cron 作业对象本身。

    6. 在**目标**部分,选择您的事件接收器。这可以是**资源**或**URI**。

      1. 选择**资源**以使用通道、代理或服务作为事件源的事件接收器。在此示例中,上一步中创建的event-display服务用作目标**资源**。

      2. 选择**URI**以指定事件路由到的统一资源标识符 (URI)。

    7. 在**匹配标签**部分

      1. 在**名称**字段中输入app

      2. 在**值**字段中输入heartbeat-cron

        使用带有接收器绑定的 cron 作业时,需要标签选择器,而不是资源名称。这是因为 cron 作业创建的作业没有可预测的名称,并且在其名称中包含随机生成的字符串。例如,hearthbeat-cron-1cc23f

    8. 单击**创建**。

验证

您可以通过查看**拓扑**页面和 pod 日志来验证接收器绑定、接收器和 cron 作业是否已创建并正常工作。

  1. 在**开发者**视角中,导航到**拓扑**。

  2. 查看接收器绑定、接收器和心跳 cron 作业。

    View the sink binding and service in the Topology view
  3. 观察到添加接收器绑定后,cron 作业会注册成功的作业。这意味着接收器绑定已成功重新配置 cron 作业创建的作业。

  4. 浏览event-display服务以查看心跳 cron 作业生成的事件。

    View the events in the web UI

接收器绑定参考

您可以通过创建接收器绑定来使用PodSpecable对象作为事件源。创建SinkBinding对象时,您可以配置多个参数。

SinkBinding对象支持以下参数

字段 描述 必需或可选

apiVersion

指定 API 版本,例如sources.knative.dev/v1

必需

kind

将此资源对象标识为SinkBinding对象。

必需

metadata

指定唯一标识SinkBinding对象的元数据。例如,一个name

必需

spec

指定此SinkBinding对象的配置信息。

必需

spec.sink

对解析为 URI 的对象的引用,用作接收器。

必需

spec.subject

引用其运行时契约通过绑定实现进行增强的资源。

必需

spec.ceOverrides

定义覆盖以控制输出格式和对发送到接收器的事件的修改。

可选

主题参数

Subject参数引用其运行时契约通过绑定实现进行增强的资源。您可以为Subject定义配置多个字段。

Subject定义支持以下字段

字段 描述 必需或可选

apiVersion

引用的 API 版本。

必需

kind

引用的类型。

必需

namespace

引用的命名空间。如果省略,则默认为对象的命名空间。

可选

name

引用的名称。

如果您配置了selector,请不要使用。

selector

引用的选择器。

如果您配置了name,请不要使用。

selector.matchExpressions

标签选择器要求列表。

只使用matchExpressionsmatchLabels中的一个。

selector.matchExpressions.key

选择器应用到的标签键。

如果使用matchExpressions,则为必需。

selector.matchExpressions.operator

表示键与一组值的关系。有效运算符为InNotInExistsDoesNotExist

如果使用matchExpressions,则为必需。

selector.matchExpressions.values

字符串值数组。如果operator参数值为InNotIn,则values数组必须非空。如果operator参数值为ExistsDoesNotExist,则values数组必须为空。此数组在策略性合并修补程序期间会被替换。

如果使用matchExpressions,则为必需。

selector.matchLabels

一个键值对映射。matchLabels映射中的每个键值对等效于matchExpressions的一个元素,其中键字段为matchLabels.<key>operatorIn,而values数组仅包含matchLabels.<value>

只使用matchExpressionsmatchLabels中的一个。

主题参数示例

给定以下YAML,则选择名为mysubject位于default命名空间中的Deployment对象。

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  subject:
    apiVersion: apps/v1
    kind: Deployment
    namespace: default
    name: mysubject
  ...

给定以下YAML,则选择任何在default命名空间中具有标签working=exampleJob对象。

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  subject:
    apiVersion: batch/v1
    kind: Job
    namespace: default
    selector:
      matchLabels:
        working: example
  ...

给定以下YAML,则选择任何在default命名空间中具有标签working=exampleworking=samplePod对象。

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  subject:
    apiVersion: v1
    kind: Pod
    namespace: default
    selector:
      - matchExpression:
        key: working
        operator: In
        values:
          - example
          - sample
  ...

CloudEvent 覆盖

ceOverrides定义提供了控制发送到接收器的CloudEvent输出格式和修改的覆盖。您可以为ceOverrides定义配置多个字段。

ceOverrides定义支持以下字段

字段 描述 必需或可选

扩展

指定在出站事件上添加或覆盖哪些属性。每个extensions键值对都作为属性扩展独立设置在事件上。

可选

仅允许有效的CloudEvent属性名称作为扩展。您不能从扩展覆盖配置中设置spec定义的属性。例如,您不能修改type属性。

CloudEvent 覆盖示例
apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  ...
  ceOverrides:
    extensions:
      extra: this is an extra attribute
      additional: 42

这会在subject上设置K_CE_OVERRIDES环境变量。

示例输出
{ "extensions": { "extra": "this is an extra attribute", "additional": "42" } }

include 标签

要使用接收器绑定,您需要将bindings.knative.dev/include: "true"标签分配给资源或包含该资源的命名空间。如果资源定义不包含该标签,集群管理员可以通过运行以下命令将其附加到命名空间。

$ oc label namespace <namespace> bindings.knative.dev/include=true

将服务网格与接收器绑定集成

先决条件
  • 您已将服务网格与OpenShift Serverless集成。

步骤
  1. 在属于ServiceMeshMemberRoll的命名空间中创建一个Service

    apiVersion: serving.knative.dev/v1
    kind: Service
    metadata:
      name: event-display
      namespace: <namespace> (1)
    spec:
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "true" (2)
            sidecar.istio.io/rewriteAppHTTPProbers: "true"
        spec:
          containers:
          - image: quay.io/openshift-knative/showcase
    1 属于ServiceMeshMemberRoll的命名空间。
    2 将服务网格 sidecar 注入到 Knative 服务 pod 中。
  2. 应用Service资源。

    $ oc apply -f <filename>
  3. 创建一个SinkBinding资源。

    apiVersion: sources.knative.dev/v1
    kind: SinkBinding
    metadata:
      name: bind-heartbeat
      namespace: <namespace> (1)
    spec:
      subject:
        apiVersion: batch/v1
        kind: Job (2)
        selector:
          matchLabels:
            app: heartbeat-cron
    
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
    1 属于ServiceMeshMemberRoll的命名空间。
    2 在此示例中,任何具有标签app: heartbeat-cron的Job都绑定到事件接收器。
  4. 应用SinkBinding资源。

    $ oc apply -f <filename>
  5. 创建一个CronJob

    apiVersion: batch/v1
    kind: CronJob
    metadata:
      name: heartbeat-cron
      namespace: <namespace> (1)
    spec:
      # Run every minute
      schedule: "* * * * *"
      jobTemplate:
        metadata:
          labels:
            app: heartbeat-cron
            bindings.knative.dev/include: "true"
        spec:
          template:
            metadata:
              annotations:
                sidecar.istio.io/inject: "true" (2)
                sidecar.istio.io/rewriteAppHTTPProbers: "true"
            spec:
              restartPolicy: Never
              containers:
                - name: single-heartbeat
                  image: quay.io/openshift-knative/heartbeats:latest
                  args:
                    - --period=1
                  env:
                    - name: ONE_SHOT
                      value: "true"
                    - name: POD_NAME
                      valueFrom:
                        fieldRef:
                          fieldPath: metadata.name
                    - name: POD_NAMESPACE
                      valueFrom:
                        fieldRef:
                          fieldPath: metadata.namespace
    1 属于ServiceMeshMemberRoll的命名空间。
    2 将服务网格 sidecar 注入到CronJob pod 中。
  6. 应用CronJob资源。

    $ oc apply -f <filename>
验证

要验证事件是否已发送到 Knative 事件接收器,请查看消息转储函数日志。

  1. 输入以下命令

    $ oc get pods
  2. 输入以下命令

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container
    示例输出
    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.eventing.samples.heartbeat
      source: https://knative.k8s.ac.cn/eventing/test/heartbeats/#event-test/mypod
      id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596
      time: 2019-10-18T15:23:20.809775386Z
      contenttype: application/json
    Extensions,
      beats: true
      heart: yes
      the: 42
    Data,
      {
        "id": 1,
        "label": ""
      }

容器源

容器源创建一个生成事件并将事件发送到接收器的容器镜像。您可以通过创建容器镜像和使用您的镜像 URI 的ContainerSource对象来使用容器源创建自定义事件源。

创建容器镜像的指南

容器源控制器注入两个环境变量:K_SINKK_CE_OVERRIDES。这些变量分别从sinkceOverrides规范中解析。事件将发送到K_SINK环境变量中指定的接收器 URI。消息必须使用CloudEvent HTTP 格式作为POST发送。

容器镜像示例

以下是一个心跳容器镜像示例。

package main

import (
	"context"
	"encoding/json"
	"flag"
	"fmt"
	"log"
	"os"
	"strconv"
	"time"

	duckv1 "knative.dev/pkg/apis/duck/v1"

	cloudevents "github.com/cloudevents/sdk-go/v2"
	"github.com/kelseyhightower/envconfig"
)

type Heartbeat struct {
	Sequence int    `json:"id"`
	Label    string `json:"label"`
}

var (
	eventSource string
	eventType   string
	sink        string
	label       string
	periodStr   string
)

func init() {
	flag.StringVar(&eventSource, "eventSource", "", "the event-source (CloudEvents)")
	flag.StringVar(&eventType, "eventType", "dev.knative.eventing.samples.heartbeat", "the event-type (CloudEvents)")
	flag.StringVar(&sink, "sink", "", "the host url to heartbeat to")
	flag.StringVar(&label, "label", "", "a special label")
	flag.StringVar(&periodStr, "period", "5", "the number of seconds between heartbeats")
}

type envConfig struct {
	// Sink URL where to send heartbeat cloud events
	Sink string `envconfig:"K_SINK"`

	// CEOverrides are the CloudEvents overrides to be applied to the outbound event.
	CEOverrides string `envconfig:"K_CE_OVERRIDES"`

	// Name of this pod.
	Name string `envconfig:"POD_NAME" required:"true"`

	// Namespace this pod exists in.
	Namespace string `envconfig:"POD_NAMESPACE" required:"true"`

	// Whether to run continuously or exit.
	OneShot bool `envconfig:"ONE_SHOT" default:"false"`
}

func main() {
	flag.Parse()

	var env envConfig
	if err := envconfig.Process("", &env); err != nil {
		log.Printf("[ERROR] Failed to process env var: %s", err)
		os.Exit(1)
	}

	if env.Sink != "" {
		sink = env.Sink
	}

	var ceOverrides *duckv1.CloudEventOverrides
	if len(env.CEOverrides) > 0 {
		overrides := duckv1.CloudEventOverrides{}
		err := json.Unmarshal([]byte(env.CEOverrides), &overrides)
		if err != nil {
			log.Printf("[ERROR] Unparseable CloudEvents overrides %s: %v", env.CEOverrides, err)
			os.Exit(1)
		}
		ceOverrides = &overrides
	}

	p, err := cloudevents.NewHTTP(cloudevents.WithTarget(sink))
	if err != nil {
		log.Fatalf("failed to create http protocol: %s", err.Error())
	}

	c, err := cloudevents.NewClient(p, cloudevents.WithUUIDs(), cloudevents.WithTimeNow())
	if err != nil {
		log.Fatalf("failed to create client: %s", err.Error())
	}

	var period time.Duration
	if p, err := strconv.Atoi(periodStr); err != nil {
		period = time.Duration(5) * time.Second
	} else {
		period = time.Duration(p) * time.Second
	}

	if eventSource == "" {
		eventSource = fmt.Sprintf("https://knative.k8s.ac.cn/eventing-contrib/cmd/heartbeats/#%s/%s", env.Namespace, env.Name)
		log.Printf("Heartbeats Source: %s", eventSource)
	}

	if len(label) > 0 && label[0] == '"' {
		label, _ = strconv.Unquote(label)
	}
	hb := &Heartbeat{
		Sequence: 0,
		Label:    label,
	}
	ticker := time.NewTicker(period)
	for {
		hb.Sequence++

		event := cloudevents.NewEvent("1.0")
		event.SetType(eventType)
		event.SetSource(eventSource)
		event.SetExtension("the", 42)
		event.SetExtension("heart", "yes")
		event.SetExtension("beats", true)

		if ceOverrides != nil && ceOverrides.Extensions != nil {
			for n, v := range ceOverrides.Extensions {
				event.SetExtension(n, v)
			}
		}

		if err := event.SetData(cloudevents.ApplicationJSON, hb); err != nil {
			log.Printf("failed to set cloudevents data: %s", err.Error())
		}

		log.Printf("sending cloudevent to %s", sink)
		if res := c.Send(context.Background(), event); !cloudevents.IsACK(res) {
			log.Printf("failed to send cloudevent: %v", res)
		}

		if env.OneShot {
			return
		}

		// Wait for next tick
		<-ticker.C
	}
}

以下是一个引用前面心跳容器镜像的容器源示例。

apiVersion: sources.knative.dev/v1
kind: ContainerSource
metadata:
  name: test-heartbeats
spec:
  template:
    spec:
      containers:
        # This corresponds to a heartbeats image URI that you have built and published
        - image: gcr.io/knative-releases/knative.dev/eventing/cmd/heartbeats
          name: heartbeats
          args:
            - --period=1
          env:
            - name: POD_NAME
              value: "example-pod"
            - name: POD_NAMESPACE
              value: "event-test"
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: showcase
...

使用 Knative CLI 创建和管理容器源

您可以使用kn source container命令使用 Knative (kn) CLI 创建和管理容器源。使用 Knative CLI 创建事件源比直接修改 YAML 文件提供更简化和直观的用户界面。

创建一个容器源
$ kn source container create <container_source_name> --image <image_uri> --sink <sink>
删除一个容器源
$ kn source container delete <container_source_name>
描述一个容器源
$ kn source container describe <container_source_name>
列出现有容器源
$ kn source container list
以 YAML 格式列出现有容器源
$ kn source container list -o yaml
更新一个容器源

此命令更新现有容器源的镜像 URI。

$ kn source container update <container_source_name> --image <image_uri>

使用 Web 控制台创建容器源

在集群上安装 Knative Eventing 后,您可以使用 Web 控制台创建容器源。使用 OpenShift Container Platform Web 控制台提供了一个简化且直观的用户界面来创建事件源。

先决条件
  • 您已登录到 OpenShift Container Platform Web 控制台。

  • OpenShift Serverless Operator、Knative Serving 和 Knative Eventing 已安装在您的 OpenShift Container Platform 集群上。

  • 您已创建项目或有权访问具有在 OpenShift Container Platform 中创建应用程序和其他工作负载的适当角色和权限的项目。

步骤
  1. 在**开发者**视角中,导航到**+添加** → **事件源**。将显示**事件源**页面。

  2. 选择**容器源**,然后单击**创建事件源**。将显示**创建事件源**页面。

  3. 使用**表单视图**或**YAML 视图**配置**容器源**设置。

    您可以在**表单视图**和**YAML 视图**之间切换。在视图之间切换时,数据会保留。

    1. 在**镜像**字段中,输入要在容器源创建的容器中运行的镜像的 URI。

    2. 在**名称**字段中,输入镜像的名称。

    3. 可选:在**参数**字段中,输入要传递给容器的任何参数。

    4. 可选:在**环境变量**字段中,添加要在容器中设置的任何环境变量。

    5. 在**目标**部分,选择您的事件接收器。这可以是**资源**或**URI**。

      1. 选择**资源**以使用通道、代理或服务作为事件源的事件接收器。

      2. 选择**URI**以指定事件路由到的统一资源标识符 (URI)。

  4. 完成容器源的配置后,单击**创建**。

容器源参考

您可以通过创建ContainerSource对象来使用容器作为事件源。创建ContainerSource对象时,您可以配置多个参数。

ContainerSource对象支持以下字段

字段 描述 必需或可选

apiVersion

指定 API 版本,例如sources.knative.dev/v1

必需

kind

将此资源对象标识为ContainerSource对象。

必需

metadata

指定唯一标识ContainerSource对象的元数据。例如,一个name

必需

spec

指定此ContainerSource对象的配置信息。

必需

spec.sink

对解析为 URI 的对象的引用,用作接收器。

必需

spec.template

ContainerSource对象的template规范。

必需

spec.ceOverrides

定义覆盖以控制输出格式和对发送到接收器的事件的修改。

可选

模板参数示例
apiVersion: sources.knative.dev/v1
kind: ContainerSource
metadata:
  name: test-heartbeats
spec:
  template:
    spec:
      containers:
        - image: quay.io/openshift-knative/heartbeats:latest
          name: heartbeats
          args:
            - --period=1
          env:
            - name: POD_NAME
              value: "mypod"
            - name: POD_NAMESPACE
              value: "event-test"
  ...

CloudEvent 覆盖

ceOverrides定义提供了控制发送到接收器的CloudEvent输出格式和修改的覆盖。您可以为ceOverrides定义配置多个字段。

ceOverrides定义支持以下字段

字段 描述 必需或可选

扩展

指定在出站事件上添加或覆盖哪些属性。每个extensions键值对都作为属性扩展独立设置在事件上。

可选

仅允许有效的CloudEvent属性名称作为扩展。您不能从扩展覆盖配置中设置spec定义的属性。例如,您不能修改type属性。

CloudEvent 覆盖示例
apiVersion: sources.knative.dev/v1
kind: ContainerSource
metadata:
  name: test-heartbeats
spec:
  ...
  ceOverrides:
    extensions:
      extra: this is an extra attribute
      additional: 42

这会在subject上设置K_CE_OVERRIDES环境变量。

示例输出
{ "extensions": { "extra": "this is an extra attribute", "additional": "42" } }

将服务网格与 ContainerSource 集成

先决条件
  • 您已将服务网格与OpenShift Serverless集成。

步骤
  1. 在属于ServiceMeshMemberRoll的命名空间中创建一个Service

    apiVersion: serving.knative.dev/v1
    kind: Service
    metadata:
      name: event-display
      namespace: <namespace> (1)
    spec:
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "true" (2)
            sidecar.istio.io/rewriteAppHTTPProbers: "true"
        spec:
          containers:
          - image: quay.io/openshift-knative/showcase
    1 属于ServiceMeshMemberRoll的命名空间。
    2 将服务网格 sidecar 注入到 Knative 服务 pod 中。
  2. 应用Service资源。

    $ oc apply -f <filename>
  3. 在属于ServiceMeshMemberRoll的命名空间中创建一个ContainerSource对象,并将接收器设置为event-display

    apiVersion: sources.knative.dev/v1
    kind: ContainerSource
    metadata:
      name: test-heartbeats
      namespace: <namespace> (1)
    spec:
      template:
        metadata: (2)
          annotations:
            sidecar.istio.io/inject: "true"
            sidecar.istio.io/rewriteAppHTTPProbers: "true"
        spec:
          containers:
            - image: quay.io/openshift-knative/heartbeats:latest
              name: heartbeats
              args:
                - --period=1s
              env:
                - name: POD_NAME
                  value: "example-pod"
                - name: POD_NAMESPACE
                  value: "event-test"
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
    1 命名空间是ServiceMeshMemberRoll的一部分。
    2 启用ContainerSource对象的Service Mesh集成。
  4. 应用ContainerSource资源。

    $ oc apply -f <filename>
验证

要验证事件是否已发送到 Knative 事件接收器,请查看消息转储函数日志。

  1. 输入以下命令

    $ oc get pods
  2. 输入以下命令

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container
    示例输出
    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.eventing.samples.heartbeat
      source: https://knative.k8s.ac.cn/eventing/test/heartbeats/#event-test/mypod
      id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596
      time: 2019-10-18T15:23:20.809775386Z
      contenttype: application/json
    Extensions,
      beats: true
      heart: yes
      the: 42
    Data,
      {
        "id": 1,
        "label": ""
      }