您可以创建一个 Apache Kafka 源,它从 Apache Kafka 集群读取事件并将这些事件传递到接收器。您可以使用 OpenShift Container Platform Web 控制台、Knative (kn
) CLI 或通过直接创建 KafkaSource
对象(作为 YAML 文件)并使用 OpenShift CLI (oc
) 应用它来创建 Kafka 源。
请参阅有关安装用于 Apache Kafka 的 Knative 代理的文档。 |
在您的集群上安装用于 Apache Kafka 的 Knative 代理实现后,您可以使用 Web 控制台创建 Apache Kafka 源。使用 OpenShift Container Platform Web 控制台提供了一个简化且直观的用户界面来创建 Kafka 源。
OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka
自定义资源已安装在您的集群上。
您已登录到 Web 控制台。
您可以访问一个 Red Hat AMQ Streams (Kafka) 集群,该集群会生成您要导入的 Kafka 消息。
您已创建了一个项目,或者可以访问具有在 OpenShift Container Platform 中创建应用程序和其他工作负载的适当角色和权限的项目。
在**开发者**视角中,导航到**+添加**页面并选择**事件源**。
在**事件源**页面中,在**类型**部分选择**Kafka 源**。
配置**Kafka 源**设置
添加**引导服务器**的逗号分隔列表。
添加**主题**的逗号分隔列表。
添加**消费者组**。
为创建的服务帐户选择**服务帐户名称**。
在**目标**部分,选择您的事件接收器。这可以是**资源**或**URI**
选择**资源**以使用通道、代理或服务作为事件源的事件接收器。
选择**URI**以指定将事件路由到的统一资源标识符 (URI)。
输入 Kafka 事件源的**名称**。
单击**创建**。
您可以通过查看**拓扑**页面来验证 Kafka 事件源是否已创建并连接到接收器。
在**开发者**视角中,导航到**拓扑**。
查看 Kafka 事件源和接收器。
您可以使用 kn source kafka create
命令使用 Knative (kn
) CLI 创建 Kafka 源。与直接修改 YAML 文件相比,使用 Knative CLI 创建事件源提供了更简化和直观的界面。
OpenShift Serverless Operator、Knative Eventing、Knative Serving 和 KnativeKafka
自定义资源 (CR) 已安装在您的集群上。
您已创建了一个项目,或者可以访问具有在 OpenShift Container Platform 中创建应用程序和其他工作负载的适当角色和权限的项目。
您可以访问一个 Red Hat AMQ Streams (Kafka) 集群,该集群会生成您要导入的 Kafka 消息。
您已安装 Knative (kn
) CLI。
可选:如果您想使用此过程中的验证步骤,则已安装 OpenShift CLI (oc
)。
要验证 Kafka 事件源是否正常工作,请创建一个 Knative 服务,该服务将传入事件转储到服务日志中
$ kn service create event-display \
--image quay.io/openshift-knative/showcase
创建 KafkaSource
CR
$ kn source kafka create <kafka_source_name> \
--servers <cluster_kafka_bootstrap>.kafka.svc:9092 \
--topics <topic_name> --consumergroup my-consumer-group \
--sink event-display
请将此命令中的占位符值替换为您的源名称、Bootstrap 服务器和主题的值。 |
--servers
、--topics
和 --consumergroup
选项指定与 Kafka 集群的连接参数。--consumergroup
选项是可选的。
可选:查看您创建的 KafkaSource
CR 的详细信息
$ kn source kafka describe <kafka_source_name>
Name: example-kafka-source
Namespace: kafka
Age: 1h
BootstrapServers: example-cluster-kafka-bootstrap.kafka.svc:9092
Topics: example-topic
ConsumerGroup: example-consumer-group
Sink:
Name: event-display
Namespace: default
Resource: Service (serving.knative.dev/v1)
Conditions:
OK TYPE AGE REASON
++ Ready 1h
++ Deployed 1h
++ SinkProvided 1h
触发 Kafka 实例向主题发送消息
$ oc -n kafka run kafka-producer \
-ti --image=quay.io/strimzi/kafka:latest-kafka-2.7.0 --rm=true \
--restart=Never -- bin/kafka-console-producer.sh \
--broker-list <cluster_kafka_bootstrap>:9092 --topic my-topic
在提示符中输入消息。此命令假设:
Kafka 集群安装在 kafka
命名空间中。
KafkaSource
对象已配置为使用 my-topic
主题。
通过查看日志验证消息是否已到达
$ oc logs $(oc get pod -o name | grep event-display) -c user-container
☁️ cloudevents.Event
Validation: valid
Context Attributes,
specversion: 1.0
type: dev.knative.kafka.event
source: /apis/v1/namespaces/default/kafkasources/example-kafka-source#example-topic
subject: partition:46#0
id: partition:46/offset:0
time: 2021-03-10T11:21:49.4Z
Extensions,
traceparent: 00-161ff3815727d8755848ec01c866d1cd-7ff3916c44334678-00
Data,
Hello!
使用 Knative (kn
) CLI 创建事件源时,您可以使用 --sink
标志指定一个接收来自该资源的事件的 sink。sink 可以是任何可寻址或可调用的资源,可以接收来自其他资源的传入事件。
以下示例创建一个 sink 绑定,该绑定使用服务 http://event-display.svc.cluster.local
作为 sink
$ kn source binding create bind-heartbeat \
--namespace sinkbinding-example \
--subject "Job:batch/v1:app=heartbeat-cron" \
--sink http://event-display.svc.cluster.local \ (1)
--ce-override "sink=bound"
1 | http://event-display.svc.cluster.local 中的 svc 确定 sink 是一个 Knative 服务。其他默认 sink 前缀包括 channel 和 broker 。 |
使用 YAML 文件创建 Knative 资源使用声明式 API,使您可以声明式且可重复地描述应用程序。要使用 YAML 创建 Kafka 源,您必须创建一个定义 KafkaSource
对象的 YAML 文件,然后使用 oc apply
命令应用它。
OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka
自定义资源已安装在您的集群上。
您已创建了一个项目,或者可以访问具有在 OpenShift Container Platform 中创建应用程序和其他工作负载的适当角色和权限的项目。
您可以访问一个 Red Hat AMQ Streams (Kafka) 集群,该集群会生成您要导入的 Kafka 消息。
安装 OpenShift CLI (oc
)。
创建一个作为 YAML 文件的 KafkaSource
对象
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: <source_name>
spec:
consumerGroup: <group_name> (1)
bootstrapServers:
- <list_of_bootstrap_servers>
topics:
- <list_of_topics> (2)
sink:
- <list_of_sinks> (3)
1 | 消费者组是一组使用相同组 ID 并从主题消费数据的消费者。 |
2 | 主题为数据存储提供了一个目标。每个主题都分成一个或多个分区。 |
3 | sink 指定事件从源发送到的位置。 |
OpenShift Serverless 上的 |
KafkaSource
对象apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
topics:
- knative-demo-topic
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
应用 KafkaSource
YAML 文件
$ oc apply -f <filename>
通过输入以下命令验证 Kafka 事件源是否已创建
$ oc get pods
NAME READY STATUS RESTARTS AGE
kafkasource-kafka-source-5ca0248f-... 1/1 Running 0 13m
Apache Kafka 使用简单身份验证和安全层 (SASL) 进行身份验证。如果您在集群上使用 SASL 身份验证,则用户必须向 Knative 提供凭据才能与 Kafka 集群通信;否则无法生成或使用事件。
您在 OpenShift Container Platform 上拥有集群或专用管理员权限。
OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka
CR 已安装在您的 OpenShift Container Platform 集群上。
您已创建了一个项目,或者可以访问具有在 OpenShift Container Platform 中创建应用程序和其他工作负载的适当角色和权限的项目。
您拥有 Kafka 集群的用户名和密码。
您已选择要使用的 SASL 机制,例如 PLAIN
、SCRAM-SHA-256
或 SCRAM-SHA-512
。
如果启用了 TLS,您还需要 Kafka 集群的 ca.crt
证书文件。
您已安装 OpenShift (oc
) CLI。
在您选择的命名空间中创建证书文件作为密钥。
$ oc create secret -n <namespace> generic <kafka_auth_secret> \
--from-file=ca.crt=caroot.pem \
--from-literal=password="SecretPassword" \
--from-literal=saslType="SCRAM-SHA-512" \ (1)
--from-literal=user="my-sasl-user"
1 | SASL 类型可以是 PLAIN 、SCRAM-SHA-256 或 SCRAM-SHA-512 。 |
创建或修改您的 Kafka 源,使其包含以下 spec
配置
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: example-source
spec:
...
net:
sasl:
enable: true
user:
secretKeyRef:
name: <kafka_auth_secret>
key: user
password:
secretKeyRef:
name: <kafka_auth_secret>
key: password
type:
secretKeyRef:
name: <kafka_auth_secret>
key: saslType
tls:
enable: true
caCert: (1)
secretKeyRef:
name: <kafka_auth_secret>
key: ca.crt
...
1 | 如果您使用的是公共云 Kafka 服务,则不需要 caCert spec。 |
您可以将 Apache Kafka (KafkaSource) 的 Knative Eventing 源配置为使用自定义指标自动缩放运算符进行自动缩放,该运算符基于 Kubernetes 事件驱动自动缩放器 (KEDA)。
为 KafkaSource 配置 KEDA 自动缩放仅是技术预览功能。技术预览功能不受 Red Hat 生产服务级别协议 (SLA) 支持,并且可能功能不完整。Red Hat 不建议在生产环境中使用它们。这些功能可让您抢先体验即将推出的产品功能,从而使客户能够在开发过程中测试功能并提供反馈。 有关 Red Hat 技术预览功能的支持范围的更多信息,请参见 技术预览功能支持范围。 |
OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka
自定义资源已安装在您的集群上。
在 KnativeKafka
自定义资源中,启用 KEDA 缩放
apiVersion: operator.serverless.openshift.io/v1alpha1
kind: KnativeKafka
metadata:
name: knative-kafka
namespace: knative-eventing
spec:
config:
kafka-features:
controller-autoscaler-keda: enabled
应用 KnativeKafka
YAML 文件
$ oc apply -f <filename>