Skip to content

Table API

Update
  • Created 10/2024
  • Reorganize to improve documentation 10/2025

Concepts

The TableAPI serves as the lower-level API for executing Flink SQL, allowing for stream processing implementations in Java and Python. The Table API encapsulates a stream and physical table, enabling developers to implement streaming processing by programming against these tables.

See the main concepts and APIs. The structure of a program looks as:

  1. Create a TableEnvironment for batch or streaming execution. A table environment is the base class, entry point, and central context for creating Flink Table and SQL API programs. TableEnvironment uses an EnvironmentSettings that define the execution mode.

    Apache Flink - Table Environment
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.TableEnvironment;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.TableEnvironment;
    
    EnvironmentSettings settings = EnvironmentSettings
        .newInstance()
        .inStreamingMode()
        .withBuiltInCatalogName("default_catalog")
        .withBuiltInDatabaseName("default_database")
        //.inBatchMode()
        .build();
    // Initialize the session context to get started
    TableEnvironment tableEnv = TableEnvironment.create(settings);
    // specific setting
    tableEnv.get_config().set("parallelism.default", "4");
    

    In the context of Confluent Cloud, the Table API program acts as a client-side library for interacting with the Flink engine hosted in the cloud. Use a specific package to interact with Confluent Cloud via REST api. The integration may be defined in properties, program arguments, or environment variables(recommended)

    Connect to Remote Environment
    import io.confluent.flink.plugin.ConfluentSettings;
    import io.confluent.flink.plugin.ConfluentTools;
    ...
    // With Properties file
    EnvironmentSettings settings = ConfluentSettings.fromResource("/cloud.properties");
    // Env variables:
    EnvironmentSettings settings = ConfluentSettings.fromGlobalVariables();
    // With program arguments
    EnvironmentSettings settings = ConfluentSettings.fromArgs(args);
    // Initialize the session context to get started
    TableEnvironment env = TableEnvironment.create(settings);
    

  2. Create one or more source table(s)

    Build CTE
    env.useCatalog(TARGET_CATALOG);
    env.useDatabase(TARGET_DATABASE);
    Table rawTable =  env.from("orders").select(withAllColumns());
    

  3. Create one or more sink Tables(s) or use the print sink:

    INSERT INTO fct_transactions
    SELECT
        txn_id,
        account_id,
        amount,
        currency,
        `timestamp`,
        status
    FROM src_transactions
    WHERE txn_id IS NOT NULL AND txn_id <> '';
    
    
    
  4. Create processing logic using SQL string or Table API functions

  5. Package:

    • For java, using mvn package to build a jar
    • For Python
  6. Deploy: Once packaged with maven as a uber-jar the application may be executed locally to send the dataflow to the Confluent Cloud for Flink JobManager or can be deployed as a FlinkApplication within a k8s cluster.

    ./bin/flink run ./flink-studies/code/table-api/simplest-table-api-for-flink-oss/target/simplest-table-api-for-flink-oss-1.0.0.jar
    

    To be done

    Execute the program:

    java -jar target/flink-table-api-java-examples-1.0.jar
    

Summary of important concepts:

  • The main function is a Flink client, that will compile the code into a dataflow graph and submit to the JobManager.
  • A TableEnvironment maintains a map of catalogs of tables
  • Tables can be either virtual (VIEWS) or regular TABLES which describe external data (csv, sql, kafka topic).
  • Tables may be temporary (tied to the lifecycle of a single Flink session), or permanent (visible across multiple Flink sessions and clusters).
  • Temportary table may shadow a permanent table.
  • Tables are always registered with a 3-part identifier consisting of catalog, database, and table name.
  • TableSink is a generic interface to write results to. A batch Table can only be written to a BatchTableSink, while a streaming Table requires either an AppendStreamTableSink, a RetractStreamTableSink, or an UpsertStreamTableSink.
  • A pipeline can be explained with TablePipeline.explain() and executed invoking TablePipeline.execute().
  • Recall that High-Availability in Application Mode is only supported for single-execute() applications.

It is important to note that Table API and SQL queries can be easily integrated with and embedded into DataStream programs.

Confluent Specifics

In Confluent Platform Manager for Flink deployment, only Flink Application mode is supported. A Flink Application is any user's program that spawns one or multiple Flink jobs from its main() method. The execution of these jobs can happen in a local JVM (LocalEnvironment) or on a remote setup of clusters with multiple machines (kubernetes).

In the context of Confluent Cloud, the integrate code enables the submission of Statements and retrieval of StatementResults. The provided Confluent plugin integrates specific components for configuring the TableEnvironment, eliminating the need for a local Flink cluster. By including the confluent-flink-table-api-java-plugin dependency, Flink's internal components—such as CatalogStore, Catalog, Planner, Executor, and configuration, are managed by the plugin and fully integrated with Confluent Cloud. This integration is via the REST API, so Confluent Table API plugin is an higher emcapsulation of the CC REST API.

Getting Started

Java

Any main function needs to connect to the Flink environment.

The development approach includes at least the following steps:

  1. Create a maven project with a command like:

    mvn archetype:generate -DgroupId=j9r.flink -DartifactId=my-app -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.5 -DinteractiveMode=false
    

  2. Add the flink table api, and Kafka client dependencies:

    <groupId>org.apache.flink</groupId>
    
    <artifactId>flink-java</artifactId>
    <artifactId>flink-clients</artifactId>
    <artifactId>flink-table-api-java</artifactId>
    <artifactId>flink-table-common</artifactId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <artifactId>flink-table-runtime</artifactId>
    <artifactId>flink-connector-kafka</artifactId>
    <artifactId>flink-connector-base</artifactId>
    <!-- Depending of the serialization needs -->
    <artifactId>flink-json</artifactId>
    <artifactId>flink-avro</artifactId>
    <!-- when using schema registry -->
    <artifactId>flink-avro-confluent-registry</artifactId>
    
    (see an example of pom.xml). Use provided dependencies to get the Flink jars from the deployed product.

    Verify this pom.xml for current supported version.

     <dependency>
        <groupId>io.confluent.flink</groupId>
        <artifactId>confluent-flink-table-api-java-plugin</artifactId>
        <version>${confluent-plugin.version}</version>
    </dependency>
    

  3. Implement and unit test the flow.

  4. Execute the java program

Get the catalog and databases, and use the environment to get the list of tables. In Confluent Cloud, there is a predefined catalog with some table samples: examples.marketplace.

# using a sql string
env.executeSql("SHOW TABLES IN `examples`.`marketplace`").print();
# or using the api
env.useCatalog("examples");
env.useDatabase("marketplace");
Arrays.stream(env.listTables()).forEach(System.out::println);
# work on one table
env.from("`customers`").printSchema();

Python

The Flink Python API communicates with a Java process. You must have at least Java 11 installed, and be sure to have $JAVA_HOME set.

Python Table API Quick Start on Confluent Cloud for Apache Flink - documentation.

How to

All the Java based Table API examples, for Confluent Cloud are in the folder, in different java classes.

Code structure

Clearly separate the creation of sources, sinks, workflow in different methods. References those methods in the main().

Joining two tables

Table joinedTable = customers
                        .join(transformedOrders, $("customer_id").isEqual($("o_customer_id")))
                        .dropColumns($("o_customer_id"));

Deduplication

FAQs

Create a data generator

There is the FlinkFaker tool that seems to be very efficient to send different types of data. It is using Datafaker Java library, which can be extended to add our own data provider. FlinkFaker jar is added to the custom flink image in the Dockerfile.

Create a table with Kafka topic as persistence in Confluent Cloud?
Create a table
import io.confluent.flink.plugin.ConfluentSettings;
import io.confluent.flink.plugin.ConfluentTableDescriptor;
//...
env.createTable(
        SINK_TABLE,
        ConfluentTableDescriptor.forManaged()
            .schema(
                    Schema.newBuilder()
                            .column("user_id", DataTypes.STRING())
                            .column("name", DataTypes.STRING())
                            .column("email", DataTypes.STRING())
                            .build())
            .distributedBy(4, "user_id")
            .option("kafka.retention.time", "0")
            .option("key.format", "json-registry")
            .option("value.format", "json-registry")
            .build());
Access to the schema of an existing topic / table?
Access to Table Schema Definition
import org.apache.flink.table.api.DataTypes;
//...
DataType productsRow = env.from("examples.marketplace.products")
                .getResolvedSchema()
                .toPhysicalRowDataType();
List<String> columnNames = DataType.getFieldNames(productsRow);
List<DataType> columnTypes = DataType.getFieldDataTypes(productsRow);
// use in the schema function to create a new topic ...
Schema.newBuilder()
        .fromFields(columnNames, columnTypes)
        .column("additionalColumn", DataTypes.STRING())
        .build()
How to split records to two topic, using StatementSet?
StatementSet statementSet = env.createStatementSet()
                    .add(
                        env.from("`examples`.`marketplace`.`orders`")
                           .select($("product_id"), $("price"))
                           .insertInto("PricePerProduct"))
                    .add(
                        env.from("`examples`.`marketplace`.`orders`")
                           .select($("customer_id"), $("price"))
                           .insertInto("PricePerCustomer"));

Create some test data

Use one of the TableEnvironment fromValues() methods,

env.fromValues("Paul", "Jerome", "Peter", "Robert")
                .as("name")
                .filter($("name").like("%e%"))
                .execute()
                .print();
import org.apache.flink.table.api.DataTypes;
Table customers = env.fromValues(
                        DataTypes.ROW(
                                DataTypes.FIELD("customer_id", DataTypes.INT()),
                                DataTypes.FIELD("name", DataTypes.STRING()),
                                DataTypes.FIELD("email", DataTypes.STRING())),
                        row(3160, "Bob", "bob@corp.com"),
                        row(3107, "Alice", "alice.smith@example.org"),
                        row(3248, "Robert", "robert@someinc.com"));

Confluent tools for printing and stop statement

See this git repository

References

APIs

The important classes are:

Confluent Repositories