概念术语
- 1: 函数定义
- 2: 函数构建
- 3: 构建策略
- 4: 函数触发器
- 5: 函数输出
- 6: 函数缩放
- 7: 函数签名
- 8: Wasm 函数
- 9: 无服务器应用
- 10: BaaS 集成
- 11: 网络
- 12: CI/CD
- 13: OpenFunction 事件
- 13.1: 简介
- 13.2: 使用 EventSource
- 13.3: 使用 EventBus 和 Trigger
- 13.4: 在一个 EventSource 中使用多个源
- 13.5: 使用 ClusterEventBus
- 13.6: 使用带有条件的 Trigger
1 - 函数定义
函数
Function
是Build
和Serving
的控制平面,也是用户使用OpenFunction的接口。用户无需单独创建Build
或Serving
,因为Function
是定义函数的Build
和Serving
的唯一地方。
一旦创建了函数,它将控制Build
和Serving
的生命周期,无需用户干预:
如果在函数中定义了
Build
,则一旦部署函数,将创建一个builder自定义资源来构建函数的容器镜像。如果在函数中定义了
Serving
,则将创建一个serving自定义资源来控制函数的服务和自动缩放。Build
和Serving
可以一起定义,这意味着首先将构建函数镜像,然后将其用于服务。可以定义
Build
而不定义Serving
,在这种情况下,函数仅用于构建镜像。可以定义
Serving
而不定义Build
,函数将使用先前构建的函数镜像进行服务。
构建
OpenFunction使用Shipwright和Cloud Native Buildpacks将函数源代码构建成容器镜像。
一旦创建了包含Build
规范的函数,就会创建一个builder
自定义资源,该资源将使用Shipwright管理构建工具和策略。然后,Shipwright将使用Tekton控制构建容器镜像的过程,包括获取源代码、生成镜像工件和发布镜像。
服务
一旦创建了包含Serving
规范的函数,就会创建一个Serving
自定义资源来控制函数的服务阶段。目前,OpenFunction Serving支持两种运行时:Knative同步运行时和OpenFunction异步运行时。
同步运行时
对于同步函数,OpenFunction目前支持使用 Knative Serving 和 KEDA http-addon 作为运行时。
异步运行时
OpenFunction的异步运行时是一个基于KEDA和Dapr实现的事件驱动运行时。异步函数可以由各种事件类型触发,如消息队列、cronjob和MQTT等。
2 - 函数构建
目前,OpenFunction支持使用Cloud Native Buildpacks构建函数镜像,无需创建Dockerfile
。
同时,您也可以使用OpenFunction通过Dockerfile
构建无服务器应用。
通过定义构建部分来构建函数
您可以从git仓库的源代码或本地存储的源代码构建函数或应用。
从git仓库的源代码构建函数
您可以通过在Function
定义中添加一个构建部分来构建函数镜像。如果还定义了服务部分,那么构建完成后将立即启动函数。
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
name: logs-async-handler
spec:
version: "v2.0.0"
image: openfunctiondev/logs-async-handler:v1
imageCredentials:
name: push-secret
build:
builder: openfunction/builder-go:latest
env:
FUNC_NAME: "LogsHandler"
FUNC_CLEAR_SOURCE: "true"
## 自定义函数框架版本,目前仅对functions-framework-go有效
## 通常情况下,您不需要这样做,因为构建器会自带最新的函数框架
# FUNC_FRAMEWORK_VERSION: "v0.4.0"
## 使用FUNC_GOPROXY设置goproxy
# FUNC_GOPROXY: "https://goproxy.cn"
srcRepo:
url: "https://github.com/OpenFunction/samples.git"
sourceSubPath: "functions/async/logs-handler-function/"
revision: "main"
要将函数镜像推送到容器仓库,您需要创建一个包含仓库凭证的秘密,并将该秘密添加到
imageCredentials
。 您可以参考先决条件以获取更多信息。
从本地源代码构建函数
要从本地源代码构建函数或应用,您需要将本地源代码打包成容器镜像,并将此镜像推送到容器仓库。
假设您的源代码在samples
目录中,您可以使用以下Dockerfile
来构建源代码包镜像。
FROM scratch
WORKDIR /
COPY samples samples/
然后,您可以像这样构建源代码包镜像:
docker build -t <your registry name>/sample-source-code:latest -f </path/to/the/dockerfile> .
docker push <your registry name>/sample-source-code:latest
建议使用空镜像scratch作为构建源代码包镜像的基础镜像,非空基础镜像可能会导致源代码复制失败。
与为git仓库方法定义spec.build.srcRepo.url字段不同,您需要定义spec.build.srcRepo.bundleContainer.image字段。
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
name: logs-async-handler
spec:
build:
srcRepo:
bundleContainer:
image: openfunctiondev/sample-source-code:latest
sourceSubPath: "/samples/functions/async/logs-handler-function/"
sourceSubPath是源代码包镜像中源代码的绝对路径。
使用pack CLI构建函数
通常,直接从本地源代码构建函数镜像是必要的,特别是出于调试目的或用于离线环境。您可以使用pack CLI来实现这一点。
Pack是由Cloud Native Buildpacks项目维护的一个工具,用于支持使用buildpacks。它启用了以下功能:
- 使用buildpacks构建应用。
- 重新基准使用buildpacks创建的应用镜像。
- 创建生态系统内使用的各种组件。
按照这里的说明安装pack
CLI工具。
您可以在这里找到更多关于如何使用pack CLI的详细信息。
要从本地源代码构建OpenFunction函数镜像,您可以按照以下步骤操作:
下载函数示例
git clone https://github.com/OpenFunction/samples.git
cd samples/functions/knative/hello-world-go
使用pack CLI构建函数镜像
pack build func-helloworld-go --builder openfunction/builder-go:v2.4.0-1.17 --env FUNC_NAME="HelloWorld" --env FUNC_CLEAR_SOURCE=true
本地启动函数镜像
docker run --rm --env="FUNC_CONTEXT={\"name\":\"HelloWorld\",\"version\":\"v1.0.0\",\"port\":\"8080\",\"runtime\":\"Knative\"}" --env="CONTEXT_MODE=self-host" --name func-helloworld-go -p 8080:8080 func-helloworld-go
访问函数
curl http://localhost:8080
输出示例:
hello, world!
OpenFunction构建器
要使用Cloud Native Buildpacks构建函数镜像,需要一个构建器镜像。
在这里,您可以找到OpenFunction社区维护的流行语言的构建器:
构建器 | |
---|---|
Go | openfunction/builder-go:v2.4.0 (openfunction/builder-go:latest) |
Nodejs | openfunction/builder-node:v2-16.15 (openfunction/builder-node:latest) |
Java | openfunction/builder-java:v2-11, openfunction/builder-java:v2-16, openfunction/builder-java:v2-17, openfunction/builder-java:v2-18 |
Python | openfunction/gcp-builder:v1 |
DotNet | openfunction/gcp-builder:v1 |
3 - 构建策略
构建策略用于控制构建过程。有两种类型的策略,ClusterBuildStrategy
和 BuildStrategy
。
这两种策略都定义了一组步骤,这些步骤是控制应用程序构建过程所必需的。
ClusterBuildStrategy
是集群范围的,而 BuildStrategy
是命名空间范围的。
在 OpenFunction 中有 4 种内置的 ClusterBuildStrategy
,你可以在以下各节中找到更多详细信息。
openfunction
openfunction
ClusterBuildStrategy 使用 Buildpacks 来构建函数镜像,这是默认的构建策略。
以下是 openfunction
ClusterBuildStrategy 的参数:
名称 | 类型 | 描述 |
---|---|---|
RUN_IMAGE | string | 使用的运行镜像的引用 |
CACHE_IMAGE | string | 缓存镜像是一种在不同构建之间保留缓存层的方式,当构建具有大量依赖项的函数或应用程序(如 Java 函数)时,可以提高构建性能。 |
BASH_IMAGE | string | 策略使用的 bash 镜像。 |
ENV_VARS | string | 在 构建时 设置的环境变量。格式为 key1=value1,key2=value2 。 |
用户可以像这样设置这些参数:
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
name: logs-async-handler
spec:
build:
shipwright:
params:
RUN_IMAGE: ""
ENV_VARS: ""
buildah
buildah
ClusterBuildStrategy 使用 buildah 来构建应用程序镜像。
要使用 buildah
ClusterBuildStrategy,你可以定义一个 Function
,如下所示:
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
name: sample-go-app
namespace: default
spec:
build:
builder: openfunction/buildah:v1.23.1
dockerfile: Dockerfile
shipwright:
strategy:
kind: ClusterBuildStrategy
name: buildah
以下是 buildah
ClusterBuildStrategy 的参数:
名称 | 类型 | 描述 | 默认值 |
---|---|---|---|
registry-search | string | 用于搜索短名称镜像(如 golang:latest )的镜像仓库,用逗号分隔。 | docker.io,quay.io |
registry-insecure | string | 不安全镜像仓库的全名。不安全的镜像仓库是没有有效 SSL 证书或只支持 HTTP 的镜像仓库。 | |
registry-block | string | 需要阻止拉取访问的镜像仓库。 | "" |
kaniko
kaniko
ClusterBuildStrategy 使用 kaniko 来构建应用程序镜像。
要使用 kaniko
ClusterBuildStrategy,你可以定义一个 Function
,如下所示:
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
name: function-kaniko
namespace: default
spec:
build:
builder: openfunction/kaniko-executor:v1.7.0
dockerfile: Dockerfile
shipwright:
strategy:
kind: ClusterBuildStrategy
name: kaniko
ko
ko
ClusterBuildStrategy 使用 ko 来构建 Go
应用程序镜像。
要使用 ko
ClusterBuildStrategy,你可以定义一个 Function
,如下所示:
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
name: function-ko
namespace: default
spec:
build:
builder: golang:1.17
dockerfile: Dockerfile
shipwright:
strategy:
kind: ClusterBuildStrategy
name: ko
以下是 ko
ClusterBuildStrategy 的参数:
名称 | 类型 | 描述 | 默认值 |
---|---|---|---|
go-flags | string | Value for the GOFLAGS environment variable. | "" |
ko-version | string | Version of ko, must be either ’latest’, or a release name from https://github.com/google/ko/releases. | "" |
package-directory | string | The directory inside the context directory containing the main package. | “.” |
自定义策略
用户可以自定义他们自己的策略。要自定义策略,你可以参考 这里。
4 - 函数触发器
函数 触发器
用于定义如何触发函数。目前有两种触发器:HTTP 触发器
和 Dapr 触发器
。默认的触发器是 HTTP 触发器
。
HTTP 触发器
HTTP 触发器
通过 HTTP 请求触发函数。你可以像这样为函数定义一个 HTTP 触发器
:
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
name: function-sample
spec:
serving:
triggers:
http:
port: 8080
route:
rules:
- matches:
- path:
type: PathPrefix
value: /echo
Dapr 触发器
Dapr 触发器
通过 Dapr bindings
或 Dapr pubsub
的事件触发函数。你可以像这样定义一个带有 Dapr 触发器
的函数:
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
name: logs-async-handler
namespace: default
spec:
serving:
bindings:
kafka-receiver:
metadata:
- name: brokers
value: kafka-server-kafka-brokers:9092
- name: authRequired
value: "false"
- name: publishTopic
value: logs
- name: topics
value: logs
- name: consumerGroup
value: logs-handler
type: bindings.kafka
version: v1
triggers:
dapr:
- name: kafka-receiver
type: bindings.kafka
函数输入
输入
是函数可以从中获取额外输入数据的地方,目前支持 Dapr 状态存储
作为 输入
。
你可以像这样定义函数输入:
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
name: logs-async-handler
namespace: default
spec:
serving:
triggers:
inputs:
- dapr:
name: mysql
type: state.mysql
5 - 函数输出
函数输出
输出是函数可以发送数据的组件,包括:
例如,这里 你可以找到一个带有 cron 输入绑定和 Kafka 输出绑定的异步函数:
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
name: cron-input-kafka-output
spec:
...
serving:
...
outputs:
- dapr:
name: kafka-server
type: bindings.kafka
operation: "create"
bindings:
kafka-server:
type: bindings.kafka
version: v1
metadata:
- name: brokers
value: "kafka-server-kafka-brokers:9092"
- name: topics
value: "sample-topic"
- name: consumerGroup
value: "bindings-with-output"
- name: publishTopic
value: "sample-topic"
- name: authRequired
value: "false"
这里 是另一个使用 Kafka Pub/sub 组件作为输入的异步函数示例
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
name: autoscaling-subscriber
spec:
...
serving:
...
runtime: "async"
outputs:
- dapr:
name: kafka-server
type: pubsub.kafka
topic: "sample-topic"
pubsub:
kafka-server:
type: pubsub.kafka
version: v1
metadata:
- name: brokers
value: "kafka-server-kafka-brokers:9092"
- name: authRequired
value: "false"
- name: allowedTopics
value: "sample-topic"
- name: consumerID
value: "autoscaling-subscriber"
6 - 函数缩放
缩放是 FaaS 或 Serverless 平台的核心特性之一。
OpenFunction 在 ScaleOptions
中定义了函数缩放,并在 Triggers
中定义了激活函数缩放的触发器。
ScaleOptions
您可以为同步和异步函数定义统一的函数缩放选项,如下所示,这将对同步和异步函数都有效:
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
name: function-sample
spec:
serving:
scaleOptions:
minReplicas: 0
maxReplicas: 10
通常,仅定义 minReplicas 和 maxReplicas 对于异步函数来说是不够的。您可以像下面这样为异步函数定义单独的缩放选项,这将覆盖 minReplicas 和 maxReplicas。
您可以在 KEDA ScaleObject Spec 和 KEDA ScaledJob Spec 中找到更多关于异步函数缩放选项的详细信息。
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
name: function-sample
spec:
serving:
scaleOptions:
minReplicas: 0
maxReplicas: 10
keda:
scaledObject:
pollingInterval: 15
cooldownPeriod: 60
advanced:
horizontalPodAutoscalerConfig:
behavior:
scaleDown:
stabilizationWindowSeconds: 45
policies:
- type: Percent
value: 50
periodSeconds: 15
scaleUp:
stabilizationWindowSeconds: 0
您也可以为 Knative 同步函数设置高级缩放选项,这将覆盖 minReplicas 和 maxReplicas。
您可以在 这里 找到更多关于 Knative 同步函数缩放选项的详细信息。
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
name: function-sample
spec:
serving:
scaleOptions:
knative:
autoscaling.knative.dev/initial-scale: "1"
autoscaling.knative.dev/scale-down-delay: "0"
autoscaling.knative.dev/window: "60s"
autoscaling.knative.dev/panic-window-percentage: "10.0"
autoscaling.knative.dev/metric: "concurrency"
autoscaling.knative.dev/target: "100"
触发器
触发器定义了如何激活异步函数的函数缩放。您可以使用所有 KEDA 缩放器 中定义的触发器作为 OpenFunction 的触发器规范。
同步函数的缩放是通过 HTTP 请求的各种选项激活的,这些选项已经在前面的 ScaleOption 部分中定义了。
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
name: function-sample
spec:
serving:
scaleOptions:
keda:
triggers:
- type: kafka
metadata:
topic: logs
bootstrapServers: kafka-server-kafka-brokers.default.svc.cluster.local:9092
consumerGroup: logs-handler
lagThreshold: "20"
7 - 函数签名
不同函数签名的比较
OpenFunction中有三种函数签名:HTTP
,CloudEvent
和OpenFunction
。让我们使用Go函数为例,详细解释一下。
HTTP
和CloudEvent
签名可以用来创建同步函数,而OpenFunction
签名可以用来创建同步和异步函数。
此外,OpenFunction
签名可以利用各种Dapr构建块,包括Bindings,Pub/sub等访问各种BaaS服务,帮助创建更强大的函数。(Dapr状态管理,配置将很快得到支持)
HTTP | CloudEvent | OpenFunction | |
---|---|---|---|
签名 | func(http.ResponseWriter, *http.Request) error | func(context.Context, cloudevents.Event) error | func(ofctx.Context, []byte) (ofctx.Out, error) |
同步函数 | 支持 | 支持 | 支持 |
异步函数 | 不支持 | 不支持 | 支持 |
Dapr Binding | 不支持 | 不支持 | 支持 |
Dapr Pub/sub | 不支持 | 不支持 | 支持 |
示例
如您所见,OpenFunction
签名是推荐的函数签名,我们正在努力在更多语言运行时支持此函数签名。
HTTP | CloudEvent | OpenFunction | |
---|---|---|---|
Go | Hello World, 多函数, 带路径参数的同步函数, 日志处理 | 带路径参数的同步函数 | 带路径参数的同步函数, 带输出绑定的同步函数, Kafka输入 & HTTP输出绑定, Cron输入 & Kafka输出绑定, Cron输入绑定, Kafka输入绑定, Kafka pubsub |
Nodejs | Hello World | 带输出绑定的同步函数, MQTT绑定 & pubsub | |
Python | Hello World | ||
Java | Hello World | CloudEvent | 带输出的同步函数, Cron输入 & Kafka输出绑定, Kafka pubsub |
DotNet | Hello World |
8 - Wasm 函数
WasmEdge
是一个轻量级、高性能、可扩展的 WebAssembly 运行时,用于云原生、边缘和去中心化应用。它支持无服务器应用、嵌入式函数、微服务、智能合约和物联网设备。
OpenFunction 现在支持使用 WasmEdge
作为工作负载运行时来构建和运行 wasm 函数。
你可以在这里找到 WasmEdge 集成提案
Wasm 容器镜像
包含 wasm 二进制文件的 wasm 镜像是一个没有操作系统层的特殊容器镜像。应该在这个 wasm 容器镜像中添加一个特殊的注解 module.wasm.image/variant: compat-smart
,以便像 WasmEdge 这样的 wasm 运行时能够识别它。这在 OpenFunction 中是自动处理的,用户只需要将 workloadRuntime
指定为 wasmedge
。
Wasm 容器镜像的构建阶段
如果 function.spec.workloadRuntime
设置为 wasmedge
,或者函数的注解包含 module.wasm.image/variant: compat-smart
,function.spec.build.shipwright.strategy
将根据名为 wasmedge
的 ClusterBuildStrategy
自动生成,以便构建带有 module.wasm.image/variant: compat-smart
注解的 wasm 容器镜像。
Wasm 容器镜像的服务阶段
当 function.spec.workloadRuntime
设置为 wasmedge
,或者函数的注解包含 module.wasm.image/variant: compat-smart
时:
- 如果
function.spec.serving.annotations
不包含module.wasm.image/variant
,module.wasm.image/variant: compat-smart
将自动添加到function.spec.serving.annotations
。 - 如果
function.spec.serving.template.runtimeClassName
没有设置,这个runtimeClassName
将自动设置为默认的openfunction-crun
如果你的 kubernetes 集群在像
Azure
这样的公共云中,你可以手动设置spec.serving.template.runtimeClassName
来覆盖默认的runtimeClassName
。
构建和运行 wasm 函数
要在 kubernetes 集群中设置
WasmEdge
工作负载运行时并将镜像推送到容器镜像仓库, 请参考先决条件部分以获取更多信息。
你可以在这里找到关于这个示例函数的更多信息。
- 创建一个 wasm 函数
cat <<EOF | kubectl apply -f -
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
name: wasmedge-http-server
spec:
workloadRuntime: wasmedge
image: openfunctiondev/wasmedge_http_server:0.1.0
imageCredentials:
name: push-secret
build:
dockerfile: Dockerfile
srcRepo:
revision: main
sourceSubPath: functions/knative/wasmedge/http-server
url: https://github.com/OpenFunction/samples
serving:
scaleOptions:
minReplicas: 0
template:
containers:
- command:
- /wasmedge_hyper_server.wasm
imagePullPolicy: IfNotPresent
livenessProbe:
initialDelaySeconds: 3
periodSeconds: 30
tcpSocket:
port: 8080
name: function
triggers:
http:
port: 8080
route:
rules:
- matches:
- path:
type: PathPrefix
value: /echo
EOF
- 检查 wasm 函数的状态
kubectl get functions.core.openfunction.io -w
NAME BUILDSTATE SERVINGSTATE BUILDER SERVING ADDRESS AGE
wasmedge-http-server Succeeded Running builder-4p2qq serving-lrd8c http://wasmedge-http-server.default.svc.cluster.local/echo 12m
- 访问 wasm 函数
一旦 BUILDSTATE
变为 Succeeded
,SERVINGSTATE
变为 Running
,你就可以通过 ADDRESS
字段中的地址访问这个函数:
kubectl run curl --image=radial/busyboxplus:curl -i --tty
curl http://wasmedge-http-server.default.svc.cluster.local/echo -X POST -d "WasmEdge"
WasmEdge
9 - 无服务器应用
除了构建和运行无服务器函数外,你还可以使用 OpenFuntion 构建和运行无服务器应用。
OpenFunction 支持两种不同的方式将源代码构建成容器镜像:
- 使用 Cloud Native Buildpacks 在没有
Dockerfile
的情况下构建源代码 - 使用 Buildah 或 BuildKit 在有
Dockerfile
的情况下构建源代码
要将镜像推送到容器镜像仓库,你需要创建一个包含镜像仓库凭据的 secret,并将 secret 添加到
imageCredentials
。 请参考 先决条件 部分获取更多信息。
使用 Dockerfile 构建和运行无服务器应用
如果你已经为你的应用创建了一个 Dockerfile
,比如这个 Go 应用,你可以像 这样 以无服务器的方式构建和运行这个应用:
- 创建示例 Go 无服务器应用
cat <<EOF | kubectl apply -f -
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
name: sample-go-app
namespace: default
spec:
build:
builder: openfunction/buildah:v1.23.1
shipwright:
strategy:
kind: ClusterBuildStrategy
name: buildah
srcRepo:
revision: main
sourceSubPath: apps/buildah/go
url: https://github.com/OpenFunction/samples.git
image: openfunctiondev/sample-go-app:v1
imageCredentials:
name: push-secret
serving:
template:
containers:
- imagePullPolicy: IfNotPresent
name: function
triggers:
http:
port: 8080
version: v1.0.0
workloadRuntime: OCIContainer
EOF
- 检查应用状态 你可以通过 kubectl get functions.core.openfunction.io -w 检查无服务器应用的状态:
kubectl get functions.core.openfunction.io -w
NAME BUILDSTATE SERVINGSTATE BUILDER SERVING ADDRESS AGE
sample-go-app Succeeded Running builder-jgnzp serving-q6wdp http://sample-go-app.default.svc.cluster.local/ 22m
- 访问这个应用
一旦
BUILDSTATE
变成Succeeded
,SERVINGSTATE
变成Running
,你就可以通过ADDRESS
字段中的地址访问这个 Go 无服务器应用:
kubectl run curl --image=radial/busyboxplus:curl -i --tty
curl http://sample-go-app.default.svc.cluster.local
这里你可以找到一个 Java 无服务器应用(带有 Dockerfile)的示例。
不使用 Dockerfile 构建和运行无服务器应用
如果你有一个没有 Dockerfile 的应用,比如这个 Java 应用,你也可以像这个 Java 应用 那样以无服务器的方式构建和运行你的应用:
- 创建示例 Java 无服务器应用
cat <<EOF | kubectl apply -f -
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
name: sample-java-app-buildpacks
namespace: default
spec:
build:
builder: cnbs/sample-builder:alpine
srcRepo:
revision: main
sourceSubPath: apps/java-maven
url: https://github.com/buildpacks/samples.git
image: openfunction/sample-java-app-buildpacks:v1
imageCredentials:
name: push-secret
serving:
template:
containers:
- imagePullPolicy: IfNotPresent
name: function
resources: {}
triggers:
http:
port: 8080
version: v1.0.0
workloadRuntime: OCIContainer
EOF
- 检查应用状态 你可以通过 kubectl get functions.core.openfunction.io -w 检查无服务器应用的状态:
kubectl get functions.core.openfunction.io -w
NAME BUILDSTATE SERVINGSTATE BUILDER SERVING ADDRESS AGE
sample-java-app-buildpacks Succeeded Running builder-jgnzp serving-q6wdp http://sample-java-app-buildpacks.default.svc.cluster.local/ 22m
- 访问这个应用
一旦
BUILDSTATE
变成Succeeded
,SERVINGSTATE
变成Running
,你就可以通过ADDRESS
字段中的地址访问这个 Java 无服务器应用:
kubectl run curl --image=radial/busyboxplus:curl -i --tty
curl http://sample-java-app-buildpacks.default.svc.cluster.local
10 - BaaS 集成
OpenFunction 的一个独特特性是它可以通过 Dapr 简单地集成各种后端服务 (BaaS)。目前,OpenFunction 支持 Dapr 的 pub/sub 和 bindings 构建块,未来将会添加更多。
在 OpenFunction v0.7.0 及之前的版本中,OpenFunction 通过在每个函数实例的 pod 中注入一个 dapr sidecar 容器来集成 BaaS,这导致了以下问题:
- dapr sidecar 容器的启动拖慢了整个函数实例的启动时间。
- dapr sidecar 容器可能消耗的资源比函数容器本身还要多。
为了解决上述问题,OpenFunction 在 v0.8.0 中引入了 Dapr 独立模式
。
Dapr 独立模式
在 Dapr 独立模式中,每个函数都会创建一个 Dapr 代理服务
,然后由该函数的所有实例共享。这样,就不再需要为每个函数实例启动一个单独的 Dapr sidecar 容器,从而大大减少了函数的启动时间。
选择适当的 Dapr 服务模式
所以现在你有两个选项来集成 BaaS:
Dapr Sidecar 模式
Dapr 独立模式
你可以为你的函数选择适当的 Dapr 服务模式。Dapr 独立模式
是推荐的和默认的模式。如果你的函数不经常扩展,或者你难以使用 Dapr 独立模式
,你可以使用 Dapr Sidecar 模式
。
你可以通过设置两个标志来控制如何集成 BaaS,这两个标志都可以在函数的 spec.serving.annotations
中设置:
openfunction.io/enable-dapr
可以设置为true
或false
openfunction.io/dapr-service-mode
可以设置为standalone
或sidecar
- 当
openfunction.io/enable-dapr
设置为true
时,用户可以通过将openfunction.io/dapr-service-mode
设置为standalone
或sidecar
来选择Dapr 服务模式
。 - 当
openfunction.io/enable-dapr
设置为false
时,openfunction.io/dapr-service-mode
的值将被忽略,既不会启动Dapr Sidecar
,也不会启动Dapr 代理服务
。
如果这两个标志没有设置,它们都有默认值。
- 如果
openfunction.io/enable-dapr
在spec.serving.annotations
中没有定义,并且函数定义包含spec.serving.inputs
或spec.serving.outputs
,那么openfunction.io/enable-dapr
的值将被设置为true
。否则,它将被设置为false
。 - 如果没有设置
openfunction.io/dapr-service-mode
,其默认值为standalone
。
下面你可以找到一个设置这两个标志的函数示例:
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
name: cron-input-kafka-output
spec:
version: "v2.0.0"
image: openfunctiondev/cron-input-kafka-output:v1
imageCredentials:
name: push-secret
build:
builder: openfunction/builder-go:latest
env:
FUNC_NAME: "HandleCronInput"
FUNC_CLEAR_SOURCE: "true"
srcRepo:
url: "https://github.com/OpenFunction/samples.git"
sourceSubPath: "functions/async/bindings/cron-input-kafka-output"
revision: "main"
serving:
annotations:
openfunction.io/enable-dapr: "true"
openfunction.io/dapr-service-mode: "standalone"
template:
containers:
- name: function # DO NOT change this
imagePullPolicy: IfNotPresent
triggers:
dapr:
- name: cron
type: bindings.cron
outputs:
- dapr:
component: kafka-server
operation: "create"
bindings:
cron:
type: bindings.cron
version: v1
metadata:
- name: schedule
value: "@every 2s"
kafka-server:
type: bindings.kafka
version: v1
metadata:
- name: brokers
value: "kafka-server-kafka-brokers:9092"
- name: topics
value: "sample-topic"
- name: consumerGroup
value: "bindings-with-output"
- name: publishTopic
value: "sample-topic"
- name: authRequired
value: "false"
11 - 网络
11.1 - 简介
概述
从 v0.5.0 开始,OpenFunction 使用 Kubernetes Ingress 为同步函数提供统一的入口点,并且必须安装 nginx ingress 控制器。
随着 Kubernetes Gateway API 的成熟,我们决定在 OpenFunction v0.7.0 中实现基于 Kubernetes Gateway API 的 OpenFunction Gateway,以替代之前基于 ingress 的域方法。
你可以在这里找到 OpenFunction Gateway 的提案
OpenFunction Gateway 提供了一个更强大、更灵活的函数网关,包括以下特性:
使用户能够以更简单、厂商中立的方式切换到任何支持 Kubernetes Gateway API 的 网关实现,如 Contour、Istio、Apache APISIX、Envoy Gateway(未来)等。
用户可以选择安装默认的网关实现(Contour),然后定义一个新的
gateway.networking.k8s.io
,或者使用他们环境中的任何现有网关实现,然后引用现有的gateway.networking.k8s.io
。允许用户自定义他们自己的函数访问模式,如
hostTemplate: "{{.Name}}.{{.Namespace}}.{{.Domain}}"
用于基于主机的访问。允许用户自定义他们自己的函数访问模式,如
pathTemplate: "{{.Namespace}}/{{.Name}}"
用于基于路径的访问。允许用户在函数定义中自定义每个函数的路由规则(基于主机、基于路径或两者都有),如果没有定义自定义的路由规则,将为每个函数提供默认的路由规则。
直接将流量发送到 Knative 服务修订版,而不再通过 Knative 自己的网关。从 OpenFunction 0.7.0 开始,你只需要 OpenFunction Gateway 就可以访问 OpenFunction 同步函数,如果你不需要直接访问 Knative 服务,你可以忽略 Knative 的域配置错误。
在函数修订版之间分割流量(未来)
下图说明了客户端流量如何通过 OpenFunction Gateway,然后直接到达函数:
11.2 - 网关
深入理解 OpenFunction 网关
OpenFunction 网关由 Kubernetes 网关支持,定义了用户如何访问同步函数。
每当创建一个 OpenFunction 网关
时,网关控制器将:
如果
gatewaySpec.listeners
中没有,将添加一个名为ofn-http-internal
的默认监听器。根据
domain
或clusterDomain
生成gatewaySpec.listeners.[*].hostname
。将
gatewaySpec.listenters
注入到OpenFunction 网关
的gatewayRef
定义的现有Kubernetes 网关
中。根据
OpenFunction 网关
的gatewayDef
中的gatewaySpec.listenters
字段创建一个新的Kubernetes 网关
。创建一个名为
gateway.openfunction.svc.cluster.local
的服务,该服务定义了同步函数的统一入口。
部署 OpenFunction 网关
后,你将能够在 OpenFunction 网关
状态中找到 Kubernetes 网关
和其 listeners
的状态:
status:
conditions:
- message: Gateway is scheduled
reason: Scheduled
status: "True"
type: Scheduled
- message: Valid Gateway
reason: Valid
status: "True"
type: Ready
listeners:
- attachedRoutes: 0
conditions:
- message: Valid listener
reason: Ready
status: "True"
type: Ready
name: ofn-http-internal
supportedKinds:
- group: gateway.networking.k8s.io
kind: HTTPRoute
- attachedRoutes: 0
conditions:
- message: Valid listener
reason: Ready
status: "True"
type: Ready
name: ofn-http-external
supportedKinds:
- group: gateway.networking.k8s.io
kind: HTTPRoute
默认的 OpenFunction 网关
OpenFunction 网关
使用 Contour
作为默认的 Kubernetes 网关
实现。
安装 OpenFunction 后,将自动创建以下 OpenFunction 网关
:
apiVersion: networking.openfunction.io/v1alpha1
kind: Gateway
metadata:
name: openfunction
namespace: openfunction
spec:
domain: ofn.io
clusterDomain: cluster.local
hostTemplate: "{{.Name}}.{{.Namespace}}.{{.Domain}}"
pathTemplate: "{{.Namespace}}/{{.Name}}"
httpRouteLabelKey: "app.kubernetes.io/managed-by"
gatewayRef:
name: contour
namespace: projectcontour
gatewaySpec:
listeners:
- name: ofn-http-internal
hostname: "*.cluster.local"
protocol: HTTP
port: 80
allowedRoutes:
namespaces:
from: All
- name: ofn-http-external
hostname: "*.ofn.io"
protocol: HTTP
port: 80
allowedRoutes:
namespaces:
from: All
你可以像下面这样自定义默认的 OpenFunction 网关
:
kubectl edit gateway openfunction -n openfunction
切换到不同的 Kubernetes 网关
你可以以更简单、厂商中立的方式切换到任何支持 Kubernetes 网关 API 的 网关实现,如 Contour、Istio、Apache APISIX、Envoy Gateway(未来)等。
这里 你可以找到更多详细信息。
多个 OpenFunction 网关
对于 OpenFunction,多个 Gateway
没有意义,我们目前只支持一个 OpenFunction Gateway
。
11.3 - 路由
什么是 Route
?
Route
是 Function
定义的一部分。Route
定义了来自 Gateway
监听器的流量如何路由到函数。
Route
在 GatewayRef
中指定了它将附加到的 Gateway
,使其能够从 Gateway
接收流量。
一旦创建了同步 Function
,函数控制器将:
- 查找
openfunction
命名空间中名为openfunction
的Gateway
,然后如果函数中没有定义route.gatewayRef
,则附加到此Gateway
。 - 如果函数中没有定义
route.hostnames
,则根据Gateway.spec.hostTemplate
自动生成route.hostnames
。 - 如果函数中没有定义
route.rules
,则根据路径/
自动生成route.rules
。 - 根据
Route
创建一个HTTPRoute
自定义资源。BackendRefs
将自动链接到相应的 Knative 服务修订版,并将HTTPRouteLabelKey
标签添加到此HTTPRoute
。 - 创建服务
{{.Name}}.{{.Namespace}}.svc.cluster.local
,此服务定义了从集群内部访问函数的入口。 - 如果
route.gatewayRef
引用的Gateway
发生变化,将更新HTTPRoute
。
部署同步 Function
后,你将能够在 Function
的状态字段中找到 Function
地址和 Route
状态,例如:
status:
addresses:
- type: External
value: http://function-sample-serving-only.default.ofn.io/
- type: Internal
value: http://function-sample-serving-only.default.svc.cluster.local/
build:
resourceHash: "14903236521345556383"
state: Skipped
route:
conditions:
- message: Valid HTTPRoute
reason: Valid
status: "True"
type: Accepted
hosts:
- function-sample-serving-only.default.ofn.io
- function-sample-serving-only.default.svc.cluster.local
paths:
- type: PathPrefix
value: /
serving:
lastSuccessfulResourceRef: serving-znk54
resourceHash: "10715302888241374768"
resourceRef: serving-znk54
service: serving-znk54-ksvc-nbg6f
state: Running
注意
Funtion.status
中类型为 Internal
的地址提供了从集群内部访问函数的默认方法。
这个内部地址不受 route.gatewayRef
引用的 Gateway
的影响,它适合用作 EventSource
的 sink.url
。
Funtion.status
中类型为 External
的地址提供了从集群外部访问函数的方法(你可以选择配置 Magic DNS 或真实 DNS,请参考 通过外部地址访问函数 获取更多详情)。
这个外部地址是基于 route.gatewayRef
、router.hostnames
和 route.rules
生成的。路由模式只对这个外部地址生效,下面的文档将解释它是如何工作的。
有关如何访问函数的更多信息,请参考 函数入口点。
基于主机的路由
基于主机
是默认的路由模式。当 route.hostnames
未定义时,
将根据 gateway.spec.hostTemplate
自动生成 route.hostnames
。
如果 route.rules
未定义,将根据路径 /
自动生成 route.rules
。
kubectl apply -f - <<EOF
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
name: function-sample
spec:
version: "v1.0.0"
image: "openfunctiondev/v1beta1-http:latest"
serving:
template:
containers:
- name: function
imagePullPolicy: Always
triggers:
http:
route:
gatewayRef:
name: openfunction
namespace: openfunction
EOF
如果你正在使用默认的 OpenFunction Gateway,函数的外部地址将如下:
http://function-sample.default.ofn.io/
基于路径的路由
如果你在函数中定义了 route.hostnames
,将根据 gateway.spec.pathTemplate
自动生成 route.rules
。
kubectl apply -f - <<EOF
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
name: function-sample
spec:
version: "v1.0.0"
image: "openfunctiondev/v1beta1-http:latest"
serving:
template:
containers:
- name: function
imagePullPolicy: Always
triggers:
http:
route:
gatewayRef:
name: openfunction
namespace: openfunction
hostnames:
- "sample.ofn.io"
EOF
如果你正在使用默认的 OpenFunction Gateway,函数的外部地址将如下:
http://sample.default.ofn.io/default/function-sample/
基于主机和路径的路由
你可以同时定义主机名和路径,以自定义流量应如何路由到你的函数。
注意
在这种模式下,你需要自己解决 HTTPRoutes 之间可能存在的冲突。kubectl apply -f - <<EOF
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
name: function-sample
spec:
version: "v1.0.0"
image: "openfunctiondev/v1beta1-http:latest"
serving:
template:
containers:
- name: function
imagePullPolicy: Always
triggers:
http:
route:
gatewayRef:
name: openfunction
namespace: openfunction
rules:
- matches:
- path:
type: PathPrefix
value: /v2/foo
hostnames:
- "sample.ofn.io"
EOF
如果你正在使用默认的 OpenFunction Gateway,函数的外部地址将如下:
http://sample.default.ofn.io/v2/foo/
11.4 - 函数入口
有几种方法可以访问同步函数。我们将在以下部分详细说明。
本文档假设你正在使用默认的 OpenFunction 网关 并且你有一个名为
function-sample
的同步函数。
从集群内部访问函数
通过内部地址访问函数
OpenFunction 会为每个同步 Function
创建此服务:{{.Name}}.{{.Namespace}}.svc.cluster.local
。此服务将用于提供函数的内部地址。
通过运行以下命令获取 Function
内部地址:
export FUNC_INTERNAL_ADDRESS=$(kubectl get function function-sample -o=jsonpath='{.status.addresses[?(@.type=="Internal")].value}')
这个地址提供了在集群内部访问函数的默认方法,适合用作 EventSource
的 sink.url
。
在 pod 中使用 curl 访问 Function
:
kubectl run --rm ofn-test -i --tty --image=radial/busyboxplus:curl -- curl -sv $FUNC_INTERNAL_ADDRESS
从集群外部访问函数
通过 Kubernetes 网关的 IP 地址访问函数
获取 Kubernetes 网关的 ip 地址:
export IP=$(kubectl get node -l "! node.kubernetes.io/exclude-from-external-load-balancers" -o=jsonpath='{.items[0].status.addresses[?(@.type=="InternalIP")].address}')
获取函数的 HOST 和 PATH:
export FUNC_HOST=$(kubectl get function function-sample -o=jsonpath='{.status.route.hosts[0]}')
export FUNC_PATH=$(kubectl get function function-sample -o=jsonpath='{.status.route.paths[0].value}')
直接使用 curl 访问 Function
:
curl -sv -HHOST:$FUNC_HOST http://$IP$FUNC_PATH
通过外部地址访问函数
要通过外部地址访问同步函数,你需要先配置 DNS。Magic DNS 或真实 DNS 都可以:
Magic DNS
获取 Kubernetes 网关的 ip 地址:
export IP=$(kubectl get node -l "! node.kubernetes.io/exclude-from-external-load-balancers" -o=jsonpath='{.items[0].status.addresses[?(@.type=="InternalIP")].address}')
用 Magic DNS 替换 OpenFunction 网关中定义的域:
export DOMAIN="$IP.sslip.io" kubectl patch gateway.networking.openfunction.io/openfunction -n openfunction --type merge --patch '{"spec": {"domain": "'$DOMAIN'"}}'
然后,你可以在
Function
的状态字段中看到Function
外部地址:kubectl get function function-sample -oyaml
status: addresses: - type: External value: http://function-sample.default.172.31.73.53.sslip.io/ - type: Internal value: http://function-sample.default.svc.cluster.local/ build: resourceHash: "14903236521345556383" state: Skipped route: conditions: - message: Valid HTTPRoute reason: Valid status: "True" type: Accepted hosts: - function-sample.default.172.31.73.53.sslip.io - function-sample.default.svc.cluster.local paths: - type: PathPrefix value: / serving: lastSuccessfulResourceRef: serving-t56fq resourceHash: "2638289828407595605" resourceRef: serving-t56fq service: serving-t56fq-ksvc-bv8ng state: Running
Real DNS
如果你有一个外部 IP 地址,你可以配置一个通配符 A 记录作为你的域:
# 这里 example.com 是 OpenFunction 网关中定义的域 *.example.com == A <external-ip>
如果你有一个 CNAME,你可以配置一个 CNAME 记录作为你的域:
# 这里 example.com 是 OpenFunction 网关中定义的域 *.example.com == CNAME <external-name>
用你上面配置的域替换 OpenFunction 网关中定义的域:
export DOMAIN="example.com" kubectl patch gateway.networking.openfunction.io/openfunction -n openfunction --type merge --patch '{"spec": {"domain": "'$DOMAIN'"}}'
然后,你可以在
Function
的状态字段中看到Function
外部地址:kubectl get function function-sample -oyaml
status: addresses: - type: External value: http://function-sample.default.example.com/ - type: Internal value: http://function-sample.default.svc.cluster.local/ build: resourceHash: "14903236521345556383" state: Skipped route: conditions: - message: Valid HTTPRoute reason: Valid status: "True" type: Accepted hosts: - function-sample.default.example.com - function-sample.default.svc.cluster.local paths: - type: PathPrefix value: / serving: lastSuccessfulResourceRef: serving-t56fq resourceHash: "2638289828407595605" resourceRef: serving-t56fq service: serving-t56fq-ksvc-bv8ng state: Running
然后,你可以通过运行以下命令获取 Function
外部地址:
export FUNC_EXTERNAL_ADDRESS=$(kubectl get function function-sample -o=jsonpath='{.status.addresses[?(@.type=="External")].value}')
现在,你可以直接使用 curl 访问 Function
:
curl -sv $FUNC_EXTERNAL_ADDRESS
12 - CI/CD
概述
以前,用户可以使用 OpenFunction 将函数或应用程序源代码构建成容器镜像,然后直接将构建的镜像部署到底层的同步/异步 Serverless 运行时,无需用户干预。
但是,OpenFunction 无法在函数或应用程序源代码更改时重新构建镜像,然后重新部署它,也无法在此镜像更改时重新部署镜像(当镜像在另一个函数中手动构建和推送时)
从 v1.0.0 开始,OpenFunction 在一个新的组件 Revision Controller
中添加了检测源代码或镜像更改,然后重新构建和/或重新部署新构建的镜像的能力。修订控制器能够:
- 检测 github、gitlab 或 gitee 中的源代码更改,然后在源代码更改时重新构建和重新部署新构建的镜像。
- 检测包含源代码的捆绑容器镜像(镜像)的更改,然后在捆绑镜像更改时重新构建和重新部署新构建的镜像。
- 检测函数或应用程序镜像的更改,然后在函数或应用程序镜像更改时重新部署新镜像。
快速开始
安装 Revision Controller
你可以在安装 OpenFunction 时启用 Revision Controller
,只需在 helm 命令中添加以下标志即可。
--set revisionController.enable=true
你也可以在 OpenFunction
安装后启用 Revision Controller
:
kubectl apply -f https://raw.githubusercontent.com/OpenFunction/revision-controller/release-1.0/deploy/bundle.yaml
Revision Controller
默认将安装到openfunction
命名空间。如果你想将其安装到另一个命名空间,你可以下载bundle.yaml
并手动更改命名空间。
检测源代码或镜像更改
要检测源代码或镜像更改,你需要将修订控制器开关和参数添加到函数的注解中,如下所示。
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
annotations:
openfunction.io/revision-controller: enable
openfunction.io/revision-controller-params: |
type: source
repo-type: github
polling-interval: 1m
name: function-http-java
namespace: default
spec:
build:
...
serving:
...
注解
键 | 描述 |
---|---|
openfunction.io/revision-controller | 是否启用修订控制器来检测此函数的源代码或镜像更改,可以设置为 enable 或 disable 。 |
openfunction.io/revision-controller-params | 修订控制器的参数。 |
参数
名称 | 描述 |
---|---|
type | 要检测的更改类型,包括 source 、source-image 和 image 。 |
polling-interval | 轮询镜像摘要或源代码头的间隔。 |
repo-type | git 仓库的类型,包括 github 、gitlab 和 gitee 。默认为 github 。 |
base-url | gitlab 服务器的基本 url。 |
auth-type | gitlab 服务器的认证类型。 |
project-id | gitlab 仓库的项目 id。 |
insecure-registry | 如果镜像仓库不安全,你应该将此设置为 true。 |
13 - OpenFunction 事件
13.1 - 简介
概述
OpenFunction 事件是 OpenFunction 的事件管理框架。它提供以下核心特性:
- 支持通过同步和异步调用触发目标函数
- 用户定义的触发判断逻辑
- OpenFunction 事件的组件可以由 OpenFunction 本身驱动
架构
以下图示说明了 OpenFunction 事件的架构。
概念
EventSource
EventSource 定义了事件的生产者,例如 Kafka 服务,对象存储服务,甚至是函数。它包含了这些事件生产者的描述和发送这些事件的信息。
EventSource 支持以下类型的事件源服务器:
- Kafka
- Cron(调度器)
- Redis
EventBus(ClusterEventBus)
EventBus 负责聚合事件并使其持久化。它包含了一个通常是消息队列(如 NATS Streaming 和 Kafka)的事件总线 broker 的描述,并为 EventSource 和 Trigger 提供这些配置。
EventBus 默认处理命名空间范围内的事件总线适配。对于集群范围,ClusterEventBus 可作为事件总线适配器,并在其他组件无法找到命名空间下的 EventBus 时生效。
EventBus 支持以下事件总线 broker:
- NATS Streaming
Trigger
Trigger 是事件目的的抽象,例如接收到消息时需要做什么。它包含了由您定义的事件的目的,这告诉触发器它应该从哪个 EventSource 获取事件,并根据给定的条件决定是否触发目标函数。
参考
有关更多信息,请参阅 EventSource 规范 和 EventBus 规范。
13.2 - 使用 EventSource
本文档提供了一个示例,说明如何使用事件源触发同步函数。
在此示例中,定义了一个 EventSource,用于同步调用,使用事件源(一个 Kafka 服务器)作为函数(一个 Knative 服务)的输入绑定。当事件源生成事件时,它将通过 spec.sink
配置调用函数并获取同步返回。
创建函数
使用以下内容创建一个函数作为 EventSource Sink。有关如何创建函数的更多信息,请参见 创建同步函数。
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
name: sink
spec:
version: "v1.0.0"
image: "openfunction/sink-sample:latest"
serving:
template:
containers:
- name: function
imagePullPolicy: Always
triggers:
http:
port: 8080
创建函数后,运行以下命令获取函数的 URL。
$ kubectl get functions.core.openfunction.io
NAME BUILDSTATE SERVINGSTATE BUILDER SERVING URL AGE
sink Skipped Running serving-4x5wh https://openfunction.io/default/sink 13s
创建 Kafka 集群
运行以下命令在默认命名空间中安装 strimzi-kafka-operator。
helm repo add strimzi https://strimzi.io/charts/ helm install kafka-operator -n default strimzi/strimzi-kafka-operator
使用以下内容创建一个文件
kafka.yaml
。apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: kafka-server namespace: default spec: kafka: version: 3.3.1 replicas: 1 listeners: - name: plain port: 9092 type: internal tls: false - name: tls port: 9093 type: internal tls: true config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 default.replication.factor: 1 min.insync.replicas: 1 inter.broker.protocol.version: "3.1" storage: type: ephemeral zookeeper: replicas: 1 storage: type: ephemeral entityOperator: topicOperator: {} userOperator: {} --- apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: events-sample namespace: default labels: strimzi.io/cluster: kafka-server spec: partitions: 10 replicas: 1 config: retention.ms: 7200000 segment.bytes: 1073741824
运行以下命令在默认命名空间中部署一个名为
kafka-server
的 1-replica Kafka 服务器和一个名为events-sample
的 1-replica Kafka 主题。此命令创建的 Kafka 和 Zookeeper 集群的存储类型为 ephemeral,并使用 emptyDir 进行演示。kubectl apply -f kafka.yaml
运行以下命令检查 pod 状态,并等待 Kafka 和 Zookeeper 启动并运行。
$ kubectl get po NAME READY STATUS RESTARTS AGE kafka-server-entity-operator-568957ff84-nmtlw 3/3 Running 0 8m42s kafka-server-kafka-0 1/1 Running 0 9m13s kafka-server-zookeeper-0 1/1 Running 0 9m46s strimzi-cluster-operator-687fdd6f77-cwmgm 1/1 Running 0 11m
运行以下命令查看 Kafka 集群的元数据。
kafkacat -L -b kafka-server-kafka-brokers:9092
触发同步函数
创建 EventSource
使用以下内容创建一个 EventSource 配置文件(例如,
eventsource-sink.yaml
)。注意
- 以下示例定义了一个名为
my-eventsource
的事件源,并将指定 Kafka 服务器生成的事件标记为sample-one
事件。 spec.sink
引用了在先决条件中创建的目标函数(Knative 服务)。
apiVersion: events.openfunction.io/v1alpha1 kind: EventSource metadata: name: my-eventsource spec: logLevel: "2" kafka: sample-one: brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092" topic: "events-sample" authRequired: false sink: uri: "http://openfunction.io.svc.cluster.local/default/sink"
- 以下示例定义了一个名为
运行以下命令应用配置文件。
kubectl apply -f eventsource-sink.yaml
运行以下命令检查结果。
$ kubectl get eventsources.events.openfunction.io NAME EVENTBUS SINK STATUS my-eventsource Ready $ kubectl get components NAME AGE serving-8f6md-component-esc-kafka-sample-one-r527t 68m serving-8f6md-component-ts-my-eventsource-default-wz8jt 68m $ kubectl get deployments.apps NAME READY UP-TO-DATE AVAILABLE AGE serving-8f6md-deployment-v100-pg9sd 1/1 1 1 68m
注意
在此示例中触发同步函数,EventSource 控制器的工作流程描述如下:
- 创建一个名为
my-eventsource
的 EventSource 自定义资源。 - 创建一个名为
serving-xxxxx-component-esc-kafka-sample-one-xxxxx
的 Dapr 组件,使 EventSource 能够与事件源关联。 - 创建一个名为
serving-xxxxx-component-ts-my-eventsource-default-xxxxx
的 Dapr 组件,使 EventSource 能够与 sink 函数关联。 - 创建一个名为
serving-xxxxx-deployment-v100-xxxxx-xxxxxxxxxx-xxxxx
的 Deployment,用于处理事件。
- 创建一个名为
创建事件生产者
要启动目标函数,需要创建一些事件来触发函数。
使用以下内容创建一个事件生产者配置文件(例如,
events-producer.yaml
)。apiVersion: core.openfunction.io/v1beta1 kind: Function metadata: name: events-producer spec: version: "v1.0.0" image: openfunctiondev/v1beta1-bindings:latest serving: template: containers: - name: function imagePullPolicy: Always runtime: "async" inputs: - name: cron component: cron outputs: - name: target component: kafka-server operation: "create" bindings: cron: type: bindings.cron version: v1 metadata: - name: schedule value: "@every 2s" kafka-server: type: bindings.kafka version: v1 metadata: - name: brokers value: "kafka-server-kafka-brokers:9092" - name: topics value: "events-sample" - name: consumerGroup value: "bindings-with-output" - name: publishTopic value: "events-sample" - name: authRequired value: "false"
运行以下命令应用配置文件。
kubectl apply -f events-producer.yaml
运行以下命令实时检查结果。
$ kubectl get po --watch NAME READY STATUS RESTARTS AGE serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh 0/2 ContainerCreating 0 1s serving-8f6md-deployment-v100-pg9sd-6666c5577f-4rpdg 2/2 Running 0 23m serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh 0/2 ContainerCreating 0 1s serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh 1/2 Running 0 5s serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh 2/2 Running 0 8s serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 0/2 Pending 0 0s serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 0/2 Pending 0 0s serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 0/2 ContainerCreating 0 0s serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 0/2 ContainerCreating 0 2s serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 1/2 Running 0 4s serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 1/2 Running 0 4s serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 2/2 Running 0 4s
13.3 - 使用 EventBus 和 Trigger
本文档提供了一个示例,说明如何使用 EventBus 和 Trigger。
先决条件
- 您需要创建一个作为目标函数的函数以被触发。有关更多详细信息,请参见 创建函数。
- 您需要创建一个 Kafka 集群。有关更多详细信息,请参见 创建 Kafka 集群。
部署 NATS 流服务器
运行以下命令部署 NATS 流服务器。本文档使用 nats://nats.default:4222
作为 NATS 流服务器的访问地址,stan
作为集群 ID。有关更多信息,请参见 NATS Streaming (STAN)。
helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm install nats nats/nats
helm install stan nats/stan --set stan.nats.url=nats://nats:4222
创建 OpenFuncAsync 运行时函数
使用以下内容创建目标函数的配置文件(例如,
openfuncasync-function.yaml
),该函数由 Trigger CRD 触发并打印接收到的消息。apiVersion: core.openfunction.io/v1beta2 kind: Function metadata: name: trigger-target spec: version: "v1.0.0" image: openfunctiondev/v1beta1-trigger-target:latest serving: scaleOptions: keda: scaledObject: pollingInterval: 15 minReplicaCount: 0 maxReplicaCount: 10 cooldownPeriod: 30 triggers: - type: stan metadata: natsServerMonitoringEndpoint: "stan.default.svc.cluster.local:8222" queueGroup: "grp1" durableName: "ImDurable" subject: "metrics" lagThreshold: "10" triggers: dapr: - name: eventbus topic: metrics pubsub: eventbus: type: pubsub.natsstreaming version: v1 metadata: - name: natsURL value: "nats://nats.default:4222" - name: natsStreamingClusterID value: "stan" - name: subscriptionType value: "queue" - name: durableSubscriptionName value: "ImDurable" - name: consumerID value: "grp1"
运行以下命令应用配置文件。
kubectl apply -f openfuncasync-function.yaml
创建 EventBus 和 EventSource
使用以下内容创建 EventBus 的配置文件(例如,
eventbus.yaml
)。apiVersion: events.openfunction.io/v1alpha1 kind: EventBus metadata: name: default spec: natsStreaming: natsURL: "nats://nats.default:4222" natsStreamingClusterID: "stan" subscriptionType: "queue" durableSubscriptionName: "ImDurable"
使用以下内容创建 EventSource 的配置文件(例如,
eventsource.yaml
)。注意
通过spec.eventBus
设置事件总线的名称。apiVersion: events.openfunction.io/v1alpha1 kind: EventSource metadata: name: my-eventsource spec: logLevel: "2" eventBus: "default" kafka: sample-two: brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092" topic: "events-sample" authRequired: false
运行以下命令应用这些配置文件。
kubectl apply -f eventbus.yaml kubectl apply -f eventsource.yaml
运行以下命令检查结果。
$ kubectl get eventsources.events.openfunction.io NAME EVENTBUS SINK STATUS my-eventsource default Ready $ kubectl get eventbus.events.openfunction.io NAME AGE default 62m $ kubectl get components NAME AGE serving-9689d-component-ebfes-my-eventsource-cmcbw 46m serving-9689d-component-esc-kafka-sample-two-l99cg 46m serving-dxrhd-component-eventbus-t65q7 13m serving-zwlj4-component-ebft-my-trigger-4925n 100s
注意
在使用事件总线的情况下,EventSource 控制器的工作流程描述如下:
- 创建一个名为
my-eventsource
的 EventSource 自定义资源。 - 检索并重新组织 EventBus 的配置,包括 EventBus 名称(在此示例中为
default
)和与 EventBus 关联的 Dapr 组件的名称。 - 创建一个名为
serving-xxxxx-component-ebfes-my-eventsource-xxxxx
的 Dapr 组件,使 EventSource 能够与事件总线关联。 - 创建一个名为
serving-xxxxx-component-esc-kafka-sample-two-xxxxx
的 Dapr 组件,使 EventSource 能够与事件源关联。 - 创建一个名为
serving-xxxxx-deployment-v100-xxxxx
的 Deployment,用于处理事件。
- 创建一个名为
创建 Trigger
使用以下内容创建 Trigger 的配置文件(例如,
trigger.yaml
)。注意
- 通过
spec.eventBus
设置与 Trigger 关联的事件总线。 - 通过
spec.inputs
设置事件输入源。 - 这是一个简单的触发器,它从名为
default
的 EventBus 中收集事件。当它从 EventSourcemy-eventsource
中检索到一个sample-two
事件时,它触发一个名为function-sample-serving-qrdx8-ksvc-fwml8
的 Knative 服务,并同时将事件发送到事件总线的metrics
主题。
apiVersion: events.openfunction.io/v1alpha1 kind: Trigger metadata: name: my-trigger spec: logLevel: "2" eventBus: "default" inputs: inputDemo: eventSource: "my-eventsource" event: "sample-two" subscribers: - condition: inputDemo topic: "metrics"
- 通过
运行以下命令应用配置文件。
kubectl apply -f trigger.yaml
运行以下命令检查结果。
$ kubectl get triggers.events.openfunction.io NAME EVENTBUS STATUS my-trigger default Ready $ kubectl get eventbus.events.openfunction.io NAME AGE default 62m $ kubectl get components NAME AGE serving-9689d-component-ebfes-my-eventsource-cmcbw 46m serving-9689d-component-esc-kafka-sample-two-l99cg 46m serving-dxrhd-component-eventbus-t65q7 13m serving-zwlj4-component-ebft-my-trigger-4925n 100s
注意
在使用事件总线的情况下,Trigger 控制器的工作流程如下:
- 创建一个名为
my-trigger
的 Trigger 自定义资源。 - 检索并重新组织 EventBus 的配置,包括 EventBus 名称(在此示例中为
default
)和与 EventBus 关联的 Dapr 组件的名称。 - 创建一个名为
serving-xxxxx-component-ebft-my-trigger-xxxxx
的 Dapr 组件,使 Trigger 能够与事件总线关联。 - 创建一个名为
serving-xxxxx-deployment-v100-xxxxx
的 Deployment,用于处理触发任务。
- 创建一个名为
创建事件生产者
使用以下内容创建事件生产者配置文件(例如,
events-producer.yaml
)。apiVersion: core.openfunction.io/v1beta2 kind: Function metadata: name: events-producer spec: version: "v1.0.0" image: openfunctiondev/v1beta1-bindings:latest serving: template: containers: - name: function imagePullPolicy: Always triggers: dapr: - name: cron type: bindings.cron outputs: - dapr: name: kafka-server operation: "create" bindings: cron: type: bindings.cron version: v1 metadata: - name: schedule value: "@every 2s" kafka-server: type: bindings.kafka version: v1 metadata: - name: brokers value: "kafka-server-kafka-brokers:9092" - name: topics value: "events-sample" - name: consumerGroup value: "bindings-with-output" - name: publishTopic value: "events-sample" - name: authRequired value: "false"
运行以下命令应用配置文件。
kubectl apply -f events-producer.yaml
运行以下命令观察目标异步函数的变化。
$ kubectl get functions.core.openfunction.io NAME BUILDSTATE SERVINGSTATE BUILDER SERVING URL AGE trigger-target Skipped Running serving-dxrhd 20m $ kubectl get po --watch NAME READY STATUS RESTARTS AGE serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 0/2 Pending 0 0s serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 0/2 Pending 0 0s serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 0/2 ContainerCreating 0 0s serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 0/2 ContainerCreating 0 2s serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 1/2 Running 0 4s serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 1/2 Running 0 4s serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 2/2 Running 0 4s
13.4 - 在一个 EventSource 中使用多个源
本文档描述了如何在一个 EventSource 中使用多个源。
先决条件
- 您需要创建一个作为目标函数的函数以被触发。有关更多详细信息,请参见 创建函数。
- 您需要创建一个 Kafka 集群。有关更多详细信息,请参见 创建 Kafka 集群。
在一个 EventSource 中使用多个源
使用以下内容创建一个 EventSource 配置文件(例如,
eventsource-multi.yaml
)。注意
- 以下示例定义了一个名为
my-eventsource
的事件源,并将指定 Kafka 服务器生成的事件标记为sample-three
事件。 spec.sink
引用了目标函数(Knative 服务)。spec.cron
的配置是每5秒触发一次在spec.sink
中定义的函数。
apiVersion: events.openfunction.io/v1alpha1 kind: EventSource metadata: name: my-eventsource spec: logLevel: "2" kafka: sample-three: brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092" topic: "events-sample" authRequired: false cron: sample-three: schedule: "@every 5s" sink: uri: "http://openfunction.io.svc.cluster.local/default/sink"
- 以下示例定义了一个名为
运行以下命令应用配置文件。
kubectl apply -f eventsource-multi.yaml
运行以下命令观察变化。
$ kubectl get eventsources.events.openfunction.io NAME EVENTBUS SINK STATUS my-eventsource Ready $ kubectl get components NAME AGE serving-vqfk5-component-esc-cron-sample-three-dzcpv 35s serving-vqfk5-component-esc-kafka-sample-one-nr9pq 35s serving-vqfk5-component-ts-my-eventsource-default-q6g6m 35s $ kubectl get deployments.apps NAME READY UP-TO-DATE AVAILABLE AGE serving-4x5wh-ksvc-wxbf2-v100-deployment 1/1 1 1 3h14m serving-vqfk5-deployment-v100-vdmvj 1/1 1 1 48s
13.5 - 使用 ClusterEventBus
本文档描述了如何使用 ClusterEventBus。
先决条件
您已完成 使用 EventBus 和 Trigger 中描述的步骤。
使用 ClusterEventBus
使用以下内容创建一个 ClusterEventBus 配置文件(例如,
clustereventbus.yaml
)。apiVersion: events.openfunction.io/v1alpha1 kind: ClusterEventBus metadata: name: default spec: natsStreaming: natsURL: "nats://nats.default:4222" natsStreamingClusterID: "stan" subscriptionType: "queue" durableSubscriptionName: "ImDurable"
运行以下命令删除 EventBus。
kubectl delete eventbus.events.openfunction.io default
运行以下命令应用配置文件。
kubectl apply -f clustereventbus.yaml
运行以下命令检查结果。
$ kubectl get eventbus.events.openfunction.io No resources found in default namespace. $ kubectl get clustereventbus.events.openfunction.io NAME AGE default 21s
13.6 - 使用带有条件的 Trigger
本文档描述了如何使用带有条件的触发器。
先决条件
您已完成 使用 EventBus 和 Trigger 中描述的步骤。
使用带有条件的触发器
创建两个事件源
使用以下内容创建一个 EventSource 配置文件(例如,
eventsource-a.yaml
)。apiVersion: events.openfunction.io/v1alpha1 kind: EventSource metadata: name: eventsource-a spec: logLevel: "2" eventBus: "default" kafka: sample-five: brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092" topic: "events-sample" authRequired: false
使用以下内容创建另一个 EventSource 配置文件(例如,
eventsource-b.yaml
)。apiVersion: events.openfunction.io/v1alpha1 kind: EventSource metadata: name: eventsource-b spec: logLevel: "2" eventBus: "default" cron: sample-five: schedule: "@every 5s"
运行以下命令应用这两个配置文件。
kubectl apply -f eventsource-a.yaml kubectl apply -f eventsource-b.yaml
创建带有条件的触发器
使用以下内容创建一个带有
condition
的触发器配置文件(例如,condition-trigger.yaml
)。apiVersion: events.openfunction.io/v1alpha1 kind: Trigger metadata: name: condition-trigger spec: logLevel: "2" eventBus: "default" inputs: eventA: eventSource: "eventsource-a" event: "sample-five" eventB: eventSource: "eventsource-b" event: "sample-five" subscribers: - condition: eventB sink: uri: "http://openfunction.io.svc.cluster.local/default/sink" - condition: eventA && eventB topic: "metrics"
注意
在这个例子中,定义了两个输入源和两个订阅者,它们的触发关系描述如下:
- 当接收到输入
eventB
时,将输入事件发送到 Knative 服务。 - 当接收到输入
eventB
和输入eventA
时,将输入事件同时发送到事件总线的metrics
主题和 Knative 服务。
- 当接收到输入
运行以下命令应用配置文件。
kubectl apply -f condition-trigger.yaml
运行以下命令检查结果。
$ kubectl get eventsources.events.openfunction.io NAME EVENTBUS SINK STATUS eventsource-a default Ready eventsource-b default Ready $ kubectl get triggers.events.openfunction.io NAME EVENTBUS STATUS condition-trigger default Ready $ kubectl get eventbus.events.openfunction.io NAME AGE default 12s
运行以下命令,您可以从输出中看到,由于事件源
eventsource-b
是一个 cron 任务,所以触发器中的eventB
条件匹配,触发了 Knative 服务。$ kubectl get functions.core.openfunction.io NAME BUILDSTATE SERVINGSTATE BUILDER SERVING URL AGE sink Skipped Running serving-4x5wh https://openfunction.io/default/sink 3h25m $ kubectl get po NAME READY STATUS RESTARTS AGE serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-k2jdg 2/2 Running 0 46s
参考 创建事件生产者 创建一个事件生产者。
运行以下命令,您可以从输出中看到,触发器中的
eventA && eventB
条件匹配,事件同时发送到事件总线的metrics
主题。触发了 OpenFuncAsync 函数。$ kubectl get functions.core.openfunction.io NAME BUILDSTATE SERVINGSTATE BUILDER SERVING URL AGE trigger-target Skipped Running serving-7hghp 103s $ kubectl get po NAME READY STATUS RESTARTS AGE serving-7hghp-deployment-v100-z8wrf-946b4854d-svf55 2/2 Running 0 18s