Skip to content

Flink Kubernetes Deployment

This chapter updates
  • Created 10/2024
  • 12/24: move some content to hands-on readme, clean content
  • 01/25: sql processing section
  • 05/25: merge content, simplify, add some details on deployment - fully test k8s deployment on Colima
  • 07/25: UPdate for Confluent Platform v8

Apache Flink Kubernetes Operator acts as a control plane to manage the complete deployment lifecycle of Apache Flink applications. This note summarizes how to use this operator, with present the different getting started yaml files.

The operator takes care of submitting, savepointing, upgrading and generally managing Flink jobs using the built-in Flink Kubernetes integration.

1
Flink Operator to manage Flink Job and Task managers

The operator fully automates the entire lifecycle of job manager, task managers, and applications. Failures of Job Manager pods are handled by the Deployment Controller which will take care of spawning a new Job Manager.

As other operators, it can run namespace-scoped, to get multiple versions of the operator in the same Kubernetes cluster, or cluster-scoped for highly distributed deployment.

The following figure represents a simple deployment view of a Flink and a Kafka clusters on a kubernetes platform:

2
K8S deployment

The custom resource definition that describes the schema of a FlinkDeployment is a cluster wide resource. The Operator continuously tracks cluster events relating to the FlinkDeployment and FlinkSessionJob custom resources. The operator control flow is described in this note.

FlinkDeployment

FlinkDeployment CR defines Flink Application and Session cluster deployments

Important documentations

Pre-requisites

Any Flink on Kubernetes deployment should include the following pre-requisites:

Two Options to run Flink on Kubernetes: 1/ Confluent Platform for Flink, or 2/ Apache Flink Open Source

See the Makefile under deployment/k8s/cp-flink

  • Add Confluent Platform Helm repositories

    helm repo add confluentinc https://packages.confluent.io/helm
    # Verify help repo entries exist
    helm repo list
    # try to update repo content
    helm repo update
    helm upgrade --install confluent-operator confluentinc/confluent-for-kubernetes
    

  • Install Confluent CLI or update existing CLI with:

    confluent update
    

  • Install Confluent plugin for kubectl. For previously installed plugin, delete the old plugin from the directory where you previously installed it.

  • Get Confluent Platform releases information.

  • For Confluent Platform for Flink deployment see details in this section.
  • Add the Apache Flink Helm repositories:

    helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.11.0
    # Verify help repo entries exist
    helm repo list
    # Be sure to change the repo as the URL may not be valid anymore
    helm repo remove  flink-operator-repo
    # try to update repo content
    helm repo update
    

    See Apache Flink Operator documentation

Colima playground

  • Start a kubernetes cluster, for colima do:

    colima start --kubernetes
    # or under deployment/k8s folder
    ./start_colima.sh
    

  • Install Certification manager See current releases:

    kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.18.1/cert-manager.yaml
    # verify
    kubeclt get pods -n cert-manager
    

  • Install Apache Flink Operator for kubernetes

  • Install Minio to expose object storage in the K8S, see this section.

Using MinIO

MinIO is an object storage solution that provides an Amazon Web Services S3-compatible API and supports all core S3 features, on k8s.

  • First be sure the MinIO CLI is installed.

    brew install minio/stable/mc
    # or to upgrade to a new version
    brew upgrade minio/stable/mc
    # Verify installation
    mc --help
    

    mc cli command summary

  • Config Minio under minio-dev namespace

    # under deployment/k8s/MinIO
    kubectl apply -f minio-dev.yaml
    kubectl get pods -n minio-dev
    

  • Access MinIO S3 API and Console

    kubectl port-forward pod/minio 9000 9090 -n minio-dev
    

  • Log in to the Console with the credentials minioadmin | minioadmin

  • Setup a minio client with credential saved to $HOME/.mc/config.json

    mc alias set dev-minio http://localhost:9000 minioadmin minioadmin
    # make a bucket
    mc mb dev-minio/flink
    

  • Next steps is to upload jar files for the different applications to deploy, or data sets for SQL table. See later section.

The Apache flink kubernetes operator product documentation lists the setup steps.

Updated 07.01.2025: For CFK version 3.0.0 and CP v8.0.0

See README in deployment/k8s/cp-flink, the operations can be summarized as

  • Create a namespace for Confluent products deployment. By default, CMF deploys Confluent Platform in the namespaced deployment, and it manages Confluent Platform component clusters and resources in the same Kubernetes namespace where CFK itself is deployed.
  • Install Helm repo for ConfluentInc
  • Install certificat manager, see Release version here.
  • Deploy the CFK bundle using Helm
  • Install Confluent plugin for kubectl
    sudo tar -xvf kubectl-plugin/kubectl-confluent-darwin-arm64.tar.gz -C /usr/local/bin/
    
  • Deploy Confluent Platform Flink operator
  • Deploy Confluent Kafka Broker using one Kraft controller, three brokers, new Confluent Console, with REST api and schema registry
  • Then deploy one basic Flink application

Custom Resources

Once the operator is running, we can submit jobs using FlinkDeployment (for Flink Application) and FlinkSessionJob Custom Resources for Session. Confluent Managed for Flink supports application mode only and is using a new CRD for FlinkApplication.

The Apache Flink FlinkDeployment spec is here and is used to define Flink application (will have a job section) or session cluster (only job and task managers configuration).

It is important to note that FlinkDeployment and FlinkApplication CRD have a podTemplate, so config map and secrets can be used to configure environment variables for the flink app. (Be sure to keep the name flink-main-container)

spec:
  podTemplate:
    spec:
      containers:
        - name: flink-main-container
          envFrom:
            - configMapRef:
                name: flink-app-cm

A RWX, shared PersistentVolumeClaim (PVC) for the Flink JobManagers and TaskManagers provides persistence for stateful checkpoint and savepoint of Flink jobs.

A flow is a packaged as a jar, so developers need to define a docker image with the Flink API and any connector jars. Example of Dockerfile and FlinkApplication manifest.

Also one solution includes using MinIO to persist application jars.

HA configuration

Within Kubernetes, we can enable Flink HA in the ConfigMap of the cluster configuration:

  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.backend: rockdb
    state.savepoints.dir: file:///flink-data/savepoints
    state.checkpoints.dir: file:///flink-data/checkpoints
    high-availability.type: kubernetes
    high-availability.storageDir: file:///flink-data/ha

JobManager metadata is persisted in the file system high-availability.storageDir . This storageDir stores all metadata needed to recover a JobManager failure.

JobManager Pod that crashed are restarted automatically by the kubernetes scheduler, and as Flink persists metadata and the job artifacts, it is important to mount pv to the expected paths.

podTemplate:
  spec:
    containers:
      - name: flink-main-container
        volumeMounts:
        - mountPath: /flink-data
          name: flink-volume
    volumes:
    - name: flink-volume
      hostPath:
        # directory location on host
        path: /tmp/flink
        # this field is optional
        type: Directory

Recall that podTemplate is a base declaration common for job and task manager pods. Can be overridden by the jobManager and taskManager pod template sub-elements. The previous declaration will work for minikube with hostPath access, for Kubernetes cluster with separate storage class then the volume declaration is:

volumes:
  - name: flink-volume
    persistenceVolumeClaim:
      claimName: flink-pvc

For Flink job or application, it is important to enable checkpointing and savepointing:

job:
  jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar

  parallelism: 2
  upgradeMode: savepoint
  #savepointTriggerNonce: 0
  # initialSavepointPath: file:///
How to validate checkpointing?

Checkpointing let Flink to periodically save the state of a job into local storage. Look at the pod name of the task manager and stop it with kubectl delete pod/.... Flink should automatically restart the job and recover from the latest checkpoint. Use the Flink UI or CLI to see the job status.

How to validate savepointing?

Savepoints are manually triggered snapshots of the job state, which can be used to upgrade a job or to perform manual recovery. To trigger a savepoint we need to set a value into savepointTriggerNonce in the FlinkDeployment descriptor and then apply the changes. Get the location of the save point and then add to the yaml initialSavepointPath to redeploy the applicationL: it will reload its state from the savepoint. There is a custom resource definition (FlinkStateSnapshotSpec) to trigger savepoints.

  • If a write operation fails when the pod creates a folder or updates the Flink config, verify the following:

    • Assess PVC and R/W access. Verify PVC configuration. Some storage classes or persistent volume types may have restrictions on directory creation
    • Verify security context for the pod. Modify the pod's security context to allow necessary permissions.
    • The podTemplate can be configured at the same level as the task and job managers so any mounted volumes will be available to those pods. See basic-reactive.yaml from Flink Operator examples.

See PVC and PV declarations

For Session cluster, there is no jobSpec. See this deployment definition. Once a cluster is defined, it has a name and can be referenced to submit SessionJobs.

A SessionJob is executed as a long-running Kubernetes Deployment. We may run multiple Flink jobs on a Session cluster. Each job needs to be submitted to the cluster after the cluster has been deployed. To deploy a job, we need at least three components:

  • a Deployment which runs a JobManager
  • a Deployment for a pool of TaskManagers
  • a Service exposing the JobManager’s REST and UI ports

For a deployment select the execution mode: application, or session. For production it is recommended to deploy in application mode for better isolation, and using a cloud native approach. We can just build a dockerfile for our application using the Flink jars.

Session Deployment

Flink has a set of examples like the Car top speed computation with simulated record. As this code is packaged in a jar available in maven repository, we can declare a job session.

Deploy a config map to define the log4j-console.properties and other parameters for Flink (flink-conf.yaml)

The diagram below illustrates the standard deployment of a job on k8s with session mode:

1

src: apache Flink site

apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
  name: car-top-speed-job
spec:
  deploymentName: flink-session-cluster
  job:
    jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.17.2/flink-examples-streaming_2.12-1.17.2-TopSpeedWindowing.jar
    parallelism: 4
    upgradeMode: stateless

Before deploying this job, be sure to deploy a session cluster using the following command:

# under deployment/k8s
kubectl apply -f basic-job-task-mgrs.yaml 

Once the job is deployed we can see the pod and then using the user interface the job continuously running:

To help managing snapshots, there is another CR called FlinkStateSnapshot

An application deployment must define the job (JobSpec) field with the jarURI, parallelism, upgradeMode one of (stateless/savepoint/last-state) and the desired state of the job (running/suspended). See this sample app or the cmf_app_deployment.yaml in the e-com-sale demonstration.

job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
    # For your own deployment, use your own jar
    jarURI: local:///opt/flink/usrlib/yourapp01.0.0.jar
    parallelism: 2
    upgradeMode: stateless
    state: running

flinkConfiguration is a hash map used to define the Flink configuration, such as the task slot, HA and checkpointing parameters.

  flinkConfiguration:
    high-availability.type: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: 'file:///opt/flink/volume/flink-ha'
    restart-strategy: failure-rate
    restart-strategy.failure-rate.max-failures-per-interval: '10'
    restart-strategy.failure-rate.failure-rate-interval: '10 min'
    restart-strategy.failure-rate.delay: '30 s'
    execution.checkpointing.interval: '5000'
    execution.checkpointing.unaligned: 'false'
    state.backend.type: rocksdb
    state.backend.incremental: 'true'
    state.backend.rocksdb.use-bloom-filter: 'true'
    state.checkpoints.dir: 'file:///opt/flink/volume/flink-cp'
    state.checkpoints.num-retained: '3'
    state.savepoints.dir: 'file:///opt/flink/volume/flink-sp'
    taskmanager.numberOfTaskSlots: '10'
    table.exec.source.idle-timeout: '30 s'

The application jar needs to be in a custom Flink docker image built using the Dockerfile as in e-com-sale-demo, or uploaded to a MinIO bucket.

The following Dockerfile is used for deploying a solution in application mode, which packages the Java Flink jars with the app, and any connector jars needed for the integration and starts the main() function.

FROM flink
RUN mkdir -p $FLINK_HOME/usrlib
COPY /path/of/my-flink-job-*.jar $FLINK_HOME/usrlib/my-flink-job.jar
  • With Confluent Platform for Flink:

      # First be sure the service is expose
      kubectl port-forward svc/cmf-service 8080:80 -n flink
      # Deploy the app given its deployment
      confluent flink application create k8s/cmf_app_deployment.yaml  --environment $(ENV_NAME) --url http://localhost:8080 
    
Access to user interface

To forward your jobmanager’s web ui port to local 8081.

kubectl port-forward ${flink-jobmanager-pod} 8081:8081 
# Or using CP for Flink
confluent flink application web-ui-forward $(APP_NAME) --environment $(ENV_NAME) --port 8081 --url http://localhost:8080

And navigate to http://localhost:8081.

Using MinIO for app deployment

  • Upload an application to minio bucket:

    mc cp ./target/flink-app-0.1.0.jar dev-minio/flink/flink-app-0.1.0.jar
    mc ls dev-minio/flink
    
  • Start the application using confluent cli:

    confluent flink application create --environment env1 --url http://localhost:8080 app-deployment.json
    
  • Open Flink UI:

    confluent flink application web-ui-forward --environment env1 flink-app --url http://localhost:8080
    
  • Produce messages to kafka topic

    echo 'message1' | kubectl exec -i -n confluent kafka-0 -- /bin/kafka-console-producer --bootstrap-server kafka.confluent.svc.cluster.local:9092 --topic in
    
  • Cleanup

    # the Flink app
    confluent flink application delete kafka-reader-writer-example --environment development --url http://localhost:8080
    # the Kafka cluster
    # the operators
    

There are multiple choices to run Flink SQL, using the SQL client, or package the SQL scripts and a java SQL runner executing the SQL statements from a file, so the application deployment is Java based even if SQL scripts are used for stream processing.

Demonstrations

  • Deploy a Kafka python producer as pod to write to Confluent Platform topic, create a Flink app that deduplicate the records,
  • A schema evolution study:

Practices

  • It is not recommended to host a Flink Cluster across multiple Kubernetes clusters. Flink node exchanges data between task managers and so better to run in same region, and within same k8s.