Skip to content

Confluent Cloud for Apache Flink

Chapter updates
  • Created 10/2024
  • Review 10/31/24

Confluent Cloud for Apache Flink is a cloud-native, managed service, for Flink in parallel of the Kafka managed service.

Confluent Cloud Flink is built on the same open-source version as Apache Flink with additional features:

  • Auto-inference of the Confluent Cloud environment, including Kafka cluster , topics and schemas, to Flink SQL constructs of catalog, databases and tables.
  • Autoscaling capabilities.
  • Default system column for timestamps using the $rowtime column.
  • Default watermark strategy based on $rowtime.
  • Support for Avro, JSON Schema, and Protobuf.
  • CREATE statements provision resources as Kafka topics and schemas (temporary tables not supported).
  • Read from and write to Kafka in two modes: append-stream or update-stream.

Key Concepts

  • This is a regional service, in one of the three major cloud providers.
  • Compute pools groups resources for running Flink clusters, which may scale down to zero. Used to run SQL statements. Maximum pool size is defined at creation.
  • Capacity is measured in Confluent Flink Unit, CFU. Each statement is 1 CFU-minute.
  • A statement may be structural (DDL), runs in background to write data to table (DML) , or foreground to present data to client app.
  • Supports multiple Kafka clusters within the same Confluent Cloud organization in a single region.
  • Kafka Topics and schemas are always in synch with Flink.
  • Statements, in different compute pools, are isolated from each other.
  • Any table created in CC Flink appears as a topic in CC Kafka.
  • A catalog is a collection of databases. A database is a collection of tables.
  • The differences with the OSS version, is that the DDL statements of catalog, database, table are mapped to physical kafka objects. Table is a schema and a topic, catalog is an environment, and database is a cluster.
  • CC offers the Autopilot, to automatically adjusts resources for SQL statements based on demand.
  • Supports role-based access control for both user and service accounts.
  • Stream lineage provides insights at the topic level about data origins..
  • For Watermark configuration, Confluent Cloud for Apache Flink manages it automatically, by using the $rowtime column, which is mapped to the Kafka record timestamp, and by observing the behavior of the streams to dynamically adapt the configuration.
  • When messages processing starts to be behind, Autopilot adjusts resource allocation.
  • Service accounts are used for production deployment to enforce security boundaries. Permissions are done with ACL and role binding. They can own any type of API keys that can be used for CLI or API access.
Statement life cycle

Use a service account for background statement. Submit a SQL statement using the client shell:

confluent flink shell --compute-pool ${COMPUTE_POOL_ID} --environment ${ENV_ID} --service-account
How to change the CFU limit?

CFU can be changed via the console or the cli, up to the limit of 50. Going above need to be via ticket to Confluent support.

Getting Started

Install the Confluent CLI and get an Confluent Cloud account.

See those tutorials for getting started.

There is also a new confluent cli plugin: confluent-flink-quickstart to create an environment, a compute pool, enable schema registry, create a kafka cluster and starts a Flink shell.

confluent flink quickstart --name my-flink-sql --max-cfu 10 --region us-west-2 --cloud aws

For study and demonstration purpose, there is a read-only catalog named examples with database called marketplace which is a data generator in SQL tables in memory.

Set the namespace for future query work using:

use catalog examples;
use marketplace;
show tables;

To use your dedicated environment use the following syntax:

use catalog my-flink-sql_environment;
use  my-flink-sql_kafka-cluster;

To shutdown everything:

confluent environment list
confluent environment delete <ENVIRONMENT_ID>

Some common commands to manage Confluent Cloud environment

# Create an environment
confluent environment create my_environment --governance-package essentials
# Set the active environment.
confluent environment use <environment id>
# Create a cluster
confluent kafka cluster create my-cluster --cloud gcp --region us-central1 --type basic
# Create Kafka API key
confluent kafka cluster list
export CLID=<kafka cluster id>
confluent api-key create --resource $CLID
# Create a compute pool (adjust cloud and region settings as required).
confluent flink compute-pool create my-compute-pool --cloud gcp --region us-central1 --max-cfu 10
# Create a Flink api key which is scoped in an environment + region pair
confluent api-key create --resource flink --cloud gcp --region us-central1
# Define an api key for schema registry
confluent schema-registry cluster describe
confluent api-key create --resource <schema registry cluster>
# Get the user id
confluent iam user list

Using the confluent cli, we can access to the client via:

#  
confluent environment list

# Get the compute pool id
confluent flink compute-pool list
export ENV_ID=$(confluent environment list -o json | jq -r '.[] | select(.name == "aws-west") | .id')
export COMPUTE_POOL_ID=$(confluent flink compute-pool list -o json | jq -r '.[0].id')
confluent flink shell --compute-pool $COMPUTE_POOL_ID --environment $ENV_ID

Nothing special, except that once the job is started we cannot modify it, we need to stop before any edition.

Use Java Table API

The approach is to create a maven Java project with a main class to declare the data flow. Read this chapter.

Deeper dive