Skip to content

Create Table SQL (DDLs)

Updates

Created 10/24, Updated 12/20/24 Revised 12/06/24

This chapter offers a compilation of best practices for implementing Flink SQL solutions, applicable to local Flink open-source, the Confluent Platform for Flink or the Confluent Cloud for Flink.

Data Definition Language (DDL) are statements to define metadata in Flink SQL by creating, updating, or deleting tables. See the Flink SQL Examples in Confluent Cloud for Apache Flink documentation.

A table registered with the CREATE TABLE statement can be used as a table source or a table sink.

Create table statements do not changes between managed services and standalone Flink deployment, except the metadata. Each examples with specific standalone content, will be marked as Flink OSS, which also works for Confluent Platform Flink.

Sources Of Information

Create table syntax

Common Principles

  • Primary key can have one or more columns, all of them should be not null, and only being NOT ENFORCED
  • Inside Flink processing, the primary key declaration, partitions the table implicitly by the key column(s)
  • Flink uses the primary key for state management and deduplication with upsert. While the partition key is what determines which Kafka partition a message will be written to. This is a Kafka-level concept.
  • For upsert mode, the bucket key must be equal to primary key. While for append/retract mode, the bucket key can be a subset of the primary key. See more on changelog.mode

  • Understanding the table structure and mapping from a Kafka topic: the basic query helps a lot to understand table configuration and column types. This is helpful to assess time column, to confirm if the type of a column as event_time is a STRING, TIMESTAMP or TIMESTAMP_LTZ.

    show create table
    

    The TYPEOF(column_name) can also being used in a select to understand the type, and troubleshooting a type error.

  • Understand the execution plan of a query using EXPLAIN insert or select, helps to assess if the query supports the expected semantic like window aggregation, interval joins or classical joins. The change to append to retract or upsert is also very important for state assessment. Recall that properties of the destination table will influence the plan.

Table Creation How-Tos

Confluent Cloud Specifics

  • In Confluent Cloud, partition key will generate a key schema, except if using the option ('key.format' = 'raw')
  • If the destination topic doesn't define partitioning key, then CC Flink SQL will write the records in whatever partitioning that was used at the end of the query. if last operation in the query is GROUP BY foo or JOIN ON A.foo = B.foo, then output records would be partitioned on foo values, and they wouldn't be re-partitioned before writing them into Kafka. The foo partitioning is preserved.
Primary key and partition by considerations
  • If you have parallel queries without any data shuffling, like INSERT INTO Table_A SELECT * FROM Table_B, then any skew from Table_B would be repeated in Table_A. Otherwise, if partitioning key is defined (like DISTRIBUTED BY HASH(metric)), any writes into that topic would be shuffled by that new key.
  • In case of key skew, add more fields in the distribution to partition not just in one key. That would allow Flink to read data in more parallel fashion, improving the problem with readings from Kafka's bottleneck of 18MBs/connection (partition)
  • An interesting metric for Confluent Cloud Flink, 5 partitions feeds 1 CFU. CFU has constraint on memory and cpu.
  • The performance bottleneck will be actually records serialization and deserialisation, avro is slower than protobuf.
-- simplest table
CREATE TABLE humans (race STRING, origin STRING);
-- with primary key 
CREATE TABLE manufactures (m_id INT PRIMARY KEY NOT ENFORCED, site_name STRING);
-- with hash distribution to 2 partitions that match the primary key
CREATE TABLE humans (hid INT PRIMARY KEY NOT ENFORCED, race STRING, gender INT) DISTRIBUTED BY (hid) INTO 2 BUCKETS;
Create a table with csv file as persistence - Flink OSS

We need to use the file system connector.

create table user (
    'user_id' VARCHAR(250),
    'name' VARCHAR(50)
) partitioned by ('used-id')
WITH (
    'format' = 'json', -- other format are: csv, parquet
    'connector' = 'filesystem',
    'path' = '/tmp/users'
);
How to consume from a Kafka topic to a SQL table? -- Flink OSS

On Confluent Cloud for flink, there are already tables created for each topic. For local Flink we need to create table with column definitions that maps to attributes of the record. The From right operand proposes the list of topic/table for the catalog and database selected. For Flink OSS or Confluent Platform for Flink the WITH statement helps to specify the source topic.

select .... from TableName 
WITH (
    'connector' = 'kafka',
    'topic' = 'flight_schedules',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'fs_grp',
    'scan.startup.mode' = 'earliest-offset',
)

For Avro and schema registry with open source Flink. See the tool extract_sql_from_avro.py to query Confluent Schema Registry and build the matching SQL to create a table connected to the topic using this schema.

CREATE TABLE shoe_customers (
    id STRING,
    first_name STRING,
    last_name STRING,
    email STRING,
    phone STRING,
    street_address STRING,
    state STRING,
    zip_code STRING,
    country STRING,
    country_code STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'shoe_customers',
    'properties.bootstrap.servers' = 'broker:29092',
    'scan.startup.mode' = 'earliest-offset',
    'key.format' = 'raw',
    'key.fields' = 'id',
    'value.format' = 'avro-confluent',
    'properties.group.id' = 'flink-sql-consumer',
    'value.fields-include' = 'ALL',
    'value.avro-confluent.url' = 'http://schema-registry:8081'
);
How to load data from a csv file using filesystem connector using SQL - Flink OSS

Enter the following statement in a SQL client session:

SET execution.runtime-mode=BATCH;
Create a table from the content of the file, mounted inside the container or accessible on local file system:

CREATE TABLE employee_info (
    emp_id INT,
    name VARCHAR,
    dept_id INT
) WITH ( 
    'connector' = 'filesystem',
    'path' = '/home/flink-sql-demos/00-basic-sql/data/employees.csv',
    'format' = 'csv'
);

Show tables and list some elements within the table.

SHOW TABLES;

SELECT * from employee_info WHERE dept_id = 101;

Show how the table is created:

show create table orders;

See complete example in the readme

How to add a metadata field in a table?

Use ALTER TABLE to modify existing table. Below is an example of adding dt attribute.

alter table flight_schedules add(dt string metadata virtual);
Create a table as another table by inserting all records (CTAS create table as select)

CREATE TABLE AS SELECT is used to create table and insert values in the same statement. It derives the physical column data types and names (from aliased columns), the changelog.mode (from involved tables, operations, and upsert keys), and the primary key.

By using a primary key:

create table shoe_customer_keyed(
    primary key(id) not enforced
) distributed by(id) into 1 buckets
as select id, first_name, last_name, email from shoe_customers;
How to support nested rows, DDL and inserts?

Avro, Protobuf or Json schemas are very often hierarchical per design. ROW and ARRAY are the objects with nested elements. StatesTable is a column in table t. It has rows of states column which is itself a array of name,city,lg,lat.

-- DDL
create table t (
    StatesTable ROW< states ARRAY<ROW<name STRING, city STRING, lg DOUBLE, lat DOUBLE>>>,
    creationDate STRING
)

insert into t(group_id, StatesTable,creationDate)
values( 
    'grp_1',
    (
    ARRAY[ row('California', 'San Francisco', -122.4194, 37.7749),
        row('New York', 'New York City', -74.0060, 40.7128),
        row('Texas', 'Austin', -97.7431, 30.2672)
    ]
    ),            -- StatesTable
    '2020-10-10'  -- creationDate
)

How to support nested rows, and inserts from previous table definition?

From previous table definition, to extract lat and long:

select
group_id,
-- example of defining attribute from the idx element of an array (starts from 1) from a nested schema:
  CAST(StatesTable.states[3] AS DECIMAL(10, 4)) AS longitude,
  CAST(StatesTable.states[4] AS DECIMAL(10, 4)) AS latitude
from t
-- results
 'grp_1', -122.4194, 37.7749
 'grp_1', -74.0060, 40.7128
 'grp_1', -97.7431, 30.2672

See also the CROSS JOIN UNNEST keywords.

See also a running demo in flink-sql/03-nested-row

How to generate data using Flink Faker? (Flink OSS)

Create at table with records generated with Flink faker connector using the DataFaker expressions.. Valid only on OSS Flink or Confluent platform for Flink.

CREATE TABLE `bounded_pageviews` (
  `url` STRING,
  `user_id` STRING,
  `browser` STRING,
  `ts` TIMESTAMP(3)
)
WITH (
  'connector' = 'faker',
  'number-of-rows' = '500',
  'rows-per-second' = '100',
  'fields.url.expression' = '/#{GreekPhilosopher.name}.html',
  'fields.user_id.expression' = '#{numerify ''user_##''}',
  'fields.browser.expression' = '#{Options.option ''chrome'', ''firefox'', ''safari'')}',
  'fields.ts.expression' =  '#{date.past ''5'',''1'',''SECONDS''}'
);
This will only work in customized Flink client with the jar from Flink faker.

Generate data with DataGen for Flink OSS

Use DataGen to do in-memory data generation and the new feature in product how to guide documentation.

How to generate test data to Confluent Cloud Flink?

Use Kafka Connector with DataGen. Those connector exists with a lot of different pre-defined model. Also it is possible to define custom Avro schema and then use predicates to generate data. There is a Produce sample data quick start tutorial from the Confluent Cloud home page. See also this readme.

The Shift_left tool has also a test harness to generate synthetic data taking into account the SQL content.

How to transfer the source timestamp to another table

As $rowtime is the timestamp of the record in Kafka, it may be interesting to keep the source timestamp for the downstream tables.

create table `some_clicks` (
      `order_id` STRING NOT NULL,
      ...
      `event_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'),
      PRIMARY KEY(order_id) NOT ENFORCED
distributed by hash(order_id) into '4' buckets

Then the statement to insert record to the new table:

insert into `some_clicks`
select
    order_id, 
    user_id,
    $rowtime as event_time
from  `src_table`;
Views over Tables

Recall views, are read-only, and have no insert operation and are used to encapsulate complex queries and reference them like regular tables. It acts as a virtual table that refers to the result of the specified statement expression. They are read-only.

CREATE VIEW IF NOT EXISTS table_view AS SELECT ...

Views and tables share the same namespace in Flink. See array of row view example.

Dealing with late event

Any streams mapped to a table have records arriving more-or-less in order, according to the $rowtime, and the watermarks let the Flink SQL runtime know how much buffering of the incoming stream is needed to iron out any out-of-order-ness before emitting the sorted output stream.

We need to specify the watermark strategy: for example within 30 second of the event time:

create table new_table (
    ....
    `event_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
 watermark for `event_time` as `event_time` - INTERVAL '30' SECOND
);

Event_time could be a ts from the source table (like CDC table). On CCF, the watermark is on the $rowtime by default, but it can be changed.

How to change system watermark?

Modify the WATERMARK metadata using alter table.

ALTER TABLE table_name MODIFY WATERMARK FOR $rowtime AS $rowtime - INTERVAL '1' SECOND;
-- in case we need to reverse back
ALTER TABLE table_name DROP WATERMARK;

This can be used when doing enrichment join on reference table. We do not want to wait for watermark arriving on the reference table, so set the watermark of this reference table to the max INT using ALTER TABLE table_name SET$rowtimeTO_TIMESTAMP(,0)

Create a table with topic as one day persistence

See the WITH options.

create table `small-orders` (
    `order_id` STRING NOT NULL,
    `customer_id` INT NOT NULL,
    `product_id` STRING NOT NULL,
    `price` DOUBLE NOT NULL
) distributed by hash(order_id) into 1 buckets
with (
    'kafka.retention.time' = '1 d'
);

insert into `small-orders` select * from `examples`.`marketplace`.`orders` where price < 20;
distributed by hash(order_id) and into 1 buckets specify that the table is backed by a Kafka topic with 1 partition, and the order_id field will be used as the Kafka partition key.

Table with Kafka Topic metadata

The headers and timestamp are the only options not read-only, all are VIRTUAL. Virtual columns are by default excluded from a SELECT * similar to the system column like $rowtime.

ALTER TABLE <table_name> ADD (
    `headers` MAP<STRING,STRING> METADATA,
    `leader-epoch`INT METADATA VIRTUAL,
    `offset` BIGINT METADATA VIRTUAL,
    `partition` BIGINT METADATA VIRTUAL,
    `timestamp` TIMESTAMP_LTZ(3) METADATA,
    `timestamp-type` STRING METADATA VIRTUAL,
    `topic` STRING METADATA VIRTUAL
);

The headers can be updated within SQL statements, some values may be static or coming from the value of one of the selected field. The timestamp can also being updated and for example in most time window queries will be set to the window_time.

CREATE TABLE clicks_per_seconds (
    events_per_second BIGINT,
    window_time TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
    )

INSERT INTO clicks_per_seconds
SELECT
    COUNT(*) AS events_per_second,
    window_time
FROM TABLE(TUMBLE(TABLE clicks, DESCRIPTOR(`$rowtime`), INTERVAL '1' SECOND))
GROUP BY window_time, window_end, window_start

Applying to a Medallion Architecture

The current approach can be used for Flink pipeline processing:

Table type Goals Parameters
Raw table non debezium format Get the raw data from CDC or outbox pattern cleanup.policy = 'delete', changelog.mode = 'append', value.format = 'avro-registry'...
Raw table debezium format Same goals cleanup.policy = 'delete', changelog.mode = 'retract' (retract is the default for Debezium connector), value.format = 'avro-debezium-registry'
Sources Deduplicate and keep last record per key cleanup.policy = 'delete', changelog.mode = 'upsert'
Intermediates Enrichment, transformation cleanup.policy = 'delete', changelog.mode = 'upsert'
Sink tables: Facts, Dimensions, Views Create star schema elements cleanup.policy = 'compact', changelog.mode = 'retract' or 'upsert'

Deeper dive

See the product documentation with some specificities, like source and sink tables are directly mapped to Kafka Topics. The $rowtime TIMESTAMP_LTZ(3) NOT NULL is provided as a system column.

  • For each topic there is an inferred table created. The catalog is the Confluent environment and the Kafka cluster is the database. We can use the ALTER TABLE statement to evolve schemas for those inferred tables.

  • A table by default is mapped to a topic with 6 partitions, and the changelog being append. Primary key leads to an implicit DISTRIBUTED BY(k), and value and key schemas are created in Schema Registry. It is possible to create table with primary key and append mode, while by default it is a upsert mode.

    CREATE TABLE telemetries (
        device_id INT PRIMARY KEY NOT ENFORCED, 
        geolocation STRING, metric BIGINT,
        ts TIMESTAMP_LTZ(3) NOT NULL METADATA FROM 'timestamp')
    DISTRIBUTED INTO 4 BUCKETS
    WITH ('changelog.mode' = 'append');
    

The statement above also creates a metadata column for writing a Kafka message timestamp. This timestamp will not be defined in the schema registry. Compared to $rowtime which is declared as a METADATA VIRTUAL column, ts is selected in a SELECT * and is writable.

  • When the primary key is specified, then it will not be part of the value schema, except if we specify (using value.fields-include' = 'all') that the value contains the full table schema. The payload of k is stored twice in Kafka message:

    CREATE TABLE telemetries (k INT, v STRING)
    DISTRIBUTED BY (k)
    WITH ('value.fields-include' = 'all');
    
  • If the key is a string, it may make sense to do not have a schema for the key in this case declare (the key columns are determined by the DISTRIBUTED BY clause): This does not work if the key name is not key.

    CREATE TABLE telemetries (device_id STRING, metric BIGINT)
    DISTRIBUTED BY (key)
    WITH ('key.format' = 'raw');
    
  • To keep the record in the topic forever add this kafka.retention.time' = '0' as options in the WITH. The supported units are:

    "d", "day", "h", "hour", "m", "min", "minute", "ms", "milli", "millisecond",
    "micro", "microsecond", "ns", "nano", "nanosecond"
    

CREATE TABLE in Confluent Cloud for Flink

The table creation creates topic and -key, -value schemas in the Schema Registry in the same environment as the compute pool in which the query is run. The connector is automatically set to confluent. The non null and nullable columns are translated as the following avro fields:

  "fields": [
        {
        "name": "customer_id",
        "type": "string"
        },
        {
        "default": null,
        "name": "first_name",
        "type": [
            "null",
            "string"
        ]
        }
  ]

DATE as:

{
  "default": null,
  "name": "registration_date",
  "type": [
    "null",
    {
      "logicalType": "local-timestamp-millis",
      "type": "long"
    }
  ]
}

Analyzing Table

Understand the type of attribute or get the table structure with metadata

show create table 'tablename';
-- for a specific attribute
select typeof(column_name) from table_name limit 1;

Flink SQL planner performs type checking. Assessing the type of inferred table is helpful, specially around timestamp. See Data type mapping documentation.