1 - 函数定义

函数

FunctionBuildServing的控制平面,也是用户使用OpenFunction的接口。用户无需单独创建BuildServing,因为Function是定义函数的BuildServing的唯一地方。

一旦创建了函数,它将控制BuildServing的生命周期,无需用户干预:

  • 如果在函数中定义了Build,则一旦部署函数,将创建一个builder自定义资源来构建函数的容器镜像。

  • 如果在函数中定义了Serving,则将创建一个serving自定义资源来控制函数的服务和自动缩放。

  • BuildServing可以一起定义,这意味着首先将构建函数镜像,然后将其用于服务。

  • 可以定义Build而不定义Serving,在这种情况下,函数仅用于构建镜像。

  • 可以定义Serving而不定义Build,函数将使用先前构建的函数镜像进行服务。

构建

OpenFunction使用ShipwrightCloud Native Buildpacks将函数源代码构建成容器镜像。

一旦创建了包含Build规范的函数,就会创建一个builder自定义资源,该资源将使用Shipwright管理构建工具和策略。然后,Shipwright将使用Tekton控制构建容器镜像的过程,包括获取源代码、生成镜像工件和发布镜像。

服务

一旦创建了包含Serving规范的函数,就会创建一个Serving自定义资源来控制函数的服务阶段。目前,OpenFunction Serving支持两种运行时:Knative同步运行时和OpenFunction异步运行时。

同步运行时

对于同步函数,OpenFunction目前支持使用 Knative Serving 和 KEDA http-addon 作为运行时。

异步运行时

OpenFunction的异步运行时是一个基于KEDADapr实现的事件驱动运行时。异步函数可以由各种事件类型触发,如消息队列、cronjob和MQTT等。

2 - 函数构建

目前,OpenFunction支持使用Cloud Native Buildpacks构建函数镜像,无需创建Dockerfile

同时,您也可以使用OpenFunction通过Dockerfile构建无服务器应用

通过定义构建部分来构建函数

您可以从git仓库的源代码或本地存储的源代码构建函数或应用。

从git仓库的源代码构建函数

您可以通过在Function定义中添加一个构建部分来构建函数镜像。如果还定义了服务部分,那么构建完成后将立即启动函数。

apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
  name: logs-async-handler
spec:
  version: "v2.0.0"
  image: openfunctiondev/logs-async-handler:v1
  imageCredentials:
    name: push-secret
  build:
    builder: openfunction/builder-go:latest
    env:
      FUNC_NAME: "LogsHandler"
      FUNC_CLEAR_SOURCE: "true"
      ## 自定义函数框架版本,目前仅对functions-framework-go有效
      ## 通常情况下,您不需要这样做,因为构建器会自带最新的函数框架
      # FUNC_FRAMEWORK_VERSION: "v0.4.0"
      ## 使用FUNC_GOPROXY设置goproxy
      # FUNC_GOPROXY: "https://goproxy.cn"
    srcRepo:
      url: "https://github.com/OpenFunction/samples.git"
      sourceSubPath: "functions/async/logs-handler-function/"
      revision: "main"

要将函数镜像推送到容器仓库,您需要创建一个包含仓库凭证的秘密,并将该秘密添加到imageCredentials。 您可以参考先决条件以获取更多信息。

从本地源代码构建函数

要从本地源代码构建函数或应用,您需要将本地源代码打包成容器镜像,并将此镜像推送到容器仓库。

假设您的源代码在samples目录中,您可以使用以下Dockerfile来构建源代码包镜像。

FROM scratch
WORKDIR /
COPY samples samples/

然后,您可以像这样构建源代码包镜像:

docker build -t <your registry name>/sample-source-code:latest -f </path/to/the/dockerfile> .
docker push <your registry name>/sample-source-code:latest

建议使用空镜像scratch作为构建源代码包镜像的基础镜像,非空基础镜像可能会导致源代码复制失败。
与为git仓库方法定义spec.build.srcRepo.url字段不同,您需要定义spec.build.srcRepo.bundleContainer.image字段。

apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
  name: logs-async-handler
spec:
  build:
    srcRepo:
      bundleContainer:
        image: openfunctiondev/sample-source-code:latest
      sourceSubPath: "/samples/functions/async/logs-handler-function/"

sourceSubPath是源代码包镜像中源代码的绝对路径。

使用pack CLI构建函数

通常,直接从本地源代码构建函数镜像是必要的,特别是出于调试目的或用于离线环境。您可以使用pack CLI来实现这一点。

Pack是由Cloud Native Buildpacks项目维护的一个工具,用于支持使用buildpacks。它启用了以下功能:

  • 使用buildpacks构建应用。
  • 重新基准使用buildpacks创建的应用镜像。
  • 创建生态系统内使用的各种组件。

按照这里的说明安装pack CLI工具。 您可以在这里找到更多关于如何使用pack CLI的详细信息。

要从本地源代码构建OpenFunction函数镜像,您可以按照以下步骤操作:

下载函数示例

git clone https://github.com/OpenFunction/samples.git
cd samples/functions/knative/hello-world-go

使用pack CLI构建函数镜像

pack build func-helloworld-go --builder openfunction/builder-go:v2.4.0-1.17 --env FUNC_NAME="HelloWorld"  --env FUNC_CLEAR_SOURCE=true

本地启动函数镜像

docker run --rm --env="FUNC_CONTEXT={\"name\":\"HelloWorld\",\"version\":\"v1.0.0\",\"port\":\"8080\",\"runtime\":\"Knative\"}" --env="CONTEXT_MODE=self-host" --name func-helloworld-go -p 8080:8080 func-helloworld-go

访问函数

curl http://localhost:8080

输出示例:

hello, world!

OpenFunction构建器

要使用Cloud Native Buildpacks构建函数镜像,需要一个构建器镜像。

在这里,您可以找到OpenFunction社区维护的流行语言的构建器:

构建器
Goopenfunction/builder-go:v2.4.0 (openfunction/builder-go:latest)
Nodejsopenfunction/builder-node:v2-16.15 (openfunction/builder-node:latest)
Javaopenfunction/builder-java:v2-11, openfunction/builder-java:v2-16, openfunction/builder-java:v2-17, openfunction/builder-java:v2-18
Pythonopenfunction/gcp-builder:v1
DotNetopenfunction/gcp-builder:v1

3 - 构建策略

构建策略用于控制构建过程。有两种类型的策略,ClusterBuildStrategyBuildStrategy。 这两种策略都定义了一组步骤,这些步骤是控制应用程序构建过程所必需的。

ClusterBuildStrategy 是集群范围的,而 BuildStrategy 是命名空间范围的。

在 OpenFunction 中有 4 种内置的 ClusterBuildStrategy,你可以在以下各节中找到更多详细信息。

openfunction

openfunction ClusterBuildStrategy 使用 Buildpacks 来构建函数镜像,这是默认的构建策略。

以下是 openfunction ClusterBuildStrategy 的参数:

名称类型描述
RUN_IMAGEstring使用的运行镜像的引用
CACHE_IMAGEstring缓存镜像是一种在不同构建之间保留缓存层的方式,当构建具有大量依赖项的函数或应用程序(如 Java 函数)时,可以提高构建性能。
BASH_IMAGEstring策略使用的 bash 镜像。
ENV_VARSstring构建时 设置的环境变量。格式为 key1=value1,key2=value2

用户可以像这样设置这些参数:

apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
  name: logs-async-handler
spec:
  build:
    shipwright:
      params:
        RUN_IMAGE: ""
        ENV_VARS: ""

buildah

buildah ClusterBuildStrategy 使用 buildah 来构建应用程序镜像。

要使用 buildah ClusterBuildStrategy,你可以定义一个 Function,如下所示:

apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
  name: sample-go-app
  namespace: default
spec:
  build:
    builder: openfunction/buildah:v1.23.1
    dockerfile: Dockerfile
    shipwright:
      strategy:
        kind: ClusterBuildStrategy
        name: buildah

以下是 buildah ClusterBuildStrategy 的参数:

名称类型描述默认值
registry-searchstring用于搜索短名称镜像(如 golang:latest)的镜像仓库,用逗号分隔。docker.io,quay.io
registry-insecurestring不安全镜像仓库的全名。不安全的镜像仓库是没有有效 SSL 证书或只支持 HTTP 的镜像仓库。
registry-blockstring需要阻止拉取访问的镜像仓库。""

kaniko

kaniko ClusterBuildStrategy 使用 kaniko 来构建应用程序镜像。

要使用 kaniko ClusterBuildStrategy,你可以定义一个 Function,如下所示:

apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
  name: function-kaniko
  namespace: default
spec:
  build:
    builder: openfunction/kaniko-executor:v1.7.0
    dockerfile: Dockerfile
    shipwright:
      strategy:
        kind: ClusterBuildStrategy
        name: kaniko

ko

ko ClusterBuildStrategy 使用 ko 来构建 Go 应用程序镜像。

要使用 ko ClusterBuildStrategy,你可以定义一个 Function,如下所示:

apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
  name: function-ko
  namespace: default
spec:
  build:
    builder: golang:1.17
    dockerfile: Dockerfile
    shipwright:
      strategy:
        kind: ClusterBuildStrategy
        name: ko

以下是 ko ClusterBuildStrategy 的参数:

名称类型描述默认值
go-flagsstringValue for the GOFLAGS environment variable.""
ko-versionstringVersion of ko, must be either ’latest’, or a release name from https://github.com/google/ko/releases.""
package-directorystringThe directory inside the context directory containing the main package.“.”

自定义策略

用户可以自定义他们自己的策略。要自定义策略,你可以参考 这里

4 - 函数触发器

函数 触发器 用于定义如何触发函数。目前有两种触发器:HTTP 触发器Dapr 触发器。默认的触发器是 HTTP 触发器

HTTP 触发器

HTTP 触发器 通过 HTTP 请求触发函数。你可以像这样为函数定义一个 HTTP 触发器

apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
  name: function-sample
spec:
  serving:
    triggers:
      http:
        port: 8080
        route:
          rules:
            - matches:
                - path:
                    type: PathPrefix
                    value: /echo

Dapr 触发器

Dapr 触发器 通过 Dapr bindingsDapr pubsub 的事件触发函数。你可以像这样定义一个带有 Dapr 触发器 的函数:

apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
  name: logs-async-handler
  namespace: default
spec:
  serving:
    bindings:
      kafka-receiver:
        metadata:
          - name: brokers
            value: kafka-server-kafka-brokers:9092
          - name: authRequired
            value: "false"
          - name: publishTopic
            value: logs
          - name: topics
            value: logs
          - name: consumerGroup
            value: logs-handler
        type: bindings.kafka
        version: v1
    triggers:
      dapr:
        - name: kafka-receiver
          type: bindings.kafka

函数输入

输入 是函数可以从中获取额外输入数据的地方,目前支持 Dapr 状态存储 作为 输入

你可以像这样定义函数输入:

apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
  name: logs-async-handler
  namespace: default
spec:
  serving:
    triggers:
      inputs:
        - dapr:
            name: mysql
            type: state.mysql

5 - 函数输出

函数输出

输出是函数可以发送数据的组件,包括:

例如,这里 你可以找到一个带有 cron 输入绑定和 Kafka 输出绑定的异步函数:

apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
  name: cron-input-kafka-output
spec:
  ...
  serving:
    ...
    outputs:
      - dapr:
          name: kafka-server
          type: bindings.kafka
          operation: "create"
    bindings:
      kafka-server:
        type: bindings.kafka
        version: v1
        metadata:
        - name: brokers
          value: "kafka-server-kafka-brokers:9092"
        - name: topics
          value: "sample-topic"
        - name: consumerGroup
          value: "bindings-with-output"
        - name: publishTopic
          value: "sample-topic"
        - name: authRequired
          value: "false"

这里 是另一个使用 Kafka Pub/sub 组件作为输入的异步函数示例

apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
  name: autoscaling-subscriber
spec:
  ...
  serving:
    ...
    runtime: "async"
    outputs:
      - dapr:
          name: kafka-server
          type: pubsub.kafka
          topic: "sample-topic"
    pubsub:
      kafka-server:
        type: pubsub.kafka
        version: v1
        metadata:
          - name: brokers
            value: "kafka-server-kafka-brokers:9092"
          - name: authRequired
            value: "false"
          - name: allowedTopics
            value: "sample-topic"
          - name: consumerID
            value: "autoscaling-subscriber"

6 - 函数缩放

缩放是 FaaS 或 Serverless 平台的核心特性之一。

OpenFunction 在 ScaleOptions 中定义了函数缩放,并在 Triggers 中定义了激活函数缩放的触发器。

ScaleOptions

您可以为同步和异步函数定义统一的函数缩放选项,如下所示,这将对同步和异步函数都有效:

apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
  name: function-sample
spec:
  serving:
    scaleOptions:
      minReplicas: 0
      maxReplicas: 10

通常,仅定义 minReplicas 和 maxReplicas 对于异步函数来说是不够的。您可以像下面这样为异步函数定义单独的缩放选项,这将覆盖 minReplicas 和 maxReplicas。

您可以在 KEDA ScaleObject SpecKEDA ScaledJob Spec 中找到更多关于异步函数缩放选项的详细信息。

apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
  name: function-sample
spec:
  serving:
    scaleOptions:
      minReplicas: 0
      maxReplicas: 10
      keda:
        scaledObject:
          pollingInterval: 15
          cooldownPeriod: 60
          advanced:
            horizontalPodAutoscalerConfig:
              behavior:
                scaleDown:
                  stabilizationWindowSeconds: 45
                  policies:
                  - type: Percent
                    value: 50
                    periodSeconds: 15
                scaleUp:
                  stabilizationWindowSeconds: 0

您也可以为 Knative 同步函数设置高级缩放选项,这将覆盖 minReplicas 和 maxReplicas。

您可以在 这里 找到更多关于 Knative 同步函数缩放选项的详细信息。

apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
  name: function-sample
spec:
  serving:
    scaleOptions:
      knative:
        autoscaling.knative.dev/initial-scale: "1"
        autoscaling.knative.dev/scale-down-delay: "0"
        autoscaling.knative.dev/window: "60s"
        autoscaling.knative.dev/panic-window-percentage: "10.0"
        autoscaling.knative.dev/metric: "concurrency"
        autoscaling.knative.dev/target: "100"

触发器

触发器定义了如何激活异步函数的函数缩放。您可以使用所有 KEDA 缩放器 中定义的触发器作为 OpenFunction 的触发器规范。

同步函数的缩放是通过 HTTP 请求的各种选项激活的,这些选项已经在前面的 ScaleOption 部分中定义了。

apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
  name: function-sample
spec:
  serving:
    scaleOptions:
      keda:
        triggers:
          - type: kafka
            metadata:
              topic: logs
              bootstrapServers: kafka-server-kafka-brokers.default.svc.cluster.local:9092
              consumerGroup: logs-handler
              lagThreshold: "20"

7 - 函数签名

不同函数签名的比较

OpenFunction中有三种函数签名:HTTPCloudEventOpenFunction。让我们使用Go函数为例,详细解释一下。

HTTPCloudEvent签名可以用来创建同步函数,而OpenFunction签名可以用来创建同步和异步函数。

此外,OpenFunction签名可以利用各种Dapr构建块,包括Bindings,Pub/sub等访问各种BaaS服务,帮助创建更强大的函数。(Dapr状态管理,配置将很快得到支持)

HTTPCloudEventOpenFunction
签名func(http.ResponseWriter, *http.Request) errorfunc(context.Context, cloudevents.Event) errorfunc(ofctx.Context, []byte) (ofctx.Out, error)
同步函数支持支持支持
异步函数不支持不支持支持
Dapr Binding不支持不支持支持
Dapr Pub/sub不支持不支持支持

示例

如您所见,OpenFunction签名是推荐的函数签名,我们正在努力在更多语言运行时支持此函数签名。

HTTPCloudEventOpenFunction
GoHello World, 多函数, 带路径参数的同步函数, 日志处理带路径参数的同步函数带路径参数的同步函数, 带输出绑定的同步函数, Kafka输入 & HTTP输出绑定, Cron输入 & Kafka输出绑定, Cron输入绑定, Kafka输入绑定, Kafka pubsub
NodejsHello World带输出绑定的同步函数, MQTT绑定 & pubsub
PythonHello World
JavaHello WorldCloudEvent带输出的同步函数, Cron输入 & Kafka输出绑定, Kafka pubsub
DotNetHello World

8 - Wasm 函数

WasmEdge 是一个轻量级、高性能、可扩展的 WebAssembly 运行时,用于云原生、边缘和去中心化应用。它支持无服务器应用、嵌入式函数、微服务、智能合约和物联网设备。

OpenFunction 现在支持使用 WasmEdge 作为工作负载运行时来构建和运行 wasm 函数。

你可以在这里找到 WasmEdge 集成提案

Wasm 容器镜像

包含 wasm 二进制文件的 wasm 镜像是一个没有操作系统层的特殊容器镜像。应该在这个 wasm 容器镜像中添加一个特殊的注解 module.wasm.image/variant: compat-smart,以便像 WasmEdge 这样的 wasm 运行时能够识别它。这在 OpenFunction 中是自动处理的,用户只需要将 workloadRuntime 指定为 wasmedge

Wasm 容器镜像的构建阶段

如果 function.spec.workloadRuntime 设置为 wasmedge,或者函数的注解包含 module.wasm.image/variant: compat-smartfunction.spec.build.shipwright.strategy 将根据名为 wasmedgeClusterBuildStrategy 自动生成,以便构建带有 module.wasm.image/variant: compat-smart 注解的 wasm 容器镜像。

Wasm 容器镜像的服务阶段

function.spec.workloadRuntime 设置为 wasmedge,或者函数的注解包含 module.wasm.image/variant: compat-smart 时:

  • 如果 function.spec.serving.annotations 不包含 module.wasm.image/variantmodule.wasm.image/variant: compat-smart 将自动添加到 function.spec.serving.annotations
  • 如果 function.spec.serving.template.runtimeClassName 没有设置,这个 runtimeClassName 将自动设置为默认的 openfunction-crun

如果你的 kubernetes 集群在像 Azure 这样的公共云中,你可以手动设置 spec.serving.template.runtimeClassName 来覆盖默认的 runtimeClassName

构建和运行 wasm 函数

要在 kubernetes 集群中设置 WasmEdge 工作负载运行时并将镜像推送到容器镜像仓库, 请参考先决条件部分以获取更多信息。

你可以在这里找到关于这个示例函数的更多信息。

  1. 创建一个 wasm 函数
cat <<EOF | kubectl apply -f -
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
  name: wasmedge-http-server
spec:
  workloadRuntime: wasmedge
  image: openfunctiondev/wasmedge_http_server:0.1.0
  imageCredentials:
    name: push-secret
  build:
    dockerfile: Dockerfile
    srcRepo:
      revision: main
      sourceSubPath: functions/knative/wasmedge/http-server
      url: https://github.com/OpenFunction/samples
  serving:
    scaleOptions:
      minReplicas: 0
    template:
      containers:
        - command:
            - /wasmedge_hyper_server.wasm
          imagePullPolicy: IfNotPresent
          livenessProbe:
            initialDelaySeconds: 3
            periodSeconds: 30
            tcpSocket:
              port: 8080
          name: function
    triggers:
      http:
        port: 8080
        route:
          rules:
            - matches:
                - path:
                    type: PathPrefix
                    value: /echo
EOF
  1. 检查 wasm 函数的状态
kubectl get functions.core.openfunction.io -w
NAME                   BUILDSTATE   SERVINGSTATE   BUILDER         SERVING         ADDRESS                                                      AGE
wasmedge-http-server   Succeeded    Running        builder-4p2qq   serving-lrd8c   http://wasmedge-http-server.default.svc.cluster.local/echo   12m
  1. 访问 wasm 函数

一旦 BUILDSTATE 变为 SucceededSERVINGSTATE 变为 Running,你就可以通过 ADDRESS 字段中的地址访问这个函数:

kubectl run curl --image=radial/busyboxplus:curl -i --tty
curl http://wasmedge-http-server.default.svc.cluster.local/echo  -X POST -d "WasmEdge"
WasmEdge

9 - 无服务器应用

除了构建和运行无服务器函数外,你还可以使用 OpenFuntion 构建和运行无服务器应用。

OpenFunction 支持两种不同的方式将源代码构建成容器镜像:

要将镜像推送到容器镜像仓库,你需要创建一个包含镜像仓库凭据的 secret,并将 secret 添加到 imageCredentials。 请参考 先决条件 部分获取更多信息。

使用 Dockerfile 构建和运行无服务器应用

如果你已经为你的应用创建了一个 Dockerfile,比如这个 Go 应用,你可以像 这样 以无服务器的方式构建和运行这个应用:

  1. 创建示例 Go 无服务器应用
cat <<EOF | kubectl apply -f -
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
  name: sample-go-app
  namespace: default
spec:
  build:
    builder: openfunction/buildah:v1.23.1
    shipwright:
      strategy:
        kind: ClusterBuildStrategy
        name: buildah
    srcRepo:
      revision: main
      sourceSubPath: apps/buildah/go
      url: https://github.com/OpenFunction/samples.git
  image: openfunctiondev/sample-go-app:v1
  imageCredentials:
    name: push-secret
  serving:
    template:
      containers:
        - imagePullPolicy: IfNotPresent
          name: function
    triggers:
      http:
        port: 8080
  version: v1.0.0
  workloadRuntime: OCIContainer
EOF
  1. 检查应用状态 你可以通过 kubectl get functions.core.openfunction.io -w 检查无服务器应用的状态:
kubectl get functions.core.openfunction.io -w
NAME                    BUILDSTATE   SERVINGSTATE   BUILDER         SERVING         ADDRESS                                                   AGE
sample-go-app           Succeeded    Running        builder-jgnzp   serving-q6wdp   http://sample-go-app.default.svc.cluster.local/           22m
  1. 访问这个应用 一旦 BUILDSTATE 变成 SucceededSERVINGSTATE 变成 Running,你就可以通过 ADDRESS 字段中的地址访问这个 Go 无服务器应用:
kubectl run curl --image=radial/busyboxplus:curl -i --tty
curl http://sample-go-app.default.svc.cluster.local

这里你可以找到一个 Java 无服务器应用(带有 Dockerfile)的示例。

不使用 Dockerfile 构建和运行无服务器应用

如果你有一个没有 Dockerfile 的应用,比如这个 Java 应用,你也可以像这个 Java 应用 那样以无服务器的方式构建和运行你的应用:

  1. 创建示例 Java 无服务器应用
cat <<EOF | kubectl apply -f -
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
  name: sample-java-app-buildpacks
  namespace: default
spec:
  build:
    builder: cnbs/sample-builder:alpine
    srcRepo:
      revision: main
      sourceSubPath: apps/java-maven
      url: https://github.com/buildpacks/samples.git
  image: openfunction/sample-java-app-buildpacks:v1
  imageCredentials:
    name: push-secret
  serving:
    template:
      containers:
        - imagePullPolicy: IfNotPresent
          name: function
          resources: {}
    triggers:
      http:
        port: 8080
  version: v1.0.0
  workloadRuntime: OCIContainer

EOF
  1. 检查应用状态 你可以通过 kubectl get functions.core.openfunction.io -w 检查无服务器应用的状态:
kubectl get functions.core.openfunction.io -w
NAME                                 BUILDSTATE   SERVINGSTATE   BUILDER         SERVING         ADDRESS                                                                AGE
sample-java-app-buildpacks           Succeeded    Running        builder-jgnzp   serving-q6wdp   http://sample-java-app-buildpacks.default.svc.cluster.local/           22m
  1. 访问这个应用 一旦 BUILDSTATE 变成 SucceededSERVINGSTATE 变成 Running,你就可以通过 ADDRESS 字段中的地址访问这个 Java 无服务器应用:
kubectl run curl --image=radial/busyboxplus:curl -i --tty
curl http://sample-java-app-buildpacks.default.svc.cluster.local

10 - BaaS 集成

OpenFunction 的一个独特特性是它可以通过 Dapr 简单地集成各种后端服务 (BaaS)。目前,OpenFunction 支持 Dapr 的 pub/subbindings 构建块,未来将会添加更多。

在 OpenFunction v0.7.0 及之前的版本中,OpenFunction 通过在每个函数实例的 pod 中注入一个 dapr sidecar 容器来集成 BaaS,这导致了以下问题:

  • dapr sidecar 容器的启动拖慢了整个函数实例的启动时间。
  • dapr sidecar 容器可能消耗的资源比函数容器本身还要多。

为了解决上述问题,OpenFunction 在 v0.8.0 中引入了 Dapr 独立模式

Dapr 独立模式

在 Dapr 独立模式中,每个函数都会创建一个 Dapr 代理服务,然后由该函数的所有实例共享。这样,就不再需要为每个函数实例启动一个单独的 Dapr sidecar 容器,从而大大减少了函数的启动时间。

选择适当的 Dapr 服务模式

所以现在你有两个选项来集成 BaaS:

  • Dapr Sidecar 模式
  • Dapr 独立模式

你可以为你的函数选择适当的 Dapr 服务模式。Dapr 独立模式 是推荐的和默认的模式。如果你的函数不经常扩展,或者你难以使用 Dapr 独立模式,你可以使用 Dapr Sidecar 模式

你可以通过设置两个标志来控制如何集成 BaaS,这两个标志都可以在函数的 spec.serving.annotations 中设置:

  • openfunction.io/enable-dapr 可以设置为 truefalse
  • openfunction.io/dapr-service-mode 可以设置为 standalonesidecar
  • openfunction.io/enable-dapr 设置为 true 时,用户可以通过将 openfunction.io/dapr-service-mode 设置为 standalonesidecar 来选择 Dapr 服务模式
  • openfunction.io/enable-dapr 设置为 false 时,openfunction.io/dapr-service-mode 的值将被忽略,既不会启动 Dapr Sidecar,也不会启动 Dapr 代理服务

如果这两个标志没有设置,它们都有默认值。

  • 如果 openfunction.io/enable-daprspec.serving.annotations 中没有定义,并且函数定义包含 spec.serving.inputsspec.serving.outputs,那么 openfunction.io/enable-dapr 的值将被设置为 true。否则,它将被设置为 false
  • 如果没有设置 openfunction.io/dapr-service-mode,其默认值为 standalone

下面你可以找到一个设置这两个标志的函数示例:

apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
  name: cron-input-kafka-output
spec:
  version: "v2.0.0"
  image: openfunctiondev/cron-input-kafka-output:v1
  imageCredentials:
    name: push-secret
  build:
    builder: openfunction/builder-go:latest
    env:
      FUNC_NAME: "HandleCronInput"
      FUNC_CLEAR_SOURCE: "true"
    srcRepo:
      url: "https://github.com/OpenFunction/samples.git"
      sourceSubPath: "functions/async/bindings/cron-input-kafka-output"
      revision: "main"
  serving:
    annotations:
      openfunction.io/enable-dapr: "true"
      openfunction.io/dapr-service-mode: "standalone"
    template:
      containers:
        - name: function # DO NOT change this
          imagePullPolicy: IfNotPresent
    triggers:
      dapr:
        - name: cron
          type: bindings.cron
    outputs:
      - dapr:
          component: kafka-server
          operation: "create"
    bindings:
      cron:
        type: bindings.cron
        version: v1
        metadata:
        - name: schedule
          value: "@every 2s"
      kafka-server:
        type: bindings.kafka
        version: v1
        metadata:
        - name: brokers
          value: "kafka-server-kafka-brokers:9092"
        - name: topics
          value: "sample-topic"
        - name: consumerGroup
          value: "bindings-with-output"
        - name: publishTopic
          value: "sample-topic"
        - name: authRequired
          value: "false"

11 - 网络

11.1 - 简介

概述

从 v0.5.0 开始,OpenFunction 使用 Kubernetes Ingress 为同步函数提供统一的入口点,并且必须安装 nginx ingress 控制器。

随着 Kubernetes Gateway API 的成熟,我们决定在 OpenFunction v0.7.0 中实现基于 Kubernetes Gateway API 的 OpenFunction Gateway,以替代之前基于 ingress 的域方法。

你可以在这里找到 OpenFunction Gateway 的提案

OpenFunction Gateway 提供了一个更强大、更灵活的函数网关,包括以下特性:

  • 使用户能够以更简单、厂商中立的方式切换到任何支持 Kubernetes Gateway API网关实现,如 Contour、Istio、Apache APISIX、Envoy Gateway(未来)等。

  • 用户可以选择安装默认的网关实现(Contour),然后定义一个新的 gateway.networking.k8s.io,或者使用他们环境中的任何现有网关实现,然后引用现有的 gateway.networking.k8s.io

  • 允许用户自定义他们自己的函数访问模式,如 hostTemplate: "{{.Name}}.{{.Namespace}}.{{.Domain}}" 用于基于主机的访问。

  • 允许用户自定义他们自己的函数访问模式,如 pathTemplate: "{{.Namespace}}/{{.Name}}" 用于基于路径的访问。

  • 允许用户在函数定义中自定义每个函数的路由规则(基于主机、基于路径或两者都有),如果没有定义自定义的路由规则,将为每个函数提供默认的路由规则。

  • 直接将流量发送到 Knative 服务修订版,而不再通过 Knative 自己的网关。从 OpenFunction 0.7.0 开始,你只需要 OpenFunction Gateway 就可以访问 OpenFunction 同步函数,如果你不需要直接访问 Knative 服务,你可以忽略 Knative 的域配置错误。

  • 在函数修订版之间分割流量(未来)

下图说明了客户端流量如何通过 OpenFunction Gateway,然后直接到达函数:

11.2 - 网关

深入理解 OpenFunction 网关

OpenFunction 网关由 Kubernetes 网关支持,定义了用户如何访问同步函数。

每当创建一个 OpenFunction 网关 时,网关控制器将:

  • 如果 gatewaySpec.listeners 中没有,将添加一个名为 ofn-http-internal 的默认监听器。

  • 根据 domainclusterDomain 生成 gatewaySpec.listeners.[*].hostname

  • gatewaySpec.listenters 注入到 OpenFunction 网关gatewayRef 定义的现有 Kubernetes 网关 中。

  • 根据 OpenFunction 网关gatewayDef 中的 gatewaySpec.listenters 字段创建一个新的 Kubernetes 网关

  • 创建一个名为 gateway.openfunction.svc.cluster.local 的服务,该服务定义了同步函数的统一入口。

部署 OpenFunction 网关 后,你将能够在 OpenFunction 网关 状态中找到 Kubernetes 网关 和其 listeners 的状态:

status:
  conditions:
  - message: Gateway is scheduled
    reason: Scheduled
    status: "True"
    type: Scheduled
  - message: Valid Gateway
    reason: Valid
    status: "True"
    type: Ready
  listeners:
  - attachedRoutes: 0
    conditions:
    - message: Valid listener
      reason: Ready
      status: "True"
      type: Ready
    name: ofn-http-internal
    supportedKinds:
    - group: gateway.networking.k8s.io
      kind: HTTPRoute
  - attachedRoutes: 0
    conditions:
    - message: Valid listener
      reason: Ready
      status: "True"
      type: Ready
    name: ofn-http-external
    supportedKinds:
    - group: gateway.networking.k8s.io
      kind: HTTPRoute

默认的 OpenFunction 网关

OpenFunction 网关 使用 Contour 作为默认的 Kubernetes 网关 实现。 安装 OpenFunction 后,将自动创建以下 OpenFunction 网关

apiVersion: networking.openfunction.io/v1alpha1
kind: Gateway
metadata:
  name: openfunction
  namespace: openfunction
spec:
  domain: ofn.io
  clusterDomain: cluster.local
  hostTemplate: "{{.Name}}.{{.Namespace}}.{{.Domain}}"
  pathTemplate: "{{.Namespace}}/{{.Name}}"
  httpRouteLabelKey: "app.kubernetes.io/managed-by"
  gatewayRef:
    name: contour
    namespace: projectcontour
  gatewaySpec:
    listeners:
      - name: ofn-http-internal
        hostname: "*.cluster.local"
        protocol: HTTP
        port: 80
        allowedRoutes:
          namespaces:
            from: All
      - name: ofn-http-external
        hostname: "*.ofn.io"
        protocol: HTTP
        port: 80
        allowedRoutes:
          namespaces:
            from: All

你可以像下面这样自定义默认的 OpenFunction 网关

kubectl edit gateway openfunction -n openfunction

切换到不同的 Kubernetes 网关

你可以以更简单、厂商中立的方式切换到任何支持 Kubernetes 网关 API网关实现,如 Contour、Istio、Apache APISIX、Envoy Gateway(未来)等。

这里 你可以找到更多详细信息。

多个 OpenFunction 网关

对于 OpenFunction,多个 Gateway 没有意义,我们目前只支持一个 OpenFunction Gateway

11.3 - 路由

什么是 Route

RouteFunction 定义的一部分。Route 定义了来自 Gateway 监听器的流量如何路由到函数。

RouteGatewayRef 中指定了它将附加到的 Gateway,使其能够从 Gateway 接收流量。

一旦创建了同步 Function,函数控制器将:

  • 查找 openfunction 命名空间中名为 openfunctionGateway,然后如果函数中没有定义 route.gatewayRef,则附加到此 Gateway
  • 如果函数中没有定义 route.hostnames,则根据 Gateway.spec.hostTemplate 自动生成 route.hostnames
  • 如果函数中没有定义 route.rules,则根据路径 / 自动生成 route.rules
  • 根据 Route 创建一个 HTTPRoute 自定义资源。BackendRefs 将自动链接到相应的 Knative 服务修订版,并将 HTTPRouteLabelKey 标签添加到此 HTTPRoute
  • 创建服务 {{.Name}}.{{.Namespace}}.svc.cluster.local,此服务定义了从集群内部访问函数的入口。
  • 如果 route.gatewayRef 引用的 Gateway 发生变化,将更新 HTTPRoute

部署同步 Function 后,你将能够在 Function 的状态字段中找到 Function 地址和 Route 状态,例如:

status:
  addresses:
    - type: External
      value: http://function-sample-serving-only.default.ofn.io/
    - type: Internal
      value: http://function-sample-serving-only.default.svc.cluster.local/
  build:
    resourceHash: "14903236521345556383"
    state: Skipped
  route:
    conditions:
      - message: Valid HTTPRoute
        reason: Valid
        status: "True"
        type: Accepted
    hosts:
      - function-sample-serving-only.default.ofn.io
      - function-sample-serving-only.default.svc.cluster.local
    paths:
      - type: PathPrefix
        value: /
  serving:
    lastSuccessfulResourceRef: serving-znk54
    resourceHash: "10715302888241374768"
    resourceRef: serving-znk54
    service: serving-znk54-ksvc-nbg6f
    state: Running

基于主机的路由

基于主机 是默认的路由模式。当 route.hostnames 未定义时, 将根据 gateway.spec.hostTemplate 自动生成 route.hostnames。 如果 route.rules 未定义,将根据路径 / 自动生成 route.rules

kubectl apply -f - <<EOF
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
  name: function-sample
spec:
  version: "v1.0.0"
  image: "openfunctiondev/v1beta1-http:latest"
  serving:
    template:
      containers:
        - name: function
          imagePullPolicy: Always
    triggers:
      http:
        route:
          gatewayRef:
            name: openfunction
            namespace: openfunction
EOF

如果你正在使用默认的 OpenFunction Gateway,函数的外部地址将如下:

http://function-sample.default.ofn.io/

基于路径的路由

如果你在函数中定义了 route.hostnames,将根据 gateway.spec.pathTemplate 自动生成 route.rules

kubectl apply -f - <<EOF
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
  name: function-sample
spec:
  version: "v1.0.0"
  image: "openfunctiondev/v1beta1-http:latest"
  serving:
    template:
      containers:
        - name: function
          imagePullPolicy: Always
    triggers:
      http:
        route:
          gatewayRef:
            name: openfunction
            namespace: openfunction
           hostnames:
           - "sample.ofn.io"
EOF

如果你正在使用默认的 OpenFunction Gateway,函数的外部地址将如下:

http://sample.default.ofn.io/default/function-sample/

基于主机和路径的路由

你可以同时定义主机名和路径,以自定义流量应如何路由到你的函数。

kubectl apply -f - <<EOF
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
  name: function-sample
spec:
  version: "v1.0.0"
  image: "openfunctiondev/v1beta1-http:latest"
  serving:
    template:
      containers:
        - name: function
          imagePullPolicy: Always
    triggers:
      http:
        route:
          gatewayRef:
            name: openfunction
            namespace: openfunction
          rules:
            - matches:
                - path:
                    type: PathPrefix
                    value: /v2/foo
          hostnames:
          - "sample.ofn.io"
EOF

如果你正在使用默认的 OpenFunction Gateway,函数的外部地址将如下:

http://sample.default.ofn.io/v2/foo/

11.4 - 函数入口

有几种方法可以访问同步函数。我们将在以下部分详细说明。

本文档假设你正在使用默认的 OpenFunction 网关 并且你有一个名为 function-sample 的同步函数。

从集群内部访问函数

通过内部地址访问函数

OpenFunction 会为每个同步 Function 创建此服务:{{.Name}}.{{.Namespace}}.svc.cluster.local。此服务将用于提供函数的内部地址。

通过运行以下命令获取 Function 内部地址:

export FUNC_INTERNAL_ADDRESS=$(kubectl get function function-sample -o=jsonpath='{.status.addresses[?(@.type=="Internal")].value}')

这个地址提供了在集群内部访问函数的默认方法,适合用作 EventSourcesink.url

在 pod 中使用 curl 访问 Function

kubectl run --rm ofn-test -i --tty --image=radial/busyboxplus:curl -- curl -sv $FUNC_INTERNAL_ADDRESS

从集群外部访问函数

通过 Kubernetes 网关的 IP 地址访问函数

获取 Kubernetes 网关的 ip 地址:

export IP=$(kubectl get node -l "! node.kubernetes.io/exclude-from-external-load-balancers" -o=jsonpath='{.items[0].status.addresses[?(@.type=="InternalIP")].address}')

获取函数的 HOST 和 PATH:

export FUNC_HOST=$(kubectl get function function-sample -o=jsonpath='{.status.route.hosts[0]}')
export FUNC_PATH=$(kubectl get function function-sample -o=jsonpath='{.status.route.paths[0].value}')

直接使用 curl 访问 Function

curl -sv -HHOST:$FUNC_HOST http://$IP$FUNC_PATH

通过外部地址访问函数

要通过外部地址访问同步函数,你需要先配置 DNS。Magic DNS 或真实 DNS 都可以:

  • Magic DNS

    获取 Kubernetes 网关的 ip 地址:

    export IP=$(kubectl get node -l "! node.kubernetes.io/exclude-from-external-load-balancers" -o=jsonpath='{.items[0].status.addresses[?(@.type=="InternalIP")].address}')
    

    用 Magic DNS 替换 OpenFunction 网关中定义的域:

    export DOMAIN="$IP.sslip.io"
    kubectl patch gateway.networking.openfunction.io/openfunction -n openfunction --type merge --patch '{"spec": {"domain": "'$DOMAIN'"}}'
    

    然后,你可以在 Function 的状态字段中看到 Function 外部地址:

    kubectl get function function-sample -oyaml
    
    status:
      addresses:
      - type: External
        value: http://function-sample.default.172.31.73.53.sslip.io/
      - type: Internal
        value: http://function-sample.default.svc.cluster.local/
      build:
        resourceHash: "14903236521345556383"
        state: Skipped
      route:
        conditions:
        - message: Valid HTTPRoute
          reason: Valid
          status: "True"
          type: Accepted
        hosts:
        - function-sample.default.172.31.73.53.sslip.io
        - function-sample.default.svc.cluster.local
        paths:
        - type: PathPrefix
          value: /
      serving:
        lastSuccessfulResourceRef: serving-t56fq
        resourceHash: "2638289828407595605"
        resourceRef: serving-t56fq
        service: serving-t56fq-ksvc-bv8ng
        state: Running
    
  • Real DNS

    如果你有一个外部 IP 地址,你可以配置一个通配符 A 记录作为你的域:

    # 这里 example.com 是 OpenFunction 网关中定义的域
    *.example.com == A <external-ip>
    

    如果你有一个 CNAME,你可以配置一个 CNAME 记录作为你的域:

    # 这里 example.com 是 OpenFunction 网关中定义的域
    *.example.com == CNAME <external-name>
    

    用你上面配置的域替换 OpenFunction 网关中定义的域:

    export DOMAIN="example.com"
    kubectl patch gateway.networking.openfunction.io/openfunction -n openfunction --type merge --patch '{"spec": {"domain": "'$DOMAIN'"}}'
    

    然后,你可以在 Function 的状态字段中看到 Function 外部地址:

    kubectl get function function-sample -oyaml
    
    status:
      addresses:
      - type: External
        value: http://function-sample.default.example.com/
      - type: Internal
        value: http://function-sample.default.svc.cluster.local/
      build:
        resourceHash: "14903236521345556383"
        state: Skipped
      route:
        conditions:
        - message: Valid HTTPRoute
          reason: Valid
          status: "True"
          type: Accepted
        hosts:
        - function-sample.default.example.com
        - function-sample.default.svc.cluster.local
        paths:
        - type: PathPrefix
          value: /
      serving:
        lastSuccessfulResourceRef: serving-t56fq
        resourceHash: "2638289828407595605"
        resourceRef: serving-t56fq
        service: serving-t56fq-ksvc-bv8ng
        state: Running
    

然后,你可以通过运行以下命令获取 Function 外部地址:

export FUNC_EXTERNAL_ADDRESS=$(kubectl get function function-sample -o=jsonpath='{.status.addresses[?(@.type=="External")].value}')

现在,你可以直接使用 curl 访问 Function

curl -sv $FUNC_EXTERNAL_ADDRESS

12 - CI/CD

概述

以前,用户可以使用 OpenFunction 将函数或应用程序源代码构建成容器镜像,然后直接将构建的镜像部署到底层的同步/异步 Serverless 运行时,无需用户干预。

但是,OpenFunction 无法在函数或应用程序源代码更改时重新构建镜像,然后重新部署它,也无法在此镜像更改时重新部署镜像(当镜像在另一个函数中手动构建和推送时)

从 v1.0.0 开始,OpenFunction 在一个新的组件 Revision Controller 中添加了检测源代码或镜像更改,然后重新构建和/或重新部署新构建的镜像的能力。修订控制器能够:

  • 检测 github、gitlab 或 gitee 中的源代码更改,然后在源代码更改时重新构建和重新部署新构建的镜像。
  • 检测包含源代码的捆绑容器镜像(镜像)的更改,然后在捆绑镜像更改时重新构建和重新部署新构建的镜像。
  • 检测函数或应用程序镜像的更改,然后在函数或应用程序镜像更改时重新部署新镜像。

快速开始

安装 Revision Controller

你可以在安装 OpenFunction 时启用 Revision Controller,只需在 helm 命令中添加以下标志即可。

--set revisionController.enable=true

你也可以在 OpenFunction 安装后启用 Revision Controller

kubectl apply -f https://raw.githubusercontent.com/OpenFunction/revision-controller/release-1.0/deploy/bundle.yaml

Revision Controller 默认将安装到 openfunction 命名空间。如果你想将其安装到另一个命名空间,你可以下载 bundle.yaml 并手动更改命名空间。

检测源代码或镜像更改

要检测源代码或镜像更改,你需要将修订控制器开关和参数添加到函数的注解中,如下所示。

apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
  annotations:
    openfunction.io/revision-controller: enable
    openfunction.io/revision-controller-params: |
      type: source
      repo-type: github
      polling-interval: 1m      
  name: function-http-java
  namespace: default
spec:
  build:
    ...
  serving:
    ...

注解

描述
openfunction.io/revision-controller是否启用修订控制器来检测此函数的源代码或镜像更改,可以设置为 enabledisable
openfunction.io/revision-controller-params修订控制器的参数。

参数

名称描述
type要检测的更改类型,包括 sourcesource-imageimage
polling-interval轮询镜像摘要或源代码头的间隔。
repo-typegit 仓库的类型,包括 githubgitlabgitee。默认为 github
base-urlgitlab 服务器的基本 url。
auth-typegitlab 服务器的认证类型。
project-idgitlab 仓库的项目 id。
insecure-registry如果镜像仓库不安全,你应该将此设置为 true。

13 - OpenFunction 事件

13.1 - 简介

概述

OpenFunction 事件是 OpenFunction 的事件管理框架。它提供以下核心特性:

  • 支持通过同步和异步调用触发目标函数
  • 用户定义的触发判断逻辑
  • OpenFunction 事件的组件可以由 OpenFunction 本身驱动

架构

以下图示说明了 OpenFunction 事件的架构。

openfunction-events

概念

EventSource

EventSource 定义了事件的生产者,例如 Kafka 服务,对象存储服务,甚至是函数。它包含了这些事件生产者的描述和发送这些事件的信息。

EventSource 支持以下类型的事件源服务器:

  • Kafka
  • Cron(调度器)
  • Redis

EventBus(ClusterEventBus)

EventBus 负责聚合事件并使其持久化。它包含了一个通常是消息队列(如 NATS Streaming 和 Kafka)的事件总线 broker 的描述,并为 EventSource 和 Trigger 提供这些配置。

EventBus 默认处理命名空间范围内的事件总线适配。对于集群范围,ClusterEventBus 可作为事件总线适配器,并在其他组件无法找到命名空间下的 EventBus 时生效。

EventBus 支持以下事件总线 broker:

  • NATS Streaming

Trigger

Trigger 是事件目的的抽象,例如接收到消息时需要做什么。它包含了由您定义的事件的目的,这告诉触发器它应该从哪个 EventSource 获取事件,并根据给定的条件决定是否触发目标函数。

参考

有关更多信息,请参阅 EventSource 规范EventBus 规范

13.2 - 使用 EventSource

本文档提供了一个示例,说明如何使用事件源触发同步函数。

在此示例中,定义了一个 EventSource,用于同步调用,使用事件源(一个 Kafka 服务器)作为函数(一个 Knative 服务)的输入绑定。当事件源生成事件时,它将通过 spec.sink 配置调用函数并获取同步返回。

创建函数

使用以下内容创建一个函数作为 EventSource Sink。有关如何创建函数的更多信息,请参见 创建同步函数

apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
  name: sink
spec:
  version: "v1.0.0"
  image: "openfunction/sink-sample:latest"
  serving:
    template:
      containers:
        - name: function
          imagePullPolicy: Always
    triggers:
      http:
        port: 8080

创建函数后,运行以下命令获取函数的 URL。

$ kubectl get functions.core.openfunction.io
NAME   BUILDSTATE   SERVINGSTATE   BUILDER   SERVING         URL                                   AGE
sink   Skipped      Running                  serving-4x5wh   https://openfunction.io/default/sink   13s

创建 Kafka 集群

  1. 运行以下命令在默认命名空间中安装 strimzi-kafka-operator

    helm repo add strimzi https://strimzi.io/charts/
    helm install kafka-operator -n default strimzi/strimzi-kafka-operator
    
  2. 使用以下内容创建一个文件 kafka.yaml

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: kafka-server
      namespace: default
    spec:
      kafka:
        version: 3.3.1
        replicas: 1
        listeners:
          - name: plain
            port: 9092
            type: internal
            tls: false
          - name: tls
            port: 9093
            type: internal
            tls: true
        config:
          offsets.topic.replication.factor: 1
          transaction.state.log.replication.factor: 1
          transaction.state.log.min.isr: 1
          default.replication.factor: 1
          min.insync.replicas: 1
          inter.broker.protocol.version: "3.1"
        storage:
          type: ephemeral
      zookeeper:
        replicas: 1
        storage:
          type: ephemeral
      entityOperator:
        topicOperator: {}
        userOperator: {}
    ---
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaTopic
    metadata:
      name: events-sample
      namespace: default
      labels:
        strimzi.io/cluster: kafka-server
    spec:
      partitions: 10
      replicas: 1
      config:
        retention.ms: 7200000
        segment.bytes: 1073741824
    
  3. 运行以下命令在默认命名空间中部署一个名为 kafka-server 的 1-replica Kafka 服务器和一个名为 events-sample 的 1-replica Kafka 主题。此命令创建的 Kafka 和 Zookeeper 集群的存储类型为 ephemeral,并使用 emptyDir 进行演示。

    kubectl apply -f kafka.yaml
    
  4. 运行以下命令检查 pod 状态,并等待 Kafka 和 Zookeeper 启动并运行。

    $ kubectl get po
    NAME                                              READY   STATUS        RESTARTS   AGE
    kafka-server-entity-operator-568957ff84-nmtlw     3/3     Running       0          8m42s
    kafka-server-kafka-0                              1/1     Running       0          9m13s
    kafka-server-zookeeper-0                          1/1     Running       0          9m46s
    strimzi-cluster-operator-687fdd6f77-cwmgm         1/1     Running       0          11m
    
  5. 运行以下命令查看 Kafka 集群的元数据。

    kafkacat -L -b kafka-server-kafka-brokers:9092
    

触发同步函数

创建 EventSource

  1. 使用以下内容创建一个 EventSource 配置文件(例如,eventsource-sink.yaml)。

    apiVersion: events.openfunction.io/v1alpha1
    kind: EventSource
    metadata:
      name: my-eventsource
    spec:
      logLevel: "2"
      kafka:
        sample-one:
          brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092"
          topic: "events-sample"
          authRequired: false
      sink:
        uri: "http://openfunction.io.svc.cluster.local/default/sink"
    
  2. 运行以下命令应用配置文件。

    kubectl apply -f eventsource-sink.yaml
    
  3. 运行以下命令检查结果。

    $ kubectl get eventsources.events.openfunction.io
    NAME             EVENTBUS   SINK   STATUS
    my-eventsource                     Ready
    
    $ kubectl get components
    NAME                                                      AGE
    serving-8f6md-component-esc-kafka-sample-one-r527t        68m
    serving-8f6md-component-ts-my-eventsource-default-wz8jt   68m
    
    $ kubectl get deployments.apps
    NAME                                           READY   UP-TO-DATE   AVAILABLE   AGE
    serving-8f6md-deployment-v100-pg9sd            1/1     1            1           68m
    

创建事件生产者

要启动目标函数,需要创建一些事件来触发函数。

  1. 使用以下内容创建一个事件生产者配置文件(例如,events-producer.yaml)。

    apiVersion: core.openfunction.io/v1beta1
    kind: Function
    metadata:
      name: events-producer
    spec:
      version: "v1.0.0"
      image: openfunctiondev/v1beta1-bindings:latest
      serving:
        template:
          containers:
            - name: function
              imagePullPolicy: Always
        runtime: "async"
        inputs:
          - name: cron
            component: cron
        outputs:
          - name: target
            component: kafka-server
            operation: "create"
        bindings:
          cron:
            type: bindings.cron
            version: v1
            metadata:
              - name: schedule
                value: "@every 2s"
          kafka-server:
            type: bindings.kafka
            version: v1
            metadata:
              - name: brokers
                value: "kafka-server-kafka-brokers:9092"
              - name: topics
                value: "events-sample"
              - name: consumerGroup
                value: "bindings-with-output"
              - name: publishTopic
                value: "events-sample"
              - name: authRequired
                value: "false"
    
  2. 运行以下命令应用配置文件。

    kubectl apply -f events-producer.yaml
    
  3. 运行以下命令实时检查结果。

    $ kubectl get po --watch
    NAME                                                           READY   STATUS              RESTARTS   AGE
    serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh            0/2     ContainerCreating   0          1s
    serving-8f6md-deployment-v100-pg9sd-6666c5577f-4rpdg           2/2     Running             0          23m
    serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh            0/2     ContainerCreating   0          1s
    serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh            1/2     Running             0          5s
    serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh            2/2     Running             0          8s
    serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk      0/2     Pending             0          0s
    serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk      0/2     Pending             0          0s
    serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk      0/2     ContainerCreating   0          0s
    serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk      0/2     ContainerCreating   0          2s
    serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk      1/2     Running             0          4s
    serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk      1/2     Running             0          4s
    serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk      2/2     Running             0          4s
    

13.3 - 使用 EventBus 和 Trigger

本文档提供了一个示例,说明如何使用 EventBus 和 Trigger。

先决条件

  • 您需要创建一个作为目标函数的函数以被触发。有关更多详细信息,请参见 创建函数
  • 您需要创建一个 Kafka 集群。有关更多详细信息,请参见 创建 Kafka 集群

部署 NATS 流服务器

运行以下命令部署 NATS 流服务器。本文档使用 nats://nats.default:4222 作为 NATS 流服务器的访问地址,stan 作为集群 ID。有关更多信息,请参见 NATS Streaming (STAN)

helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm install nats nats/nats
helm install stan nats/stan --set stan.nats.url=nats://nats:4222

创建 OpenFuncAsync 运行时函数

  1. 使用以下内容创建目标函数的配置文件(例如,openfuncasync-function.yaml),该函数由 Trigger CRD 触发并打印接收到的消息。

    apiVersion: core.openfunction.io/v1beta2
    kind: Function
    metadata:
      name: trigger-target
    spec:
      version: "v1.0.0"
      image: openfunctiondev/v1beta1-trigger-target:latest
      serving:
        scaleOptions:
          keda:
            scaledObject:
              pollingInterval: 15
              minReplicaCount: 0
              maxReplicaCount: 10
              cooldownPeriod: 30
            triggers:
              - type: stan
                metadata:
                  natsServerMonitoringEndpoint: "stan.default.svc.cluster.local:8222"
                  queueGroup: "grp1"
                  durableName: "ImDurable"
                  subject: "metrics"
                  lagThreshold: "10"
        triggers:
          dapr:
            - name: eventbus
              topic: metrics
        pubsub:
          eventbus:
            type: pubsub.natsstreaming
            version: v1
            metadata:
              - name: natsURL
                value: "nats://nats.default:4222"
              - name: natsStreamingClusterID
                value: "stan"
              - name: subscriptionType
                value: "queue"
              - name: durableSubscriptionName
                value: "ImDurable"
              - name: consumerID
                value: "grp1"
    
  2. 运行以下命令应用配置文件。

    kubectl apply -f openfuncasync-function.yaml
    

创建 EventBus 和 EventSource

  1. 使用以下内容创建 EventBus 的配置文件(例如,eventbus.yaml)。

    apiVersion: events.openfunction.io/v1alpha1
    kind: EventBus
    metadata:
      name: default
    spec:
      natsStreaming:
        natsURL: "nats://nats.default:4222"
        natsStreamingClusterID: "stan"
        subscriptionType: "queue"
        durableSubscriptionName: "ImDurable"
    
  2. 使用以下内容创建 EventSource 的配置文件(例如,eventsource.yaml)。

    apiVersion: events.openfunction.io/v1alpha1
    kind: EventSource
    metadata:
      name: my-eventsource
    spec:
      logLevel: "2"
      eventBus: "default"
      kafka:
        sample-two:
          brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092"
          topic: "events-sample"
          authRequired: false
    
  3. 运行以下命令应用这些配置文件。

    kubectl apply -f eventbus.yaml
    kubectl apply -f eventsource.yaml
    
  4. 运行以下命令检查结果。

    $ kubectl get eventsources.events.openfunction.io
    NAME             EVENTBUS   SINK   STATUS
    my-eventsource   default           Ready
    
    $ kubectl get eventbus.events.openfunction.io
    NAME      AGE
    default   62m
    
    $ kubectl get components
    NAME                                                 AGE
    serving-9689d-component-ebfes-my-eventsource-cmcbw   46m
    serving-9689d-component-esc-kafka-sample-two-l99cg   46m
    serving-dxrhd-component-eventbus-t65q7               13m
    serving-zwlj4-component-ebft-my-trigger-4925n        100s
    

创建 Trigger

  1. 使用以下内容创建 Trigger 的配置文件(例如,trigger.yaml)。

    apiVersion: events.openfunction.io/v1alpha1
    kind: Trigger
    metadata:
      name: my-trigger
    spec:
      logLevel: "2"
      eventBus: "default"
      inputs:
        inputDemo:
          eventSource: "my-eventsource"
          event: "sample-two"
      subscribers:
        - condition: inputDemo
          topic: "metrics"
    
  2. 运行以下命令应用配置文件。

    kubectl apply -f trigger.yaml
    
  3. 运行以下命令检查结果。

    $ kubectl get triggers.events.openfunction.io
    NAME         EVENTBUS   STATUS
    my-trigger   default    Ready
    
    $ kubectl get eventbus.events.openfunction.io
    NAME      AGE
    default   62m
    
    $ kubectl get components
    NAME                                                 AGE
    serving-9689d-component-ebfes-my-eventsource-cmcbw   46m
    serving-9689d-component-esc-kafka-sample-two-l99cg   46m
    serving-dxrhd-component-eventbus-t65q7               13m
    serving-zwlj4-component-ebft-my-trigger-4925n        100s
    

创建事件生产者

  1. 使用以下内容创建事件生产者配置文件(例如,events-producer.yaml)。

    apiVersion: core.openfunction.io/v1beta2
    kind: Function
    metadata:
      name: events-producer
    spec:
      version: "v1.0.0"
      image: openfunctiondev/v1beta1-bindings:latest
      serving:
        template:
          containers:
            - name: function
              imagePullPolicy: Always
        triggers:
          dapr:
            - name: cron
              type: bindings.cron
        outputs:
          - dapr:
              name: kafka-server
              operation: "create"
        bindings:
          cron:
            type: bindings.cron
            version: v1
            metadata:
              - name: schedule
                value: "@every 2s"
          kafka-server:
            type: bindings.kafka
            version: v1
            metadata:
              - name: brokers
                value: "kafka-server-kafka-brokers:9092"
              - name: topics
                value: "events-sample"
              - name: consumerGroup
                value: "bindings-with-output"
              - name: publishTopic
                value: "events-sample"
              - name: authRequired
                value: "false"
    
  2. 运行以下命令应用配置文件。

    kubectl apply -f events-producer.yaml
    
  3. 运行以下命令观察目标异步函数的变化。

    $ kubectl get functions.core.openfunction.io
    NAME                                  BUILDSTATE   SERVINGSTATE   BUILDER   SERVING         URL                                   AGE
    trigger-target                        Skipped      Running                  serving-dxrhd                                         20m
    
    $ kubectl get po --watch
    NAME                                                     READY   STATUS              RESTARTS   AGE
    serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm      0/2     Pending             0          0s
    serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm      0/2     Pending             0          0s
    serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm      0/2     ContainerCreating   0          0s
    serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm      0/2     ContainerCreating   0          2s
    serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm      1/2     Running             0          4s
    serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm      1/2     Running             0          4s
    serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm      2/2     Running             0          4s
    

13.4 - 在一个 EventSource 中使用多个源

本文档描述了如何在一个 EventSource 中使用多个源。

先决条件

  • 您需要创建一个作为目标函数的函数以被触发。有关更多详细信息,请参见 创建函数
  • 您需要创建一个 Kafka 集群。有关更多详细信息,请参见 创建 Kafka 集群

在一个 EventSource 中使用多个源

  1. 使用以下内容创建一个 EventSource 配置文件(例如,eventsource-multi.yaml)。

    apiVersion: events.openfunction.io/v1alpha1
    kind: EventSource
    metadata:
      name: my-eventsource
    spec:
      logLevel: "2"
      kafka:
        sample-three:
          brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092"
          topic: "events-sample"
          authRequired: false
      cron:
        sample-three:
          schedule: "@every 5s"
      sink:
        uri: "http://openfunction.io.svc.cluster.local/default/sink"
    
  2. 运行以下命令应用配置文件。

    kubectl apply -f eventsource-multi.yaml
    
  3. 运行以下命令观察变化。

    $ kubectl get eventsources.events.openfunction.io
    NAME             EVENTBUS   SINK   STATUS
    my-eventsource                     Ready
    
    $ kubectl get components
    NAME                                                      AGE
    serving-vqfk5-component-esc-cron-sample-three-dzcpv       35s
    serving-vqfk5-component-esc-kafka-sample-one-nr9pq        35s
    serving-vqfk5-component-ts-my-eventsource-default-q6g6m   35s
    
    $ kubectl get deployments.apps
    NAME                                       READY   UP-TO-DATE   AVAILABLE   AGE
    serving-4x5wh-ksvc-wxbf2-v100-deployment   1/1     1            1           3h14m
    serving-vqfk5-deployment-v100-vdmvj        1/1     1            1           48s
    

13.5 - 使用 ClusterEventBus

本文档描述了如何使用 ClusterEventBus。

先决条件

您已完成 使用 EventBus 和 Trigger 中描述的步骤。

使用 ClusterEventBus

  1. 使用以下内容创建一个 ClusterEventBus 配置文件(例如,clustereventbus.yaml)。

    apiVersion: events.openfunction.io/v1alpha1
    kind: ClusterEventBus
    metadata:
      name: default
    spec:
      natsStreaming:
        natsURL: "nats://nats.default:4222"
        natsStreamingClusterID: "stan"
        subscriptionType: "queue"
        durableSubscriptionName: "ImDurable"
    
  2. 运行以下命令删除 EventBus。

    kubectl delete eventbus.events.openfunction.io default
    
  3. 运行以下命令应用配置文件。

    kubectl apply -f clustereventbus.yaml
    
  4. 运行以下命令检查结果。

    $ kubectl get eventbus.events.openfunction.io
    No resources found in default namespace.
    
    $ kubectl get clustereventbus.events.openfunction.io
    NAME      AGE
    default   21s
    

13.6 - 使用带有条件的 Trigger

本文档描述了如何使用带有条件的触发器。

先决条件

您已完成 使用 EventBus 和 Trigger 中描述的步骤。

使用带有条件的触发器

创建两个事件源

  1. 使用以下内容创建一个 EventSource 配置文件(例如,eventsource-a.yaml)。

    apiVersion: events.openfunction.io/v1alpha1
    kind: EventSource
    metadata:
      name: eventsource-a
    spec:
      logLevel: "2"
      eventBus: "default"
      kafka:
        sample-five:
          brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092"
          topic: "events-sample"
          authRequired: false
    
  2. 使用以下内容创建另一个 EventSource 配置文件(例如,eventsource-b.yaml)。

    apiVersion: events.openfunction.io/v1alpha1
    kind: EventSource
    metadata:
      name: eventsource-b
    spec:
      logLevel: "2"
      eventBus: "default"
      cron:
        sample-five:
          schedule: "@every 5s"
    
  3. 运行以下命令应用这两个配置文件。

    kubectl apply -f eventsource-a.yaml
    kubectl apply -f eventsource-b.yaml
    

创建带有条件的触发器

  1. 使用以下内容创建一个带有 condition 的触发器配置文件(例如,condition-trigger.yaml)。

    apiVersion: events.openfunction.io/v1alpha1
    kind: Trigger
    metadata:
      name: condition-trigger
    spec:
      logLevel: "2"
      eventBus: "default"
      inputs:
        eventA:
          eventSource: "eventsource-a"
          event: "sample-five"
        eventB:
          eventSource: "eventsource-b"
          event: "sample-five"
      subscribers:
      - condition: eventB
        sink:
          uri: "http://openfunction.io.svc.cluster.local/default/sink"
      - condition: eventA && eventB
        topic: "metrics"
    
  2. 运行以下命令应用配置文件。

    kubectl apply -f condition-trigger.yaml
    
  3. 运行以下命令检查结果。

    $ kubectl get eventsources.events.openfunction.io
    NAME            EVENTBUS   SINK   STATUS
    eventsource-a   default           Ready
    eventsource-b   default           Ready
    
    $ kubectl get triggers.events.openfunction.io
    NAME                EVENTBUS   STATUS
    condition-trigger   default    Ready
    
    $ kubectl get eventbus.events.openfunction.io
    NAME      AGE
    default   12s
    
  4. 运行以下命令,您可以从输出中看到,由于事件源 eventsource-b 是一个 cron 任务,所以触发器中的 eventB 条件匹配,触发了 Knative 服务。

    $ kubectl get functions.core.openfunction.io
    NAME                                  BUILDSTATE   SERVINGSTATE   BUILDER   SERVING         URL                                   AGE
    sink                                  Skipped      Running                  serving-4x5wh   https://openfunction.io/default/sink   3h25m
    
    $ kubectl get po
    NAME                                                        READY   STATUS    RESTARTS   AGE
    serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-k2jdg   2/2     Running   0          46s
    
  5. 参考 创建事件生产者 创建一个事件生产者。

  6. 运行以下命令,您可以从输出中看到,触发器中的 eventA && eventB 条件匹配,事件同时发送到事件总线的 metrics 主题。触发了 OpenFuncAsync 函数。

    $ kubectl get functions.core.openfunction.io
    NAME                                  BUILDSTATE   SERVINGSTATE   BUILDER   SERVING         URL                                   AGE
    trigger-target                        Skipped      Running                  serving-7hghp                                         103s
    
    $ kubectl get po
    NAME                                                        READY   STATUS    RESTARTS   AGE
    serving-7hghp-deployment-v100-z8wrf-946b4854d-svf55         2/2     Running   0          18s