Skip to content

Pipeline Management

Version

Created Mars 21- 2025 - Update 3/28/25

The goals of this chapter is to present the requirements, design, and validation of the pipeline management tools.

Context

Flink statements are inherently interdependent, consuming and joining tables produced by other statements, forming a complex pipeline. Careful deployment is crucial. The following diagram illustrates this interconnectedness for a simple example and outlines a pipeline management strategy.

a pipeline of Flink statements

This graph is generated by running a report like: shift_left pipeline report fct_order --graph

Test data

The folder src/shift_left/tests/data includes a Flink-project with the DDLs and DMLs to support the graph above, and it is used in all test cases.

Managing the pipeline

The recipe chapter has how-to descriptions for the specific commands to use during devleopment and pipeline management. The following high level concepts are the foundations for this management:

  1. The git folder is the source of truth for pipeline definitions.
  2. The table inventory, which lists all the Flink tables of a project, is used as the foundation to find basic metadata about Flink statemetns. It must be created with a simple command like:

    shift_left table build-inventory $PIPELINES
    

    The inventory.json is persisted in the $PIPELINES folder and committed in git. It will be extensively used by any pipeline commands. It could be updated at each PR by a CI tool.

  3. The table pipeline_definition json file, includes a single level of information about the pipeline. Those files are built from the sink tables going up to the sources. During the Flink development phase, developers may use this tool to build the metadata:

    shift_left pipeline build_metadata $PIPELINES/facts/p1/fct_order/sql_scripts/dml.fct_order.sql $PIPELINES
    

    The created file may look like:

    {
        "table_name": "fct_order",
        "type": "fact",
        "dml_ref": "pipelines/facts/p1/fct_order/sql-scripts/dml.fct_order.sql",
        "ddl_ref": "pipelines/facts/p1/fct_order/sql-scripts/ddl.fct_order.sql",
        "path": "pipelines/facts/p1/fct_order",
        "state_form": "Stateful",
        "parents": [
            {
                "table_name": "int_table_2",
                "type": "intermediate",
                "dml_ref": "pipelines/intermediates/p1/int_table_2/sql-scripts/dml.int_table_2.sql",
                "ddl_ref": "pipelines/intermediates/p1/int_table_2/sql-scripts/ddl.int_table_2.sql",
                "path": "pipelines/intermediates/p1/int_table_2",
                "state_form": "Stateful",
                "parents": [],
                "children": [
                    {
                    "table_name": "fct_order",
                    "type": "fact",
                    "dml_ref": "pipelines/facts/p1/fct_order/sql-scripts/dml.fct_order.sql",
                    "ddl_ref": "pipelines/facts/p1/fct_order/sql-scripts/ddl.fct_order.sql",
                    "path": "pipelines/facts/p1/fct_order",
                    "state_form": "Stateful",
                    "parents": [],
                    "children": []
                    }
                ]
            },
            {
                "table_name": "int_table_1",
                "type": "intermediate",
                "dml_ref": "pipelines/intermediates/p1/int_table_1/sql-scripts/dml.int_table_1.sql",
                "ddl_ref": "pipelines/intermediates/p1/int_table_1/sql-scripts/ddl.int_table_1.sql",
                "path": "pipelines/intermediates/p1/int_table_1",
                "state_form": "Stateful",
                "parents": [],
                "children": [
                    {
                    "table_name": "fct_order",
                    "type": "fact",
                    "dml_ref": "pipelines/facts/p1/fct_order/sql-scripts/dml.fct_order.sql",
                    "ddl_ref": "pipelines/facts/p1/fct_order/sql-scripts/ddl.fct_order.sql",
                    "path": "pipelines/facts/p1/fct_order",
                    "state_form": "Stateful",
                    "parents": [],
                    "children": []
                    }
                ]
            }
        ],
        "children": []
    }
    

    Developers or SREs may use an other command to go over all facts, dimensions or views folders, to create all the pipeline_definitions.json:

    shift_left pipeline build-all-metadata $PIPELINES
    

    Note that going while walking up a second pipeline from a new sink, may modify the pipeline_definitions.json of an existing parent table, to update the list of children with the new sink. Same for new intermediate table. The parents and children lists are in fact Sets so there is no duplicate entry if a table is used by multiple pipelines.

  4. A hierarchy view of a pipeline can be used for reporting, or by the developer to understand the complex tree:

    shift_left pipeline report fct_order --json
    

    The --json or --yaml or --graph can be used.

  5. Hierarchy view is used to deploy a selected table and its children.

Different constraints for pipeline deployment

Deploying a fact table

During development, Flink SQL developers use the makefile: see recipe to deploy statement. While preparing for staging or integration tests, it may be relevant to deploy a full pipeline from a sink table. For example SREs want to deploy the sink fct_order table. To make the DML running successfuly, as it joins two tables, both tables need to be created. So the tool needs to walk up the hierarchy to deploy parents, up to the source. The white colored topic and Flink statements are currently running, tables and topics have messages. Before deploying the fct_order dml, the tool needs to assess what are the current parents table running. If there are missing tables, the tool needs to deploy those, taking into consideration parents of parents. For example, for the int_table_1 which is not created, the tool needs first to run the DDL src_table_1 and any DML for src_table_1. (in the test the dml of the sources are just inserting records, but in real project, those DMLs may consume from an existing Kafka topic created via CDC), thne run the int_table_1 DDL and DML, to finally deploy the fct_order DDL and DML.

Sink table deployment - with parent deployment

The red color highlights what is the goal of the deployment. The white represents what is stable, while the orange elements are impacted by the deployment.

Step to demonstrate a sink deployment
  • remove any older logs with rm ~/.shift_left/logs/*.log*
  • be sure config.yaml has the good parameters in particular the flink and confluent cloud access keys,secrets, a default compute_pool_id and the logging level.
  • defines the PIPELINES and CONFIG_FILE environement variables
  • Ensure an inventory is up to date, if not run shift_left table build-inventory $PIPELINES
  • If for any reason the pipeline definitions for the given pipeline needs to be recreated: shift_left pipeline build-metadata fct_order $PIPELINES
  • verify a report works on the fact table: shift_left pipeline report fct_order $PIPELINES.
  • deploy the fact table: shift_left deploy fct_order
  • Verify in the Confluent Cloud console the Flink statements running and the topics created.

Deploying an intermediate table

Intermediate table deployment, will follow the same principle as above if parent in the upward hierarchy are not running, but most important, it may impact children. The behavior of the deployment will be different if the DML are stateful, for the current DML to deploy but also for the children. The red color means those elements will be recreated, and orange means they may be impacted for a re-deployment. For stateful with earliest-offset consumption, will mean the topic needs to be recreated and the downstream children recreated. For stateless, stopping, getting the offset and restarting from the saved offset will work.

Intermediate table deployment

Deploying source table

For source processing, it may impact more children elements. Most of those processing are doing deduplication or transforming to upsert table with different primary keys, which means becoming stateful.

Source table deployment

Tool Requirements

The following list presents the requirements to implement for the shift_left deploy command:

  • The expected command to deploy should be as simple as:
shift_left pipeline deploy [OPTIONS] TABLE_NAME INVENTORY_PATH

   --compute-pool-id     TEXT  Flink compute pool ID. If not provided, it will create a pool. [default: None]   
   --dml-only            By default the deployment will do DDL and DML, with this flag it will deploy only DML [default: no-dml-only]                
   --force               The children deletion will be done only if they are stateful. This Flag force to drop table and recreate all (ddl, dml) [default: no-force]
  • Deploy dml - ddl: Given the table name, executes the dml and ddl to deploy a pipeline. If the compute pool id is present it will use it. If not, it will get the existing pool_id from the table already deployed, if none is defined it will create a new pool and assign the pool_id. A deployment may impact children statement depending of the semantic of the current DDL and the children's one.

  • Support deploying only DML, or both DDL and DML (default)

  • Deploying a DDL, means dropping existing table if exists.
  • Deploying a non existing sink means deploying all its parents if not already deployed, up to the sources. This will be the way to deploy a pipeline. In this case deploy first the sources, ddl and dml, except if already running as it means the current table was created by another pipeline. This is recursive.
  • Deploying an existing sink, means drop the table if the force flag is true, and deploy the DML. If forced flag is false, only deploy dml. When DML is stateful deploy DDL and DML (= forced)
  • For a given table with children, deploy the current table, and for each children redeploy the DML, if the DML is stateful. When stateless, manage the offset and modify the DML to read from the retrieved offset.
  • Support deleting a full pipeline: delete tables not used by other pipeline: the number of children is 1 or all the children are not running.

Questions

The following may be considered:

  • does it make sense to have DDL only deployment from a source to sink pipeline?

Developer's notes

The modules to support the management of pipeline is pipeline_mgr.py and deployment_mgr.py.

  • Testing a Flink deployment see [test - ]