Flink Kubernetes Deployment¶
This chapter updates
- Created 10/2024
- 12/24: move some content to hands-on readme, clean content
- 01/25: sql processing section
- 05/25: merge content, simplify, add some details on deployment - fully test k8s deployment on Colima
- 07/25: Update for Confluent Platform v8
- 09/29: Update to diagrams and doc structure.
- 10/12: update to Minio and snapshot / checkpoint configuration
- 10/20: Reorganize content
Apache Flink has defined a Kubernetes Operator (FKO) to deploy and manage custom resources for Flink deployments. Confluent Platform Manager for Flink (CMF) is also deployed on Kubernetes with its own operator, leveraging the FKO. Also as part of the Confluent Platform it is integrated with Confluent Kubernetes Operator (CKO).
We assume reader has good understanding and knowledge of Kubernetes, and kubectl.
Let start to review the Apache Flink Kubernetes Operator concepts.
Apache Flink Kubernetes Operator Concepts¶
Apache Flink Kubernetes Operator(FKO) acts as a control plane to manage the complete deployment lifecycle of Apache Flink applications. This note summarizes how to use this operator.
The operator takes care of submitting, savepointing, upgrading and generally managing Flink jobs using the built-in Flink Kubernetes integration. The operator fully automates the entire lifecycle of the job manager, the task managers, and the applications. A FlinkDeployment is a manifest to define what needs to be deployed, and then the FKO manages the deployment by using Kubernetes deployments, pods... It supports query on the custom resources it manages.
Failures of Job Manager pod are handled by the Deployment Controller which takes care of spawning a new Job Manager.
As any Kubernetes operators, FKO can run namespace-scoped, to get multiple versions of the operator in the same Kubernetes cluster, or cluster-scoped for highly distributed deployment. The operator maps its custom resources to existing Kubernetes resources of deployments, replica sets, config maps, secrets, service accounts...
The following figure represents a simple deployment view of a Flink Cluster, in parallel of a Kafka cluster running on a Kubernetes platform:
The FKO may have two instances running in parallel. A Flink application may run on its own namespace and will be one jb manager and n task managers. The Kafka cluster runs in its own namespace. File services are needed for Flink to persist checkpoints and savepointes.
Confluent for Kubernetes Operator (CFK)¶
The deployments for Confluent Platform and Confluent Manager for Flink, may look like in the following figure:
CFK Operator is the control plane for deploying and managing Confluent in your Kubernetes private cloud environment. It defines custom resource definitions to support Kafka based resources like topics, brokers, connectors. It also defines how to deploy Schema Registry.
Once you have a Kubernetes cluster the approach is to first deploy the CKO and then define Kafka resources using manifests:
- See This makefile and documentation.
- See the KraftCluster manifest used for all demonstrations in this repository.
Confluent Manager for Flink (CMF)¶
Confluent Manager for Flink (CMF) is a Kubernetes operator, to manage Flink Applications, Environments, Compute pools, Catalogs, we will detail those in a later section.
The following figure illustrates the relationships between those operators:
It is important to note that all CR deployments via kubectl go to the CFK with the apiVersion: platform.confluent.io/v1beta1. CRs touching Flink resources are delegated to the CMF operator. While deploying Flink components via the Confluent CLI, the CMF CRDs use different apiVersion. Therefore it is possible to run CMF without CFK. Any CR with cmf.confluent.io/v1 as apiVersion needs to be created with confluent CLI, as using kubectl will not work because the CRDs are not known by Kubernetes.
- Helpful commands to work on CRDs:
The examples in Confluent github provides scenario workflows to deploy and manage Confluent on Kubernetes including Flink and this article: How to Use Confluent for Kubernetes to Manage Resources Outside of Kubernetes covers part of the deployment.
Custom Resources¶
Apache Flink specific custom resources¶
A Flink Application is any user's program that spawns one or multiple Flink jobs from its main() method and is deploying a JobManager and n Task managers. They may run in their own namespace.
The Flink Kubernetes Operator is looking at different Flink Deployment, so it can be isolated within its own namespace. When deploying the FKO it is important to specify the namespaces to watch for future deployments. The following command modify this list:
helm upgrade --install cp-flink-Kubernetes-operator --version "~1.120.0" confluentinc/flink-Kubernetes-operator --set watchNamespace="{flink, confluent, el-demo, rental}" -n flink
It is important to delete the operator pod and let Kubernetes restarts the FKO pod with the new config.
The custom resource definition that describes the schema of a FlinkDeployment is a cluster wide resource. The Operator continuously tracks cluster events relating to the FlinkDeployment and FlinkSessionJob custom resources. The operator control flow is described in this note.. The important points to remember are:
- The operator control flow is: 1/ Observes the status of the currently deployed resource. 2/ Validates the new resource spec, 3/ Reconciles any required changes based on the new spec and the observed status.
- The Observer module assesses the current stateus of any deployed Flink resources.
- Observer is responsible for application upgrade.
- The job manager is validated via a call to its REST api and the status is recorded in the
jobManagerDeploymentStatus - A Job cannot be in running state without a healthy jobmanager.
Confluent Manager for Flink and FKO¶
Confluent Manager for Apache Flink® (CMF) manages a fleet of Flink Applications (cluster) across multiple Environments. CP Console is integrated with CMF. CMF exposes a REST API and cli integration for managing Flink statements.
CMF integrates with FKO to support Flink native custom resources. The following figure illustrates the current (Oct 2025) configuration of Flink solution deployment using the different CRs apiVersion.
- Flink Environments can be created with Manifests or using the Confluent CLI. The metadata is persisted in an embedded database. The 'Environment' concept is to group multiple Flink applications together. This is an isolation layer for RBAC, and to define Flink Configuration cross compute pools and applications deployed within an environment. Flink Configuration may include common observability and checkpointing storage (HDFS or S3) definitions. See one definition of FlinkEnvironment.
- a REST API supports all the external integration to the operator. Confluent Control Center and the
confluentcli are using this end point. - CMF manages FlinkDeployment resources internally
- Confluent For Kubernetes is the Confluent operator to manage Kafka resources, but it supports the deployment of FlinkEnvironment and an "adapted" FlinkApplication (to reference an Environment) so kubectl command can access this operator for confluent platform CRs (
apiVersion: platform.confluent.io/v1beta1).
It is still possible to do pure OSS FlinkDeployment CRs but this strongly not recommended to leverage the full power of Confluent Platform and get Confluent Support.
Let review the Kubernetes custom resources for Flink.
Flink Custom Resources¶
Once the Flink for Kubernetes Operator is running, we can submit jobs using FlinkDeployment (for Flink Application or for Job manager and task manager for session cluster) and FlinkSessionJob Custom Resources for Session. The following figure represents those concepts:
On the left, a FlinkSessionJob references an existing FlinkDeployment as multiple session jobs can run into the same Flink cluster. The job declaration specifies the code to run with its specific configuration. While on the right, the application mode, has the job definition as part of the FlinkDeployment, as the JobManager and TaskManager mininum resource requirements.
The Apache Flink FlinkDeployment spec is here and is used to define Flink application (will have a job section) or session cluster (only job and task managers configuration).
It is important to note that FlinkDeployment and FlinkApplication CRDs have a podTemplate, so ConfigMap(s) and Secret(s) can be used to configure environment variables for the flink app. (Be sure to keep the container name as flink-main-container)
spec:
podTemplate:
spec:
containers:
- name: flink-main-container
envFrom:
- configMapRef:
name: flink-app-cm
Confluent Specific CRs¶
First an important document to read: The Operator API references.
Confluent Managed for Flink manages Flink application mode only and is using its own CRDs to define FlinkEnvironment and FlinkApplication. The CRDs are defined here. To be complete, it also define KafkaCatalog and ComputePool CRDs to map the same concepts introduced by Confluent Cloud.
- The new CRs for Environment, Application, Compute pool, and Flink Catalog:
- An FlinkEnvironment define access control to flink resources and may define FlinkConfigurations cross applications. Environment level has precedence over Flink configuration for individual Flink applications. See one example in deployment/k8s/cmf.
apiVersion: platform.confluent.io/v1beta1 kind: FlinkEnvironment metadata: name: dev-env namespace: confluent spec: kubernetesNamespace: el-demo flinkApplicationDefaults: metadata: labels: env: dev-env spec: flinkConfiguration: taskmanager.numberOfTaskSlots: '1' state.backend.type: rocksdb state.checkpoints.dir: 's3a://flink/checkpoints' state.savepoints.dir: 's3a://flink/savepoints' state.backend.incremental: 'true' state.backend.rocksdb.use-bloom-filter: 'true' state.checkpoints.num-retained: '3' ... podTemplate: metadata: name: flink-pod-template spec: containers: - name: flink-main-container env: - name: S3_ENDPOINT valueFrom: secretKeyRef: name: minio-s3-credentials key: s3.endpoint ... cmfRestClassRef: name: default namespace: confluent
Some important elements to consider are:
-
kubernetesNamespaceis the namespace where the Flink deployment(s) will be deployed. So one environment establishes foundations for those Flink applications. It can define default Flink configuration for all applications and add common labels, like specifying the environment name they run in.FlinkApplicationis referencing back the Flink Environment which is not what Flink OSS Application does. The last piece is thecmfRestClassRefto reference the Kubernetes object/resource used to define access point to the CMF REST api. -
CMFRestClassdefines the client configuration to access CMF Rest APIs. This resource is referenced by other CFK resources (ex FlinkEnvironment, FlinkApplication) to access CMF Rest APIs. In a more advance configuration, this CR defines security like the authentication mechanism and mTLS to access the REST api. -
FlinkApplicationin the context of Confluent Manager for Flink is the same as Flink OSS but it supports references to Environment and to the CMFRestClass. Every application runs on its own cluster, providing isolation between all applications. - Service Account: Service accounts provide a secure way for applications (like Flink jobs deployed via CMF) to interact with Confluent Cloud resources (e.g., Kafka clusters, Schema Registry) without relying on individual user credentials. Service accounts are central to the RBAC system. Need one service account per application or most likely per environment. The SA, cluster role, role and the role bindings need to be defined in the target namespace where the Flink app will be deployed. See this example for one application or this one based on Table API
- KafkaCatalog is used to expose Kafka Topics as Tables for Flink. This CRD defines a Kafka Catalog object to connect to Kafka Cluster(s) and Schema Registry. Each referenced Kafka Cluster is mapped as a Database. See the kafka catalog example in external lookup demo or rental demo
- ComputePools are used in the context of Flink SQL to execute SQL queries or statements. The ComputePool will only be used when the statement is deployed which happens after the compilation. It is a second level of Flink configuration for Flink cluster settings. See the kafka catalog example in external lookup demo. One important element is to specify the
imageattribute to referent a flink with SQL likeconfluentinc/cp-flink-sql:1.19-cp1. See docker hub for last tags.
The configuration flexibility:
- FlinkConfiguration defined at the environment level can apply to all compute pools of this environment, and applications
- Compute pool configuration can apply to all SQL statements executed within the compute pool
- Flink Application has its own configuration, knowing that an application can be done with DataStream, or TableAPI.
Installation¶
The Components to install for each deployment approach:
In the context of a Confluent Platform deployment, the components to install are represented in the following figure from bottom to higher layer:

For an equivalent open source the components are:

Prerequisites¶
Any Kubernetes deployment should include the following pre-requisites:
- kubectl
- A Kubernetes cluster. Colima for local Kubernetes. See start colima with deployment/k8s/start_colima.sh or
make start_colimaunder deployment/k8s folder. -
Be sure to have helm cli installed: (see installation instructions)
-
Install Confluent CLI or update existing CLI with:
Colima playground¶
See the Colima installation instructions.
- Start a Kubernetes cluster, using one of the following options:
External Components¶
The certificate manager and minio operator may be deploy in one command under deployment/k8s:
Certificate manager¶
-
See certificate manager current releases, and update the CERT_MGR_VERSION=v1.18.1 in the Makefile, then run the command:
-
Which is doing:
-
Verify deployment with
sh kubeclt get pods -n cert-manager # or make verify_cert_manager
Using MinIO¶
MinIO is an object storage solution that provides an Amazon Web Services S3-compatible API and supports all core S3 features, on k8s. It may be used for Flink checkpoint and snapshot persistenace, or when deploying application jar file to Flink, as a file storage.
-
First be sure to have the MinIO CLI installed.
-
Deploy Minio operator under
minio-devnamespace, using `Make -
Access MinIO S3 API and Console
-
Log in to the Console with the credentials
minioadmin | minioadmin - Setup a minio client with credential saved to $HOME/.mc/config.json
CFK installation¶
See the Confluent Platform product installation documentation for details. We can summarize as:
- The deployment leverages Kubernetes native API to configure, deploy, and manage Kafka cluster, Connect workers, Schema Registry, Confluent Control Center, Confluent REST Proxy and application resources such as topics.
-
The following diagram illustrates those components in one namespace.
Confluent Platform Components - k8s deployment -
Under the deployment/k8s/cfk folder, run
make deploywhich will do the following operations:- Create a namespace for Confluent products deployment. By default, it deploys Confluent Platform in the namespaced deployment, and it manages Confluent Platform components in this namespace .
- Add Confluent Platform Helm repositories
- Deploy Confluent Kafka Broker using one Kraft controller, one to three brokers, the new Confluent Center Console, with REST api and Schema Registry.
-
The console may be accessed via port-forwarding:

Confluent Manager for Flink (CMF)¶
Updated 10.18.2025: For CFK version 2.0.3 and CP v8.0.2
See the Makefile under deployment/k8s/cmf which includes a set of targets to simplify the deployment. See Confluent Manager for Flink product documentation for deeper information. The following steps are a summary of what should be done.
- Install Confluent Manager for Flink operator, under
deployment/k8s/cmf
See deploy application section for SQL or Java app deployment
HA configuration¶
Within Kubernetes, we can enable Flink HA in the ConfigMap of the cluster configuration that will be shared with deployments:
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
state.backend: rockdb
state.savepoints.dir: file:///flink-data/savepoints
state.checkpoints.dir: file:///flink-data/checkpoints
high-availability.type: Kubernetes
high-availability.storageDir: file:///flink-data/ha
job.autoscaler.enabled: true
This configuration settings is supported via FKO. See product documentation, and the autoscaler section for deeper parameter explanations. The Flink autoscaler monitors the number of unprocessed records in the input (pending records), and will allocate more resources to absorb the lag. It adjusts parallelism at the flink operator level within the DAG.
JobManager metadata is persisted in the file system specified by high-availability.storageDir . This storageDir stores all metadata needed to recover a JobManager failure.
JobManager Pods, that crashed, are restarted automatically by the Kubernetes scheduler, and as Flink persists metadata and the job artifacts, it is important to mount pv to the expected paths.
podTemplate:
spec:
containers:
- name: flink-main-container
volumeMounts:
- mountPath: /flink-data
name: flink-volume
volumes:
- name: flink-volume
hostPath:
# directory location on host
path: /tmp/flink
# this field is optional
type: Directory
Recall that podTemplate is a base declaration common for job and task manager pods. Can be overridden by the jobManager and taskManager pod template sub-elements (spec.taskManager.podTemplate). The previous declaration will work for local k8s with hostPath access, for Kubernetes cluster with separate storage class then the volume declaration is:
podTemplate can include nodeAffinity to allocate taskManager to different node characteristics:
podTemplate:
spec:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: cfk-cr
operator: In
values:
- flink
tolerations:
- key: cfk-cr
operator: Equal
value: flink
effect: NoSchedule
Documentations¶
- Confluent Platform for Flink has another operator integrated with FKO. See my CP Flink summary.
- Confluent Flink operator documentation
- Getting started with Flink OSS Standalone Kubernetes Setup.
- Apache Flink Native Kubernetes deployment.
- A Confluent Platform demonstration git repo: confluentinc/confluent-demo
- Next steps is to upload jar files for the different applications to deploy, or data sets for SQL table. See application section.
TO UPDATE
Durable Storage¶
Durable storage is used to store consistent checkpoints of the Flink state. Review the state management section in the concept chapter. The checkpoints are saved to object storage compatible with S3, or HDFS protocol. The FlinkConfiguration can be set at the Application, ComputePool or Environment level.
Two important elements to configure: 1. the environment variable ENABLE_BUILT_IN_PLUGINS 1. The state.checkpoints.dir to the location of S3 bucket.
The following is a configuration using minio and the presto S3FileSystem which is a specific implementation (created by Presto) of the file system interface within Apache Flink. (See the S3FileSystemFactory class).
"flinkConfiguration": {
"pipeline.operator-chaining.enabled": "false",
"execution.checkpointing.interval": "10s",
"taskmanager.numberOfTaskSlots": "4",
"fs.s3.impl": "org.apache.flink.fs.s3presto.S3FileSystem",
"presto.s3.endpoint": "http://minio.minio-dev.svc.cluster.local:9000",
"presto.s3.path.style.access": "true",
"presto.s3.connection.ssl.enabled": "false",
"presto.s3.access-key": "admin",
"presto.s3.secret-key": "admin123",
"state.checkpoints.dir": "s3://flink/stateful-flink/checkpoints",
"state.savepoints.dir": "s3://flink/stateful-flink/savepoints",
"state.checkpoints.interval": "10000",
"state.checkpoints.timeout": "600000"
},
For Minio settings:
s3.endpoint: http://minio.minio-dev.svc.cluster.local:9000
s3.path.style.access: "true"
s3.connection.ssl.enabled: "false"
s3.access-key: minioadmin
s3.secret-key: minioadmin
state.checkpoints.dir: s3://flink/stateful-flink/checkpoints
state.savepoints.dir: s3://flink/stateful-flink/savepoints
state.checkpoints.interval: "10000"
state.checkpoints.timeout: "600000"
TO BE CONTINUED
A RWX, shared PersistentVolumeClaim (PVC) for the Flink JobManagers and TaskManagers provides persistence for stateful checkpoint and savepoint of Flink jobs.
A flow is a packaged as a jar, so developers need to define a docker image with the Flink API and any connector jars. Example of Dockerfile and FlinkApplication manifest.
Also one solution includes using MinIO to persist application jars.
Apache Flink OSS¶
-
Add the Apache Flink Helm repositories:
Deploy Apache Flink Kubernetes Operator¶
The Apache flink Kubernetes operator product documentation lists the setup steps.
- Get the list of Apache Flink releases and tags here
- To get access to k8s deployment manifests and a Makefile to simplify deployment, of Apache Flink, or Confluent Platform on k8s (local colima or minikube) see the deployment/k8s/flink-oss folder.
- Access Flink UI
- Mount a host folder as a PV to access data or SQL scripts, using hostPath.
Flink Config Update¶
-
If a write operation fails when the pod creates a folder or updates the Flink config, verify the following:
- Assess PVC and R/W access. Verify PVC configuration. Some storage classes or persistent volume types may have restrictions on directory creation
- Verify security context for the pod. Modify the pod's security context to allow necessary permissions.
- The podTemplate can be configured at the same level as the task and job managers so any mounted volumes will be available to those pods. See basic-reactive.yaml from Flink Operator examples.
Flink Session Cluster¶
For Session cluster, there is no jobSpec. See this deployment definition. Once a cluster is defined, it has a name and can be referenced to submit SessionJobs.
A SessionJob is executed as a long-running Kubernetes Deployment. We may run multiple Flink jobs on a Session cluster. Each job needs to be submitted to the cluster after the cluster has been deployed. To deploy a job, we need at least three components:
- a Deployment which runs a JobManager
- a Deployment for a pool of TaskManagers
- a Service exposing the JobManager’s REST and UI ports
For a deployment select the execution mode: application, or session. For production it is recommended to deploy in application mode for better isolation, and using a cloud native approach. We can just build a dockerfile for our application using the Flink jars.
Session Deployment¶
Flink has a set of examples like the Car top speed computation with simulated record. As this code is packaged in a jar available in maven repository, we can declare a job session.
Deploy a config map to define the log4j-console.properties and other parameters for Flink (flink-conf.yaml)
The diagram below illustrates the standard deployment of a job on k8s with session mode:
src: apache Flink site
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: car-top-speed-job
spec:
deploymentName: flink-session-cluster
job:
jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.17.2/flink-examples-streaming_2.12-1.17.2-TopSpeedWindowing.jar
parallelism: 4
upgradeMode: stateless
Before deploying this job, be sure to deploy a session cluster using the following command:
Once the job is deployed we can see the pod and then using the user interface the job continuously running:
-
Example of deploying Java based SQL Runner to interpret a Flink SQL script: package it as docker images, and deploy it with a Session Job. There is a equivalent for Python using Pyflink.
- See the ported code for Java
- And for the Python implementation
Flink State Snapshot¶
To help managing snapshots, there is another CR called FlinkStateSnapshot
Flink Application Deployment¶
There two types of Flink application: the java packaging or the SQL client with open session to the cluster.
Flink SQL processing¶
There are multiple choices to run Flink SQL: using the SQL client, or package the SQL scripts in a docker container with the java SQL runner executing the SQL statements from a file, or use the Table API. The application deployment is Java based even if SQL scripts are used for stream processing.
With Apache Flink OSS, Flink Session Cluster is the most suitable deployment mode for the SQL Client. This is a long-running Flink cluster (JobManager and TaskManagers) on which you can submit multiple jobs to. The sql client is a long-running, interactive application that submits jobs to an existing cluster.
For Confluent Manager For Flink the recommended approach is to use a Flink Application, which per design, is one Job manager with multiple Task managers or use the Flink SQL Shell.
Confluent Manager for Flink¶
As seen previously in Confluent Manager for Flink the method is to create an Environment and Compute pool to run the SQL statements in a pool. Those concepts and components are the same as the Confluent Cloud for Flink.
-
Be sure that the port-forward to the svc/cmf-service is active.
-
Define an environment:
export CONFLUENT_CMF_URL=http://localhost:8084 # be sure to not be connected to confluent cloud, if not do: confluent logout # Look at current environment confluent flink environment list # Create new env confluent flink environment create dev --kubernetes-namespace el-demo # or under deployment/k8s/cmf make create_flink_env -
Define a compute pool (verify current docker image tag) and see the compute_pool.json
-
Flink SQL uses the concept of Catalogs to connect to external storage systems. CMF features built-in KafkaCatalogs to connect to Kafka and Schema Registry. A
KafkaCatalogexposes Kafka topics as tables and derives their schema from Schema Registry. Define a Flink Catalog as json file: (see cmf/kafka_catalog.json). The catalog is configured with connection properties for the Kafka and Schema Registry clients. -
Define secret to access Kafka Cluster See this secret and the mapping
-
Use the confluent cli to start a Flink SQL shell
Apache Flink (OSS)¶
You can run the SQL Client in a couple of ways:
- As a separate Docker container: The Flink Docker images include the SQL Client. You can run a container and connect to the JobManager. You will need to mount a volume to persist SQL scripts and other data.
When running the SQL Client as a pod within the same Kubernetes cluster, you can use the internal DNS name of the JobManager service to connect. The format is typically
- Locally: Download the Flink distribution, extract it, and run the SQL Client from your local machine.
Flink Application¶
An application deployment must define the job (JobSpec) field with the jarURI, parallelism, upgradeMode one of (stateless/savepoint/last-state) and the desired state of the job (running/suspended). See this sample app or the cmf_app_deployment.yaml in the e-com-sale demonstration.
Here is an example of FlinkApplication, the CRD managed by the CMF operator:
apiVersion: "cmf.confluent.io/v1
kind: FlinkApplication"
spec:
flinkVersion: v1_19
image: confluentinc/cp-flink:1.19.1-cp2 # or a custom image based on this one.
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
# For your own deployment, use your own jar
jarURI: local:///opt/flink/usrlib/yourapp01.0.0.jar
parallelism: 2
upgradeMode: stateless
state: running
jobManager:
resource:
cpu: 1
memory: 1048m
taskManager:
resource:
cpu: 1
memory: 1048m
Fault tolerance¶
For Flink job or application that are stateful and for fault tolerance, it is important to enable checkpointing and savepointing:
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
upgradeMode: savepoint
#savepointTriggerNonce: 0
# initialSavepointPath: file:///
The other upgradeMode is ``
How to validate checkpointing?
Checkpointing let Flink to periodically save the state of a job into local storage. Look at the pod name of the task manager and stop it with kubectl delete pod/.... Flink should automatically restart the job and recover from the latest checkpoint. Use the Flink UI or CLI to see the job status.
How to validate savepointing?
Savepoints are manually triggered snapshots of the job state, which can be used to upgrade a job or to perform manual recovery. To trigger a savepoint we need to set a value into savepointTriggerNonce in the FlinkDeployment descriptor and then apply the changes. Get the location of the save point and then add to the yaml initialSavepointPath to redeploy the applicationL: it will reload its state from the savepoint. There is a custom resource definition (FlinkStateSnapshotSpec) to trigger savepoints.
flinkConfiguration is a hash map used to define the Flink configuration, such as the task slot, HA and checkpointing parameters.
flinkConfiguration:
high-availability.type: org.apache.flink.Kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: 'file:///opt/flink/volume/flink-ha'
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: '10'
restart-strategy.failure-rate.failure-rate-interval: '10 min'
restart-strategy.failure-rate.delay: '30 s'
execution.checkpointing.interval: '5000'
execution.checkpointing.unaligned: 'false'
state.backend.type: rocksdb
state.backend.incremental: 'true'
state.backend.rocksdb.use-bloom-filter: 'true'
state.checkpoints.dir: 'file:///opt/flink/volume/flink-cp'
state.checkpoints.num-retained: '3'
state.savepoints.dir: 'file:///opt/flink/volume/flink-sp'
taskmanager.numberOfTaskSlots: '10'
table.exec.source.idle-timeout: '30 s'
The application jar needs to be in a custom Flink docker image built using the Dockerfile as in e-com-sale-demo, or uploaded to a MinIO bucket.
The following Dockerfile is used for deploying a solution in application mode, which packages the Java Flink jars with the app, and any connector jars needed for the integration and starts the main() function.
FROM confluentinc/cp-flink:1.19.1-cp2
RUN mkdir -p $FLINK_HOME/usrlib
COPY /path/of/my-flink-job-*.jar $FLINK_HOME/usrlib/my-flink-job.jar
-
With Confluent Platform for Flink:
Access to user interface
To forward your jobmanager’s web ui port to local 8081.
kubectl port-forward ${flink-jobmanager-pod} 8081:8081
# Or using confluent cli CP for Flink command:
confluent flink application web-ui-forward $(APP_NAME) --environment $(ENV_NAME) --port 8081 --url http://localhost:8080
And navigate to http://localhost:8081.
Using MinIO for app deployment¶
-
Upload an application to minio bucket:
-
Start the application using confluent cli:
-
Open Flink UI:
-
Produce messages to kafka topic
-
Cleanup
Practices¶
- It is not recommended to host a Flink Cluster across multiple Kubernetes clusters. Flink node exchanges data between task managers and so better to run in same region, and within same k8s.