Table API¶
Update
- Created 10/2024
- Reorganize to improve documentation 10/2025
Concepts¶
The TableAPI serves as the lower-level API for executing Flink SQL, allowing for stream processing implementations in Java and Python. The Table API encapsulates a stream and physical table, enabling developers to implement streaming processing by programming against these tables.
See the main concepts and APIs. The structure of a program looks as:
-
Create a TableEnvironment for batch or streaming execution. A table environment is the base class, entry point, and central context for creating Flink Table and SQL API programs. TableEnvironment uses an
EnvironmentSettingsthat define the execution mode.Apache Flink - Table Environmentimport org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; EnvironmentSettings settings = EnvironmentSettings .newInstance() .inStreamingMode() .withBuiltInCatalogName("default_catalog") .withBuiltInDatabaseName("default_database") //.inBatchMode() .build(); // Initialize the session context to get started TableEnvironment tableEnv = TableEnvironment.create(settings); // specific setting tableEnv.get_config().set("parallelism.default", "4");In the context of Confluent Cloud, the Table API program acts as a client-side library for interacting with the Flink engine hosted in the cloud. Use a specific package to interact with Confluent Cloud via REST api. The integration may be defined in properties, program arguments, or environment variables(recommended)
Connect to Remote Environmentimport io.confluent.flink.plugin.ConfluentSettings; import io.confluent.flink.plugin.ConfluentTools; ... // With Properties file EnvironmentSettings settings = ConfluentSettings.fromResource("/cloud.properties"); // Env variables: EnvironmentSettings settings = ConfluentSettings.fromGlobalVariables(); // With program arguments EnvironmentSettings settings = ConfluentSettings.fromArgs(args); // Initialize the session context to get started TableEnvironment env = TableEnvironment.create(settings); -
Create one or more source table(s)
-
Create one or more sink Tables(s) or use the print sink:
-
Create processing logic using SQL string or Table API functions
-
Package:
- For java, using
mvn packageto build a jar - For Python
- For java, using
-
Deploy: Once packaged with maven as a uber-jar the application may be executed locally to send the dataflow to the Confluent Cloud for Flink JobManager or can be deployed as a
FlinkApplicationwithin a k8s cluster.
Summary of important concepts:¶
- The main function is a Flink client, that will compile the code into a dataflow graph and submit to the JobManager.
- A TableEnvironment maintains a map of catalogs of tables. It is the base class, central context for creating Table and SQL API programs.
- Table is the core class for TableAPI. It describes a pipeline of data transformations.
- Tables can be either virtual (VIEWS) or regular TABLES which describe external data (csv, sql, kafka topic).
- Tables may be temporary (tied to the lifecycle of a single Flink session), or permanent (visible across multiple Flink sessions and clusters).
- Temportary table may shadow a permanent table.
- Tables are always registered with a 3-part identifier consisting of catalog, database, and table name. Every Table object has a schema that is available through
getResolvedSchema()function - TableSink is a generic interface to write results to. A batch Table can only be written to a
BatchTableSink, while a streaming Table requires either anAppendStreamTableSink, aRetractStreamTableSink, or anUpsertStreamTableSink. - A pipeline can be explained with
TablePipeline.explain()and executed invokingTablePipeline.execute(). - Recall that High-Availability in Application Mode is only supported for single-execute() applications.
It is important to note that Table API and SQL queries can be easily integrated with and embedded into DataStream programs.
Confluent Specifics¶
In Confluent Platform Manager for Flink deployment, only Flink Application mode is supported. A Flink Application is any user's program that spawns one or multiple Flink jobs from its main() method. The execution of these jobs can happen in a local JVM (LocalEnvironment) or on a remote setup of clusters with multiple machines (kubernetes).
In the context of Confluent Cloud, the integrate code enables the submission of Statements and retrieval of StatementResults. The provided Confluent plugin integrates specific components for configuring the TableEnvironment, eliminating the need for a local Flink cluster. By including the confluent-flink-table-api-java-plugin dependency, Flink's internal components—such as CatalogStore, Catalog, Planner, Executor, and configuration, are managed by the plugin and fully integrated with Confluent Cloud. This integration is via the REST API, so Confluent Table API plugin is an higher emcapsulation of the CC REST API.
Getting Started¶
Java¶
Any main function needs to connect to the Flink environment.
The development approach includes at least the following steps:
-
Create a maven project with a command like:
-
Add the flink table api, and Kafka client dependencies:
(see an example of pom.xml). Use<groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <artifactId>flink-clients</artifactId> <artifactId>flink-table-api-java</artifactId> <artifactId>flink-table-common</artifactId> <artifactId>flink-table-api-java-bridge</artifactId> <artifactId>flink-table-runtime</artifactId> <artifactId>flink-connector-kafka</artifactId> <artifactId>flink-connector-base</artifactId> <!-- Depending of the serialization needs --> <artifactId>flink-json</artifactId> <artifactId>flink-avro</artifactId> <!-- when using schema registry --> <artifactId>flink-avro-confluent-registry</artifactId>provideddependencies to get the Flink jars from the deployed product.Verify this pom.xml for current supported version.
-
Implement and unit test the flow.
- Execute the java program
Get the catalog and databases, and use the environment to get the list of tables. In Confluent Cloud, there is a predefined catalog with some table samples: examples.marketplace.
# using a sql string
env.executeSql("SHOW TABLES IN `examples`.`marketplace`").print();
# or using the api
env.useCatalog("examples");
env.useDatabase("marketplace");
Arrays.stream(env.listTables()).forEach(System.out::println);
# work on one table
env.from("`customers`").printSchema();
Python¶
The Flink Python API communicates with a Java process. You must have at least Java 11 installed, and be sure to have $JAVA_HOME set.
Python Table API Quick Start on Confluent Cloud for Apache Flink - documentation.
How to¶
All the Java based Table API examples, for Confluent Cloud are in the code/table-api/ccf-table-api folder, each example in different java classes.
Code structure¶
Clearly separate the creation of sources, sinks,and the pipeline logic in different methods. References those methods in the main().
Execute Query¶
We may use two functions:
sqlQuery: The function is from the TableEnv: it evaluates the SQL query on registered tables and returns a Table object describing the pipeline for further transformations.executeSql" Executes the given single statement and returns the execution result. The statement can be DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE. For DML and DQL, this method returnsTableResultonce the job has been submitted.
Table rawToSrcAccounts = env.sqlQuery(SELECT_RAW_TO_SRC_ACCOUNTS);
// another
env.executeSql(DDL_SRC_ACCOUNTS);
-
Finally the TablePipeline is used to define a pipeline from one or more source tables to one sink table. It is equivalent to a SQL statement.
Table table = tableEnv.sqlQuery("SELECT * FROM src_table"); TablePipeline tablePipeline = table.insertInto("fact_table"); TableResult tableResult = tablePipeline.execute(); tableResult.await();TableResult is not mandatory. * A program may have multiple pipelines, and in this case we use StatementSet. StatementSet accepts pipelines defined by DML statements or Table objects.
Joining two tables¶
Table joinedTable = customers
.join(transformedOrders, $("customer_id").isEqual($("o_customer_id")))
.dropColumns($("o_customer_id"));
Deduplication¶
FAQs¶
Create a data generator
There is the FlinkFaker tool that seems to be very efficient to send different types of data. It is using Datafaker Java library, which can be extended to add our own data provider. FlinkFaker jar is added to the custom flink image in the Dockerfile.
How to name a statement for Confluent Cloud
Create a table with Kafka topic as persistence in Confluent Cloud?
import io.confluent.flink.plugin.ConfluentSettings;
import io.confluent.flink.plugin.ConfluentTableDescriptor;
//...
env.createTable(
SINK_TABLE,
ConfluentTableDescriptor.forManaged()
.schema(
Schema.newBuilder()
.column("user_id", DataTypes.STRING())
.column("name", DataTypes.STRING())
.column("email", DataTypes.STRING())
.build())
.distributedBy(4, "user_id")
.option("kafka.retention.time", "0")
.option("key.format", "json-registry")
.option("value.format", "json-registry")
.build());
Access to the schema of an existing topic / table?
import org.apache.flink.table.api.DataTypes;
//...
DataType productsRow = env.from("examples.marketplace.products")
.getResolvedSchema()
.toPhysicalRowDataType();
List<String> columnNames = DataType.getFieldNames(productsRow);
List<DataType> columnTypes = DataType.getFieldDataTypes(productsRow);
// use in the schema function to create a new topic ...
Schema.newBuilder()
.fromFields(columnNames, columnTypes)
.column("additionalColumn", DataTypes.STRING())
.build()
How to split records to two topic, using StatementSet?
Create some test data¶
Use one of the TableEnvironment fromValues() methods,
import org.apache.flink.table.api.DataTypes;
Table customers = env.fromValues(
DataTypes.ROW(
DataTypes.FIELD("customer_id", DataTypes.INT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("email", DataTypes.STRING())),
row(3160, "Bob", "bob@corp.com"),
row(3107, "Alice", "alice.smith@example.org"),
row(3248, "Robert", "robert@someinc.com"));
Confluent tools for printing and stop statement¶
References¶
APIs¶
The important classes are:
- TableEnvironment
- Table
- Row
- Expressions contains static methods for referencing table columns, creating literals, and building more complex Expression chains. See below.
Confluent Repositories¶
- Confluent Table API Tutorial
- The Confluent Flink cookbook for more Table API and DataStream examples.
- See this git repo: Learn-apache-flink-table-api-for-java-exercises.
- See the Table API in Java documentation.
- Connecting the Apache Flink Table API to Confluent Cloud with matching github which part of this code was ported into flink-sql-demos/02-table-api-java