Skip to content

SQL Translation Methodology

Version

Created Dec - 2024 Updated Sept 24 - 2025

The current implementation supported by this tool enables migration of:

  • dbt/Spark SQL to Flink SQL statements
  • ksqlDB to Flink SQL

The approach uses LLM agents. This document covers the methodology, setup, usage of the shift_left tool for automated SQL migrations, and how to extend it to get better results.

The core idea is to leverage LLMs to understand the source SQL semantics and translate them to Flink SQL.

This is not production ready, the LLM can generate hallucinations, and one to one mapping between source like ksqlDB or Spark to Flink is sometime not the best approach. We expect that this agentic solution could be a strong foundation for better results, and can be enhanced over time.

The implementation use the OpenAI SDK, so different LLM models can be used, as soon as they support OpenAI. The qwen3:30b model can be used locally using Ollama on Mac M3 Pro with 36GB RAM. Other models running remotely and supporting OpenAI APIs may be used.

Migration Context

As described in the introduction, at a high level, data engineers need to take a source project, define a new Flink project, perform migrations, run Flink statement deployments, manage pipelines, and write and execute tests:

Shift Left project system context

All those tasks are described under the recipe chapter and supported by this CLI.

For automatic migration, LLMs alone might not be sufficient to address complex translations in an automated process. Agents help by specializing in specific steps with feedback loops and retries.

Complexity of language translation

For any programming language translation, we need to start with a corpus of source code. This can be done programmatically from the source language, then for each generated code, implement the semantically equivalent Flink SQL counterparts.

The goal of corpus creation is to identify common ksqlDB or Spark SQL constructs (joins, aggregations, window functions, UDFs, etc.), then manually translate a smaller, diverse set of queries to establish translation rules. Using these rules, we can generate permutations and variations of queries. It is crucial to test the generated Flink SQL against a test dataset to ensure semantic equivalence.

Build query pairs to represent the source-to-target set as a corpus. For each query pair, include the relevant table schemas. This is vital for the LLM to understand data types, column names, and relationships. It is not recommended to have different prompts for different parts of a SQL statement, as the LLM's strength comes from the entire context. However, there will still be problems for SQL scripts that have many lines of code, as a 200+ line script will reach thousands of tokens.

To improve result accuracy, it is possible to use Supervised Fine-tuning techniques:

  • Fine-tune the chosen LLM on the generated code. The goal is for the LLM to learn the translation patterns and nuances between ksqlDB or Spark SQL and Flink SQL.
  • Prompt Engineering: Experiment with different prompt structures during fine-tuning and inference. A good prompt will guide the LLM effectively. The current implementation leverages this type of prompt: e.g., "Translate the following Spark SQL query to Flink SQL, considering the provided schema. Ensure semantic equivalence and valid Flink syntax."
  • For evaluation assessment, it is recommended to add a step to the agentic workflow to validate the syntax of the generated Flink SQL. Better validation involves assessing semantic equivalence by determining if the Flink SQL query produces the same results as the ksqlDB or Spark SQL query on a given dataset.

For validation, it may be relevant to have a knowledge base of common translation errors. When the Validation Agent reports an error, the Refinement Agent attempts to correct the Flink SQL. It might feed the error message back to the LLM with instructions to fix it. The knowledge base should be populated with human-curated rules for common translation pitfalls.

It may be important to explain why a translation was done a certain way to better tune prompts. For complex queries or failures, human review ("human in the loop") and correction mechanisms will be essential, with the system learning from these corrections.

Limitations

LLMs cannot magically translate custom UDFs. This will likely require manual intervention or a separate mapping mechanism. The system should identify and flag untranslatable UDFs.

Flink excels at stateful stream processing. Spark SQL's batch orientation means that translating stateful Spark operations (if they exist) to their Flink streaming counterparts would be highly complex and would likely require significant human oversight or custom rules.

While Spark SQL is primarily designed for batch processing, it can be migrated to Flink real-time processing with some refactoring and tuning. Spark also supports streaming via micro-batching. Most basic SQL operators (SELECT, FROM, WHERE, JOIN) are similar between Spark and Flink.

  • Example command to migrate one Spark SQL script
    # set SRC_FOLDER to one of the spark source folder like tests/data/spark-project
    # set STAGING to the folder target to the migrated content
    shift_left table migrate customer_journey $SRC_FOLDER/sources/src_customer_journey.sql $STAGING --source-type spark
    

We will review setup in this section.

  • When using the start schema of Kimball methodology, it is possible to process the fact tables up to their sources; the tool will migrate all tables recursively. This could take time if the dependency graph is large. We recommend at the beginning of the migration project to go one table at a time.
shift_left table migrate $SRC_FOLDER/facts/fct_examination_data.sql $STAGING --recursive --source-type spark
Example of Output
process SQL file ../src-dbt-project/models/facts/fct_examination_data.sql
Create folder fct_exam_data in ../flink-project/staging/facts/p1

--- Start translator AI Agent ---
--- Done translator Agent: 
INSERT INTO fct_examination_data
...
--- Start clean_sql AI Agent ---
--- Done Clean SQL Agent: 
--- Start ddl_generation AI Agent ---
--- Done DDL generator Agent:
CREATE TABLE IF NOT EXISTS fct_examination_data (
    `exam_id` STRING,
    `perf_id` STRING,
...

For a given table, the tool creates one folder with the table name, a Makefile to help manage the Flink statements with Confluent CLI, a sql-scripts folder for the Flink DDL and DML statements, and a tests folder to add test_definitions.yaml (using shift_left table init-unit-tests) for unit testing.

Example of created folders:

facts
    └── fct_examination_data
        ├── Makefile
        ├── sql-scripts
           ├── ddl.fct_examination_data.sql
           └── dml.fct_examination_data.sql
        └── tests

As part of the process, developers need to validate the generated DDL and update the PRIMARY key to reflect the expected key. This information is hidden in many files in dbt, and key extraction is not yet automated by the migration tools.

Normally, the DML is not executable until all dependent input tables are created.

ksqlDB has some SQL constructs, but this is not an ANSI SQL engine. It is highly integrated with Kafka and uses keywords to define such integration. The migration and prompts need to support migration examples outside of the classical SELECT and CREATE TABLE statements.

  • Example command to migrate a ksqlDB script
    shift_left table migrate w2_processing $SRC_FOLDER/w2processing.ksql $STAGING --source-type ksql 
    

Current Agentic Approach

The current agentic workflow includes:

  1. Translate the given SQL content
  2. Validate the syntax and semantics
  3. Generate DDL derived from DML
  4. Get human validation to continue or not the automation
  5. Deploy and test with validation agents [optional]

The system uses validation agents that execute syntactic validation and automatic deployment, with feedback loops injecting error messages back to translator agents when validation fails.

Architecture Overview

The multi-agent system with human-in-the-loop validation may use Confluent Cloud's Flink REST API to deploy a generated Flink statement. The following diagram represents the different agents working together:

AI Agent Flow

Agent Roles

As agent is a combination of LLM reference, prompts, and tool definitions, there will be different implementation of those agents if we do ksqlDB to Flink SQL or from Spark to Flink.

Supporting class of the workflow is ksqlDB code agent.

Agent Scope Prompt File
Translator Raw KSQL to Flink SQL translation core/utils/prompts/ksql_fsql/translator.txt
Table Detection Identify multiple CREATE statements core/utils/prompts/ksql_fsql/table_detection.txt
Validation Validate Flink SQL constructs core/utils/prompts/ksql_fsql/mandatory_validation.txt
Refinement Fix deployment errors core/utils/prompts/ksql_fsql/refinement.txt

Supporting class of the workflow is Spark sql code agent Same approach for spark SQL with the prompts being in the core/utils/prompts/spark_fsql folder.

Agent Scope Prompt File
Translator Spark SQL to Flink SQL translation core/utils/prompts/spark_fsql/translator.txt
Table Detection Identify multiple CREATE statements core/utils/prompts/spark_fsql/table_detection.txt
Validation Validate Flink SQL constructs core/utils/prompts/ksql_fsql/mandatory_validation.txt
Refinement Fix deployment errors core/utils/prompts/ksql_fsql/refinement.txt

Class diagram

A test bed

The current project includes in the tests/data folder some examples of Spark and ksql scripts, but the following git project, , is a show case of Spark and Flink projects.

Prerequisites and Setup

Environment Setup

  1. Clone the shift_left_utils git repository

    git clone https://github.com/jbcodeforce/shift_left_utils
    

  2. Install uv for Python package management

    # Install uv if not already installed
    curl -LsSf https://astral.sh/uv/install.sh | sh
    # To update existing version
    uv self update
    

  3. Python Environment: Ensure Python 3.12+ and create a virtual environment

    uv venv --python 3.12.0
    source .venv/bin/activate  # On Windows WSL: .venv\Scripts\activate
    

  4. Install Dependencies: Use uv package manager (recommended)

    cd src/shift_left
    # Install project dependencies
    uv sync
    

  5. Install shift_left Tool:

    # under src/shift_left
    ls -al dist/
    # select the last version, for example:
    uv tool install dist/shift_left-0.1.28-py3-none-any.whl
    # Verify installation
    shift_left --help
    shift_left version
    

    You can also use pip if you have an existing Python environment:

    pip install src/shift_left/dist/shift_left-0.1.28-py3-none-any.whl
    

  6. Make sure you are logged into Confluent Cloud and have defined at least one compute pool.

Configuration File Setup

  • Create a configuration file (e.g., config-ccloud.yaml):
    cp src/shift_left/src/shift_left/core/templates/config_tmpl.yaml ./config-ccloud.yaml
    
  • Update the content of the config-ccloud.yaml to reflect your Confluent Cloud environment. (For the commands used for migration, you do not need Kafka settings.)

    # Confluent Cloud Configuration
    confluent_cloud:
      api_key: "YOUR_API_KEY"
      api_secret: "YOUR_API_SECRET"
      organization_id: "YOUR_CLUSTER_ID"
      environment_id: "YOUR_ENVIRONMENT_ID"
      url_scope: public
      region: "YOUR_REGION"
      provider: aws
    flink:
      flink_url: flink....confluent.cloud
      compute_pool_id: "YOUR_COMPUTE_POOL_ID"
      api_key: "YOUR_API_KEY"
      api_secret: "YOUR_API_SECRET"
      catalog_name: "envionment_name"
      database_name: "kafka_cluster_name"
    

  • Set the following environment variables before using the tool. This can be done by:

    cp src/shift_left/src/shift_left/core/templates/set_env_temp ./set_env
    

    Modify the CONFIG_FILE, FLINK_PROJECT, SRC_FOLDER, SL_LLM_* variables

  • Source it:

    source set_env
    

  • Validate config-ccloud.yaml

shift_left project validate-config

Migration Workflow

1. Project Initialization

Create a new target project to keep your Flink statements and pipelines (e.g., my-flink-app):

# Initialize project structure
shift_left project init <your-project> </path/to/your/folder>
# example 
shift_left project init my-flink-app $HOME/Code

You should get:

my-flink-app
├── README.md
├── docs
├── pipelines
   ├── common.mk
   ├── dimensions
   ├── facts
   ├── intermediates
   ├── sources
   └── views
├── sources
└── staging

2. Specific Setup

  • Copy your KSQL files to the sources directory:
# Copy KSQL files
cp *.ksql ${SRC_FOLDER}/
  • Optional: To run locally on a smaller model, download Ollama, then install the qwen2.5-coder model:

    # Select the size of the model according to your memory
    ollama pull qwen2.5-coder:32b
    ollama list
    

  • Create an OpenRouter.ai API key: https://openrouter.ai/, to get access to larger models, like qwen/qwen3-coder:free which is free to use.

  • Set environment variables:

3. Migration Execution

Basic Table Migration

# Migrate a simple table
shift_left table migrate basic_user_table $SRC_FOLDER/user-table.ksql $STAGING --source-type ksql 

The command generates:

# ├── staging/basic_user_table/sql-scripts
# │   ├── ddl.basic_user_table.sql     # Flink DDL
# │   ├── dml.basic_user_table.sql     # Flink DML (if any)

4. Validation and Deployment

# Deploy to Confluent Cloud for Flink
cd ${STAGING}/basic_user_table

# Deploy DDL statements
make create_flink_ddl

# Deploy DML statements  
make create_flink_dml

5. Prepare for pipeline management

Flink statements have dependencies, so it is important to use shift_left to manage those dependencies:

  • Run after new tables are created

    shift_left table build-inventory
    

  • Build all the metadata

    shift_left pipeline build-all-metadata 
    

  • Verify an execution plan

    shift_left pipeline build-execution-plan --table-name <>
    

6. Next

  • Organize the Flink statements into pipeline folders, possibly using sources, intermediates, dimensions, and facts classification. Think about data products. A candidate hierarchy may look like this:

    my-flink-app
    
    ├── pipelines
       ├── common.mk
       ├── dimensions
          ├── data_product_a
       ├── facts
          ├── data_product_a
       ├── intermediates
          ├── data_product_a
       ├── sources
          ├── data_product_a
              ├── src_stream
                 ├── Makefile
                 ├── pipeline_definition.json
                 ├── sql-scripts
                    ├── ddl.src_stream.sql
                    └── dml.src_stream.sql
                 ├── tests
       ├── views
            └── data_product_a
    

  • Add unit tests per table (at least for the complex DML ones) (see test harness)

  • Add source data into the first tables of the pipeline
  • Verify the created records within the sink tables.