OpenFunction 事件
1 - 简介
概述
OpenFunction 事件是 OpenFunction 的事件管理框架。它提供以下核心特性:
- 支持通过同步和异步调用触发目标函数
- 用户定义的触发判断逻辑
- OpenFunction 事件的组件可以由 OpenFunction 本身驱动
架构
以下图示说明了 OpenFunction 事件的架构。
概念
EventSource
EventSource 定义了事件的生产者,例如 Kafka 服务,对象存储服务,甚至是函数。它包含了这些事件生产者的描述和发送这些事件的信息。
EventSource 支持以下类型的事件源服务器:
- Kafka
- Cron(调度器)
- Redis
EventBus(ClusterEventBus)
EventBus 负责聚合事件并使其持久化。它包含了一个通常是消息队列(如 NATS Streaming 和 Kafka)的事件总线 broker 的描述,并为 EventSource 和 Trigger 提供这些配置。
EventBus 默认处理命名空间范围内的事件总线适配。对于集群范围,ClusterEventBus 可作为事件总线适配器,并在其他组件无法找到命名空间下的 EventBus 时生效。
EventBus 支持以下事件总线 broker:
- NATS Streaming
Trigger
Trigger 是事件目的的抽象,例如接收到消息时需要做什么。它包含了由您定义的事件的目的,这告诉触发器它应该从哪个 EventSource 获取事件,并根据给定的条件决定是否触发目标函数。
参考
有关更多信息,请参阅 EventSource 规范 和 EventBus 规范。
2 - 使用 EventSource
本文档提供了一个示例,说明如何使用事件源触发同步函数。
在此示例中,定义了一个 EventSource,用于同步调用,使用事件源(一个 Kafka 服务器)作为函数(一个 Knative 服务)的输入绑定。当事件源生成事件时,它将通过 spec.sink
配置调用函数并获取同步返回。
创建函数
使用以下内容创建一个函数作为 EventSource Sink。有关如何创建函数的更多信息,请参见 创建同步函数。
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
name: sink
spec:
version: "v1.0.0"
image: "openfunction/sink-sample:latest"
serving:
template:
containers:
- name: function
imagePullPolicy: Always
triggers:
http:
port: 8080
创建函数后,运行以下命令获取函数的 URL。
$ kubectl get functions.core.openfunction.io
NAME BUILDSTATE SERVINGSTATE BUILDER SERVING URL AGE
sink Skipped Running serving-4x5wh https://openfunction.io/default/sink 13s
创建 Kafka 集群
运行以下命令在默认命名空间中安装 strimzi-kafka-operator。
helm repo add strimzi https://strimzi.io/charts/ helm install kafka-operator -n default strimzi/strimzi-kafka-operator
使用以下内容创建一个文件
kafka.yaml
。apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: kafka-server namespace: default spec: kafka: version: 3.3.1 replicas: 1 listeners: - name: plain port: 9092 type: internal tls: false - name: tls port: 9093 type: internal tls: true config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 default.replication.factor: 1 min.insync.replicas: 1 inter.broker.protocol.version: "3.1" storage: type: ephemeral zookeeper: replicas: 1 storage: type: ephemeral entityOperator: topicOperator: {} userOperator: {} --- apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: events-sample namespace: default labels: strimzi.io/cluster: kafka-server spec: partitions: 10 replicas: 1 config: retention.ms: 7200000 segment.bytes: 1073741824
运行以下命令在默认命名空间中部署一个名为
kafka-server
的 1-replica Kafka 服务器和一个名为events-sample
的 1-replica Kafka 主题。此命令创建的 Kafka 和 Zookeeper 集群的存储类型为 ephemeral,并使用 emptyDir 进行演示。kubectl apply -f kafka.yaml
运行以下命令检查 pod 状态,并等待 Kafka 和 Zookeeper 启动并运行。
$ kubectl get po NAME READY STATUS RESTARTS AGE kafka-server-entity-operator-568957ff84-nmtlw 3/3 Running 0 8m42s kafka-server-kafka-0 1/1 Running 0 9m13s kafka-server-zookeeper-0 1/1 Running 0 9m46s strimzi-cluster-operator-687fdd6f77-cwmgm 1/1 Running 0 11m
运行以下命令查看 Kafka 集群的元数据。
kafkacat -L -b kafka-server-kafka-brokers:9092
触发同步函数
创建 EventSource
使用以下内容创建一个 EventSource 配置文件(例如,
eventsource-sink.yaml
)。注意
- 以下示例定义了一个名为
my-eventsource
的事件源,并将指定 Kafka 服务器生成的事件标记为sample-one
事件。 spec.sink
引用了在先决条件中创建的目标函数(Knative 服务)。
apiVersion: events.openfunction.io/v1alpha1 kind: EventSource metadata: name: my-eventsource spec: logLevel: "2" kafka: sample-one: brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092" topic: "events-sample" authRequired: false sink: uri: "http://openfunction.io.svc.cluster.local/default/sink"
- 以下示例定义了一个名为
运行以下命令应用配置文件。
kubectl apply -f eventsource-sink.yaml
运行以下命令检查结果。
$ kubectl get eventsources.events.openfunction.io NAME EVENTBUS SINK STATUS my-eventsource Ready $ kubectl get components NAME AGE serving-8f6md-component-esc-kafka-sample-one-r527t 68m serving-8f6md-component-ts-my-eventsource-default-wz8jt 68m $ kubectl get deployments.apps NAME READY UP-TO-DATE AVAILABLE AGE serving-8f6md-deployment-v100-pg9sd 1/1 1 1 68m
注意
在此示例中触发同步函数,EventSource 控制器的工作流程描述如下:
- 创建一个名为
my-eventsource
的 EventSource 自定义资源。 - 创建一个名为
serving-xxxxx-component-esc-kafka-sample-one-xxxxx
的 Dapr 组件,使 EventSource 能够与事件源关联。 - 创建一个名为
serving-xxxxx-component-ts-my-eventsource-default-xxxxx
的 Dapr 组件,使 EventSource 能够与 sink 函数关联。 - 创建一个名为
serving-xxxxx-deployment-v100-xxxxx-xxxxxxxxxx-xxxxx
的 Deployment,用于处理事件。
- 创建一个名为
创建事件生产者
要启动目标函数,需要创建一些事件来触发函数。
使用以下内容创建一个事件生产者配置文件(例如,
events-producer.yaml
)。apiVersion: core.openfunction.io/v1beta1 kind: Function metadata: name: events-producer spec: version: "v1.0.0" image: openfunctiondev/v1beta1-bindings:latest serving: template: containers: - name: function imagePullPolicy: Always runtime: "async" inputs: - name: cron component: cron outputs: - name: target component: kafka-server operation: "create" bindings: cron: type: bindings.cron version: v1 metadata: - name: schedule value: "@every 2s" kafka-server: type: bindings.kafka version: v1 metadata: - name: brokers value: "kafka-server-kafka-brokers:9092" - name: topics value: "events-sample" - name: consumerGroup value: "bindings-with-output" - name: publishTopic value: "events-sample" - name: authRequired value: "false"
运行以下命令应用配置文件。
kubectl apply -f events-producer.yaml
运行以下命令实时检查结果。
$ kubectl get po --watch NAME READY STATUS RESTARTS AGE serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh 0/2 ContainerCreating 0 1s serving-8f6md-deployment-v100-pg9sd-6666c5577f-4rpdg 2/2 Running 0 23m serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh 0/2 ContainerCreating 0 1s serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh 1/2 Running 0 5s serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh 2/2 Running 0 8s serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 0/2 Pending 0 0s serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 0/2 Pending 0 0s serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 0/2 ContainerCreating 0 0s serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 0/2 ContainerCreating 0 2s serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 1/2 Running 0 4s serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 1/2 Running 0 4s serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 2/2 Running 0 4s
3 - 使用 EventBus 和 Trigger
本文档提供了一个示例,说明如何使用 EventBus 和 Trigger。
先决条件
- 您需要创建一个作为目标函数的函数以被触发。有关更多详细信息,请参见 创建函数。
- 您需要创建一个 Kafka 集群。有关更多详细信息,请参见 创建 Kafka 集群。
部署 NATS 流服务器
运行以下命令部署 NATS 流服务器。本文档使用 nats://nats.default:4222
作为 NATS 流服务器的访问地址,stan
作为集群 ID。有关更多信息,请参见 NATS Streaming (STAN)。
helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm install nats nats/nats
helm install stan nats/stan --set stan.nats.url=nats://nats:4222
创建 OpenFuncAsync 运行时函数
使用以下内容创建目标函数的配置文件(例如,
openfuncasync-function.yaml
),该函数由 Trigger CRD 触发并打印接收到的消息。apiVersion: core.openfunction.io/v1beta2 kind: Function metadata: name: trigger-target spec: version: "v1.0.0" image: openfunctiondev/v1beta1-trigger-target:latest serving: scaleOptions: keda: scaledObject: pollingInterval: 15 minReplicaCount: 0 maxReplicaCount: 10 cooldownPeriod: 30 triggers: - type: stan metadata: natsServerMonitoringEndpoint: "stan.default.svc.cluster.local:8222" queueGroup: "grp1" durableName: "ImDurable" subject: "metrics" lagThreshold: "10" triggers: dapr: - name: eventbus topic: metrics pubsub: eventbus: type: pubsub.natsstreaming version: v1 metadata: - name: natsURL value: "nats://nats.default:4222" - name: natsStreamingClusterID value: "stan" - name: subscriptionType value: "queue" - name: durableSubscriptionName value: "ImDurable" - name: consumerID value: "grp1"
运行以下命令应用配置文件。
kubectl apply -f openfuncasync-function.yaml
创建 EventBus 和 EventSource
使用以下内容创建 EventBus 的配置文件(例如,
eventbus.yaml
)。apiVersion: events.openfunction.io/v1alpha1 kind: EventBus metadata: name: default spec: natsStreaming: natsURL: "nats://nats.default:4222" natsStreamingClusterID: "stan" subscriptionType: "queue" durableSubscriptionName: "ImDurable"
使用以下内容创建 EventSource 的配置文件(例如,
eventsource.yaml
)。注意
通过spec.eventBus
设置事件总线的名称。apiVersion: events.openfunction.io/v1alpha1 kind: EventSource metadata: name: my-eventsource spec: logLevel: "2" eventBus: "default" kafka: sample-two: brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092" topic: "events-sample" authRequired: false
运行以下命令应用这些配置文件。
kubectl apply -f eventbus.yaml kubectl apply -f eventsource.yaml
运行以下命令检查结果。
$ kubectl get eventsources.events.openfunction.io NAME EVENTBUS SINK STATUS my-eventsource default Ready $ kubectl get eventbus.events.openfunction.io NAME AGE default 62m $ kubectl get components NAME AGE serving-9689d-component-ebfes-my-eventsource-cmcbw 46m serving-9689d-component-esc-kafka-sample-two-l99cg 46m serving-dxrhd-component-eventbus-t65q7 13m serving-zwlj4-component-ebft-my-trigger-4925n 100s
注意
在使用事件总线的情况下,EventSource 控制器的工作流程描述如下:
- 创建一个名为
my-eventsource
的 EventSource 自定义资源。 - 检索并重新组织 EventBus 的配置,包括 EventBus 名称(在此示例中为
default
)和与 EventBus 关联的 Dapr 组件的名称。 - 创建一个名为
serving-xxxxx-component-ebfes-my-eventsource-xxxxx
的 Dapr 组件,使 EventSource 能够与事件总线关联。 - 创建一个名为
serving-xxxxx-component-esc-kafka-sample-two-xxxxx
的 Dapr 组件,使 EventSource 能够与事件源关联。 - 创建一个名为
serving-xxxxx-deployment-v100-xxxxx
的 Deployment,用于处理事件。
- 创建一个名为
创建 Trigger
使用以下内容创建 Trigger 的配置文件(例如,
trigger.yaml
)。注意
- 通过
spec.eventBus
设置与 Trigger 关联的事件总线。 - 通过
spec.inputs
设置事件输入源。 - 这是一个简单的触发器,它从名为
default
的 EventBus 中收集事件。当它从 EventSourcemy-eventsource
中检索到一个sample-two
事件时,它触发一个名为function-sample-serving-qrdx8-ksvc-fwml8
的 Knative 服务,并同时将事件发送到事件总线的metrics
主题。
apiVersion: events.openfunction.io/v1alpha1 kind: Trigger metadata: name: my-trigger spec: logLevel: "2" eventBus: "default" inputs: inputDemo: eventSource: "my-eventsource" event: "sample-two" subscribers: - condition: inputDemo topic: "metrics"
- 通过
运行以下命令应用配置文件。
kubectl apply -f trigger.yaml
运行以下命令检查结果。
$ kubectl get triggers.events.openfunction.io NAME EVENTBUS STATUS my-trigger default Ready $ kubectl get eventbus.events.openfunction.io NAME AGE default 62m $ kubectl get components NAME AGE serving-9689d-component-ebfes-my-eventsource-cmcbw 46m serving-9689d-component-esc-kafka-sample-two-l99cg 46m serving-dxrhd-component-eventbus-t65q7 13m serving-zwlj4-component-ebft-my-trigger-4925n 100s
注意
在使用事件总线的情况下,Trigger 控制器的工作流程如下:
- 创建一个名为
my-trigger
的 Trigger 自定义资源。 - 检索并重新组织 EventBus 的配置,包括 EventBus 名称(在此示例中为
default
)和与 EventBus 关联的 Dapr 组件的名称。 - 创建一个名为
serving-xxxxx-component-ebft-my-trigger-xxxxx
的 Dapr 组件,使 Trigger 能够与事件总线关联。 - 创建一个名为
serving-xxxxx-deployment-v100-xxxxx
的 Deployment,用于处理触发任务。
- 创建一个名为
创建事件生产者
使用以下内容创建事件生产者配置文件(例如,
events-producer.yaml
)。apiVersion: core.openfunction.io/v1beta2 kind: Function metadata: name: events-producer spec: version: "v1.0.0" image: openfunctiondev/v1beta1-bindings:latest serving: template: containers: - name: function imagePullPolicy: Always triggers: dapr: - name: cron type: bindings.cron outputs: - dapr: name: kafka-server operation: "create" bindings: cron: type: bindings.cron version: v1 metadata: - name: schedule value: "@every 2s" kafka-server: type: bindings.kafka version: v1 metadata: - name: brokers value: "kafka-server-kafka-brokers:9092" - name: topics value: "events-sample" - name: consumerGroup value: "bindings-with-output" - name: publishTopic value: "events-sample" - name: authRequired value: "false"
运行以下命令应用配置文件。
kubectl apply -f events-producer.yaml
运行以下命令观察目标异步函数的变化。
$ kubectl get functions.core.openfunction.io NAME BUILDSTATE SERVINGSTATE BUILDER SERVING URL AGE trigger-target Skipped Running serving-dxrhd 20m $ kubectl get po --watch NAME READY STATUS RESTARTS AGE serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 0/2 Pending 0 0s serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 0/2 Pending 0 0s serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 0/2 ContainerCreating 0 0s serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 0/2 ContainerCreating 0 2s serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 1/2 Running 0 4s serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 1/2 Running 0 4s serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 2/2 Running 0 4s
4 - 在一个 EventSource 中使用多个源
本文档描述了如何在一个 EventSource 中使用多个源。
先决条件
- 您需要创建一个作为目标函数的函数以被触发。有关更多详细信息,请参见 创建函数。
- 您需要创建一个 Kafka 集群。有关更多详细信息,请参见 创建 Kafka 集群。
在一个 EventSource 中使用多个源
使用以下内容创建一个 EventSource 配置文件(例如,
eventsource-multi.yaml
)。注意
- 以下示例定义了一个名为
my-eventsource
的事件源,并将指定 Kafka 服务器生成的事件标记为sample-three
事件。 spec.sink
引用了目标函数(Knative 服务)。spec.cron
的配置是每5秒触发一次在spec.sink
中定义的函数。
apiVersion: events.openfunction.io/v1alpha1 kind: EventSource metadata: name: my-eventsource spec: logLevel: "2" kafka: sample-three: brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092" topic: "events-sample" authRequired: false cron: sample-three: schedule: "@every 5s" sink: uri: "http://openfunction.io.svc.cluster.local/default/sink"
- 以下示例定义了一个名为
运行以下命令应用配置文件。
kubectl apply -f eventsource-multi.yaml
运行以下命令观察变化。
$ kubectl get eventsources.events.openfunction.io NAME EVENTBUS SINK STATUS my-eventsource Ready $ kubectl get components NAME AGE serving-vqfk5-component-esc-cron-sample-three-dzcpv 35s serving-vqfk5-component-esc-kafka-sample-one-nr9pq 35s serving-vqfk5-component-ts-my-eventsource-default-q6g6m 35s $ kubectl get deployments.apps NAME READY UP-TO-DATE AVAILABLE AGE serving-4x5wh-ksvc-wxbf2-v100-deployment 1/1 1 1 3h14m serving-vqfk5-deployment-v100-vdmvj 1/1 1 1 48s
5 - 使用 ClusterEventBus
本文档描述了如何使用 ClusterEventBus。
先决条件
您已完成 使用 EventBus 和 Trigger 中描述的步骤。
使用 ClusterEventBus
使用以下内容创建一个 ClusterEventBus 配置文件(例如,
clustereventbus.yaml
)。apiVersion: events.openfunction.io/v1alpha1 kind: ClusterEventBus metadata: name: default spec: natsStreaming: natsURL: "nats://nats.default:4222" natsStreamingClusterID: "stan" subscriptionType: "queue" durableSubscriptionName: "ImDurable"
运行以下命令删除 EventBus。
kubectl delete eventbus.events.openfunction.io default
运行以下命令应用配置文件。
kubectl apply -f clustereventbus.yaml
运行以下命令检查结果。
$ kubectl get eventbus.events.openfunction.io No resources found in default namespace. $ kubectl get clustereventbus.events.openfunction.io NAME AGE default 21s
6 - 使用带有条件的 Trigger
本文档描述了如何使用带有条件的触发器。
先决条件
您已完成 使用 EventBus 和 Trigger 中描述的步骤。
使用带有条件的触发器
创建两个事件源
使用以下内容创建一个 EventSource 配置文件(例如,
eventsource-a.yaml
)。apiVersion: events.openfunction.io/v1alpha1 kind: EventSource metadata: name: eventsource-a spec: logLevel: "2" eventBus: "default" kafka: sample-five: brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092" topic: "events-sample" authRequired: false
使用以下内容创建另一个 EventSource 配置文件(例如,
eventsource-b.yaml
)。apiVersion: events.openfunction.io/v1alpha1 kind: EventSource metadata: name: eventsource-b spec: logLevel: "2" eventBus: "default" cron: sample-five: schedule: "@every 5s"
运行以下命令应用这两个配置文件。
kubectl apply -f eventsource-a.yaml kubectl apply -f eventsource-b.yaml
创建带有条件的触发器
使用以下内容创建一个带有
condition
的触发器配置文件(例如,condition-trigger.yaml
)。apiVersion: events.openfunction.io/v1alpha1 kind: Trigger metadata: name: condition-trigger spec: logLevel: "2" eventBus: "default" inputs: eventA: eventSource: "eventsource-a" event: "sample-five" eventB: eventSource: "eventsource-b" event: "sample-five" subscribers: - condition: eventB sink: uri: "http://openfunction.io.svc.cluster.local/default/sink" - condition: eventA && eventB topic: "metrics"
注意
在这个例子中,定义了两个输入源和两个订阅者,它们的触发关系描述如下:
- 当接收到输入
eventB
时,将输入事件发送到 Knative 服务。 - 当接收到输入
eventB
和输入eventA
时,将输入事件同时发送到事件总线的metrics
主题和 Knative 服务。
- 当接收到输入
运行以下命令应用配置文件。
kubectl apply -f condition-trigger.yaml
运行以下命令检查结果。
$ kubectl get eventsources.events.openfunction.io NAME EVENTBUS SINK STATUS eventsource-a default Ready eventsource-b default Ready $ kubectl get triggers.events.openfunction.io NAME EVENTBUS STATUS condition-trigger default Ready $ kubectl get eventbus.events.openfunction.io NAME AGE default 12s
运行以下命令,您可以从输出中看到,由于事件源
eventsource-b
是一个 cron 任务,所以触发器中的eventB
条件匹配,触发了 Knative 服务。$ kubectl get functions.core.openfunction.io NAME BUILDSTATE SERVINGSTATE BUILDER SERVING URL AGE sink Skipped Running serving-4x5wh https://openfunction.io/default/sink 3h25m $ kubectl get po NAME READY STATUS RESTARTS AGE serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-k2jdg 2/2 Running 0 46s
参考 创建事件生产者 创建一个事件生产者。
运行以下命令,您可以从输出中看到,触发器中的
eventA && eventB
条件匹配,事件同时发送到事件总线的metrics
主题。触发了 OpenFuncAsync 函数。$ kubectl get functions.core.openfunction.io NAME BUILDSTATE SERVINGSTATE BUILDER SERVING URL AGE trigger-target Skipped Running serving-7hghp 103s $ kubectl get po NAME READY STATUS RESTARTS AGE serving-7hghp-deployment-v100-z8wrf-946b4854d-svf55 2/2 Running 0 18s