Skip to content

Confluent Platform for Flink

The official product documentation after 07/2025 release is here.

CP Flink is the Confluent Flink product that is extremely flexible and configurable to address a large set of user's requirements. It is a supported Flink with a subset of Flink components: SQL, Table API, DataStream API, ProcessFunction. Using Kafka, FileSystem, JDBC and CDC connectors, and using a central management feature for Kubernettes deployment and security.

The main features are:

  • Fully compatible with open-source Flink.
  • Deploy on Kubernetes using Helm. It is only supported on kube. On Kubernetes, machine or Flink process failures will be recovered by Kubernetes, guaranteeing high uptime, and low latency.
  • Define environment, which does a logical grouping of Flink applications with the goal to provide access isolation, and configuration sharing.
  • Deploy application with user interface and task manager cluster
  • Exposes custom kubernetes operator for specific CRD
  • When integrated with Kafka, Kafka CP can run on bare metal while CP Flink on kube.

The figure below presents the Confluent Flink components deployed on Kubernetes:

1
Confluent Platform for Flink
  • CFK supports the management of custom CRD, based on the Flink for Kubernetes Operator. (CFK 3.0.0 supports CP 8.0.0)
  • CMF (Confluent Manager for Apache Flink) adds security control, and a REST API server for the cli or a HTTP client
  • FKO is the open source Flink for Kubernetes Operator
  • Flink cluster are created from command and CRDs and run Flink applications within an environment

Be sure to have confluent cli.

References Compendium

  • Confluent Manager for Flink (CMF) provides:

    • Job life-cycle management for Flink jobs.
    • Integration with Confluent Platform for authentication and authorization (RBAC).
    • Well-defined REST APIs and command-line interfaces (CLIs).
    • Store metadata in its own embedded database
    • It is a kubernetes operator to manage custom resources
  • Environment: for access control isolation, and Flink configuration sharing

  • Compute pool represents resources to run Task manager and Job manager. Each Flink SQL statement is associated with exactly one Compute Pool. See example of pool definition and in cmf folder
  • SQL catalog to group database concept for Flink SQL table queries. It references a Schema Registry instance and one or more Kafka clusters.

  • See Product FAQs

Product Set Up

The Confuent Flink images and Helm chart references:

Demonstrations:

Deployment architecture

  • A Flink cluster always needs to run in one K8s cluster in one region, as the Flink nodes are typically exchanging a lot of data, requiring low latency network. See K8S deployment architecture - chapter
  • For failover between data centers, the approach if to share durable storage (HDFS, S3, Minio) accessible between data centers, get Kafka topic replicated by keeping offset numbers, and being able to restart Flink Application from checkpoints.
  • Also recall that Flink may be able to interact with multiple Kafka Clusters at the same time, which may be interesting to read from one Kafka cluster in Region A and write to another Kafka cluster in Region B.

Important Source of Information for Deployment

Authentication and Authorization

Authentication and authorization includes access to Confluent Platform components from external systems and users and for intra-components communication. The following figure illustrates what needs to be covered:

By default CP components are not configured to include encryption, authentication and authorization.

Authentication

The product documentations for CP authentication can be summarized as:

  • Kafka brokers accesses from producer/consumer applications can be done using SASL (Simple Authentication Security Layer), mutualTLS to verify clients and server identities to ensure that traffic is secure and trusted in both directions.
  • Admin REST API authentication is done via HTTP Basic Auth
  • Access to the C3 may be done via SSO
  • It supports using LDAP for authentication or OAuth/OIDC where all identities across Confluent Platform workloads and interfaces can be hosted on a single identity provider providing for a unified identity management solution. See a Keycloak docker compose demo, and my keycloak deployment on k8s.
  • See SSL/TLS on how to use openssl for cryptography encryption.
  • By default inter broker communication is PLAINTEXT, but it can be set to SSL
  • Secrets can be saved not AWS Secrets Manager or HashiCorp Vault.

Authorization

The product documentations for CP Authorization highlights the following major points:

  • Access to Confluent Platform resources as clusters, topics, consumer groups, and connectors are controlled via ACLs. ACLs are stored in the KRaft-based Kafka cluster metadata.
  • Group based authorization can be defined in LDAP
  • The control of who can access what, is via RBAC with roles such as ClusterAdmin or DeveloperRead assigned to users and service accounts.
  • ACLs may be used, instead of RBAC

Flink Job represents code. So it is critical to setup authentication and authorization. Flink supports TLS/SSL for authentication and encryption of network traffic between Flink processes, both for internal connectivity (between Flink processes) and external connectivity (From outside to Flink processes).

Recall that, keystore and a truststore must be set up such that the truststore trusts the keystore’s certificates.

CMF supports mTLS authentication and OAuth authentication. As CMF is deployed via kubernetes operator, SREs have control over who may deploy Flink applications. All access should go through CMF to comply with the authentication and authorization requirements.

  • For mTLS the approach is to define a mtls-values.yaml and use it when deploying or upgrading the cmf helm chart.

    helm upgrade --install cmf confluentinc/confluent-manager-for-apache-flink -f mtls-values.yaml
    

    See and example of mtls-values.yaml which declares the truststore file to use. The certificates should be defined in a secret.

Defining SSL certs
Use [keytool](https://docs.oracle.com/javase/8/docs/technotes/tools/unix/keytool.html) to create self-certified certificates and keys.
```sh
keytool -genkeypair -alias flink.internal -keystore internal.keystore -dname "CN=flink.internal" \
        -storepass internal_store_password -keyalg RSA -keysize 4096 -storetype PKCS12
```
Define a secret with the base64 string of the certificates:
```sh
cat your_certificate.crt | base64 -w 0
cat your_private_key.key | base64 -w 0
```

And a secret.yaml
```yaml
apiVersion: v1
kind: Secret
metadata:
    name: cmf-certs
type: kubernetes.io/tls
data:
    tls.crt: <base64_encoded_certificate>
    tls.key: <base64_encoded_private_key>
```

Roles

The Flink Kubernetes operator installs two custom roles: flink-operator and flink:

  • flink-operator is used to manage FlinkDeployment resources: meaning it creates and manages the JobManager deployment for each Flink job (and related resources).
  • flink role is used by the jobManager process of each job to create and manage the taskManager and configMap resources.

With FKO, job and task managers are dep loyed together so there is no need for the flink service account to have permissions to launch additional pods. K8s handles the TaskManager Pod lifecycle. With native k8s mode, flink code may could inherit the JobManager's Kubernetes permissions, and so being able to launch other pods to access sensitive cluster resoirces.

RBAC

  • CMF communicates with the MDS to validate whether the principal of the incoming request is allowed to perform a specific action.
  • CMF requires its own service principal to authorize user principals.

Metdata service (MDS)

MDS manages a variety of metadata about your Confluent Platform installation. It is configured on each Kafka broker. It is possible to centralize into a hub and spoke architecture of multiple kafka clusters, Connect and schema registry. MDS maintains a local cache of authorization data that is persisted to an internal Kafka topic named _confluent-metadata-auth.

SQL specific

As of November 2025 SQL support is still under-preview.

Applications

Important points:

Understanding Sizing

The observed core performance rule is that Flink can process ~10,000 records per second per CPU core. This baseline may decrease with larger messages, bigger state, key skew, or high number of distinct keys.

There are a set of characteristics to assess before doing sizing:

Statement Complexity Impact

Statement complexity reflects the usage of complex operators like:

  • Joins between multiple streams
  • Windowed aggregations
  • Complex analytics functions

More complex operations require additional CPU and memory resources.

Architecture Assumptions

  • Source & Sink latency: Assumed to be minimal
  • Key size: Assumed to be small (few bytes)
  • Minimum cluster size: 3 nodes required
  • CPU limit: Maximum 8 CPU cores per Flink node

State Management & Checkpointing

  • Working State: Kept locally in RocksDB
  • Backup: State backed up to distributed storage
  • Recovery: Checkpoint size affects recovery time
  • Frequency: Checkpoint interval determines state capture frequency
  • Aggregate state size consumes bandwidth and impacts recovery latency.

Resource Guidelines

Typical Flink node configuration includes:

  • 4 CPU cores with 16GB memory
  • Should process 20-50 MB/s of data
  • Jobs with significant state benefit from more memory

Scaling Strategy: Scale vertically before horizontally

Latency Impact on Resources

Lower latency requirements significantly increase resource needs:

  • Sub-500ms latency: +50% CPU, frequent checkpoints (10s), boosted parallelism
  • Sub-1s latency: +20% CPU, extra buffering memory, 30s checkpoints
  • Sub-5s latency: +10% CPU, moderate buffering, 60s checkpoints
  • Relaxed latency (>5s): Standard resource allocation Stricter latency requires more frequent checkpoints for faster recovery, additional memory for buffering, and extra CPU for low-latency optimizations.

Estimation Heuristics

The estimator increases task managers until there are sufficient resources for:

  • Expected throughput requirements
  • Latency-appropriate checkpoint intervals
  • Acceptable recovery times
  • Handling data skew and key distribution
Important Notes
  • These are estimates - always test with your actual workload
  • Start with conservative estimates and adjust based on testing
  • Monitor your cluster performance and tune as needed
  • Consider your specific data patterns and business requirements

See this Flink estimator project for a tool to help estimating cluster sizing.

Monitoring

Troubleshouting

Submitted query pending

When submitting SQL query from the Flink SQL, it is possible it goes in pending: the job manager pod is created and running, but the task manager is pending. Assess what is going on with kubectl describe pod <pod_id> -n flink, which gives error message like insufficient cpu. Adding compute pool cpu is worse as the compute pool spec for task manager and job manager is for the default settings to create those pods.