×

您可以创建一个 Apache Kafka 源,它从 Apache Kafka 集群读取事件并将这些事件传递到接收器。您可以使用 OpenShift Container Platform Web 控制台、Knative (kn) CLI 或通过直接创建 KafkaSource 对象(作为 YAML 文件)并使用 OpenShift CLI (oc) 应用它来创建 Kafka 源。

使用 Web 控制台创建 Apache Kafka 事件源

在您的集群上安装用于 Apache Kafka 的 Knative 代理实现后,您可以使用 Web 控制台创建 Apache Kafka 源。使用 OpenShift Container Platform Web 控制台提供了一个简化且直观的用户界面来创建 Kafka 源。

先决条件
  • OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka 自定义资源已安装在您的集群上。

  • 您已登录到 Web 控制台。

  • 您可以访问一个 Red Hat AMQ Streams (Kafka) 集群,该集群会生成您要导入的 Kafka 消息。

  • 您已创建了一个项目,或者可以访问具有在 OpenShift Container Platform 中创建应用程序和其他工作负载的适当角色和权限的项目。

步骤
  1. 在**开发者**视角中,导航到**+添加**页面并选择**事件源**。

  2. 在**事件源**页面中,在**类型**部分选择**Kafka 源**。

  3. 配置**Kafka 源**设置

    1. 添加**引导服务器**的逗号分隔列表。

    2. 添加**主题**的逗号分隔列表。

    3. 添加**消费者组**。

    4. 为创建的服务帐户选择**服务帐户名称**。

    5. 在**目标**部分,选择您的事件接收器。这可以是**资源**或**URI**

      1. 选择**资源**以使用通道、代理或服务作为事件源的事件接收器。

      2. 选择**URI**以指定将事件路由到的统一资源标识符 (URI)。

    6. 输入 Kafka 事件源的**名称**。

  4. 单击**创建**。

验证

您可以通过查看**拓扑**页面来验证 Kafka 事件源是否已创建并连接到接收器。

  1. 在**开发者**视角中,导航到**拓扑**。

  2. 查看 Kafka 事件源和接收器。

    View the Kafka source and service in the Topology view

使用 Knative CLI 创建 Apache Kafka 事件源

您可以使用 kn source kafka create 命令使用 Knative (kn) CLI 创建 Kafka 源。与直接修改 YAML 文件相比,使用 Knative CLI 创建事件源提供了更简化和直观的界面。

先决条件
  • OpenShift Serverless Operator、Knative Eventing、Knative Serving 和 KnativeKafka 自定义资源 (CR) 已安装在您的集群上。

  • 您已创建了一个项目,或者可以访问具有在 OpenShift Container Platform 中创建应用程序和其他工作负载的适当角色和权限的项目。

  • 您可以访问一个 Red Hat AMQ Streams (Kafka) 集群,该集群会生成您要导入的 Kafka 消息。

  • 您已安装 Knative (kn) CLI。

  • 可选:如果您想使用此过程中的验证步骤,则已安装 OpenShift CLI (oc)。

步骤
  1. 要验证 Kafka 事件源是否正常工作,请创建一个 Knative 服务,该服务将传入事件转储到服务日志中

    $ kn service create event-display \
        --image quay.io/openshift-knative/showcase
  2. 创建 KafkaSource CR

    $ kn source kafka create <kafka_source_name> \
        --servers <cluster_kafka_bootstrap>.kafka.svc:9092 \
        --topics <topic_name> --consumergroup my-consumer-group \
        --sink event-display

    请将此命令中的占位符值替换为您的源名称、Bootstrap 服务器和主题的值。

    --servers--topics--consumergroup 选项指定与 Kafka 集群的连接参数。--consumergroup 选项是可选的。

  3. 可选:查看您创建的 KafkaSource CR 的详细信息

    $ kn source kafka describe <kafka_source_name>
    示例输出
    Name:              example-kafka-source
    Namespace:         kafka
    Age:               1h
    BootstrapServers:  example-cluster-kafka-bootstrap.kafka.svc:9092
    Topics:            example-topic
    ConsumerGroup:     example-consumer-group
    
    Sink:
      Name:       event-display
      Namespace:  default
      Resource:   Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE            AGE REASON
      ++ Ready            1h
      ++ Deployed         1h
      ++ SinkProvided     1h
验证步骤
  1. 触发 Kafka 实例向主题发送消息

    $ oc -n kafka run kafka-producer \
        -ti --image=quay.io/strimzi/kafka:latest-kafka-2.7.0 --rm=true \
        --restart=Never -- bin/kafka-console-producer.sh \
        --broker-list <cluster_kafka_bootstrap>:9092 --topic my-topic

    在提示符中输入消息。此命令假设:

    • Kafka 集群安装在 kafka 命名空间中。

    • KafkaSource 对象已配置为使用 my-topic 主题。

  2. 通过查看日志验证消息是否已到达

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container
    示例输出
    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.kafka.event
      source: /apis/v1/namespaces/default/kafkasources/example-kafka-source#example-topic
      subject: partition:46#0
      id: partition:46/offset:0
      time: 2021-03-10T11:21:49.4Z
    Extensions,
      traceparent: 00-161ff3815727d8755848ec01c866d1cd-7ff3916c44334678-00
    Data,
      Hello!

Knative CLI sink 标志

使用 Knative (kn) CLI 创建事件源时,您可以使用 --sink 标志指定一个接收来自该资源的事件的 sink。sink 可以是任何可寻址或可调用的资源,可以接收来自其他资源的传入事件。

以下示例创建一个 sink 绑定,该绑定使用服务 http://event-display.svc.cluster.local 作为 sink

使用 sink 标志的示例命令
$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \ (1)
  --ce-override "sink=bound"
1 http://event-display.svc.cluster.local 中的 svc 确定 sink 是一个 Knative 服务。其他默认 sink 前缀包括 channelbroker

使用 YAML 创建 Apache Kafka 事件源

使用 YAML 文件创建 Knative 资源使用声明式 API,使您可以声明式且可重复地描述应用程序。要使用 YAML 创建 Kafka 源,您必须创建一个定义 KafkaSource 对象的 YAML 文件,然后使用 oc apply 命令应用它。

先决条件
  • OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka 自定义资源已安装在您的集群上。

  • 您已创建了一个项目,或者可以访问具有在 OpenShift Container Platform 中创建应用程序和其他工作负载的适当角色和权限的项目。

  • 您可以访问一个 Red Hat AMQ Streams (Kafka) 集群,该集群会生成您要导入的 Kafka 消息。

  • 安装 OpenShift CLI (oc)。

步骤
  1. 创建一个作为 YAML 文件的 KafkaSource 对象

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: <source_name>
    spec:
      consumerGroup: <group_name> (1)
      bootstrapServers:
      - <list_of_bootstrap_servers>
      topics:
      - <list_of_topics> (2)
      sink:
      - <list_of_sinks> (3)
    1 消费者组是一组使用相同组 ID 并从主题消费数据的消费者。
    2 主题为数据存储提供了一个目标。每个主题都分成一个或多个分区。
    3 sink 指定事件从源发送到的位置。

    OpenShift Serverless 上的 KafkaSource 对象仅支持 API 的 v1beta1 版本。请勿使用此 API 的 v1alpha1 版本,因为此版本现已弃用。

    示例 KafkaSource 对象
    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: kafka-source
    spec:
      consumerGroup: knative-group
      bootstrapServers:
      - my-cluster-kafka-bootstrap.kafka:9092
      topics:
      - knative-demo-topic
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
  2. 应用 KafkaSource YAML 文件

    $ oc apply -f <filename>
验证
  • 通过输入以下命令验证 Kafka 事件源是否已创建

    $ oc get pods
    示例输出
    NAME                                    READY     STATUS    RESTARTS   AGE
    kafkasource-kafka-source-5ca0248f-...   1/1       Running   0          13m

为 Apache Kafka 源配置 SASL 身份验证

Apache Kafka 使用简单身份验证和安全层 (SASL) 进行身份验证。如果您在集群上使用 SASL 身份验证,则用户必须向 Knative 提供凭据才能与 Kafka 集群通信;否则无法生成或使用事件。

先决条件
  • 您在 OpenShift Container Platform 上拥有集群或专用管理员权限。

  • OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka CR 已安装在您的 OpenShift Container Platform 集群上。

  • 您已创建了一个项目,或者可以访问具有在 OpenShift Container Platform 中创建应用程序和其他工作负载的适当角色和权限的项目。

  • 您拥有 Kafka 集群的用户名和密码。

  • 您已选择要使用的 SASL 机制,例如 PLAINSCRAM-SHA-256SCRAM-SHA-512

  • 如果启用了 TLS,您还需要 Kafka 集群的 ca.crt 证书文件。

  • 您已安装 OpenShift (oc) CLI。

步骤
  1. 在您选择的命名空间中创建证书文件作为密钥。

    $ oc create secret -n <namespace> generic <kafka_auth_secret> \
      --from-file=ca.crt=caroot.pem \
      --from-literal=password="SecretPassword" \
      --from-literal=saslType="SCRAM-SHA-512" \ (1)
      --from-literal=user="my-sasl-user"
    1 SASL 类型可以是 PLAINSCRAM-SHA-256SCRAM-SHA-512
  2. 创建或修改您的 Kafka 源,使其包含以下 spec 配置

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: example-source
    spec:
    ...
      net:
        sasl:
          enable: true
          user:
            secretKeyRef:
              name: <kafka_auth_secret>
              key: user
          password:
            secretKeyRef:
              name: <kafka_auth_secret>
              key: password
          type:
            secretKeyRef:
              name: <kafka_auth_secret>
              key: saslType
        tls:
          enable: true
          caCert: (1)
            secretKeyRef:
              name: <kafka_auth_secret>
              key: ca.crt
    ...
    1 如果您使用的是公共云 Kafka 服务,则不需要 caCert spec。

为 KafkaSource 配置 KEDA 自动缩放

您可以将 Apache Kafka (KafkaSource) 的 Knative Eventing 源配置为使用自定义指标自动缩放运算符进行自动缩放,该运算符基于 Kubernetes 事件驱动自动缩放器 (KEDA)。

为 KafkaSource 配置 KEDA 自动缩放仅是技术预览功能。技术预览功能不受 Red Hat 生产服务级别协议 (SLA) 支持,并且可能功能不完整。Red Hat 不建议在生产环境中使用它们。这些功能可让您抢先体验即将推出的产品功能,从而使客户能够在开发过程中测试功能并提供反馈。

有关 Red Hat 技术预览功能的支持范围的更多信息,请参见 技术预览功能支持范围

先决条件
  • OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka 自定义资源已安装在您的集群上。

步骤
  1. KnativeKafka 自定义资源中,启用 KEDA 缩放

    示例 YAML
    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      name: knative-kafka
      namespace: knative-eventing
    spec:
      config:
        kafka-features:
          controller-autoscaler-keda: enabled
  2. 应用 KnativeKafka YAML 文件

    $ oc apply -f <filename>