×

Apache Kafka 接收器是一种事件接收器类型,如果集群管理员已在您的集群上启用 Apache Kafka,则可以使用。您可以使用 Kafka 接收器直接从事件源将事件发送到 Kafka 主题。

使用 YAML 创建 Apache Kafka 接收器

您可以创建一个 Kafka 接收器,将事件发送到 Kafka 主题。默认情况下,Kafka 接收器使用二进制内容模式,这比结构化模式更高效。要使用 YAML 创建 Kafka 接收器,必须创建一个定义KafkaSink对象的 YAML 文件,然后使用oc apply命令应用它。

先决条件
  • OpenShift Serverless Operator、Knative Eventing 和KnativeKafka自定义资源 (CR) 已安装在您的集群上。

  • 您已创建了一个项目,或者可以访问具有在 OpenShift Container Platform 中创建应用程序和其他工作负载的适当角色和权限的项目。

  • 您可以访问一个 Red Hat AMQ Streams (Kafka) 集群,该集群生成您要导入的 Kafka 消息。

  • 安装 OpenShift CLI (oc)。

步骤
  1. 创建一个KafkaSink对象定义作为 YAML 文件

    Kafka 接收器 YAML
    apiVersion: eventing.knative.dev/v1alpha1
    kind: KafkaSink
    metadata:
      name: <sink-name>
      namespace: <namespace>
    spec:
      topic: <topic-name>
      bootstrapServers:
       - <bootstrap-server>
  2. 要创建 Kafka 接收器,请应用KafkaSinkYAML 文件

    $ oc apply -f <filename>
  3. 配置事件源,以便在其规范中指定接收器

    连接到 API 服务器源的 Kafka 接收器的示例
    apiVersion: sources.knative.dev/v1alpha2
    kind: ApiServerSource
    metadata:
      name: <source-name> (1)
      namespace: <namespace> (2)
    spec:
      serviceAccountName: <service-account-name> (3)
      mode: Resource
      resources:
      - apiVersion: v1
        kind: Event
      sink:
        ref:
          apiVersion: eventing.knative.dev/v1alpha1
          kind: KafkaSink
          name: <sink-name> (4)
    1 事件源的名称。
    2 事件源的命名空间。
    3 事件源的服务帐户。
    4 Kafka 接收器名称。

使用 OpenShift Container Platform Web 控制台创建 Apache Kafka 事件接收器

您可以使用 OpenShift Container Platform Web 控制台中的**开发者**视角创建一个 Kafka 接收器,将事件发送到 Kafka 主题。默认情况下,Kafka 接收器使用二进制内容模式,这比结构化模式更高效。

作为开发者,您可以创建一个事件接收器来接收来自特定源的事件并将其发送到 Kafka 主题。

先决条件
  • 您已从 OperatorHub 安装了 OpenShift Serverless Operator,其中包含 Knative Serving、Knative Eventing 和用于 Apache Kafka API 的 Knative 代理。

  • 您已在 Kafka 环境中创建了一个 Kafka 主题。

步骤
  1. 在**开发者**视角中,导航到**+添加**视图。

  2. 在**事件编排目录**中单击**事件接收器**。

  3. 在目录项中搜索KafkaSink并单击它。

  4. 单击**创建事件接收器**。

  5. 在表单视图中,键入引导服务器的 URL,它是主机名和端口的组合。

    create event sink
  6. 键入要发送事件数据的主题的名称。

  7. 键入事件接收器的名称。

  8. 单击**创建**。

验证
  1. 在**开发者**视角中,导航到**拓扑**视图。

  2. 单击已创建的事件接收器以在右侧面板中查看其详细信息。

配置 Apache Kafka 接收器的安全

Apache Kafka 客户端和服务器使用传输层安全 (TLS) 来加密 Knative 和 Kafka 之间的流量,以及用于身份验证。对于 Apache Kafka 的 Knative 代理实现,TLS 是唯一支持的流量加密方法。

Apache Kafka 使用简单身份验证和安全层 (SASL) 进行身份验证。如果您在集群上使用 SASL 身份验证,则用户必须向 Knative 提供凭据才能与 Kafka 集群通信;否则,将无法生成或使用事件。

先决条件
  • OpenShift Serverless Operator、Knative Eventing 和KnativeKafka自定义资源 (CR) 已安装在您的 OpenShift Container Platform 集群上。

  • KnativeKafka CR 中启用了 Kafka 接收器。

  • 您已创建了一个项目,或者可以访问具有在 OpenShift Container Platform 中创建应用程序和其他工作负载的适当角色和权限的项目。

  • 您有一个存储为.pem文件的Kafka集群CA证书。

  • 您有一个存储为.pem文件的Kafka集群客户端证书和密钥。

  • 您已安装OpenShift (oc) CLI。

  • 您已选择要使用的SASL机制,例如PLAINSCRAM-SHA-256SCRAM-SHA-512

步骤
  1. 在与您的KafkaSink对象相同的命名空间中,创建证书文件作为密钥。

    证书和密钥必须为PEM格式。

    • 对于使用SASL进行身份验证而无需加密的情况

      $ oc create secret -n <namespace> generic <secret_name> \
        --from-literal=protocol=SASL_PLAINTEXT \
        --from-literal=sasl.mechanism=<sasl_mechanism> \
        --from-literal=user=<username> \
        --from-literal=password=<password>
    • 对于使用SASL进行身份验证并使用TLS进行加密的情况

      $ oc create secret -n <namespace> generic <secret_name> \
        --from-literal=protocol=SASL_SSL \
        --from-literal=sasl.mechanism=<sasl_mechanism> \
        --from-file=ca.crt=<my_caroot.pem_file_path> \ (1)
        --from-literal=user=<username> \
        --from-literal=password=<password>
      1 如果您使用的是公共云托管的Kafka服务,则可以省略ca.crt以使用系统的根CA集。
    • 对于使用TLS进行身份验证和加密的情况

      $ oc create secret -n <namespace> generic <secret_name> \
        --from-literal=protocol=SSL \
        --from-file=ca.crt=<my_caroot.pem_file_path> \ (1)
        --from-file=user.crt=<my_cert.pem_file_path> \
        --from-file=user.key=<my_key.pem_file_path>
      1 如果您使用的是公共云托管的Kafka服务,则可以省略ca.crt以使用系统的根CA集。
  2. 创建或修改KafkaSink对象,并在auth规范中添加对您密钥的引用。

    apiVersion: eventing.knative.dev/v1alpha1
    kind: KafkaSink
    metadata:
       name: <sink_name>
       namespace: <namespace>
    spec:
    ...
       auth:
         secret:
           ref:
             name: <secret_name>
    ...
  3. 应用KafkaSink对象。

    $ oc apply -f <filename>