1 - 创建一个基于 Knative 的函数以通过 Dapr 组件与中间件交互

了解如何创建一个基于 Knative 的函数,通过 Dapr 组件与中间件交互。

本文档描述了如何创建一个基于 Knative 的函数,通过 Dapr 组件与中间件交互。

概述

与异步函数类似,基于 Knative 运行时的函数可以通过 Dapr 组件与中间件交互。本文档使用两个函数,function-frontkafka-input,进行演示。

以下图表说明了这些函数之间的关系。

先决条件

创建 Kafka 服务器和主题

  1. 运行以下命令,在默认命名空间中安装 strimzi-kafka-operator

    helm repo add strimzi https://strimzi.io/charts/
    helm install kafka-operator -n default strimzi/strimzi-kafka-operator
    
  2. 使用以下内容创建一个文件 kafka.yaml

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: kafka-server
      namespace: default
    spec:
      kafka:
        version: 3.3.1
        replicas: 1
        listeners:
          - name: plain
            port: 9092
            type: internal
            tls: false
          - name: tls
            port: 9093
            type: internal
            tls: true
        config:
          offsets.topic.replication.factor: 1
          transaction.state.log.replication.factor: 1
          transaction.state.log.min.isr: 1
          default.replication.factor: 1
          min.insync.replicas: 1
          inter.broker.protocol.version: "3.1"
        storage:
          type: ephemeral
      zookeeper:
        replicas: 1
        storage:
          type: ephemeral
      entityOperator:
        topicOperator: {}
        userOperator: {}
    ---
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaTopic
    metadata:
      name: sample-topic
      namespace: default
      labels:
        strimzi.io/cluster: kafka-server
    spec:
      partitions: 10
      replicas: 1
      config:
        retention.ms: 7200000
        segment.bytes: 1073741824
    
  3. 运行以下命令,在默认命名空间中部署一个名为 kafka-server 的 1 副本 Kafka 服务器和一个名为 sample-topic 的 1 副本 Kafka 主题。

    kubectl apply -f kafka.yaml
    
  4. 运行以下命令,检查 pod 状态,等待 Kafka 和 Zookeeper 启动并运行。

    $ kubectl get po
    NAME                                              READY   STATUS        RESTARTS   AGE
    kafka-server-entity-operator-568957ff84-nmtlw     3/3     Running       0          8m42s
    kafka-server-kafka-0                              1/1     Running       0          9m13s
    kafka-server-zookeeper-0                          1/1     Running       0          9m46s
    strimzi-cluster-operator-687fdd6f77-cwmgm         1/1     Running       0          11m
    
  5. 运行以下命令,查看 Kafka 集群的元数据。

    # 启动一个实用工具 pod。
    $ kubectl run utils --image=arunvelsriram/utils -i --tty --rm
    # 检查 Kafka 集群的元数据。
    $ kafkacat -L -b kafka-server-kafka-brokers:9092
    

创建函数

  1. 使用以下示例 YAML 文件创建一个清单 kafka-input.yaml,并修改 spec.image 的值以设置您自己的镜像仓库地址。字段 spec.serving.inputs 定义了一个指向 Kafka 服务器的 Dapr 组件的输入源。这意味着 kafka-input 函数将由 Kafka 服务器的主题 sample-topic 中的事件驱动。

    apiVersion: core.openfunction.io/v1beta2
    kind: Function
    metadata:
      name: kafka-input
    spec:
      version: "v1.0.0"
      image: <your registry name>/kafka-input:latest
      imageCredentials:
        name: push-secret
      build:
        builder: openfunction/builder-go:latest
        env:
          FUNC_NAME: "HandleKafkaInput"
          FUNC_CLEAR_SOURCE: "true"
        srcRepo:
          url: "https://github.com/OpenFunction/samples.git"
          sourceSubPath: "functions/async/bindings/kafka-input"
          revision: "main"
      serving:
        scaleOptions:
          minReplicas: 0
          maxReplicas: 10
          keda:
            triggers:
            - type: kafka
              metadata:
                topic: sample-topic
                bootstrapServers: kafka-server-kafka-brokers.default.svc:9092
                consumerGroup: kafka-input
                lagThreshold: "20"
            scaledObject:
              pollingInterval: 15
              cooldownPeriod: 60
              advanced:
                horizontalPodAutoscalerConfig:
                  behavior:
                    scaleDown:
                      stabilizationWindowSeconds: 45
                      policies:
                      - type: Percent
                        value: 50
                        periodSeconds: 15
                    scaleUp:
                      stabilizationWindowSeconds: 0
    
        triggers:
          dapr:
          - name: target-topic
            type: bindings.kafka
        bindings:
          target-topic:
            type: bindings.kafka
            version: v1
            metadata:
              - name: brokers
                value: "kafka-server-kafka-brokers:9092"
              - name: topics
                value: "sample-topic"
              - name: consumerGroup
                value: "kafka-input"
              - name: publishTopic
                value: "sample-topic"
              - name: authRequired
                value: "false"
        template:
          containers:
            - name: function
              imagePullPolicy: Always
    
  2. 运行以下命令创建函数 kafka-input

    kubectl apply -f kafka-input.yaml
    
  3. 使用以下示例 YAML 文件创建一个清单 function-front.yaml,并修改 spec.image 的值以设置您自己的镜像仓库地址。

       apiVersion: core.openfunction.io/v1beta2
       kind: Function
       metadata:
         name: function-front
       spec:
         version: "v1.0.0"
         image: "<your registry name>/sample-knative-dapr:latest"
         imageCredentials:
           name: push-secret
         build:
           builder: openfunction/builder-go:latest
           env:
             FUNC_NAME: "ForwardToKafka"
             FUNC_CLEAR_SOURCE: "true"
           srcRepo:
             url: "https://github.com/OpenFunction/samples.git"
             sourceSubPath: "functions/knative/with-output-binding"
             revision: "main"
         serving:
            hooks:
              pre:
                - plugin-custom
                - plugin-example
              post:
                - plugin-example
                - plugin-custom
           scaleOptions:
             minReplicas: 0
             maxReplicas: 5
           outputs:
             - dapr:
                 name: kafka-server
                 operation: "create"
           bindings:
             kafka-server:
               type: bindings.kafka
               version: v1
               metadata:
                 - name: brokers
                   value: "kafka-server-kafka-brokers:9092"
                 - name: authRequired
                   value: "false"
                 - name: publishTopic
                   value: "sample-topic"
                 - name: topics
                   value: "sample-topic"
                 - name: consumerGroup
                   value: "function-front"
           template:
             containers:
               - name: function
                 imagePullPolicy: Always
    
  4. 在清单中,spec.serving.outputs 定义了一个指向 Kafka 服务器的 Dapr 组件的输出。这使您可以在 function-front 函数中向输出 target 发送自定义内容。

    func Sender(ctx ofctx.Context, in []byte) (ofctx.Out, error) {
      ...
     _, err := ctx.Send("target", greeting)
     ...
    }
    
  5. 运行以下命令创建函数 function-front

    kubectl apply -f function-front.yaml
    

检查结果

  1. 运行以下命令查看函数的状态。

    $ kubectl get functions.core.openfunction.io
    
    NAME             BUILDSTATE   SERVINGSTATE   BUILDER         SERVING         URL                                             AGE
    function-front   Succeeded    Running        builder-bhbtk   serving-vc6jw   https://openfunction.io/default/function-front   2m41s
    kafka-input      Succeeded    Running        builder-dprfd   serving-75vrt                                                   2m21s
    
  2. 运行以下命令在集群中创建一个用于访问函数的 pod。

    kubectl run curl --image=radial/busyboxplus:curl -i --tty --rm
    
  3. 运行以下命令通过 URL 访问函数。

    [ root@curl:/ ]$ curl -d '{"message":"Awesome OpenFunction!"}' -H "Content-Type: application/json" -X POST http://openfunction.io.svc.cluster.local/default/function-front
    
  4. 运行以下命令查看 function-front 的日志。

    kubectl logs -f \
      $(kubectl get po -l \
      openfunction.io/serving=$(kubectl get functions function-front -o jsonpath='{.status.serving.resourceRef}') \
      -o jsonpath='{.items[0].metadata.name}') \
      function
    

    输出如下所示。

    dapr client initializing for: 127.0.0.1:50001
    I0125 06:51:55.584973       1 framework.go:107] Plugins for pre-hook stage:
    I0125 06:51:55.585044       1 framework.go:110] - plugin-custom
    I0125 06:51:55.585052       1 framework.go:110] - plugin-example
    I0125 06:51:55.585057       1 framework.go:115] Plugins for post-hook stage:
    I0125 06:51:55.585062       1 framework.go:118] - plugin-custom
    I0125 06:51:55.585067       1 framework.go:118] - plugin-example
    I0125 06:51:55.585179       1 knative.go:46] Knative Function serving http: listening on port 8080
    2022/01/25 06:52:02 http - Data: {"message":"Awesome OpenFunction!"}
    I0125 06:52:02.246450       1 plugin-example.go:83] the sum is: 2
    
  5. 运行以下命令查看 kafka-input 的日志。

    kubectl logs -f \
      $(kubectl get po -l \
      openfunction.io/serving=$(kubectl get functions kafka-input -o jsonpath='{.status.serving.resourceRef}') \
      -o jsonpath='{.items[0].metadata.name}') \
      function
    

    输出如下所示。

    dapr client initializing for: 127.0.0.1:50001
    I0125 06:35:28.332381       1 framework.go:107] Plugins for pre-hook stage:
    I0125 06:35:28.332863       1 framework.go:115] Plugins for post-hook stage:
    I0125 06:35:28.333749       1 async.go:39] Async Function serving grpc: listening on port 8080
    message from Kafka '{Awesome OpenFunction!}'
    

2 - 使用 SkyWalking 为 OpenFunction 提供可观测能力

本文介绍了如何使用 SkyWalking 为 OpenFunction 构建可观测性解决方案。

功能概览

尽管 FaaS 允许开发者专注于他们的业务代码而不用担心底层的实现,但对函数服务进行功能调试和故障排除是很困难的。因此,OpenFunction 设法引入了可观测性能力来提高其可用性和稳定性。

SkyWalking 提供了在许多不同场景下观测和监控分布式系统的解决方案。OpenFunction 将 go2sky(SkyWalking 的 Go 语言代理)捆绑在 OpenFunction 追踪器选项中,以提供分布式追踪、函数性能统计和函数依赖关系图。

先决条件

追踪功能的可配置参数

下表描述了 OpenFunction 中目前可用的追踪参数。

名称描述示例
enabled使能追踪功能,默认为 falsetrue, false
provider.name可以设置为 skywalkingopentelemetry(待定)skywalking
provider.oapServerSkyWalking OAP 服务器地址skywalking-opa:11800
tags一组键值对,用于追踪中的追踪所使用的 Span 自定义标签
tags.func函数的名称,该值将被自动填充function-a
tags.layer表示被追踪的服务类型,当你使用该函数时,它应该被设置为 faasfaas
baggage一个键值对的集合,存在于追踪中,也需要跨进程边界传输

下面是一个 JSON 格式的配置参考,您可以基于此了解追踪配置的大致数据格式。

{
  "enabled": true,
  "provider": {
    "name": "skywalking",
    "oapServer": "skywalking-oap:11800"
  },
  "tags": {
    "func": "function-a",
    "layer": "faas",
    "tag1": "value1",
    "tag2": "value2"
  },
  "baggage": {
    "key": "key1",
    "value": "value1"
  }
}

启用 OpenFunction 的追踪功能

选项一:全局配置

下文使用 skywalking-oap.default:11800 作为集群中 skywalking-oap 服务的样例地址。

  1. 运行下面的命令,修改 openfunction 命名空间中的 ConfigMap openfunction-config

    kubectl edit configmap openfunction-config -n openfunction
    
  2. 参照下面的例子修改 data.plugins.tracing 中的内容,并保存这个修改。

    data:
      plugins.tracing: |
        enabled: true
        provider:
          name: "skywalking"
          oapServer: "skywalking-oap:11800"
        tags:
          func: tracing-function
          layer: faas
          tag1: value1
          tag2: value2
        baggage:
          key: "key1"
          value: "value1"    
    

选项二:函数级别配置

要在函数级别启用跟踪配置,请在函数清单的 metadata.annotations 下添加 plugins.tracing 字段,如下所示。

metadata:
  name: tracing-function
  annotations:
    plugins.tracing: |
      enabled: true
      provider:
        name: "skywalking"
        oapServer: "skywalking-oap:11800"
      tags:
        func: tracing-function
        layer: faas
        tag1: value1
        tag2: value2
      baggage:
        key: "key1"
        value: "value1"      

建议您使用全局跟踪配置,否则您必须为您创建的每个函数逐一添加函数级跟踪配置。

使用 SkyWalking 作为分布式追踪解决方案

  1. 通过参考 本文档 创建函数。

  2. 然后,您可以在 SkyWalking UI 界面上观察整个函数调用的链路。

  3. 您还可以比较 Knative 运行时函数(function-front)在运行状态和冷启动时的响应时间。

    在冷启动时:

    在运行状态下:

3 - Elastic 日志告警

学习如何创建一个异步函数来查找错误日志。

本文档描述了如何创建一个异步函数来查找错误日志。

概述

本文档使用一个异步函数来分析 Kafka 中的日志流,以找出错误日志。异步函数将然后向 Slack 发送告警。以下图表说明了整个工作流程。

先决条件

创建 Kafka 服务器和主题

  1. 运行以下命令在默认命名空间中安装 strimzi-kafka-operator

    helm repo add strimzi https://strimzi.io/charts/
    helm install kafka-operator -n default strimzi/strimzi-kafka-operator
    
  2. 使用以下内容创建一个文件 kafka.yaml

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: kafka-logs-receiver
      namespace: default
    spec:
      kafka:
        version: 3.3.1
        replicas: 1
        listeners:
          - name: plain
            port: 9092
            type: internal
            tls: false
          - name: tls
            port: 9093
            type: internal
            tls: true
        config:
          offsets.topic.replication.factor: 1
          transaction.state.log.replication.factor: 1
          transaction.state.log.min.isr: 1
          default.replication.factor: 1
          min.insync.replicas: 1
          inter.broker.protocol.version: "3.1"
        storage:
          type: ephemeral
      zookeeper:
        replicas: 1
        storage:
          type: ephemeral
      entityOperator:
        topicOperator: {}
        userOperator: {}
    ---
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaTopic
    metadata:
      name: logs
      namespace: default
      labels:
        strimzi.io/cluster: kafka-logs-receiver
    spec:
      partitions: 10
      replicas: 1
      config:
        retention.ms: 7200000
        segment.bytes: 1073741824
    
  3. 运行以下命令在默认命名空间中部署一个名为 kafka-logs-receiver 的 1 副本 Kafka 服务器和一个名为 logs 的 1 副本 Kafka 主题。

    kubectl apply -f kafka.yaml
    
  4. 运行以下命令检查 pod 状态,等待 Kafka 和 Zookeeper 启动并运行。

    $ kubectl get po
    NAME                                                     READY   STATUS        RESTARTS   AGE
    kafka-logs-receiver-entity-operator-57dc457ccc-tlqqs     3/3     Running       0          8m42s
    kafka-logs-receiver-kafka-0                              1/1     Running       0          9m13s
    kafka-logs-receiver-zookeeper-0                          1/1     Running       0          9m46s
    strimzi-cluster-operator-687fdd6f77-cwmgm                1/1     Running       0          11m
    
  5. 运行以下命令查看 Kafka 集群的元数据。

    # 启动一个实用工具 pod。
    $ kubectl run utils --image=arunvelsriram/utils -i --tty --rm
    # 检查 Kafka 集群的元数据。
    $ kafkacat -L -b kafka-logs-receiver-kafka-brokers:9092
    

创建日志处理函数

  1. 使用以下示例 YAML 文件创建一个清单 logs-handler-function.yaml,并修改 spec.image 的值以设置您自己的镜像仓库地址。
   apiVersion: core.openfunction.io/v1beta2
   kind: Function
   metadata:
     name: logs-async-handler
     namespace: default
   spec:
     build:
       builder: openfunction/builder-go:latest
       env:
         FUNC_CLEAR_SOURCE: "true"
         FUNC_NAME: LogsHandler
       srcRepo:
         revision: main
         sourceSubPath: functions/async/logs-handler-function/
         url: https://github.com/OpenFunction/samples.git
     image: openfunctiondev/logs-async-handler:v1
     imageCredentials:
       name: push-secret
     serving:
       bindings:
         kafka-receiver:
           metadata:
             - name: brokers
               value: kafka-server-kafka-brokers:9092
             - name: authRequired
               value: "false"
             - name: publishTopic
               value: logs
             - name: topics
               value: logs
             - name: consumerGroup
               value: logs-handler
           type: bindings.kafka
           version: v1
         notification-manager:
           metadata:
             - name: url
               value: http://notification-manager-svc.kubesphere-monitoring-system.svc.cluster.local:19093/api/v2/alerts
           type: bindings.http
           version: v1
       outputs:
         - dapr:
             name: notification-manager
             operation: post
             type: bindings.http
       scaleOptions:
         keda:
           scaledObject:
             advanced:
               horizontalPodAutoscalerConfig:
                 behavior:
                   scaleDown:
                     policies:
                       - periodSeconds: 15
                         type: Percent
                         value: 50
                     stabilizationWindowSeconds: 45
                   scaleUp:
                     stabilizationWindowSeconds: 0
             cooldownPeriod: 60
             pollingInterval: 15
           triggers:
             - metadata:
                 bootstrapServers: kafka-server-kafka-brokers.default.svc.cluster.local:9092
                 consumerGroup: logs-handler
                 lagThreshold: "20"
                 topic: logs
               type: kafka
         maxReplicas: 10
         minReplicas: 0
       template:
         containers:
           - imagePullPolicy: IfNotPresent
             name: function
       triggers:
         dapr:
           - name: kafka-receiver
             type: bindings.kafka
       workloadType: Deployment
     version: v2.0.0
     workloadRuntime: OCIContainer
  1. 运行以下命令创建函数 logs-async-handler

    kubectl apply -f logs-handler-function.yaml
    
  2. 日志处理函数将由 Kafka 中 logs 主题的消息触发。