使用 EventBus 和 Trigger

本文档提供了一个示例,说明如何使用 EventBus 和 Trigger。

先决条件

  • 您需要创建一个作为目标函数的函数以被触发。有关更多详细信息,请参见 创建函数
  • 您需要创建一个 Kafka 集群。有关更多详细信息,请参见 创建 Kafka 集群

部署 NATS 流服务器

运行以下命令部署 NATS 流服务器。本文档使用 nats://nats.default:4222 作为 NATS 流服务器的访问地址,stan 作为集群 ID。有关更多信息,请参见 NATS Streaming (STAN)

helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm install nats nats/nats
helm install stan nats/stan --set stan.nats.url=nats://nats:4222

创建 OpenFuncAsync 运行时函数

  1. 使用以下内容创建目标函数的配置文件(例如,openfuncasync-function.yaml),该函数由 Trigger CRD 触发并打印接收到的消息。

    apiVersion: core.openfunction.io/v1beta2
    kind: Function
    metadata:
      name: trigger-target
    spec:
      version: "v1.0.0"
      image: openfunctiondev/v1beta1-trigger-target:latest
      serving:
        scaleOptions:
          keda:
            scaledObject:
              pollingInterval: 15
              minReplicaCount: 0
              maxReplicaCount: 10
              cooldownPeriod: 30
            triggers:
              - type: stan
                metadata:
                  natsServerMonitoringEndpoint: "stan.default.svc.cluster.local:8222"
                  queueGroup: "grp1"
                  durableName: "ImDurable"
                  subject: "metrics"
                  lagThreshold: "10"
        triggers:
          dapr:
            - name: eventbus
              topic: metrics
        pubsub:
          eventbus:
            type: pubsub.natsstreaming
            version: v1
            metadata:
              - name: natsURL
                value: "nats://nats.default:4222"
              - name: natsStreamingClusterID
                value: "stan"
              - name: subscriptionType
                value: "queue"
              - name: durableSubscriptionName
                value: "ImDurable"
              - name: consumerID
                value: "grp1"
    
  2. 运行以下命令应用配置文件。

    kubectl apply -f openfuncasync-function.yaml
    

创建 EventBus 和 EventSource

  1. 使用以下内容创建 EventBus 的配置文件(例如,eventbus.yaml)。

    apiVersion: events.openfunction.io/v1alpha1
    kind: EventBus
    metadata:
      name: default
    spec:
      natsStreaming:
        natsURL: "nats://nats.default:4222"
        natsStreamingClusterID: "stan"
        subscriptionType: "queue"
        durableSubscriptionName: "ImDurable"
    
  2. 使用以下内容创建 EventSource 的配置文件(例如,eventsource.yaml)。

    apiVersion: events.openfunction.io/v1alpha1
    kind: EventSource
    metadata:
      name: my-eventsource
    spec:
      logLevel: "2"
      eventBus: "default"
      kafka:
        sample-two:
          brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092"
          topic: "events-sample"
          authRequired: false
    
  3. 运行以下命令应用这些配置文件。

    kubectl apply -f eventbus.yaml
    kubectl apply -f eventsource.yaml
    
  4. 运行以下命令检查结果。

    $ kubectl get eventsources.events.openfunction.io
    NAME             EVENTBUS   SINK   STATUS
    my-eventsource   default           Ready
    
    $ kubectl get eventbus.events.openfunction.io
    NAME      AGE
    default   62m
    
    $ kubectl get components
    NAME                                                 AGE
    serving-9689d-component-ebfes-my-eventsource-cmcbw   46m
    serving-9689d-component-esc-kafka-sample-two-l99cg   46m
    serving-dxrhd-component-eventbus-t65q7               13m
    serving-zwlj4-component-ebft-my-trigger-4925n        100s
    

创建 Trigger

  1. 使用以下内容创建 Trigger 的配置文件(例如,trigger.yaml)。

    apiVersion: events.openfunction.io/v1alpha1
    kind: Trigger
    metadata:
      name: my-trigger
    spec:
      logLevel: "2"
      eventBus: "default"
      inputs:
        inputDemo:
          eventSource: "my-eventsource"
          event: "sample-two"
      subscribers:
        - condition: inputDemo
          topic: "metrics"
    
  2. 运行以下命令应用配置文件。

    kubectl apply -f trigger.yaml
    
  3. 运行以下命令检查结果。

    $ kubectl get triggers.events.openfunction.io
    NAME         EVENTBUS   STATUS
    my-trigger   default    Ready
    
    $ kubectl get eventbus.events.openfunction.io
    NAME      AGE
    default   62m
    
    $ kubectl get components
    NAME                                                 AGE
    serving-9689d-component-ebfes-my-eventsource-cmcbw   46m
    serving-9689d-component-esc-kafka-sample-two-l99cg   46m
    serving-dxrhd-component-eventbus-t65q7               13m
    serving-zwlj4-component-ebft-my-trigger-4925n        100s
    

创建事件生产者

  1. 使用以下内容创建事件生产者配置文件(例如,events-producer.yaml)。

    apiVersion: core.openfunction.io/v1beta2
    kind: Function
    metadata:
      name: events-producer
    spec:
      version: "v1.0.0"
      image: openfunctiondev/v1beta1-bindings:latest
      serving:
        template:
          containers:
            - name: function
              imagePullPolicy: Always
        triggers:
          dapr:
            - name: cron
              type: bindings.cron
        outputs:
          - dapr:
              name: kafka-server
              operation: "create"
        bindings:
          cron:
            type: bindings.cron
            version: v1
            metadata:
              - name: schedule
                value: "@every 2s"
          kafka-server:
            type: bindings.kafka
            version: v1
            metadata:
              - name: brokers
                value: "kafka-server-kafka-brokers:9092"
              - name: topics
                value: "events-sample"
              - name: consumerGroup
                value: "bindings-with-output"
              - name: publishTopic
                value: "events-sample"
              - name: authRequired
                value: "false"
    
  2. 运行以下命令应用配置文件。

    kubectl apply -f events-producer.yaml
    
  3. 运行以下命令观察目标异步函数的变化。

    $ kubectl get functions.core.openfunction.io
    NAME                                  BUILDSTATE   SERVINGSTATE   BUILDER   SERVING         URL                                   AGE
    trigger-target                        Skipped      Running                  serving-dxrhd                                         20m
    
    $ kubectl get po --watch
    NAME                                                     READY   STATUS              RESTARTS   AGE
    serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm      0/2     Pending             0          0s
    serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm      0/2     Pending             0          0s
    serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm      0/2     ContainerCreating   0          0s
    serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm      0/2     ContainerCreating   0          2s
    serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm      1/2     Running             0          4s
    serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm      1/2     Running             0          4s
    serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm      2/2     Running             0          4s