SQL Translation Methodology¶
Version
Created Dec - 2024 Updated Sept 24 - 2025
The current implementation supported by this tool enables migration of:
- dbt/Spark SQL to Flink SQL statements
- ksqlDB to Flink SQL
The approach uses LLM agents. This document covers the methodology, setup, usage of the shift_left
tool for automated SQL migrations, and how to extend it to get better results.
The core idea is to leverage LLMs to understand the source SQL semantics and translate them to Flink SQL.
This is not production ready, the LLM can generate hallucinations, and one to one mapping between source like ksqlDB or Spark to Flink is sometime not the best approach. We expect that this agentic solution could be a strong foundation for better results, and can be enhanced over time.
The implementation use the OpenAI SDK, so different LLM models can be used, as soon as they support OpenAI. The qwen3:30b
model can be used locally using Ollama on Mac M3 Pro with 36GB RAM. Other models running remotely and supporting OpenAI APIs may be used.
Migration Context¶
As described in the introduction, at a high level, data engineers need to take a source project, define a new Flink project, perform migrations, run Flink statement deployments, manage pipelines, and write and execute tests:

All those tasks are described under the recipe chapter and supported by this CLI.
For automatic migration, LLMs alone might not be sufficient to address complex translations in an automated process. Agents help by specializing in specific steps with feedback loops and retries.
Complexity of language translation
For any programming language translation, we need to start with a corpus of source code. This can be done programmatically from the source language, then for each generated code, implement the semantically equivalent Flink SQL counterparts.
The goal of corpus creation is to identify common ksqlDB or Spark SQL constructs (joins, aggregations, window functions, UDFs, etc.), then manually translate a smaller, diverse set of queries to establish translation rules. Using these rules, we can generate permutations and variations of queries. It is crucial to test the generated Flink SQL against a test dataset to ensure semantic equivalence.
Build query pairs to represent the source-to-target set as a corpus. For each query pair, include the relevant table schemas. This is vital for the LLM to understand data types, column names, and relationships. It is not recommended to have different prompts for different parts of a SQL statement, as the LLM's strength comes from the entire context. However, there will still be problems for SQL scripts that have many lines of code, as a 200+ line script will reach thousands of tokens.
To improve result accuracy, it is possible to use Supervised Fine-tuning techniques:
- Fine-tune the chosen LLM on the generated code. The goal is for the LLM to learn the translation patterns and nuances between ksqlDB or Spark SQL and Flink SQL.
- Prompt Engineering: Experiment with different prompt structures during fine-tuning and inference. A good prompt will guide the LLM effectively. The current implementation leverages this type of prompt: e.g., "Translate the following Spark SQL query to Flink SQL, considering the provided schema. Ensure semantic equivalence and valid Flink syntax."
- For evaluation assessment, it is recommended to add a step to the agentic workflow to validate the syntax of the generated Flink SQL. Better validation involves assessing semantic equivalence by determining if the Flink SQL query produces the same results as the ksqlDB or Spark SQL query on a given dataset.
For validation, it may be relevant to have a knowledge base of common translation errors. When the Validation Agent reports an error, the Refinement Agent attempts to correct the Flink SQL. It might feed the error message back to the LLM with instructions to fix it. The knowledge base should be populated with human-curated rules for common translation pitfalls.
It may be important to explain why a translation was done a certain way to better tune prompts. For complex queries or failures, human review ("human in the loop") and correction mechanisms will be essential, with the system learning from these corrections.
Limitations¶
LLMs cannot magically translate custom UDFs. This will likely require manual intervention or a separate mapping mechanism. The system should identify and flag untranslatable UDFs.
Flink excels at stateful stream processing. Spark SQL's batch orientation means that translating stateful Spark operations (if they exist) to their Flink streaming counterparts would be highly complex and would likely require significant human oversight or custom rules.
Spark SQL to Flink SQL¶
While Spark SQL is primarily designed for batch processing, it can be migrated to Flink real-time processing with some refactoring and tuning. Spark also supports streaming via micro-batching. Most basic SQL operators (SELECT, FROM, WHERE, JOIN) are similar between Spark and Flink.
- Example command to migrate one Spark SQL script
We will review setup in this section.
- When using the start schema of Kimball methodology, it is possible to process the fact tables up to their sources; the tool will migrate all tables recursively. This could take time if the dependency graph is large. We recommend at the beginning of the migration project to go one table at a time.
shift_left table migrate $SRC_FOLDER/facts/fct_examination_data.sql $STAGING --recursive --source-type spark
Example of Output
process SQL file ../src-dbt-project/models/facts/fct_examination_data.sql
Create folder fct_exam_data in ../flink-project/staging/facts/p1
--- Start translator AI Agent ---
--- Done translator Agent:
INSERT INTO fct_examination_data
...
--- Start clean_sql AI Agent ---
--- Done Clean SQL Agent:
--- Start ddl_generation AI Agent ---
--- Done DDL generator Agent:
CREATE TABLE IF NOT EXISTS fct_examination_data (
`exam_id` STRING,
`perf_id` STRING,
...
For a given table, the tool creates one folder with the table name, a Makefile to help manage the Flink statements with Confluent CLI, a sql-scripts
folder for the Flink DDL and DML statements, and a tests
folder to add test_definitions.yaml
(using shift_left table init-unit-tests
) for unit testing.
Example of created folders:
facts
└── fct_examination_data
├── Makefile
├── sql-scripts
│ ├── ddl.fct_examination_data.sql
│ └── dml.fct_examination_data.sql
└── tests
As part of the process, developers need to validate the generated DDL and update the PRIMARY key to reflect the expected key. This information is hidden in many files in dbt, and key extraction is not yet automated by the migration tools.
Normally, the DML is not executable until all dependent input tables are created.
ksqlDB to Flink SQL¶
ksqlDB has some SQL constructs, but this is not an ANSI SQL engine. It is highly integrated with Kafka and uses keywords to define such integration. The migration and prompts need to support migration examples outside of the classical SELECT and CREATE TABLE statements.
- Example command to migrate a ksqlDB script
Current Agentic Approach¶
The current agentic workflow includes:
- Translate the given SQL content
- Validate the syntax and semantics
- Generate DDL derived from DML
- Get human validation to continue or not the automation
- Deploy and test with validation agents [optional]
The system uses validation agents that execute syntactic validation and automatic deployment, with feedback loops injecting error messages back to translator agents when validation fails.
Architecture Overview¶
The multi-agent system with human-in-the-loop validation may use Confluent Cloud's Flink REST API to deploy a generated Flink statement. The following diagram represents the different agents working together:

Agent Roles¶
As agent is a combination of LLM reference, prompts, and tool definitions, there will be different implementation of those agents if we do ksqlDB to Flink SQL or from Spark to Flink.
KsqlDB to Flink agents¶
Supporting class of the workflow is ksqlDB code agent.
Agent | Scope | Prompt File |
---|---|---|
Translator | Raw KSQL to Flink SQL translation | core/utils/prompts/ksql_fsql/translator.txt |
Table Detection | Identify multiple CREATE statements | core/utils/prompts/ksql_fsql/table_detection.txt |
Validation | Validate Flink SQL constructs | core/utils/prompts/ksql_fsql/mandatory_validation.txt |
Refinement | Fix deployment errors | core/utils/prompts/ksql_fsql/refinement.txt |
Spark to Flink agents¶
Supporting class of the workflow is Spark sql code agent Same approach for spark SQL with the prompts being in the core/utils/prompts/spark_fsql
folder.
Agent | Scope | Prompt File |
---|---|---|
Translator | Spark SQL to Flink SQL translation | core/utils/prompts/spark_fsql/translator.txt |
Table Detection | Identify multiple CREATE statements | core/utils/prompts/spark_fsql/table_detection.txt |
Validation | Validate Flink SQL constructs | core/utils/prompts/ksql_fsql/mandatory_validation.txt |
Refinement | Fix deployment errors | core/utils/prompts/ksql_fsql/refinement.txt |
Class diagram¶
A test bed¶
The current project includes in the tests/data folder some examples of Spark and ksql scripts, but the following git project, , is a show case of Spark and Flink projects.
Prerequisites and Setup¶
Environment Setup¶
-
Clone the shift_left_utils git repository
-
Install uv for Python package management
-
Python Environment: Ensure Python 3.12+ and create a virtual environment
-
Install Dependencies: Use
uv
package manager (recommended) -
Install shift_left Tool:
# under src/shift_left ls -al dist/ # select the last version, for example: uv tool install dist/shift_left-0.1.28-py3-none-any.whl # Verify installation shift_left --help shift_left version
You can also use
pip
if you have an existing Python environment: -
Make sure you are logged into Confluent Cloud and have defined at least one compute pool.
Configuration File Setup¶
- Create a configuration file (e.g.,
config-ccloud.yaml
): -
Update the content of the config-ccloud.yaml to reflect your Confluent Cloud environment. (For the commands used for migration, you do not need Kafka settings.)
# Confluent Cloud Configuration confluent_cloud: api_key: "YOUR_API_KEY" api_secret: "YOUR_API_SECRET" organization_id: "YOUR_CLUSTER_ID" environment_id: "YOUR_ENVIRONMENT_ID" url_scope: public region: "YOUR_REGION" provider: aws flink: flink_url: flink....confluent.cloud compute_pool_id: "YOUR_COMPUTE_POOL_ID" api_key: "YOUR_API_KEY" api_secret: "YOUR_API_SECRET" catalog_name: "envionment_name" database_name: "kafka_cluster_name"
-
Set the following environment variables before using the tool. This can be done by:
Modify the CONFIG_FILE, FLINK_PROJECT, SRC_FOLDER, SL_LLM_* variables
-
Source it:
-
Validate config-ccloud.yaml
Migration Workflow¶
1. Project Initialization¶
Create a new target project to keep your Flink statements and pipelines (e.g., my-flink-app):
# Initialize project structure
shift_left project init <your-project> </path/to/your/folder>
# example
shift_left project init my-flink-app $HOME/Code
You should get:
my-flink-app
├── README.md
├── docs
├── pipelines
│ ├── common.mk
│ ├── dimensions
│ ├── facts
│ ├── intermediates
│ ├── sources
│ └── views
├── sources
└── staging
2. Specific Setup¶
- Copy your KSQL files to the sources directory:
-
Optional: To run locally on a smaller model, download Ollama, then install the qwen2.5-coder model:
-
Create an OpenRouter.ai API key: https://openrouter.ai/, to get access to larger models, like
qwen/qwen3-coder:free
which is free to use. -
Set environment variables:
3. Migration Execution¶
Basic Table Migration¶
# Migrate a simple table
shift_left table migrate basic_user_table $SRC_FOLDER/user-table.ksql $STAGING --source-type ksql
The command generates:
# ├── staging/basic_user_table/sql-scripts
# │ ├── ddl.basic_user_table.sql # Flink DDL
# │ ├── dml.basic_user_table.sql # Flink DML (if any)
4. Validation and Deployment¶
# Deploy to Confluent Cloud for Flink
cd ${STAGING}/basic_user_table
# Deploy DDL statements
make create_flink_ddl
# Deploy DML statements
make create_flink_dml
5. Prepare for pipeline management¶
Flink statements have dependencies, so it is important to use shift_left to manage those dependencies:
-
Run after new tables are created
-
Build all the metadata
-
Verify an execution plan
6. Next¶
-
Organize the Flink statements into pipeline folders, possibly using sources, intermediates, dimensions, and facts classification. Think about data products. A candidate hierarchy may look like this:
my-flink-app ├── pipelines │ ├── common.mk │ ├── dimensions │ │ ├── data_product_a │ ├── facts │ │ ├── data_product_a │ ├── intermediates │ │ ├── data_product_a │ ├── sources │ │ ├── data_product_a │ │ ├── src_stream │ │ │ ├── Makefile │ │ │ ├── pipeline_definition.json │ │ │ ├── sql-scripts │ │ │ │ ├── ddl.src_stream.sql │ │ │ │ └── dml.src_stream.sql │ │ │ ├── tests │ ├── views └── data_product_a
-
Add unit tests per table (at least for the complex DML ones) (see test harness)
- Add source data into the first tables of the pipeline
- Verify the created records within the sink tables.