Environment Setup¶
Versions
Created January 2025 - Updated 03/21/25: setup guide separation between user of the CLI and developer of this project. Update 08/09: config file explanation and how to get the parameters
This chapter addresses how to set up the shift_left CLI tool to manage Flink project. To install the CLI, which is based on Python, use a virtual environment: venv and pip will work, but we have adopted uv for package and virtual environment management.
Additionally, when using the tool to do migration of existing code to Apache Flink SQL, a Large Language Model is needed see dedicated chapter for instruction.
Pre-requisites¶
- On Windows - enable WSL2
- All Platforms - install git
-
Make is used to encapsulate the confluent cli, to make it easier for Data engineers to deploy Flink statements.
- install make for windows
- Mac OS:
brew install make
- Linux:
sudo apt-get install build-essential
-
All Platforms - install confluent cli
- If using uv, install it using the documentation
-
All Platforms - Install Python 3.12
-
Clone this repository (this will not be necessary once the CLI will be available in pypi):
- Create a Python virtual environment:
- Activate the environment:
- Be sure to use the pip install in the virtual environment:
- Install the shift_left CLI using the command (this is temporary once the CLI will be loaded to pypi): To get the list of version of the wheel, they are under the
src/shift_left/dist
folder
Set up config.yaml file¶
The configuration file config.yaml
is used intensively to tune the shift_left per environment and will be referenced by the environment variables: CONFIG_FILE. You should have different config.yaml for the different kafka cluster, schema registry and Flink environment.
-
Copy the
config_tmpl.yaml
template file to keep some important parameters for the CLI. -
Modify the
config.yaml
with values from your Confluent Cloud settings. See the tabs below for the different sections of this file:
- Get the Kakfa cluster URL using the
Confluent Console > Cluster Settings
page. The URL has the cluster id and a second id that is used for the RESP API.
kafka:
bootstrap.servers: lkc-2qxxxx-pyyyy.us-west-2.aws.confluent.cloud:9092
cluster_id: lkc-2qxxxx
pkafka_cluster: lkc-2qxxxx-pyyyy
security.protocol: SASL_SSL
sasl.mechanisms: PLAIN
sasl.username: <key name>
sasl.password: <key seceret>
session.timeout.ms: 5000
cluster_type: stage
src_topic_prefix: clone
Kafka cluster > API Keys
. The key can be downloaded locally - Get environment ID from the
Environment details
in the Confluen Console. The cloud provider and region.
confluent_cloud:
base_api: api.confluent.cloud/org/v2
environment_id: env-20xxxx
region: us-west-2
provider: aws
organization_id: 5329.....96
api_key: T3.....HH
api_secret: secret-key
page_size: 100
glb_name: glb
url_scope: private
organization_id
is defined under the user account > Organization settings
* the url_scope is set to private
or public
. When using private link it should be private. * When using a global load balancer, the glb_name to be set. It will be use to define the URL of the REST API. * The API key and secrets are defined per user, and visible under the user account > API keys
-
Flink settings are per environment. Get the URL endpoint by going to
Environments > one_of_the_env > Flink > Endpoints
, copy the private or public endpointflink: flink_url: flink-d....7.us-west-2.aws.glb.confluent.cloud api_key: MVXI.....HY api_secret: cf.......IztA compute_pool_id: lfcp-0... catalog_name: dev-flink-us-west-2 database_name: dev-flink-us-west-2 max_cfu: 50 #-- maximum CFU when creating compute pool via the tool max_cfu_percent_before_allocation: .8 #-- percent * 100 to consider before creating a new compute pool
-
Define the API keys from
Environments > one_of_the_env > Flink > API keys
. - The compute pool id is used as default for running Flink query.
- Catalog name is the name of the environment and database name is the name of the Kafka cluster
The app section defines a set of capabilities to tune the cli.
app:
delta_max_time_in_min: 15 # this is to apply randomly to the event timestamp to create out of order
report_output_dir: ./reports
default_PK: __db
timezone: America/Los_Angeles
src_table_name_prefix: src_
logging: INFO
products: ["p1", "p2", "p3"]
accepted_common_products: ['common', 'seeds']
cache_ttl: 120 # in seconds
sql_content_modifier: shift_left.core.utils.table_worker.ReplaceEnvInSqlContent
translator_to_flink_sql_agent: shift_left.core.utils.translator_to_flink_sql.DbtTranslatorToFlinkSqlAgent
dml_naming_convention_modifier: shift_left.core.utils.naming_convention.DmlNameModifier
compute_pool_naming_convention_modifier: shift_left.core.utils.naming_convention.ComputePoolNameModifier
data_limit_where_condition : rf"where tenant_id in ( SELECT tenant_id FROM tenant_filter_pipeline WHERE product = {product_name})"
data_limit_replace_from_reg_ex: r"\s*select\s+\*\s+from\s+final\s*;?"
data_limit_table_type: source
data_limit_column_name_to_select_from: tenant_id
post_fix_unit_test: _ut
post_fix_integration_test: _it
post_fix_unit_test, post_fix_integration_test
are used to append the given string to table name during unit testing and integration test respectively.- The
data_limit_replace_from_reg_ex, data_limit_table_type, data_limit_column_name_to_select_from
are used to add data filtering to all the source tables based on one column name to filter. The regex specifies to file theselect * from final
which is the last string in most Flink statements using CTEs implementation. sql_content_modifier
specifies the custom class to use to do some SQL content modification depending of the target environment. This is a way to extend the CLI logic to specific usage.
Security access
The config.yaml file is ignored in Git. So having the keys in this file is not a major concern, as it is used by the developers only. But it may be possible, in the future, to access secrets using a Key manager API. This could be a future enhancement.
Environment variables¶
Ensure the following environment variables are set: in a set_env
file. For example, in a project where the source repository is cloned to your-src-dbt-folder and the target Flink project is flink-project, use these setting:
export FLINK_PROJECT=$HOME/Code/flink-project
export STAGING=$FLINK_PROJECT/staging
export PIPELINES=$FLINK_PROJECT/pipelines
export SRC_FOLDER=$HOME/Code/datawarehouse/models
export CONFIG_FILE=./config.yaml
# The following variables are used when deploying with the Makefile.
export CCLOUD_ENV_ID=env-xxxx
export CCLOUD_ENV_NAME=j9r-env
export CCLOUD_KAFKA_CLUSTER=jxxxxx
export CLOUD_REGION=us-west-2
export CLOUD_PROVIDER=aws
export CCLOUD_CONTEXT=login-<user_id>@<domain>.com-https://confluent.cloud
To get the environment variables configured for your Terminal session do: