要获取更多使用 OpenFunction 的示例,请参考 示例仓库,例如 基于队列深度的服务自动扩展。
最佳实践
1 - 创建一个基于 Knative 的函数以通过 Dapr 组件与中间件交互
本文档描述了如何创建一个基于 Knative 的函数,通过 Dapr 组件与中间件交互。
概述
与异步函数类似,基于 Knative 运行时的函数可以通过 Dapr 组件与中间件交互。本文档使用两个函数,function-front
和 kafka-input
,进行演示。
以下图表说明了这些函数之间的关系。
先决条件
- 您已经安装了 OpenFunction。
- 您已经创建了一个 secret。
创建 Kafka 服务器和主题
运行以下命令,在默认命名空间中安装 strimzi-kafka-operator。
helm repo add strimzi https://strimzi.io/charts/ helm install kafka-operator -n default strimzi/strimzi-kafka-operator
使用以下内容创建一个文件
kafka.yaml
。apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: kafka-server namespace: default spec: kafka: version: 3.3.1 replicas: 1 listeners: - name: plain port: 9092 type: internal tls: false - name: tls port: 9093 type: internal tls: true config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 default.replication.factor: 1 min.insync.replicas: 1 inter.broker.protocol.version: "3.1" storage: type: ephemeral zookeeper: replicas: 1 storage: type: ephemeral entityOperator: topicOperator: {} userOperator: {} --- apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: sample-topic namespace: default labels: strimzi.io/cluster: kafka-server spec: partitions: 10 replicas: 1 config: retention.ms: 7200000 segment.bytes: 1073741824
运行以下命令,在默认命名空间中部署一个名为
kafka-server
的 1 副本 Kafka 服务器和一个名为sample-topic
的 1 副本 Kafka 主题。kubectl apply -f kafka.yaml
运行以下命令,检查 pod 状态,等待 Kafka 和 Zookeeper 启动并运行。
$ kubectl get po NAME READY STATUS RESTARTS AGE kafka-server-entity-operator-568957ff84-nmtlw 3/3 Running 0 8m42s kafka-server-kafka-0 1/1 Running 0 9m13s kafka-server-zookeeper-0 1/1 Running 0 9m46s strimzi-cluster-operator-687fdd6f77-cwmgm 1/1 Running 0 11m
运行以下命令,查看 Kafka 集群的元数据。
# 启动一个实用工具 pod。 $ kubectl run utils --image=arunvelsriram/utils -i --tty --rm # 检查 Kafka 集群的元数据。 $ kafkacat -L -b kafka-server-kafka-brokers:9092
创建函数
使用以下示例 YAML 文件创建一个清单
kafka-input.yaml
,并修改spec.image
的值以设置您自己的镜像仓库地址。字段spec.serving.inputs
定义了一个指向 Kafka 服务器的 Dapr 组件的输入源。这意味着kafka-input
函数将由 Kafka 服务器的主题sample-topic
中的事件驱动。apiVersion: core.openfunction.io/v1beta2 kind: Function metadata: name: kafka-input spec: version: "v1.0.0" image: <your registry name>/kafka-input:latest imageCredentials: name: push-secret build: builder: openfunction/builder-go:latest env: FUNC_NAME: "HandleKafkaInput" FUNC_CLEAR_SOURCE: "true" srcRepo: url: "https://github.com/OpenFunction/samples.git" sourceSubPath: "functions/async/bindings/kafka-input" revision: "main" serving: scaleOptions: minReplicas: 0 maxReplicas: 10 keda: triggers: - type: kafka metadata: topic: sample-topic bootstrapServers: kafka-server-kafka-brokers.default.svc:9092 consumerGroup: kafka-input lagThreshold: "20" scaledObject: pollingInterval: 15 cooldownPeriod: 60 advanced: horizontalPodAutoscalerConfig: behavior: scaleDown: stabilizationWindowSeconds: 45 policies: - type: Percent value: 50 periodSeconds: 15 scaleUp: stabilizationWindowSeconds: 0 triggers: dapr: - name: target-topic type: bindings.kafka bindings: target-topic: type: bindings.kafka version: v1 metadata: - name: brokers value: "kafka-server-kafka-brokers:9092" - name: topics value: "sample-topic" - name: consumerGroup value: "kafka-input" - name: publishTopic value: "sample-topic" - name: authRequired value: "false" template: containers: - name: function imagePullPolicy: Always
运行以下命令创建函数
kafka-input
。kubectl apply -f kafka-input.yaml
使用以下示例 YAML 文件创建一个清单
function-front.yaml
,并修改spec.image
的值以设置您自己的镜像仓库地址。apiVersion: core.openfunction.io/v1beta2 kind: Function metadata: name: function-front spec: version: "v1.0.0" image: "<your registry name>/sample-knative-dapr:latest" imageCredentials: name: push-secret build: builder: openfunction/builder-go:latest env: FUNC_NAME: "ForwardToKafka" FUNC_CLEAR_SOURCE: "true" srcRepo: url: "https://github.com/OpenFunction/samples.git" sourceSubPath: "functions/knative/with-output-binding" revision: "main" serving: hooks: pre: - plugin-custom - plugin-example post: - plugin-example - plugin-custom scaleOptions: minReplicas: 0 maxReplicas: 5 outputs: - dapr: name: kafka-server operation: "create" bindings: kafka-server: type: bindings.kafka version: v1 metadata: - name: brokers value: "kafka-server-kafka-brokers:9092" - name: authRequired value: "false" - name: publishTopic value: "sample-topic" - name: topics value: "sample-topic" - name: consumerGroup value: "function-front" template: containers: - name: function imagePullPolicy: Always
注意
metadata.plugins.pre
定义了在执行用户函数之前需要调用的插件的顺序。metadata.plugins.post
定义了在执行用户函数之后需要调用的插件的顺序。有关这两个插件的逻辑以及执行插件后的效果的更多信息,请参阅 插件机制。在清单中,
spec.serving.outputs
定义了一个指向 Kafka 服务器的 Dapr 组件的输出。这使您可以在function-front
函数中向输出target
发送自定义内容。func Sender(ctx ofctx.Context, in []byte) (ofctx.Out, error) { ... _, err := ctx.Send("target", greeting) ... }
运行以下命令创建函数
function-front
。kubectl apply -f function-front.yaml
检查结果
运行以下命令查看函数的状态。
$ kubectl get functions.core.openfunction.io NAME BUILDSTATE SERVINGSTATE BUILDER SERVING URL AGE function-front Succeeded Running builder-bhbtk serving-vc6jw https://openfunction.io/default/function-front 2m41s kafka-input Succeeded Running builder-dprfd serving-75vrt 2m21s
注意
URL
,由 OpenFunction Domain 提供,是可以访问的地址。要通过此 URL 地址访问函数,您需要确保 DNS 可以解析此地址。运行以下命令在集群中创建一个用于访问函数的 pod。
kubectl run curl --image=radial/busyboxplus:curl -i --tty --rm
运行以下命令通过
URL
访问函数。[ root@curl:/ ]$ curl -d '{"message":"Awesome OpenFunction!"}' -H "Content-Type: application/json" -X POST http://openfunction.io.svc.cluster.local/default/function-front
运行以下命令查看
function-front
的日志。kubectl logs -f \ $(kubectl get po -l \ openfunction.io/serving=$(kubectl get functions function-front -o jsonpath='{.status.serving.resourceRef}') \ -o jsonpath='{.items[0].metadata.name}') \ function
输出如下所示。
dapr client initializing for: 127.0.0.1:50001 I0125 06:51:55.584973 1 framework.go:107] Plugins for pre-hook stage: I0125 06:51:55.585044 1 framework.go:110] - plugin-custom I0125 06:51:55.585052 1 framework.go:110] - plugin-example I0125 06:51:55.585057 1 framework.go:115] Plugins for post-hook stage: I0125 06:51:55.585062 1 framework.go:118] - plugin-custom I0125 06:51:55.585067 1 framework.go:118] - plugin-example I0125 06:51:55.585179 1 knative.go:46] Knative Function serving http: listening on port 8080 2022/01/25 06:52:02 http - Data: {"message":"Awesome OpenFunction!"} I0125 06:52:02.246450 1 plugin-example.go:83] the sum is: 2
运行以下命令查看
kafka-input
的日志。kubectl logs -f \ $(kubectl get po -l \ openfunction.io/serving=$(kubectl get functions kafka-input -o jsonpath='{.status.serving.resourceRef}') \ -o jsonpath='{.items[0].metadata.name}') \ function
输出如下所示。
dapr client initializing for: 127.0.0.1:50001 I0125 06:35:28.332381 1 framework.go:107] Plugins for pre-hook stage: I0125 06:35:28.332863 1 framework.go:115] Plugins for post-hook stage: I0125 06:35:28.333749 1 async.go:39] Async Function serving grpc: listening on port 8080 message from Kafka '{Awesome OpenFunction!}'
2 - 使用 SkyWalking 为 OpenFunction 提供可观测能力
功能概览
尽管 FaaS 允许开发者专注于他们的业务代码而不用担心底层的实现,但对函数服务进行功能调试和故障排除是很困难的。因此,OpenFunction 设法引入了可观测性能力来提高其可用性和稳定性。
SkyWalking 提供了在许多不同场景下观测和监控分布式系统的解决方案。OpenFunction 将 go2sky(SkyWalking 的 Go 语言代理)捆绑在 OpenFunction 追踪器选项中,以提供分布式追踪、函数性能统计和函数依赖关系图。
先决条件
- 您已安装了 OpenFunction。
- 您已创建了一个 Kubernetes Secret。
- 您已安装了 SkyWalking v9。
- 您已创建了一个 Kafka 服务和主题。
追踪功能的可配置参数
下表描述了 OpenFunction 中目前可用的追踪参数。
名称 | 描述 | 示例 |
---|---|---|
enabled | 使能追踪功能,默认为 false | true , false |
provider.name | 可以设置为 skywalking 、opentelemetry (待定) | skywalking |
provider.oapServer | SkyWalking OAP 服务器地址 | skywalking-opa:11800 |
tags | 一组键值对,用于追踪中的追踪所使用的 Span 自定义标签 | |
tags.func | 函数的名称,该值将被自动填充 | function-a |
tags.layer | 表示被追踪的服务类型,当你使用该函数时,它应该被设置为 faas | faas |
baggage | 一个键值对的集合,存在于追踪中,也需要跨进程边界传输 |
下面是一个 JSON 格式的配置参考,您可以基于此了解追踪配置的大致数据格式。
{
"enabled": true,
"provider": {
"name": "skywalking",
"oapServer": "skywalking-oap:11800"
},
"tags": {
"func": "function-a",
"layer": "faas",
"tag1": "value1",
"tag2": "value2"
},
"baggage": {
"key": "key1",
"value": "value1"
}
}
启用 OpenFunction 的追踪功能
选项一:全局配置
下文使用 skywalking-oap.default:11800
作为集群中 skywalking-oap
服务的样例地址。
运行下面的命令,修改
openfunction
命名空间中的 ConfigMapopenfunction-config
。kubectl edit configmap openfunction-config -n openfunction
参照下面的例子修改
data.plugins.tracing
中的内容,并保存这个修改。data: plugins.tracing: | enabled: true provider: name: "skywalking" oapServer: "skywalking-oap:11800" tags: func: tracing-function layer: faas tag1: value1 tag2: value2 baggage: key: "key1" value: "value1"
选项二:函数级别配置
要在函数级别启用跟踪配置,请在函数清单的 metadata.annotations
下添加 plugins.tracing
字段,如下所示。
metadata:
name: tracing-function
annotations:
plugins.tracing: |
enabled: true
provider:
name: "skywalking"
oapServer: "skywalking-oap:11800"
tags:
func: tracing-function
layer: faas
tag1: value1
tag2: value2
baggage:
key: "key1"
value: "value1"
建议您使用全局跟踪配置,否则您必须为您创建的每个函数逐一添加函数级跟踪配置。
使用 SkyWalking 作为分布式追踪解决方案
通过参考 本文档 创建函数。
然后,您可以在 SkyWalking UI 界面上观察整个函数调用的链路。
您还可以比较 Knative 运行时函数(
function-front
)在运行状态和冷启动时的响应时间。在冷启动时:
在运行状态下:
3 - Elastic 日志告警
本文档描述了如何创建一个异步函数来查找错误日志。
概述
本文档使用一个异步函数来分析 Kafka 中的日志流,以找出错误日志。异步函数将然后向 Slack 发送告警。以下图表说明了整个工作流程。
先决条件
- 您已经安装了 OpenFunction。
- 您已经创建了一个 secret。
创建 Kafka 服务器和主题
运行以下命令在默认命名空间中安装 strimzi-kafka-operator。
helm repo add strimzi https://strimzi.io/charts/ helm install kafka-operator -n default strimzi/strimzi-kafka-operator
使用以下内容创建一个文件
kafka.yaml
。apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: kafka-logs-receiver namespace: default spec: kafka: version: 3.3.1 replicas: 1 listeners: - name: plain port: 9092 type: internal tls: false - name: tls port: 9093 type: internal tls: true config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 default.replication.factor: 1 min.insync.replicas: 1 inter.broker.protocol.version: "3.1" storage: type: ephemeral zookeeper: replicas: 1 storage: type: ephemeral entityOperator: topicOperator: {} userOperator: {} --- apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: logs namespace: default labels: strimzi.io/cluster: kafka-logs-receiver spec: partitions: 10 replicas: 1 config: retention.ms: 7200000 segment.bytes: 1073741824
运行以下命令在默认命名空间中部署一个名为
kafka-logs-receiver
的 1 副本 Kafka 服务器和一个名为logs
的 1 副本 Kafka 主题。kubectl apply -f kafka.yaml
运行以下命令检查 pod 状态,等待 Kafka 和 Zookeeper 启动并运行。
$ kubectl get po NAME READY STATUS RESTARTS AGE kafka-logs-receiver-entity-operator-57dc457ccc-tlqqs 3/3 Running 0 8m42s kafka-logs-receiver-kafka-0 1/1 Running 0 9m13s kafka-logs-receiver-zookeeper-0 1/1 Running 0 9m46s strimzi-cluster-operator-687fdd6f77-cwmgm 1/1 Running 0 11m
运行以下命令查看 Kafka 集群的元数据。
# 启动一个实用工具 pod。 $ kubectl run utils --image=arunvelsriram/utils -i --tty --rm # 检查 Kafka 集群的元数据。 $ kafkacat -L -b kafka-logs-receiver-kafka-brokers:9092
创建日志处理函数
- 使用以下示例 YAML 文件创建一个清单
logs-handler-function.yaml
,并修改spec.image
的值以设置您自己的镜像仓库地址。
apiVersion: core.openfunction.io/v1beta2
kind: Function
metadata:
name: logs-async-handler
namespace: default
spec:
build:
builder: openfunction/builder-go:latest
env:
FUNC_CLEAR_SOURCE: "true"
FUNC_NAME: LogsHandler
srcRepo:
revision: main
sourceSubPath: functions/async/logs-handler-function/
url: https://github.com/OpenFunction/samples.git
image: openfunctiondev/logs-async-handler:v1
imageCredentials:
name: push-secret
serving:
bindings:
kafka-receiver:
metadata:
- name: brokers
value: kafka-server-kafka-brokers:9092
- name: authRequired
value: "false"
- name: publishTopic
value: logs
- name: topics
value: logs
- name: consumerGroup
value: logs-handler
type: bindings.kafka
version: v1
notification-manager:
metadata:
- name: url
value: http://notification-manager-svc.kubesphere-monitoring-system.svc.cluster.local:19093/api/v2/alerts
type: bindings.http
version: v1
outputs:
- dapr:
name: notification-manager
operation: post
type: bindings.http
scaleOptions:
keda:
scaledObject:
advanced:
horizontalPodAutoscalerConfig:
behavior:
scaleDown:
policies:
- periodSeconds: 15
type: Percent
value: 50
stabilizationWindowSeconds: 45
scaleUp:
stabilizationWindowSeconds: 0
cooldownPeriod: 60
pollingInterval: 15
triggers:
- metadata:
bootstrapServers: kafka-server-kafka-brokers.default.svc.cluster.local:9092
consumerGroup: logs-handler
lagThreshold: "20"
topic: logs
type: kafka
maxReplicas: 10
minReplicas: 0
template:
containers:
- imagePullPolicy: IfNotPresent
name: function
triggers:
dapr:
- name: kafka-receiver
type: bindings.kafka
workloadType: Deployment
version: v2.0.0
workloadRuntime: OCIContainer
运行以下命令创建函数
logs-async-handler
。kubectl apply -f logs-handler-function.yaml
日志处理函数将由 Kafka 中 logs 主题的消息触发。