1 - 简介

概述

OpenFunction 事件是 OpenFunction 的事件管理框架。它提供以下核心特性:

  • 支持通过同步和异步调用触发目标函数
  • 用户定义的触发判断逻辑
  • OpenFunction 事件的组件可以由 OpenFunction 本身驱动

架构

以下图示说明了 OpenFunction 事件的架构。

openfunction-events

概念

EventSource

EventSource 定义了事件的生产者,例如 Kafka 服务,对象存储服务,甚至是函数。它包含了这些事件生产者的描述和发送这些事件的信息。

EventSource 支持以下类型的事件源服务器:

  • Kafka
  • Cron(调度器)
  • Redis

EventBus(ClusterEventBus)

EventBus 负责聚合事件并使其持久化。它包含了一个通常是消息队列(如 NATS Streaming 和 Kafka)的事件总线 broker 的描述,并为 EventSource 和 Trigger 提供这些配置。

EventBus 默认处理命名空间范围内的事件总线适配。对于集群范围,ClusterEventBus 可作为事件总线适配器,并在其他组件无法找到命名空间下的 EventBus 时生效。

EventBus 支持以下事件总线 broker:

  • NATS Streaming

Trigger

Trigger 是事件目的的抽象,例如接收到消息时需要做什么。它包含了由您定义的事件的目的,这告诉触发器它应该从哪个 EventSource 获取事件,并根据给定的条件决定是否触发目标函数。

参考

有关更多信息,请参阅 EventSource 规范EventBus 规范

2 - 使用 EventSource

本文档提供了一个示例,说明如何使用事件源触发同步函数。

在此示例中,定义了一个 EventSource,用于同步调用,使用事件源(一个 Kafka 服务器)作为函数(一个 Knative 服务)的输入绑定。当事件源生成事件时,它将通过 spec.sink 配置调用函数并获取同步返回。

创建函数

使用以下内容创建一个函数作为 EventSource Sink。有关如何创建函数的更多信息,请参见 创建同步函数

apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
  name: sink
spec:
  version: "v1.0.0"
  image: "openfunction/sink-sample:latest"
  serving:
    template:
      containers:
        - name: function
          imagePullPolicy: Always
    triggers:
      http:
        port: 8080

创建函数后,运行以下命令获取函数的 URL。

$ kubectl get functions.core.openfunction.io
NAME   BUILDSTATE   SERVINGSTATE   BUILDER   SERVING         URL                                   AGE
sink   Skipped      Running                  serving-4x5wh   https://openfunction.io/default/sink   13s

创建 Kafka 集群

  1. 运行以下命令在默认命名空间中安装 strimzi-kafka-operator

    helm repo add strimzi https://strimzi.io/charts/
    helm install kafka-operator -n default strimzi/strimzi-kafka-operator
    
  2. 使用以下内容创建一个文件 kafka.yaml

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: kafka-server
      namespace: default
    spec:
      kafka:
        version: 3.3.1
        replicas: 1
        listeners:
          - name: plain
            port: 9092
            type: internal
            tls: false
          - name: tls
            port: 9093
            type: internal
            tls: true
        config:
          offsets.topic.replication.factor: 1
          transaction.state.log.replication.factor: 1
          transaction.state.log.min.isr: 1
          default.replication.factor: 1
          min.insync.replicas: 1
          inter.broker.protocol.version: "3.1"
        storage:
          type: ephemeral
      zookeeper:
        replicas: 1
        storage:
          type: ephemeral
      entityOperator:
        topicOperator: {}
        userOperator: {}
    ---
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaTopic
    metadata:
      name: events-sample
      namespace: default
      labels:
        strimzi.io/cluster: kafka-server
    spec:
      partitions: 10
      replicas: 1
      config:
        retention.ms: 7200000
        segment.bytes: 1073741824
    
  3. 运行以下命令在默认命名空间中部署一个名为 kafka-server 的 1-replica Kafka 服务器和一个名为 events-sample 的 1-replica Kafka 主题。此命令创建的 Kafka 和 Zookeeper 集群的存储类型为 ephemeral,并使用 emptyDir 进行演示。

    kubectl apply -f kafka.yaml
    
  4. 运行以下命令检查 pod 状态,并等待 Kafka 和 Zookeeper 启动并运行。

    $ kubectl get po
    NAME                                              READY   STATUS        RESTARTS   AGE
    kafka-server-entity-operator-568957ff84-nmtlw     3/3     Running       0          8m42s
    kafka-server-kafka-0                              1/1     Running       0          9m13s
    kafka-server-zookeeper-0                          1/1     Running       0          9m46s
    strimzi-cluster-operator-687fdd6f77-cwmgm         1/1     Running       0          11m
    
  5. 运行以下命令查看 Kafka 集群的元数据。

    kafkacat -L -b kafka-server-kafka-brokers:9092
    

触发同步函数

创建 EventSource

  1. 使用以下内容创建一个 EventSource 配置文件(例如,eventsource-sink.yaml)。

    apiVersion: events.openfunction.io/v1alpha1
    kind: EventSource
    metadata:
      name: my-eventsource
    spec:
      logLevel: "2"
      kafka:
        sample-one:
          brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092"
          topic: "events-sample"
          authRequired: false
      sink:
        uri: "http://openfunction.io.svc.cluster.local/default/sink"
    
  2. 运行以下命令应用配置文件。

    kubectl apply -f eventsource-sink.yaml
    
  3. 运行以下命令检查结果。

    $ kubectl get eventsources.events.openfunction.io
    NAME             EVENTBUS   SINK   STATUS
    my-eventsource                     Ready
    
    $ kubectl get components
    NAME                                                      AGE
    serving-8f6md-component-esc-kafka-sample-one-r527t        68m
    serving-8f6md-component-ts-my-eventsource-default-wz8jt   68m
    
    $ kubectl get deployments.apps
    NAME                                           READY   UP-TO-DATE   AVAILABLE   AGE
    serving-8f6md-deployment-v100-pg9sd            1/1     1            1           68m
    

创建事件生产者

要启动目标函数,需要创建一些事件来触发函数。

  1. 使用以下内容创建一个事件生产者配置文件(例如,events-producer.yaml)。

    apiVersion: core.openfunction.io/v1beta1
    kind: Function
    metadata:
      name: events-producer
    spec:
      version: "v1.0.0"
      image: openfunctiondev/v1beta1-bindings:latest
      serving:
        template:
          containers:
            - name: function
              imagePullPolicy: Always
        runtime: "async"
        inputs:
          - name: cron
            component: cron
        outputs:
          - name: target
            component: kafka-server
            operation: "create"
        bindings:
          cron:
            type: bindings.cron
            version: v1
            metadata:
              - name: schedule
                value: "@every 2s"
          kafka-server:
            type: bindings.kafka
            version: v1
            metadata:
              - name: brokers
                value: "kafka-server-kafka-brokers:9092"
              - name: topics
                value: "events-sample"
              - name: consumerGroup
                value: "bindings-with-output"
              - name: publishTopic
                value: "events-sample"
              - name: authRequired
                value: "false"
    
  2. 运行以下命令应用配置文件。

    kubectl apply -f events-producer.yaml
    
  3. 运行以下命令实时检查结果。

    $ kubectl get po --watch
    NAME                                                           READY   STATUS              RESTARTS   AGE
    serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh            0/2     ContainerCreating   0          1s
    serving-8f6md-deployment-v100-pg9sd-6666c5577f-4rpdg           2/2     Running             0          23m
    serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh            0/2     ContainerCreating   0          1s
    serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh            1/2     Running             0          5s
    serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh            2/2     Running             0          8s
    serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk      0/2     Pending             0          0s
    serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk      0/2     Pending             0          0s
    serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk      0/2     ContainerCreating   0          0s
    serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk      0/2     ContainerCreating   0          2s
    serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk      1/2     Running             0          4s
    serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk      1/2     Running             0          4s
    serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk      2/2     Running             0          4s
    

3 - 使用 EventBus 和 Trigger

本文档提供了一个示例,说明如何使用 EventBus 和 Trigger。

先决条件

  • 您需要创建一个作为目标函数的函数以被触发。有关更多详细信息,请参见 创建函数
  • 您需要创建一个 Kafka 集群。有关更多详细信息,请参见 创建 Kafka 集群

部署 NATS 流服务器

运行以下命令部署 NATS 流服务器。本文档使用 nats://nats.default:4222 作为 NATS 流服务器的访问地址,stan 作为集群 ID。有关更多信息,请参见 NATS Streaming (STAN)

helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm install nats nats/nats
helm install stan nats/stan --set stan.nats.url=nats://nats:4222

创建 OpenFuncAsync 运行时函数

  1. 使用以下内容创建目标函数的配置文件(例如,openfuncasync-function.yaml),该函数由 Trigger CRD 触发并打印接收到的消息。

    apiVersion: core.openfunction.io/v1beta2
    kind: Function
    metadata:
      name: trigger-target
    spec:
      version: "v1.0.0"
      image: openfunctiondev/v1beta1-trigger-target:latest
      serving:
        scaleOptions:
          keda:
            scaledObject:
              pollingInterval: 15
              minReplicaCount: 0
              maxReplicaCount: 10
              cooldownPeriod: 30
            triggers:
              - type: stan
                metadata:
                  natsServerMonitoringEndpoint: "stan.default.svc.cluster.local:8222"
                  queueGroup: "grp1"
                  durableName: "ImDurable"
                  subject: "metrics"
                  lagThreshold: "10"
        triggers:
          dapr:
            - name: eventbus
              topic: metrics
        pubsub:
          eventbus:
            type: pubsub.natsstreaming
            version: v1
            metadata:
              - name: natsURL
                value: "nats://nats.default:4222"
              - name: natsStreamingClusterID
                value: "stan"
              - name: subscriptionType
                value: "queue"
              - name: durableSubscriptionName
                value: "ImDurable"
              - name: consumerID
                value: "grp1"
    
  2. 运行以下命令应用配置文件。

    kubectl apply -f openfuncasync-function.yaml
    

创建 EventBus 和 EventSource

  1. 使用以下内容创建 EventBus 的配置文件(例如,eventbus.yaml)。

    apiVersion: events.openfunction.io/v1alpha1
    kind: EventBus
    metadata:
      name: default
    spec:
      natsStreaming:
        natsURL: "nats://nats.default:4222"
        natsStreamingClusterID: "stan"
        subscriptionType: "queue"
        durableSubscriptionName: "ImDurable"
    
  2. 使用以下内容创建 EventSource 的配置文件(例如,eventsource.yaml)。

    apiVersion: events.openfunction.io/v1alpha1
    kind: EventSource
    metadata:
      name: my-eventsource
    spec:
      logLevel: "2"
      eventBus: "default"
      kafka:
        sample-two:
          brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092"
          topic: "events-sample"
          authRequired: false
    
  3. 运行以下命令应用这些配置文件。

    kubectl apply -f eventbus.yaml
    kubectl apply -f eventsource.yaml
    
  4. 运行以下命令检查结果。

    $ kubectl get eventsources.events.openfunction.io
    NAME             EVENTBUS   SINK   STATUS
    my-eventsource   default           Ready
    
    $ kubectl get eventbus.events.openfunction.io
    NAME      AGE
    default   62m
    
    $ kubectl get components
    NAME                                                 AGE
    serving-9689d-component-ebfes-my-eventsource-cmcbw   46m
    serving-9689d-component-esc-kafka-sample-two-l99cg   46m
    serving-dxrhd-component-eventbus-t65q7               13m
    serving-zwlj4-component-ebft-my-trigger-4925n        100s
    

创建 Trigger

  1. 使用以下内容创建 Trigger 的配置文件(例如,trigger.yaml)。

    apiVersion: events.openfunction.io/v1alpha1
    kind: Trigger
    metadata:
      name: my-trigger
    spec:
      logLevel: "2"
      eventBus: "default"
      inputs:
        inputDemo:
          eventSource: "my-eventsource"
          event: "sample-two"
      subscribers:
        - condition: inputDemo
          topic: "metrics"
    
  2. 运行以下命令应用配置文件。

    kubectl apply -f trigger.yaml
    
  3. 运行以下命令检查结果。

    $ kubectl get triggers.events.openfunction.io
    NAME         EVENTBUS   STATUS
    my-trigger   default    Ready
    
    $ kubectl get eventbus.events.openfunction.io
    NAME      AGE
    default   62m
    
    $ kubectl get components
    NAME                                                 AGE
    serving-9689d-component-ebfes-my-eventsource-cmcbw   46m
    serving-9689d-component-esc-kafka-sample-two-l99cg   46m
    serving-dxrhd-component-eventbus-t65q7               13m
    serving-zwlj4-component-ebft-my-trigger-4925n        100s
    

创建事件生产者

  1. 使用以下内容创建事件生产者配置文件(例如,events-producer.yaml)。

    apiVersion: core.openfunction.io/v1beta2
    kind: Function
    metadata:
      name: events-producer
    spec:
      version: "v1.0.0"
      image: openfunctiondev/v1beta1-bindings:latest
      serving:
        template:
          containers:
            - name: function
              imagePullPolicy: Always
        triggers:
          dapr:
            - name: cron
              type: bindings.cron
        outputs:
          - dapr:
              name: kafka-server
              operation: "create"
        bindings:
          cron:
            type: bindings.cron
            version: v1
            metadata:
              - name: schedule
                value: "@every 2s"
          kafka-server:
            type: bindings.kafka
            version: v1
            metadata:
              - name: brokers
                value: "kafka-server-kafka-brokers:9092"
              - name: topics
                value: "events-sample"
              - name: consumerGroup
                value: "bindings-with-output"
              - name: publishTopic
                value: "events-sample"
              - name: authRequired
                value: "false"
    
  2. 运行以下命令应用配置文件。

    kubectl apply -f events-producer.yaml
    
  3. 运行以下命令观察目标异步函数的变化。

    $ kubectl get functions.core.openfunction.io
    NAME                                  BUILDSTATE   SERVINGSTATE   BUILDER   SERVING         URL                                   AGE
    trigger-target                        Skipped      Running                  serving-dxrhd                                         20m
    
    $ kubectl get po --watch
    NAME                                                     READY   STATUS              RESTARTS   AGE
    serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm      0/2     Pending             0          0s
    serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm      0/2     Pending             0          0s
    serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm      0/2     ContainerCreating   0          0s
    serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm      0/2     ContainerCreating   0          2s
    serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm      1/2     Running             0          4s
    serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm      1/2     Running             0          4s
    serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm      2/2     Running             0          4s
    

4 - 在一个 EventSource 中使用多个源

本文档描述了如何在一个 EventSource 中使用多个源。

先决条件

  • 您需要创建一个作为目标函数的函数以被触发。有关更多详细信息,请参见 创建函数
  • 您需要创建一个 Kafka 集群。有关更多详细信息,请参见 创建 Kafka 集群

在一个 EventSource 中使用多个源

  1. 使用以下内容创建一个 EventSource 配置文件(例如,eventsource-multi.yaml)。

    apiVersion: events.openfunction.io/v1alpha1
    kind: EventSource
    metadata:
      name: my-eventsource
    spec:
      logLevel: "2"
      kafka:
        sample-three:
          brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092"
          topic: "events-sample"
          authRequired: false
      cron:
        sample-three:
          schedule: "@every 5s"
      sink:
        uri: "http://openfunction.io.svc.cluster.local/default/sink"
    
  2. 运行以下命令应用配置文件。

    kubectl apply -f eventsource-multi.yaml
    
  3. 运行以下命令观察变化。

    $ kubectl get eventsources.events.openfunction.io
    NAME             EVENTBUS   SINK   STATUS
    my-eventsource                     Ready
    
    $ kubectl get components
    NAME                                                      AGE
    serving-vqfk5-component-esc-cron-sample-three-dzcpv       35s
    serving-vqfk5-component-esc-kafka-sample-one-nr9pq        35s
    serving-vqfk5-component-ts-my-eventsource-default-q6g6m   35s
    
    $ kubectl get deployments.apps
    NAME                                       READY   UP-TO-DATE   AVAILABLE   AGE
    serving-4x5wh-ksvc-wxbf2-v100-deployment   1/1     1            1           3h14m
    serving-vqfk5-deployment-v100-vdmvj        1/1     1            1           48s
    

5 - 使用 ClusterEventBus

本文档描述了如何使用 ClusterEventBus。

先决条件

您已完成 使用 EventBus 和 Trigger 中描述的步骤。

使用 ClusterEventBus

  1. 使用以下内容创建一个 ClusterEventBus 配置文件(例如,clustereventbus.yaml)。

    apiVersion: events.openfunction.io/v1alpha1
    kind: ClusterEventBus
    metadata:
      name: default
    spec:
      natsStreaming:
        natsURL: "nats://nats.default:4222"
        natsStreamingClusterID: "stan"
        subscriptionType: "queue"
        durableSubscriptionName: "ImDurable"
    
  2. 运行以下命令删除 EventBus。

    kubectl delete eventbus.events.openfunction.io default
    
  3. 运行以下命令应用配置文件。

    kubectl apply -f clustereventbus.yaml
    
  4. 运行以下命令检查结果。

    $ kubectl get eventbus.events.openfunction.io
    No resources found in default namespace.
    
    $ kubectl get clustereventbus.events.openfunction.io
    NAME      AGE
    default   21s
    

6 - 使用带有条件的 Trigger

本文档描述了如何使用带有条件的触发器。

先决条件

您已完成 使用 EventBus 和 Trigger 中描述的步骤。

使用带有条件的触发器

创建两个事件源

  1. 使用以下内容创建一个 EventSource 配置文件(例如,eventsource-a.yaml)。

    apiVersion: events.openfunction.io/v1alpha1
    kind: EventSource
    metadata:
      name: eventsource-a
    spec:
      logLevel: "2"
      eventBus: "default"
      kafka:
        sample-five:
          brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092"
          topic: "events-sample"
          authRequired: false
    
  2. 使用以下内容创建另一个 EventSource 配置文件(例如,eventsource-b.yaml)。

    apiVersion: events.openfunction.io/v1alpha1
    kind: EventSource
    metadata:
      name: eventsource-b
    spec:
      logLevel: "2"
      eventBus: "default"
      cron:
        sample-five:
          schedule: "@every 5s"
    
  3. 运行以下命令应用这两个配置文件。

    kubectl apply -f eventsource-a.yaml
    kubectl apply -f eventsource-b.yaml
    

创建带有条件的触发器

  1. 使用以下内容创建一个带有 condition 的触发器配置文件(例如,condition-trigger.yaml)。

    apiVersion: events.openfunction.io/v1alpha1
    kind: Trigger
    metadata:
      name: condition-trigger
    spec:
      logLevel: "2"
      eventBus: "default"
      inputs:
        eventA:
          eventSource: "eventsource-a"
          event: "sample-five"
        eventB:
          eventSource: "eventsource-b"
          event: "sample-five"
      subscribers:
      - condition: eventB
        sink:
          uri: "http://openfunction.io.svc.cluster.local/default/sink"
      - condition: eventA && eventB
        topic: "metrics"
    
  2. 运行以下命令应用配置文件。

    kubectl apply -f condition-trigger.yaml
    
  3. 运行以下命令检查结果。

    $ kubectl get eventsources.events.openfunction.io
    NAME            EVENTBUS   SINK   STATUS
    eventsource-a   default           Ready
    eventsource-b   default           Ready
    
    $ kubectl get triggers.events.openfunction.io
    NAME                EVENTBUS   STATUS
    condition-trigger   default    Ready
    
    $ kubectl get eventbus.events.openfunction.io
    NAME      AGE
    default   12s
    
  4. 运行以下命令,您可以从输出中看到,由于事件源 eventsource-b 是一个 cron 任务,所以触发器中的 eventB 条件匹配,触发了 Knative 服务。

    $ kubectl get functions.core.openfunction.io
    NAME                                  BUILDSTATE   SERVINGSTATE   BUILDER   SERVING         URL                                   AGE
    sink                                  Skipped      Running                  serving-4x5wh   https://openfunction.io/default/sink   3h25m
    
    $ kubectl get po
    NAME                                                        READY   STATUS    RESTARTS   AGE
    serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-k2jdg   2/2     Running   0          46s
    
  5. 参考 创建事件生产者 创建一个事件生产者。

  6. 运行以下命令,您可以从输出中看到,触发器中的 eventA && eventB 条件匹配,事件同时发送到事件总线的 metrics 主题。触发了 OpenFuncAsync 函数。

    $ kubectl get functions.core.openfunction.io
    NAME                                  BUILDSTATE   SERVINGSTATE   BUILDER   SERVING         URL                                   AGE
    trigger-target                        Skipped      Running                  serving-7hghp                                         103s
    
    $ kubectl get po
    NAME                                                        READY   STATUS    RESTARTS   AGE
    serving-7hghp-deployment-v100-z8wrf-946b4854d-svf55         2/2     Running   0          18s