apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
name: <sink-name>
namespace: <namespace>
spec:
topic: <topic-name>
bootstrapServers:
- <bootstrap-server>
您可以创建一个 Kafka 接收器,将事件发送到 Kafka 主题。默认情况下,Kafka 接收器使用二进制内容模式,这比结构化模式更高效。要使用 YAML 创建 Kafka 接收器,必须创建一个定义KafkaSink
对象的 YAML 文件,然后使用oc apply
命令应用它。
OpenShift Serverless Operator、Knative Eventing 和KnativeKafka
自定义资源 (CR) 已安装在您的集群上。
您已创建了一个项目,或者可以访问具有在 OpenShift Container Platform 中创建应用程序和其他工作负载的适当角色和权限的项目。
您可以访问一个 Red Hat AMQ Streams (Kafka) 集群,该集群生成您要导入的 Kafka 消息。
安装 OpenShift CLI (oc
)。
创建一个KafkaSink
对象定义作为 YAML 文件
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
name: <sink-name>
namespace: <namespace>
spec:
topic: <topic-name>
bootstrapServers:
- <bootstrap-server>
要创建 Kafka 接收器,请应用KafkaSink
YAML 文件
$ oc apply -f <filename>
配置事件源,以便在其规范中指定接收器
apiVersion: sources.knative.dev/v1alpha2
kind: ApiServerSource
metadata:
name: <source-name> (1)
namespace: <namespace> (2)
spec:
serviceAccountName: <service-account-name> (3)
mode: Resource
resources:
- apiVersion: v1
kind: Event
sink:
ref:
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
name: <sink-name> (4)
1 | 事件源的名称。 |
2 | 事件源的命名空间。 |
3 | 事件源的服务帐户。 |
4 | Kafka 接收器名称。 |
您可以使用 OpenShift Container Platform Web 控制台中的**开发者**视角创建一个 Kafka 接收器,将事件发送到 Kafka 主题。默认情况下,Kafka 接收器使用二进制内容模式,这比结构化模式更高效。
作为开发者,您可以创建一个事件接收器来接收来自特定源的事件并将其发送到 Kafka 主题。
您已从 OperatorHub 安装了 OpenShift Serverless Operator,其中包含 Knative Serving、Knative Eventing 和用于 Apache Kafka API 的 Knative 代理。
您已在 Kafka 环境中创建了一个 Kafka 主题。
在**开发者**视角中,导航到**+添加**视图。
在**事件编排目录**中单击**事件接收器**。
在目录项中搜索KafkaSink
并单击它。
单击**创建事件接收器**。
在表单视图中,键入引导服务器的 URL,它是主机名和端口的组合。
键入要发送事件数据的主题的名称。
键入事件接收器的名称。
单击**创建**。
在**开发者**视角中,导航到**拓扑**视图。
单击已创建的事件接收器以在右侧面板中查看其详细信息。
Apache Kafka 客户端和服务器使用传输层安全 (TLS) 来加密 Knative 和 Kafka 之间的流量,以及用于身份验证。对于 Apache Kafka 的 Knative 代理实现,TLS 是唯一支持的流量加密方法。
Apache Kafka 使用简单身份验证和安全层 (SASL) 进行身份验证。如果您在集群上使用 SASL 身份验证,则用户必须向 Knative 提供凭据才能与 Kafka 集群通信;否则,将无法生成或使用事件。
OpenShift Serverless Operator、Knative Eventing 和KnativeKafka
自定义资源 (CR) 已安装在您的 OpenShift Container Platform 集群上。
在KnativeKafka
CR 中启用了 Kafka 接收器。
您已创建了一个项目,或者可以访问具有在 OpenShift Container Platform 中创建应用程序和其他工作负载的适当角色和权限的项目。
您有一个存储为.pem
文件的Kafka集群CA证书。
您有一个存储为.pem
文件的Kafka集群客户端证书和密钥。
您已安装OpenShift (oc
) CLI。
您已选择要使用的SASL机制,例如PLAIN
、SCRAM-SHA-256
或SCRAM-SHA-512
。
在与您的KafkaSink
对象相同的命名空间中,创建证书文件作为密钥。
证书和密钥必须为PEM格式。 |
对于使用SASL进行身份验证而无需加密的情况
$ oc create secret -n <namespace> generic <secret_name> \
--from-literal=protocol=SASL_PLAINTEXT \
--from-literal=sasl.mechanism=<sasl_mechanism> \
--from-literal=user=<username> \
--from-literal=password=<password>
对于使用SASL进行身份验证并使用TLS进行加密的情况
$ oc create secret -n <namespace> generic <secret_name> \
--from-literal=protocol=SASL_SSL \
--from-literal=sasl.mechanism=<sasl_mechanism> \
--from-file=ca.crt=<my_caroot.pem_file_path> \ (1)
--from-literal=user=<username> \
--from-literal=password=<password>
1 | 如果您使用的是公共云托管的Kafka服务,则可以省略ca.crt 以使用系统的根CA集。 |
对于使用TLS进行身份验证和加密的情况
$ oc create secret -n <namespace> generic <secret_name> \
--from-literal=protocol=SSL \
--from-file=ca.crt=<my_caroot.pem_file_path> \ (1)
--from-file=user.crt=<my_cert.pem_file_path> \
--from-file=user.key=<my_key.pem_file_path>
1 | 如果您使用的是公共云托管的Kafka服务,则可以省略ca.crt 以使用系统的根CA集。 |
创建或修改KafkaSink
对象,并在auth
规范中添加对您密钥的引用。
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
name: <sink_name>
namespace: <namespace>
spec:
...
auth:
secret:
ref:
name: <secret_name>
...
应用KafkaSink
对象。
$ oc apply -f <filename>