1 - Function Definition

Function

Function is the control plane of Build and Serving and it’s also the interface for users to use OpenFunction. Users needn’t to create the Build or Serving separately because Function is the only place to define a function’s Build and Serving.

Once a function is created, it will controll the lifecycle of Build and Serving without user intervention:

  • If Build is defined in a function, a builder custom resource will be created to build function’s container image once a function is deployed.

  • If Serving is defined in a function, a serving custom resource will be created to control a function’s serving and autoscalling.

  • Build and Serving can be defined together which means the function image will be built first and then it will be used in serving.

  • Build can be defined without Serving, the function is used to build image only in this case.

  • Serving can be defined without Build, the function will use a previously built function image for serving.

Build

OpenFunction uses Shipwright and Cloud Native Buildpacks to build the function source code into container images.

Once a function is created with Build spec in it, a builder custom resource will be created which will use Shipwright to manage the build tools and strategy. The Shipwright will then use Tekton to control the process of building container images including fetching source code, generating image artifacts, and publishing images.

Serving

Once a function is created with Serving spec, a Serving custom resource will be created to control a function’s serving phase. Currently OpenFunction Serving supports two runtimes: the Knative sync runtime and the OpenFunction async runtime.

The sync runtime

For sync functions, OpenFunction currently supports using Knative Serving as runtime. And we’re planning to add another sync function runtime powered by the KEDA http-addon.

The async runtime

OpenFunction’s asyn runtime is an event-driven runtime which is implemented based on KEDA and Dapr. Async functions can be triggered by various event types like message queue, cronjob, and MQTT etc.

Reference

For more information, see Function Specifications.

2 - Function Build

Currently, OpenFunction builds function images with Cloud Native Buildpacks. The traditional Dockerfile build method will be supported in the future.

Build functions by adding a build section in the function definition

You can build a function image by simply adding a build section in the Function definition like below. If there is a serving section defined as well, the function will be launched as soon as the build completes.

apiVersion: core.openfunction.io/v1beta1
kind: Function
metadata:
  name: logs-async-handler
spec:
  version: "v2.0.0"
  image: openfunctiondev/logs-async-handler:v1
  imageCredentials:
    name: push-secret
  build:
    builder: openfunction/builder-go:latest
    env:
      FUNC_NAME: "LogsHandler"
      FUNC_CLEAR_SOURCE: "true"
      ## Customize functions framework version, valid for functions-framework-go for now
      ## Usually you needn't to do so because the builder will ship with the latest functions-framework
      # FUNC_FRAMEWORK_VERSION: "v0.4.0"
      ## Use FUNC_GOPROXY to set the goproxy
      # FUNC_GOPROXY: "https://goproxy.cn"
    srcRepo:
      url: "https://github.com/OpenFunction/samples.git"
      sourceSubPath: "functions/async/logs-handler-function/"
      revision: "main"

To push the function image to a container registry, you have to create a secret containing the registry’s credential and add the secret to imageCredentials. You can refer to the prerequisites for more info.

Build functions with the pack CLI

Usually it’s necessary to build function images directly from local source code especially for debug purpose or for offline environment. You can use the pack CLI for this.

Pack is a tool maintained by the Cloud Native Buildpacks project to support the use of buildpacks. It enables the following functionality:

  • Build an application using buildpacks.
  • Rebase application images created using buildpacks.
  • Creation of various components used within the ecosystem.

Follow the instructions here to install the pack CLI tool. You can find more details on how to use the pack CLI here.

To build OpenFunction function images from source code locally, you can follow the steps below:

Download function samples

git clone https://github.com/OpenFunction/samples.git
cd samples/functions/Knative/hello-world-go

Build the function image with the pack CLI

pack build func-helloworld-go --builder openfunction/builder-go:v2.4.0-1.17 --env FUNC_NAME="HelloWorld"  --env FUNC_CLEAR_SOURCE=true

Launch the function image locally

docker run --rm --env="FUNC_CONTEXT={\"name\":\"HelloWorld\",\"version\":\"v1.0.0\",\"port\":\"8080\",\"runtime\":\"Knative\"}" --env="CONTEXT_MODE=self-host" --name func-helloworld-go -p 8080:8080 func-helloworld-go

Visit the function

curl http://localhost:8080

Output example:

hello, world!

OpenFunction Builders

To build a function image with Cloud Native Buildpacks, a builder image is needed.

Here you can find builders for popular languages maintained by the OpenFunction community:

Builders
Goopenfunction/builder-go:v2.4.0 (openfunction/builder-go:latest)
Nodejsopenfunction/builder-node:v2-16.15 (openfunction/builder-node:latest)
Javaopenfunction/builder-java:v2-11, openfunction/builder-java:v2-16, openfunction/builder-java:v2-17, openfunction/builder-java:v2-18
Pythonopenfunction/gcp-builder:v1
DotNetopenfunction/gcp-builder:v1

3 - Function Inputs and Outputs

Functions usually have inputs and outputs.

Function Inputs

For a sync function, the input is always the payload of the HTTP request.

For an async function, the input data comes from:

Function Outputs

For a sync function, the output can be sent through the HTTP response.

Both sync functions and async functions can send outputs to Dapr components including:

For example, here you can find an async function with a cron input binding and a Kafka output binding:

apiVersion: core.openfunction.io/v1beta1
kind: Function
metadata:
  name: cron-input-kafka-output
spec:
  ...
  serving:
    ...
    runtime: "async"
    inputs:
      - name: cron
        component: cron
    outputs:
      - name: sample
        component: kafka-server
        operation: "create"
    bindings:
      cron:
        type: bindings.cron
        version: v1
        metadata:
        - name: schedule
          value: "@every 2s"
      kafka-server:
        type: bindings.kafka
        version: v1
        metadata:
        - name: brokers
          value: "kafka-server-kafka-brokers:9092"
        - name: topics
          value: "sample-topic"
        - name: consumerGroup
          value: "bindings-with-output"
        - name: publishTopic
          value: "sample-topic"
        - name: authRequired
          value: "false"

Here is another async function example that use a Kafka Pub/sub component as input.

apiVersion: core.openfunction.io/v1beta1
kind: Function
metadata:
  name: autoscaling-subscriber
spec:
  ...
  serving:
    ...
    runtime: "async"
    inputs:
      - name: producer
        component: kafka-server
        topic: "sample-topic"
    pubsub:
      kafka-server:
        type: pubsub.kafka
        version: v1
        metadata:
          - name: brokers
            value: "kafka-server-kafka-brokers:9092"
          - name: authRequired
            value: "false"
          - name: allowedTopics
            value: "sample-topic"
          - name: consumerID
            value: "autoscaling-subscriber"

Sync functions can also send output to Dapr output binding components or Pub/sub components, here is an example:

apiVersion: core.openfunction.io/v1beta1
kind: Function
metadata:
  name: function-front
spec:
  serving:
    ...
    runtime: knative
    outputs:
      - name: target
        component: kafka-server
        operation: "create"
    bindings:
      kafka-server:
        type: bindings.kafka
        version: v1
        metadata:
          - name: brokers
            value: "kafka-server-kafka-brokers:9092"
          - name: authRequired
            value: "false"
          - name: publishTopic
            value: "sample-topic"
          - name: topics
            value: "sample-topic"
          - name: consumerGroup
            value: "function-front"

4 - Function Scaling and Triggers

Scaling is one of the core features of a FaaS or Serverless platform.

OpenFunction defines function scaling in ScaleOptions and defines triggers to activate function scaling in Triggers

ScaleOptions

You can define unified function scale options for sync and async functions like below which will be valid for both sync and async functions:

apiVersion: core.openfunction.io/v1beta1
kind: Function
metadata:
  name: function-sample
spec:
  serving:
    scaleOptions:
      minReplicas: 0
      maxReplicas: 10

Usually simply defining minReplicas and maxReplicas is not enough for async functions. You can define seperate scale options for async functions like below which will override the minReplicas and maxReplicas.

You can find more details of async function scale options in KEDA ScaleObject Spec and KEDA ScaledJob Spec.

apiVersion: core.openfunction.io/v1beta1
kind: Function
metadata:
  name: function-sample
spec:
  serving:
    scaleOptions:
      keda:
        scaledObject:
          pollingInterval: 15
          minReplicaCount: 0
          maxReplicaCount: 10
          cooldownPeriod: 60
          advanced:
            horizontalPodAutoscalerConfig:
              behavior:
                scaleDown:
                  stabilizationWindowSeconds: 45
                  policies:
                  - type: Percent
                    value: 50
                    periodSeconds: 15
                scaleUp:
                  stabilizationWindowSeconds: 0

You can also set advanced scale options for Knative sync functions too which will override the minReplicas and maxReplicas.

You can find more details of the Knative sync function scale options here

apiVersion: core.openfunction.io/v1beta1
kind: Function
metadata:
  name: function-sample
spec:
  serving:
    scaleOptions:
      knative:
        autoscaling:
          autoscaling.knative.dev/min-scale: 0
          autoscaling.knative.dev/max-scale: 10
          autoscaling.knative.dev/initial-scale: "1"
          autoscaling.knative.dev/scale-down-delay: "0"
          autoscaling.knative.dev/window: "60s"
          autoscaling.knative.dev/panic-window-percentage: "10.0"
          autoscaling.knative.dev/metric: "concurrency"
          autoscaling.knative.dev/target: 100

Triggers

Triggers define how to activate function scaling for async functions. You can use triggers defined in all KEDA scalers as OpenFunction’s trigger spec.

Sync functions’ scaling is activated by various options of HTTP requests which are already defined in the previous ScaleOption section.

apiVersion: core.openfunction.io/v1beta1
kind: Function
metadata:
  name: function-sample
spec:
  serving:
    runtime: "async"
    triggers:
      - type: kafka
        metadata:
          topic: logs
          bootstrapServers: kafka-server-kafka-brokers.default.svc.cluster.local:9092
          consumerGroup: logs-handler
          lagThreshold: "20"

5 - Function Signatures

Comparison of different function signatures

There’re three function signatures in OpenFunction: HTTP, CloudEvent, and OpenFunction. Let’s explain this in more detail using Go function as an example.

HTTP and CloudEvent signatures can be used to create sync functions while OpenFunction signature can be used to create both sync and async functions.

Further more OpenFunction signature can utilize various Dapr building blocks including Bindings, Pub/sub etc to access various BaaS services that helps to create more powerful functions. (Dapr State management, Configuration will be supported soon)

HTTPCloudEventOpenFunction
Signaturefunc(http.ResponseWriter, *http.Request) errorfunc(context.Context, cloudevents.Event) errorfunc(ofctx.Context, []byte) (ofctx.Out, error)
Sync FunctionsSupportedSupportedSupported
Async FunctionsNot supportedNot supportedSupported
Dapr BindingNot supportedNot supportedSupported
Dapr Pub/subNot supportedNot supportedSupported

Samples

As you can see, OpenFunction signature is the recommended function signature, and we’re working on supporting this function signature in more language runtimes.

HTTPCloudEventOpenFunction
GoHello World, Multi-functions, Sync function with path parameters, log processingSync function with path parametersSync function with path parameters, Sync function with output binding, Kafka input & HTTP output binding, Cron input & Kafka output binding, Cron input binding, Kafka input binding, Kafka pubsub
NodejsHello WorldSync function with output binding, MQTT binding & pubsub
PythonHello World
JavaHello World
DotNetHello World

6 - Networking

6.1 - Introduction

Overview

Previously starting from v0.5.0, OpenFunction uses Kubernetes Ingress to provide unified entrypoints for sync functions, and a nginx ingress controller has to be installed.

With the maturity of Kubernetes Gateway API, we decided to implement OpenFunction Gateway based on the Kubernetes Gateway API to replace the previous ingress based domain method in OpenFunction v0.7.0.

You can find the OpenFunction Gateway proposal here

OpenFunction Gateway provides a more powerful and more flexible function gateway including features like:

  • Enable users to switch to any gateway implementations that support Kubernetes Gateway API such as Contour, Istio, Apache APISIX, Envoy Gateway (in the future) and more in an easier and vendor-neutral way.

  • Users can choose to install a default gateway implementation (Contour) and then define a new gateway.networking.k8s.io or use any existing gateway implementations in their environment and then reference an existing gateway.networking.k8s.io.

  • Allow users to customize their own function access pattern like hostTemplate: "{{.Name}}.{{.Namespace}}.{{.Domain}}" for host-based access.

  • Allow users to customize their own function access pattern like pathTemplate: "{{.Namespace}}/{{.Name}}" for path-based access.

  • Allow users to customize each function’s route rules (host-based, path-based or both) in function definition and default route rules are provided for each function if there’re no customized route rules defined.

  • Send traffic to Knative service revisions directly without going through Knative’s own gateway anymore. You’ll need only one gateway that is OpenFunction Gateway starting from OpenFunction v0.7.0 to access OpenFunction sync functions, and you can ignore Knative’s domain config errors if you needn’t acess Knative service directly.

  • Traffic splitting between function revisions (in the future)

The following diagram illustrates how client traffics go through OpenFunction Gateway and then reach a function directly:

6.2 - OpenFunction Gateway

Inside OpenFunction Gateway

Backed by the Kubernetes Gateway, an OpenFunction Gateway defines how users can access sync functions.

Whenever an OpenFunction Gateway is created, the gateway controller will:

  • Add a default listener named ofn-http-internal to gatewaySpec.listeners if there isn’t one there.

  • Generate gatewaySpec.listeners.[*].hostname based on domain or clusterDomain.

  • Inject gatewaySpec.listenters to the existing Kubernetes Gateway defined by the gatewayRef of the OpenFunction Gateway.

  • Create an new Kubernetes Gateway based on the gatewaySpec.listenters field in gatewayDef of the OpenFunction Gateway.

  • Create a service named gateway.openfunction.svc.cluster.local that defines a unified entry for sync functions.

After an OpenFunction Gateway is deployed, you’ll be able to find the status of Kubernetes Gateway and its listeners in OpenFunction Gateway status:

status:
  conditions:
  - lastTransitionTime: "2022-08-04T10:20:57Z"
    message: Gateway is scheduled
    observedGeneration: 2
    reason: Scheduled
    status: "True"
    type: Scheduled
  - lastTransitionTime: "2022-08-04T10:20:57Z"
    message: Valid Gateway
    observedGeneration: 2
    reason: Valid
    status: "True"
    type: Ready
  listeners:
  - attachedRoutes: 0
    conditions:
    - lastTransitionTime: "2022-08-04T10:20:57Z"
      message: Valid listener
      observedGeneration: 2
      reason: Ready
      status: "True"
      type: Ready
    name: ofn-http-internal
    supportedKinds:
    - group: gateway.networking.k8s.io
      kind: HTTPRoute
  - attachedRoutes: 0
    conditions:
    - lastTransitionTime: "2022-08-04T10:20:57Z"
      message: Valid listener
      observedGeneration: 2
      reason: Ready
      status: "True"
      type: Ready
    name: ofn-http-external
    supportedKinds:
    - group: gateway.networking.k8s.io
      kind: HTTPRoute

The Default OpenFunction Gateway

OpenFunction Gateway uses Contour as the default Kubernetes Gateway implementation. The following OpenFunction Gateway will be created automatically once you install OpenFunction:

apiVersion: networking.openfunction.io/v1alpha1
kind: Gateway
metadata:
  name: openfunction
  namespace: openfunction
spec:
  domain: ofn.io
  clusterDomain: cluster.local
  hostTemplate: "{{.Name}}.{{.Namespace}}.{{.Domain}}"
  pathTemplate: "{{.Namespace}}/{{.Name}}"
  httpRouteLabelKey: "app.kubernetes.io/managed-by"
  gatewayRef:
    name: contour
    namespace: projectcontour
  gatewaySpec:
    listeners:
      - name: ofn-http-internal
        hostname: "*.cluster.local"
        protocol: HTTP
        port: 80
        allowedRoutes:
          namespaces:
            from: All
      - name: ofn-http-external
        hostname: "*.ofn.io"
        protocol: HTTP
        port: 80
        allowedRoutes:
          namespaces:
            from: All

You can customize the default OpenFunction Gateway like below:

kubectl edit gateway openfunction -n openfunction

Switch to a different Kubernetes Gateway

You can switch to any gateway implementations that support Kubernetes Gateway API such as Contour, Istio, Apache APISIX, Envoy Gateway (in the future) and more in an easier and vendor-neutral way.

Here you can find more details.

Multiple OpenFunction Gateway

Multiple Gateway are meaningless for OpenFunction, we currently only support one OpenFunction Gateway.

6.3 - Route

What is Route?

Route is part of the Function definition. Route defines how traffic from the Gateway listener is routed to a function.

A Route specifies the Gateway to which it will attach in GatewayRef that allows it to receive traffic from the Gateway.

Once a sync Function is created, the function controller will:

  • Look for theGateway called openfunction in openfunction namespace, then attach to this Gateway if route.gatewayRef is not defined in the function.
  • Automatically generate route.hostnames based on Gateway.spec.hostTemplate, if route.hostnames is not defined in function.
  • Automatically generate route.rules based on Gateway.spec.pathTemplate or path of /, if route.rules is not defined in function.
  • a HTTPRoute custom resource will be created based on Route. BackendRefs will be automatically link to the corresponding Knative service revision and label HTTPRouteLabelKey will be added to this HTTPRoute.
  • Create service {{.Name}}.{{.Namespace}}.svc.cluster.local, this service defines an entry for the function to access within the cluster.
  • If the Gateway referenced by route.gatewayRef changed, will update the HTTPRoute.

After a sync Function is deployed, you’ll be able to find Function addresses and Route status in Function’s status field, e.g:

status:
  addresses:
    - type: External
      value: http://function-sample-serving-only.default.ofn.io/
    - type: Internal
      value: http://function-sample-serving-only.default.svc.cluster.local/
  build:
    resourceHash: "14903236521345556383"
    state: Skipped
  route:
    conditions:
      - lastTransitionTime: "2022-08-04T10:43:29Z"
        message: Valid HTTPRoute
        observedGeneration: 1
        reason: Valid
        status: "True"
        type: Accepted
    hosts:
      - function-sample-serving-only.default.ofn.io
      - function-sample-serving-only.default.svc.cluster.local
    paths:
      - type: PathPrefix
        value: /
  serving:
    lastSuccessfulResourceRef: serving-znk54
    resourceHash: "10715302888241374768"
    resourceRef: serving-znk54
    service: serving-znk54-ksvc-nbg6f
    state: Running

Host Based Routing

Host-based is the default routing mode. When route.hostnames is not defined, route.hostnames will be generated based on gateway.spec.hostTemplate. If route.rules is not defined, route.rules will be generated based on path of /.

kubectl apply -f - <<EOF
apiVersion: core.openfunction.io/v1beta1
kind: Function
metadata:
  name: function-sample
spec:
  version: "v1.0.0"
  image: "openfunctiondev/v1beta1-http:latest"
  port: 8080
  serving:
    runtime: knative
    template:
      containers:
        - name: function
          imagePullPolicy: Always
  route:
    gatewayRef:
      name: openfunction
      namespace: openfunction
EOF

If you are using the default OpenFunction Gateway, the function external address will be as below:

http://function-sample.default.ofn.io/

Path Based Routing

If you define route.hostnames in a function, route.rules will be generated based on gateway.spec.pathTemplate.

kubectl apply -f - <<EOF
apiVersion: core.openfunction.io/v1beta1
kind: Function
metadata:
  name: function-sample
spec:
  version: "v1.0.0"
  image: "openfunctiondev/v1beta1-http:latest"
  port: 8080
  serving:
    runtime: knative
    template:
      containers:
        - name: function
          imagePullPolicy: Always
  route:
    gatewayRef:
      name: openfunction
      namespace: openfunction
    hostnames:
    - "sample.ofn.io"
EOF

If you are using the default OpenFunction Gateway, the function external address will be as below:

http://sample.default.ofn.io/default/function-sample/

Host and Path based routing

You can define hostname and path at the same time to customize how traffic should be routed to your function.

kubectl apply -f - <<EOF
apiVersion: core.openfunction.io/v1beta1
kind: Function
metadata:
  name: function-sample
spec:
  version: "v1.0.0"
  image: "openfunctiondev/v1beta1-http:latest"
  port: 8080
  serving:
    runtime: knative
    template:
      containers:
        - name: function
          imagePullPolicy: Always
  route:
    gatewayRef:
      name: openfunction
      namespace: openfunction
    rules:
      - matches:
          - path:
              type: PathPrefix
              value: /v2/foo
    hostnames:
    - "sample.ofn.io"
EOF

If you are using the default OpenFunction Gateway, the function external address will be as below:

http://sample.default.ofn.io/v2/foo/

6.4 - Function Entrypoints

There are several methods to access a sync function. Let’s elaborate on this in the following section.

This documentation will assume you are using default OpenFunction Gateway and you have a sync function named function-sample.

Access functions from within the cluster

Access functions by the internal address

OpenFunction will create this service for every sync Function: {{.Name}}.{{.Namespace}}.svc.cluster.local. This service will be used to provide the Function internal address.

Get Function internal address by running following command:

export FUNC_INTERNAL_ADDRESS=$(kubectl get function function-sample -o=jsonpath='{.status.addresses[?(@.type=="Internal")].value}')

This address provides the default method for accessing functions within the cluster, it’s suitable for use as sink.url of EventSource.

Access Function using curl in pod:

kubectl run --rm ofn-test -i --tty --image=radial/busyboxplus:curl -- curl -sv $FUNC_INTERNAL_ADDRESS

Access functions by the external address

The Function external address is usually used for external access, but you can also use it within the cluster for development or testing. You should configure local domain first, see Configure Local Domain.

Get Function external address by running following command:

export FUNC_EXTERNAL_ADDRESS=$(kubectl get function function-sample -o=jsonpath='{.status.addresses[?(@.type=="External")].value}')

Access Function using curl in pod:

kubectl run --rm ofn-test -i --tty --image=radial/busyboxplus:curl -- curl -sv $FUNC_EXTERNAL_ADDRESS

Access functions from outside the cluster

Access functions by the Kubernetes Gateway’s IP address

Get Kubernetes Gateway’s ip address and port:

export IP=<your node ip>
export PORT=$(kubectl get svc -n projectcontour contour-envoy -o=jsonpath='{.spec.ports[?(@.name=="http")].nodePort}')

Get Function’s HOST and PATH:

export FUNC_HOST=$(kubectl get function function-sample -o=jsonpath='{.status.route.hosts[0]}')
export FUNC_PATH=$(kubectl get function function-sample -o=jsonpath='{.status.route.paths[0].value}')

Access Function using curl directly:

curl -sv -HHOST:$FUNC_HOST http://$IP:$PORT$FUNC_PATH

Access by domain defined in OpenFunction Gateway

You should configure your real DNS first, the relevant configuration is based on the network topology of your environment.

Get Function external address by running following command:

export FUNC_EXTERNAL_ADDRESS=$(kubectl get function function-sample -o=jsonpath='{.status.addresses[?(@.type=="External")].value}')

Access Function using curl directly:

curl -sv $FUNC_EXTERNAL_ADDRESS

7 - OpenFunction Events

7.1 - Introduction

Overview

OpenFunction Events is OpenFunction’s event management framework. It provides the following core features:

  • Support for triggering target functions by synchronous and asynchronous calls
  • User-defined trigger judgment logic
  • The components of OpenFunction Events can be driven by OpenFunction itself

Architecture

The following diagram illustrates the architecture of OpenFunction Events.

openfunction-events

Concepts

EventSource

EventSource defines the producer of an event, such as a Kafka service, an object storage service, and even a function. It contains descriptions of these event producers and information about where to send these events.

EventSource supports the following types of event source server:

  • Kafka
  • Cron (scheduler)
  • Redis

EventBus (ClusterEventBus)

EventBus is responsible for aggregating events and making them persistent. It contains descriptions of an event bus broker that usually is a message queue (such as NATS Streaming and Kafka), and provides these configurations for EventSource and Trigger.

EventBus handles event bus adaptation for namespace scope by default. For cluster scope, ClusterEventBus is available as an event bus adapter and takes effect when other components cannot find an EventBus under a namespace.

EventBus supports the following event bus broker:

  • NATS Streaming

Trigger

Trigger is an abstraction of the purpose of an event, such as what needs to be done when a message is received. It contains the purpose of an event defined by you, which tells the trigger which EventSource it should fetch the event from and subsequently determine whether to trigger the target function according to the given conditions.

Reference

For more information, see EventSource Specifications and EventBus Specifications.

7.2 - Use EventSource

This document gives an example of how to use an event source to trigger a synchronous function.

In this example, an EventSource is defined for synchronous invocation to use the event source (a Kafka server) as an input bindings of a function (a Knative service). When the event source generates an event, it will invoke the function and get a synchronous return through the spec.sink configuration.

Create a Function

Use the following content to create a function as the EventSource Sink. For more information about how to create a function, see Create sync functions.

apiVersion: core.openfunction.io/v1beta1
kind: Function
metadata:
  name: sink
spec:
  version: "v1.0.0"
  image: "openfunction/sink-sample:latest"
  port: 8080
  serving:
    runtime: "knative"
    template:
      containers:
        - name: function
          imagePullPolicy: Always

After the function is created, run the following command to get the URL of the function.

$ kubectl get functions.core.openfunction.io
NAME   BUILDSTATE   SERVINGSTATE   BUILDER   SERVING         URL                                   AGE
sink   Skipped      Running                  serving-4x5wh   https://openfunction.io/default/sink   13s

Create a Kafka Cluster

  1. Run the following commands to install strimzi-kafka-operator in the default namespace.

    helm repo add strimzi https://strimzi.io/charts/
    helm install kafka-operator -n default strimzi/strimzi-kafka-operator
    
  2. Use the following content to create a file kafka.yaml.

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: kafka-server
      namespace: default
    spec:
      kafka:
        version: 3.1.0
        replicas: 1
        listeners:
          - name: plain
            port: 9092
            type: internal
            tls: false
          - name: tls
            port: 9093
            type: internal
            tls: true
        config:
          offsets.topic.replication.factor: 1
          transaction.state.log.replication.factor: 1
          transaction.state.log.min.isr: 1
          default.replication.factor: 1
          min.insync.replicas: 1
          inter.broker.protocol.version: "3.1"
        storage:
          type: ephemeral
      zookeeper:
        replicas: 1
        storage:
          type: ephemeral
      entityOperator:
        topicOperator: {}
        userOperator: {}
    ---
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaTopic
    metadata:
      name: events-sample
      namespace: default
      labels:
        strimzi.io/cluster: kafka-server
    spec:
      partitions: 10
      replicas: 1
      config:
        retention.ms: 7200000
        segment.bytes: 1073741824
    
  3. Run the following command to deploy a 1-replica Kafka server named kafka-server and 1-replica Kafka topic named events-sample in the default namespace. The Kafka and Zookeeper clusters created by this command have a storage type of ephemeral and are demonstrated using emptyDir.

    kubectl apply -f kafka.yaml
    
  4. Run the following command to check pod status and wait for Kafka and Zookeeper to be up and running.

    $ kubectl get po
    NAME                                              READY   STATUS        RESTARTS   AGE
    kafka-server-entity-operator-568957ff84-nmtlw     3/3     Running       0          8m42s
    kafka-server-kafka-0                              1/1     Running       0          9m13s
    kafka-server-zookeeper-0                          1/1     Running       0          9m46s
    strimzi-cluster-operator-687fdd6f77-cwmgm         1/1     Running       0          11m
    
  5. Run the following command to view the metadata of the Kafka cluster.

    kafkacat -L -b kafka-server-kafka-brokers:9092
    

Trigger a Synchronous Function

Create an EventSource

  1. Use the following content to create an EventSource configuration file (for example, eventsource-sink.yaml).

    apiVersion: events.openfunction.io/v1alpha1
    kind: EventSource
    metadata:
      name: my-eventsource
    spec:
      logLevel: "2"
      kafka:
        sample-one:
          brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092"
          topic: "events-sample"
          authRequired: false
      sink:
        uri: "http://openfunction.io.svc.cluster.local/default/sink"
    
  2. Run the following command to apply the configuration file.

    kubectl apply -f eventsource-sink.yaml
    
  3. Run the following commands to check the results.

    $ kubectl get eventsources.events.openfunction.io
    NAME             EVENTBUS   SINK   STATUS
    my-eventsource                     Ready
    
    $ kubectl get components
    NAME                                                      AGE
    serving-8f6md-component-esc-kafka-sample-one-r527t        68m
    serving-8f6md-component-ts-my-eventsource-default-wz8jt   68m
    
    $ kubectl get deployments.apps
    NAME                                           READY   UP-TO-DATE   AVAILABLE   AGE
    serving-8f6md-deployment-v100-pg9sd            1/1     1            1           68m
    

Create an event producer

To start the target function, you need to create some events to trigger the function.

  1. Use the following content to create an event producer configuration file (for example, events-producer.yaml).

    apiVersion: core.openfunction.io/v1beta1
    kind: Function
    metadata:
      name: events-producer
    spec:
      version: "v1.0.0"
      image: openfunctiondev/v1beta1-bindings:latest
      serving:
        template:
          containers:
            - name: function
              imagePullPolicy: Always
        runtime: "async"
        inputs:
          - name: cron
            component: cron
        outputs:
          - name: target
            component: kafka-server
            operation: "create"
        bindings:
          cron:
            type: bindings.cron
            version: v1
            metadata:
              - name: schedule
                value: "@every 2s"
          kafka-server:
            type: bindings.kafka
            version: v1
            metadata:
              - name: brokers
                value: "kafka-server-kafka-brokers:9092"
              - name: topics
                value: "events-sample"
              - name: consumerGroup
                value: "bindings-with-output"
              - name: publishTopic
                value: "events-sample"
              - name: authRequired
                value: "false"
    
  2. Run the following command to apply the configuration file.

    kubectl apply -f events-producer.yaml
    
  3. Run the following command to check the results in real time.

    $ kubectl get po --watch
    NAME                                                           READY   STATUS              RESTARTS   AGE
    serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh            0/2     ContainerCreating   0          1s
    serving-8f6md-deployment-v100-pg9sd-6666c5577f-4rpdg           2/2     Running             0          23m
    serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh            0/2     ContainerCreating   0          1s
    serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh            1/2     Running             0          5s
    serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh            2/2     Running             0          8s
    serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk      0/2     Pending             0          0s
    serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk      0/2     Pending             0          0s
    serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk      0/2     ContainerCreating   0          0s
    serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk      0/2     ContainerCreating   0          2s
    serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk      1/2     Running             0          4s
    serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk      1/2     Running             0          4s
    serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk      2/2     Running             0          4s
    

7.3 - Use EventBus and Trigger

This document gives an example of how to use EventBus and Trigger.

Prerequisites

  • You need to create a function as the target function to be triggered. Please refer to Create a function for more details.
  • You need to create a Kafka cluster. Please refer to Create a Kafka cluster for more details.

Deploy an NATS streaming server

Run the following commands to deploy an NATS streaming server. This document uses nats://nats.default:4222 as the access address of the NATS streaming server and stan as the cluster ID. For more information, see NATS Streaming (STAN).

helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm install nats nats/nats
helm install stan nats/stan --set stan.nats.url=nats://nats:4222

Create an OpenFuncAsync Runtime Function

  1. Use the following content to create a configuration file (for example, openfuncasync-function.yaml) for the target function, which is triggered by the Trigger CRD and prints the received message.

    apiVersion: core.openfunction.io/v1beta1
    kind: Function
    metadata:
      name: trigger-target
    spec:
      version: "v1.0.0"
      image: openfunctiondev/v1beta1-trigger-target:latest
      port: 8080
      serving:
        runtime: "async"
        scaleOptions:
          keda:
            scaledObject:
              pollingInterval: 15
              minReplicaCount: 0
              maxReplicaCount: 10
              cooldownPeriod: 30
        triggers:
          - type: stan
            metadata:
              natsServerMonitoringEndpoint: "stan.default.svc.cluster.local:8222"
              queueGroup: "grp1"
              durableName: "ImDurable"
              subject: "metrics"
              lagThreshold: "10"
        inputs:
          - name: autoscaling-pubsub
            component: eventbus
            topic: metrics
        pubsub:
          eventbus:
            type: pubsub.natsstreaming
            version: v1
            metadata:
              - name: natsURL
                value: "nats://nats.default:4222"
              - name: natsStreamingClusterID
                value: "stan"
              - name: subscriptionType
                value: "queue"
              - name: durableSubscriptionName
                value: "ImDurable"
              - name: consumerID
                value: "grp1"
    
  2. Run the following command to apply the configuration file.

    kubectl apply -f openfuncasync-function.yaml
    

Create an EventBus and an EventSource

  1. Use the following content to create a configuration file (for example, eventbus.yaml) for an EventBus.

    apiVersion: events.openfunction.io/v1alpha1
    kind: EventBus
    metadata:
      name: default
    spec:
      natsStreaming:
        natsURL: "nats://nats.default:4222"
        natsStreamingClusterID: "stan"
        subscriptionType: "queue"
        durableSubscriptionName: "ImDurable"
    
  2. Use the following content to create a configuration file (for example, eventsource.yaml) for an EventSource.

    apiVersion: events.openfunction.io/v1alpha1
    kind: EventSource
    metadata:
      name: my-eventsource
    spec:
      logLevel: "2"
      eventBus: "default"
      kafka:
        sample-two:
          brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092"
          topic: "events-sample"
          authRequired: false
    
  3. Run the following commands to apply these configuration files.

    kubectl apply -f eventbus.yaml
    kubectl apply -f eventsource.yaml
    
  4. Run the following commands to check the results.

    $ kubectl get eventsources.events.openfunction.io
    NAME             EVENTBUS   SINK   STATUS
    my-eventsource   default           Ready
    
    $ kubectl get eventbus.events.openfunction.io
    NAME      AGE
    default   6m53s
    
    $ kubectl get components
    NAME                                                 AGE
    serving-6r5dl-component-eventbus-jlpqf               11m
    serving-9689d-component-ebfes-my-eventsource-cmcbw   6m57s
    serving-9689d-component-esc-kafka-sample-two-l99cg   6m57s
    serving-k6zw8-component-cron-9x8hl                   61m
    serving-k6zw8-component-kafka-server-sjrzs           61m
    
    $ kubectl get deployments.apps
    NAME                                       READY   UP-TO-DATE   AVAILABLE   AGE
    serving-6r5dl-deployment-v100-m7nq2        0/0     0            0           12m
    serving-9689d-deployment-v100-5qdvk        1/1     1            1           7m17s
    

Create a Trigger

  1. Use the following content to create a configuration file (for example, trigger.yaml) for a Trigger.

    apiVersion: events.openfunction.io/v1alpha1
    kind: Trigger
    metadata:
      name: my-trigger
    spec:
      logLevel: "2"
      eventBus: "default"
      inputs:
        inputDemo:
          eventSource: "my-eventsource"
          event: "sample-two"
      subscribers:
        - condition: inputDemo
          topic: "metrics"
    
  2. Run the following command to apply the configuration file.

    kubectl apply -f trigger.yaml
    
  3. Run the following commands to check the results.

    $ kubectl get triggers.events.openfunction.io
    NAME         EVENTBUS   STATUS
    my-trigger   default    Ready
    
    $ kubectl get eventbus.events.openfunction.io
    NAME      AGE
    default   62m
    
    $ kubectl get components
    NAME                                                 AGE
    serving-9689d-component-ebfes-my-eventsource-cmcbw   46m
    serving-9689d-component-esc-kafka-sample-two-l99cg   46m
    serving-dxrhd-component-eventbus-t65q7               13m
    serving-zwlj4-component-ebft-my-trigger-4925n        100s
    

Create an Event Producer

  1. Use the following content to create an event producer configuration file (for example, events-producer.yaml).

    apiVersion: core.openfunction.io/v1beta1
    kind: Function
    metadata:
      name: events-producer
    spec:
      version: "v1.0.0"
      image: openfunctiondev/v1beta1-bindings:latest
      serving:
        template:
          containers:
            - name: function
              imagePullPolicy: Always
        runtime: "async"
        inputs:
          - name: cron
            component: cron
        outputs:
          - name: target
            component: kafka-server
            operation: "create"
        bindings:
          cron:
            type: bindings.cron
            version: v1
            metadata:
              - name: schedule
                value: "@every 2s"
          kafka-server:
            type: bindings.kafka
            version: v1
            metadata:
              - name: brokers
                value: "kafka-server-kafka-brokers:9092"
              - name: topics
                value: "events-sample"
              - name: consumerGroup
                value: "bindings-with-output"
              - name: publishTopic
                value: "events-sample"
              - name: authRequired
                value: "false"
    
  2. Run the following command to apply the configuration file.

    kubectl apply -f events-producer.yaml
    
  3. Run the following commands to observe changes of the target asynchronous function.

    $ kubectl get functions.core.openfunction.io
    NAME                                  BUILDSTATE   SERVINGSTATE   BUILDER   SERVING         URL                                   AGE
    trigger-target                        Skipped      Running                  serving-dxrhd                                         20m
    
    $ kubectl get po --watch
    NAME                                                     READY   STATUS              RESTARTS   AGE
    serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm      0/2     Pending             0          0s
    serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm      0/2     Pending             0          0s
    serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm      0/2     ContainerCreating   0          0s
    serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm      0/2     ContainerCreating   0          2s
    serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm      1/2     Running             0          4s
    serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm      1/2     Running             0          4s
    serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm      2/2     Running             0          4s
    

7.4 - Use Multiple Sources in One EventSource

This document describes how to use multiple sources in one EventSource.

Prerequisites

  • You need to create a function as the target function to be triggered. Please refer to Create a function for more details.
  • You need to create a Kafka cluster. Please refer to Create a Kafka cluster for more details.

Use Multiple Sources in One EventSource

  1. Use the following content to create an EventSource configuration file (for example, eventsource-multi.yaml).

    apiVersion: events.openfunction.io/v1alpha1
    kind: EventSource
    metadata:
      name: my-eventsource
    spec:
      logLevel: "2"
      kafka:
        sample-three:
          brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092"
          topic: "events-sample"
          authRequired: false
      cron:
        sample-three:
          schedule: "@every 5s" 
      sink:
        uri: "http://openfunction.io.svc.cluster.local/default/sink"
    
  2. Run the following command to apply the configuration file.

    kubectl apply -f eventsource-multi.yaml
    
  3. Run the following commands to observe changes.

    $ kubectl get eventsources.events.openfunction.io
    NAME             EVENTBUS   SINK   STATUS
    my-eventsource                     Ready
    
    $ kubectl get components
    NAME                                                      AGE
    serving-vqfk5-component-esc-cron-sample-three-dzcpv       35s
    serving-vqfk5-component-esc-kafka-sample-one-nr9pq        35s
    serving-vqfk5-component-ts-my-eventsource-default-q6g6m   35s
    
    $ kubectl get deployments.apps
    NAME                                       READY   UP-TO-DATE   AVAILABLE   AGE
    serving-4x5wh-ksvc-wxbf2-v100-deployment   1/1     1            1           3h14m
    serving-vqfk5-deployment-v100-vdmvj        1/1     1            1           48s
    

7.5 - Use ClusterEventBus

This document describes how to use a ClusterEventBus.

Prerequisites

You have finished the steps described in Use EventBus and Trigger.

Use a ClusterEventBus

  1. Use the following content to create a ClusterEventBus configuration file (for example, clustereventbus.yaml).

    apiVersion: events.openfunction.io/v1alpha1
    kind: ClusterEventBus
    metadata:
      name: default
    spec:
      natsStreaming:
        natsURL: "nats://nats.default:4222"
        natsStreamingClusterID: "stan"
        subscriptionType: "queue"
        durableSubscriptionName: "ImDurable"
    
  2. Run the following command to delete EventBus.

    kubectl delete eventbus.events.openfunction.io default
    
  3. Run the following command to apply the configuration file.

    kubectl apply -f clustereventbus.yaml
    
  4. Run the following commands to check the results.

    $ kubectl get eventbus.events.openfunction.io
    No resources found in default namespace.
    
    $ kubectl get clustereventbus.events.openfunction.io
    NAME      AGE
    default   21s
    

7.6 - Use Trigger Conditions

This document describes how to use Trigger conditions.

Prerequisites

You have finished the steps described in Use EventBus and Trigger.

Use Trigger Conditions

Create two event sources

  1. Use the following content to create an EventSource configuration file (for example, eventsource-a.yaml).

    apiVersion: events.openfunction.io/v1alpha1
    kind: EventSource
    metadata:
      name: eventsource-a
    spec:
      logLevel: "2"
      eventBus: "default"
      kafka:
        sample-five:
          brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092"
          topic: "events-sample"
          authRequired: false
    
  2. Use the following content to create another EventSource configuration file (for example, eventsource-b.yaml).

    apiVersion: events.openfunction.io/v1alpha1
    kind: EventSource
    metadata:
      name: eventsource-b
    spec:
      logLevel: "2"
      eventBus: "default"
      cron:
        sample-five:
          schedule: "@every 5s" 
    
  3. Run the following commands to apply these two configuration files.

    kubectl apply -f eventsource-a.yaml
    kubectl apply -f eventsource-b.yaml
    

Create a trigger with condition

  1. Use the following content to create a configuration file (for example, condition-trigger.yaml) for a Trigger with condition.

    apiVersion: events.openfunction.io/v1alpha1
    kind: Trigger
    metadata:
      name: condition-trigger
    spec:
      logLevel: "2"
      eventBus: "default"
      inputs:
        eventA:
          eventSource: "eventsource-a"
          event: "sample-five"
        eventB:
          eventSource: "eventsource-b"
          event: "sample-five"
      subscribers:
      - condition: eventB
        sink:
          uri: "http://openfunction.io.svc.cluster.local/default/sink"
      - condition: eventA && eventB
        topic: "metrics"
    
  2. Run the following commands to apply the configuration files.

    kubectl apply -f condition-trigger.yaml
    
  3. Run the following commands to check the results.

    $ kubectl get eventsources.events.openfunction.io
    NAME            EVENTBUS   SINK   STATUS
    eventsource-a   default           Ready
    eventsource-b   default           Ready
    
    $ kubectl get triggers.events.openfunction.io
    NAME                EVENTBUS   STATUS
    condition-trigger   default    Ready
    
    $ kubectl get eventbus.events.openfunction.io
    NAME      AGE
    default   12s
    
  4. Run the following command and you can see from the output that the eventB condition in the Trigger is matched and the Knative service is triggered because the event source eventsource-b is a cron task.

    $ kubectl get functions.core.openfunction.io
    NAME                                  BUILDSTATE   SERVINGSTATE   BUILDER   SERVING         URL                                   AGE
    sink                                  Skipped      Running                  serving-4x5wh   https://openfunction.io/default/sink   3h25m
    
    $ kubectl get po
    NAME                                                        READY   STATUS    RESTARTS   AGE
    serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-k2jdg   2/2     Running   0          46s
    
  5. Create an event producer by referring to Create an Event Producer.

  6. Run the following command and you can see from the output that the eventA && eventB condition in the Trigger is matched and the event is sent to the metrics topic of the event bus at the same time. The OpenFuncAsync function is triggered.

    $ kubectl get functions.core.openfunction.io
    NAME                                  BUILDSTATE   SERVINGSTATE   BUILDER   SERVING         URL                                   AGE
    trigger-target                        Skipped      Running                  serving-7hghp                                         103s
    
    $ kubectl get po
    NAME                                                        READY   STATUS    RESTARTS   AGE
    serving-7hghp-deployment-v100-z8wrf-946b4854d-svf55         2/2     Running   0          18s