This is the multi-page printable view of this section. Click here to print.
OpenFunction Events
1 - Introduction
Overview
OpenFunction Events is OpenFunction’s event management framework. It provides the following core features:
- Support for triggering target functions by synchronous and asynchronous calls
- User-defined trigger judgment logic
- The components of OpenFunction Events can be driven by OpenFunction itself
Architecture
The following diagram illustrates the architecture of OpenFunction Events.
Concepts
EventSource
EventSource defines the producer of an event, such as a Kafka service, an object storage service, and even a function. It contains descriptions of these event producers and information about where to send these events.
EventSource supports the following types of event source server:
- Kafka
- Cron (scheduler)
- Redis
EventBus (ClusterEventBus)
EventBus is responsible for aggregating events and making them persistent. It contains descriptions of an event bus broker that usually is a message queue (such as NATS Streaming and Kafka), and provides these configurations for EventSource and Trigger.
EventBus handles event bus adaptation for namespace scope by default. For cluster scope, ClusterEventBus is available as an event bus adapter and takes effect when other components cannot find an EventBus under a namespace.
EventBus supports the following event bus broker:
- NATS Streaming
Trigger
Trigger is an abstraction of the purpose of an event, such as what needs to be done when a message is received. It contains the purpose of an event defined by you, which tells the trigger which EventSource it should fetch the event from and subsequently determine whether to trigger the target function according to the given conditions.
Reference
For more information, see EventSource Specifications and EventBus Specifications.
2 - Use EventSource
This document gives an example of how to use an event source to trigger a synchronous function.
In this example, an EventSource is defined for synchronous invocation to use the event source (a Kafka server) as an input bindings of a function (a Knative service). When the event source generates an event, it will invoke the function and get a synchronous return through the spec.sink
configuration.
Create a Function
Use the following content to create a function as the EventSource Sink. For more information about how to create a function, see Create sync functions.
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
name: sink
spec:
version: "v1.0.0"
image: "openfunction/sink-sample:latest"
serving:
template:
containers:
- name: function
imagePullPolicy: Always
triggers:
http:
port: 8080
After the function is created, run the following command to get the URL of the function.
Note
In the URL of the function, theopenfunction
is the name of the Kubernetes Service and the io
is the namespace where the Kubernetes Service runs. For more information, see Namespaces of Services.$ kubectl get functions.core.openfunction.io
NAME BUILDSTATE SERVINGSTATE BUILDER SERVING URL AGE
sink Skipped Running serving-4x5wh https://openfunction.io/default/sink 13s
Create a Kafka Cluster
Run the following commands to install strimzi-kafka-operator in the default namespace.
helm repo add strimzi https://strimzi.io/charts/ helm install kafka-operator -n default strimzi/strimzi-kafka-operator
Use the following content to create a file
kafka.yaml
.apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: kafka-server namespace: default spec: kafka: version: 3.3.1 replicas: 1 listeners: - name: plain port: 9092 type: internal tls: false - name: tls port: 9093 type: internal tls: true config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 default.replication.factor: 1 min.insync.replicas: 1 inter.broker.protocol.version: "3.1" storage: type: ephemeral zookeeper: replicas: 1 storage: type: ephemeral entityOperator: topicOperator: {} userOperator: {} --- apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: events-sample namespace: default labels: strimzi.io/cluster: kafka-server spec: partitions: 10 replicas: 1 config: retention.ms: 7200000 segment.bytes: 1073741824
Run the following command to deploy a 1-replica Kafka server named
kafka-server
and 1-replica Kafka topic namedevents-sample
in the default namespace. The Kafka and Zookeeper clusters created by this command have a storage type of ephemeral and are demonstrated using emptyDir.kubectl apply -f kafka.yaml
Run the following command to check pod status and wait for Kafka and Zookeeper to be up and running.
$ kubectl get po NAME READY STATUS RESTARTS AGE kafka-server-entity-operator-568957ff84-nmtlw 3/3 Running 0 8m42s kafka-server-kafka-0 1/1 Running 0 9m13s kafka-server-zookeeper-0 1/1 Running 0 9m46s strimzi-cluster-operator-687fdd6f77-cwmgm 1/1 Running 0 11m
Run the following command to view the metadata of the Kafka cluster.
kafkacat -L -b kafka-server-kafka-brokers:9092
Trigger a Synchronous Function
Create an EventSource
Use the following content to create an EventSource configuration file (for example,
eventsource-sink.yaml
).Note
- The following example defines an event source named
my-eventsource
and mark the events generated by the specified Kafka server assample-one
events. spec.sink
references the target function (Knative service) created in the prerequisites.
apiVersion: events.openfunction.io/v1alpha1 kind: EventSource metadata: name: my-eventsource spec: logLevel: "2" kafka: sample-one: brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092" topic: "events-sample" authRequired: false sink: uri: "http://openfunction.io.svc.cluster.local/default/sink"
- The following example defines an event source named
Run the following command to apply the configuration file.
kubectl apply -f eventsource-sink.yaml
Run the following commands to check the results.
$ kubectl get eventsources.events.openfunction.io NAME EVENTBUS SINK STATUS my-eventsource Ready $ kubectl get components NAME AGE serving-8f6md-component-esc-kafka-sample-one-r527t 68m serving-8f6md-component-ts-my-eventsource-default-wz8jt 68m $ kubectl get deployments.apps NAME READY UP-TO-DATE AVAILABLE AGE serving-8f6md-deployment-v100-pg9sd 1/1 1 1 68m
Note
In this example of triggering a synchronous function, the workflow of the EventSource controller is described as follows:
- Create an EventSource custom resource named
my-eventsource
. - Create a Dapr component named
serving-xxxxx-component-esc-kafka-sample-one-xxxxx
to enable the EventSource to associate with the event source. - Create a Dapr component named
serving-xxxxx-component-ts-my-eventsource-default-xxxxx
enable the EventSource to associate with the sink function. - Create a Deployment named
serving-xxxxx-deployment-v100-xxxxx-xxxxxxxxxx-xxxxx
for processing events.
- Create an EventSource custom resource named
Create an event producer
To start the target function, you need to create some events to trigger the function.
Use the following content to create an event producer configuration file (for example,
events-producer.yaml
).apiVersion: core.openfunction.io/v1beta1 kind: Function metadata: name: events-producer spec: version: "v1.0.0" image: openfunctiondev/v1beta1-bindings:latest serving: template: containers: - name: function imagePullPolicy: Always runtime: "async" inputs: - name: cron component: cron outputs: - name: target component: kafka-server operation: "create" bindings: cron: type: bindings.cron version: v1 metadata: - name: schedule value: "@every 2s" kafka-server: type: bindings.kafka version: v1 metadata: - name: brokers value: "kafka-server-kafka-brokers:9092" - name: topics value: "events-sample" - name: consumerGroup value: "bindings-with-output" - name: publishTopic value: "events-sample" - name: authRequired value: "false"
Run the following command to apply the configuration file.
kubectl apply -f events-producer.yaml
Run the following command to check the results in real time.
$ kubectl get po --watch NAME READY STATUS RESTARTS AGE serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh 0/2 ContainerCreating 0 1s serving-8f6md-deployment-v100-pg9sd-6666c5577f-4rpdg 2/2 Running 0 23m serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh 0/2 ContainerCreating 0 1s serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh 1/2 Running 0 5s serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh 2/2 Running 0 8s serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 0/2 Pending 0 0s serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 0/2 Pending 0 0s serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 0/2 ContainerCreating 0 0s serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 0/2 ContainerCreating 0 2s serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 1/2 Running 0 4s serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 1/2 Running 0 4s serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 2/2 Running 0 4s
3 - Use EventBus and Trigger
This document gives an example of how to use EventBus and Trigger.
Prerequisites
- You need to create a function as the target function to be triggered. Please refer to Create a function for more details.
- You need to create a Kafka cluster. Please refer to Create a Kafka cluster for more details.
Deploy an NATS streaming server
Run the following commands to deploy an NATS streaming server. This document uses nats://nats.default:4222
as the access address of the NATS streaming server and stan
as the cluster ID. For more information, see NATS Streaming (STAN).
helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm install nats nats/nats
helm install stan nats/stan --set stan.nats.url=nats://nats:4222
Create an OpenFuncAsync Runtime Function
Use the following content to create a configuration file (for example,
openfuncasync-function.yaml
) for the target function, which is triggered by the Trigger CRD and prints the received message.apiVersion: core.openfunction.io/v1beta2 kind: Function metadata: name: trigger-target spec: version: "v1.0.0" image: openfunctiondev/v1beta1-trigger-target:latest serving: scaleOptions: keda: scaledObject: pollingInterval: 15 minReplicaCount: 0 maxReplicaCount: 10 cooldownPeriod: 30 triggers: - type: stan metadata: natsServerMonitoringEndpoint: "stan.default.svc.cluster.local:8222" queueGroup: "grp1" durableName: "ImDurable" subject: "metrics" lagThreshold: "10" triggers: dapr: - name: eventbus topic: metrics pubsub: eventbus: type: pubsub.natsstreaming version: v1 metadata: - name: natsURL value: "nats://nats.default:4222" - name: natsStreamingClusterID value: "stan" - name: subscriptionType value: "queue" - name: durableSubscriptionName value: "ImDurable" - name: consumerID value: "grp1"
Run the following command to apply the configuration file.
kubectl apply -f openfuncasync-function.yaml
Create an EventBus and an EventSource
Use the following content to create a configuration file (for example,
eventbus.yaml
) for an EventBus.apiVersion: events.openfunction.io/v1alpha1 kind: EventBus metadata: name: default spec: natsStreaming: natsURL: "nats://nats.default:4222" natsStreamingClusterID: "stan" subscriptionType: "queue" durableSubscriptionName: "ImDurable"
Use the following content to create a configuration file (for example,
eventsource.yaml
) for an EventSource.Note
Set the name of the event bus throughspec.eventBus
.apiVersion: events.openfunction.io/v1alpha1 kind: EventSource metadata: name: my-eventsource spec: logLevel: "2" eventBus: "default" kafka: sample-two: brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092" topic: "events-sample" authRequired: false
Run the following commands to apply these configuration files.
kubectl apply -f eventbus.yaml kubectl apply -f eventsource.yaml
Run the following commands to check the results.
$ kubectl get eventsources.events.openfunction.io NAME EVENTBUS SINK STATUS my-eventsource default Ready $ kubectl get eventbus.events.openfunction.io NAME AGE default 6m53s $ kubectl get components NAME AGE serving-6r5dl-component-eventbus-jlpqf 11m serving-9689d-component-ebfes-my-eventsource-cmcbw 6m57s serving-9689d-component-esc-kafka-sample-two-l99cg 6m57s serving-k6zw8-component-cron-9x8hl 61m serving-k6zw8-component-kafka-server-sjrzs 61m $ kubectl get deployments.apps NAME READY UP-TO-DATE AVAILABLE AGE serving-6r5dl-deployment-v100-m7nq2 0/0 0 0 12m serving-9689d-deployment-v100-5qdvk 1/1 1 1 7m17s
Note
In the case of using the event bus, the workflow of the EventSource controller is described as follows:
- Create an EventSource custom resource named
my-eventsource
. - Retrieve and reorganize the configuration of the EventBus, including the EventBus name (
default
in this example) and the name of the Dapr component associated with the EventBus. - Create a Dapr component named
serving-xxxxx-component-ebfes-my-eventsource-xxxxx
to enable the EventSource to associate with the event bus. - Create a Dapr component named
serving-xxxxx-component-esc-kafka-sample-two-xxxxx
to enable the EventSource to associate with the event source. - Create a Deployment named
serving-xxxxx-deployment-v100-xxxxx
for processing events.
- Create an EventSource custom resource named
Create a Trigger
Use the following content to create a configuration file (for example,
trigger.yaml
) for a Trigger.Note
- Set the event bus associated with the Trigger through
spec.eventBus
. - Set the event input source through
spec.inputs
. - This is a simple trigger that collects events from the EventBus named
default
. When it retrieves asample-two
event from the EventSourcemy-eventsource
, it triggers a Knative service namedfunction-sample-serving-qrdx8-ksvc-fwml8
and sends the event to the topicmetrics
of the event bus at the same time.
apiVersion: events.openfunction.io/v1alpha1 kind: Trigger metadata: name: my-trigger spec: logLevel: "2" eventBus: "default" inputs: inputDemo: eventSource: "my-eventsource" event: "sample-two" subscribers: - condition: inputDemo topic: "metrics"
- Set the event bus associated with the Trigger through
Run the following command to apply the configuration file.
kubectl apply -f trigger.yaml
Run the following commands to check the results.
$ kubectl get triggers.events.openfunction.io NAME EVENTBUS STATUS my-trigger default Ready $ kubectl get eventbus.events.openfunction.io NAME AGE default 62m $ kubectl get components NAME AGE serving-9689d-component-ebfes-my-eventsource-cmcbw 46m serving-9689d-component-esc-kafka-sample-two-l99cg 46m serving-dxrhd-component-eventbus-t65q7 13m serving-zwlj4-component-ebft-my-trigger-4925n 100s
Note
In the case of using the event bus, the workflow of the Trigger controller is as follows:
- Create a Trigger custom resource named
my-trigger
. - Retrieve and reorganize the configuration of the EventBus, including the EventBus name (
default
in this example) and the name of the Dapr component associated with the EventBus. - Create a Dapr component named
serving-xxxxx-component-ebft-my-trigger-xxxxx
to enable the Trigger to associatie with the event bus. - Create a Deployment named
serving-xxxxx-deployment-v100-xxxxx
for processing trigger tasks.
- Create a Trigger custom resource named
Create an Event Producer
Use the following content to create an event producer configuration file (for example,
events-producer.yaml
).apiVersion: core.openfunction.io/v1beta2 kind: Function metadata: name: events-producer spec: version: "v1.0.0" image: openfunctiondev/v1beta1-bindings:latest serving: template: containers: - name: function imagePullPolicy: Always triggers: dapr: - name: cron type: bindings.cron outputs: - dapr: name: kafka-server operation: "create" bindings: cron: type: bindings.cron version: v1 metadata: - name: schedule value: "@every 2s" kafka-server: type: bindings.kafka version: v1 metadata: - name: brokers value: "kafka-server-kafka-brokers:9092" - name: topics value: "events-sample" - name: consumerGroup value: "bindings-with-output" - name: publishTopic value: "events-sample" - name: authRequired value: "false"
Run the following command to apply the configuration file.
kubectl apply -f events-producer.yaml
Run the following commands to observe changes of the target asynchronous function.
$ kubectl get functions.core.openfunction.io NAME BUILDSTATE SERVINGSTATE BUILDER SERVING URL AGE trigger-target Skipped Running serving-dxrhd 20m $ kubectl get po --watch NAME READY STATUS RESTARTS AGE serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 0/2 Pending 0 0s serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 0/2 Pending 0 0s serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 0/2 ContainerCreating 0 0s serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 0/2 ContainerCreating 0 2s serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 1/2 Running 0 4s serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 1/2 Running 0 4s serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 2/2 Running 0 4s
4 - Use Multiple Sources in One EventSource
This document describes how to use multiple sources in one EventSource.
Prerequisites
- You need to create a function as the target function to be triggered. Please refer to Create a function for more details.
- You need to create a Kafka cluster. Please refer to Create a Kafka cluster for more details.
Use Multiple Sources in One EventSource
Use the following content to create an EventSource configuration file (for example,
eventsource-multi.yaml
).Note
- The following example defines an event source named
my-eventsource
and mark the events generated by the specified Kafka server assample-three
. spec.sink
references the target function (Knative service).- The configuration of
spec.cron
is to trigger the function defined inspec.sink
every 5 seconds.
apiVersion: events.openfunction.io/v1alpha1 kind: EventSource metadata: name: my-eventsource spec: logLevel: "2" kafka: sample-three: brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092" topic: "events-sample" authRequired: false cron: sample-three: schedule: "@every 5s" sink: uri: "http://openfunction.io.svc.cluster.local/default/sink"
- The following example defines an event source named
Run the following command to apply the configuration file.
kubectl apply -f eventsource-multi.yaml
Run the following commands to observe changes.
$ kubectl get eventsources.events.openfunction.io NAME EVENTBUS SINK STATUS my-eventsource Ready $ kubectl get components NAME AGE serving-vqfk5-component-esc-cron-sample-three-dzcpv 35s serving-vqfk5-component-esc-kafka-sample-one-nr9pq 35s serving-vqfk5-component-ts-my-eventsource-default-q6g6m 35s $ kubectl get deployments.apps NAME READY UP-TO-DATE AVAILABLE AGE serving-4x5wh-ksvc-wxbf2-v100-deployment 1/1 1 1 3h14m serving-vqfk5-deployment-v100-vdmvj 1/1 1 1 48s
5 - Use ClusterEventBus
This document describes how to use a ClusterEventBus.
Prerequisites
You have finished the steps described in Use EventBus and Trigger.
Use a ClusterEventBus
Use the following content to create a ClusterEventBus configuration file (for example,
clustereventbus.yaml
).apiVersion: events.openfunction.io/v1alpha1 kind: ClusterEventBus metadata: name: default spec: natsStreaming: natsURL: "nats://nats.default:4222" natsStreamingClusterID: "stan" subscriptionType: "queue" durableSubscriptionName: "ImDurable"
Run the following command to delete EventBus.
kubectl delete eventbus.events.openfunction.io default
Run the following command to apply the configuration file.
kubectl apply -f clustereventbus.yaml
Run the following commands to check the results.
$ kubectl get eventbus.events.openfunction.io No resources found in default namespace. $ kubectl get clustereventbus.events.openfunction.io NAME AGE default 21s
6 - Use Trigger Conditions
This document describes how to use Trigger conditions.
Prerequisites
You have finished the steps described in Use EventBus and Trigger.
Use Trigger Conditions
Create two event sources
Use the following content to create an EventSource configuration file (for example,
eventsource-a.yaml
).apiVersion: events.openfunction.io/v1alpha1 kind: EventSource metadata: name: eventsource-a spec: logLevel: "2" eventBus: "default" kafka: sample-five: brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092" topic: "events-sample" authRequired: false
Use the following content to create another EventSource configuration file (for example,
eventsource-b.yaml
).apiVersion: events.openfunction.io/v1alpha1 kind: EventSource metadata: name: eventsource-b spec: logLevel: "2" eventBus: "default" cron: sample-five: schedule: "@every 5s"
Run the following commands to apply these two configuration files.
kubectl apply -f eventsource-a.yaml kubectl apply -f eventsource-b.yaml
Create a trigger with condition
Use the following content to create a configuration file (for example,
condition-trigger.yaml
) for a Trigger withcondition
.apiVersion: events.openfunction.io/v1alpha1 kind: Trigger metadata: name: condition-trigger spec: logLevel: "2" eventBus: "default" inputs: eventA: eventSource: "eventsource-a" event: "sample-five" eventB: eventSource: "eventsource-b" event: "sample-five" subscribers: - condition: eventB sink: uri: "http://openfunction.io.svc.cluster.local/default/sink" - condition: eventA && eventB topic: "metrics"
Note
In this example, two input sources and two subscribers are defined, and their triggering relationship is described as follows:
- When input
eventB
is received, the input event is sent to the Knative service. - When input
eventB
and inputeventA
are received, the input event is sent to the metrics topic of the event bus and sent to the Knative service at the same time.
- When input
Run the following commands to apply the configuration files.
kubectl apply -f condition-trigger.yaml
Run the following commands to check the results.
$ kubectl get eventsources.events.openfunction.io NAME EVENTBUS SINK STATUS eventsource-a default Ready eventsource-b default Ready $ kubectl get triggers.events.openfunction.io NAME EVENTBUS STATUS condition-trigger default Ready $ kubectl get eventbus.events.openfunction.io NAME AGE default 12s
Run the following command and you can see from the output that the
eventB
condition in the Trigger is matched and the Knative service is triggered because the event sourceeventsource-b
is a cron task.$ kubectl get functions.core.openfunction.io NAME BUILDSTATE SERVINGSTATE BUILDER SERVING URL AGE sink Skipped Running serving-4x5wh https://openfunction.io/default/sink 3h25m $ kubectl get po NAME READY STATUS RESTARTS AGE serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-k2jdg 2/2 Running 0 46s
Create an event producer by referring to Create an Event Producer.
Run the following command and you can see from the output that the
eventA && eventB
condition in the Trigger is matched and the event is sent to themetrics
topic of the event bus at the same time. The OpenFuncAsync function is triggered.$ kubectl get functions.core.openfunction.io NAME BUILDSTATE SERVINGSTATE BUILDER SERVING URL AGE trigger-target Skipped Running serving-7hghp 103s $ kubectl get po NAME READY STATUS RESTARTS AGE serving-7hghp-deployment-v100-z8wrf-946b4854d-svf55 2/2 Running 0 18s