Terraform for Confluent Cloud Flink¶
Version
- Created November 2024
- Updated January 2026
This guide covers using Terraform to deploy and manage Confluent Cloud infrastructure for Flink applications. Terraform is well suited for managing the following components:
Overview¶
The Confluent Terraform Provider enables infrastructure-as-code management for Confluent Cloud resources including environments, Kafka clusters, Schema Registry, Flink compute pools, and Flink SQL statements.
Core Principles
When running terraform apply, Terraform calculates differences between the current state (tracked in its state file) and the desired state, then applies only necessary changes. This enables incremental building of infrastructure.
The terraform plan command performs the following steps by default:
- It first runs a refresh operation to update its in-memory state with the actual, current configuration of your remote infrastructure by making API calls to the provider.
- It then compares this updated state with the desired state defined in your local Terraform configuration files (.tf files).
- Finally, it outputs a set of proposed changes (add, change, or destroy) required to make the running infrastructure match your configuration files.
For Confluent Cloud, start with an environment, service account, and role binding, then add resources over time. Terraform automatically determines the correct order of operations based on resource dependencies.
For managing different stages (development, staging, production), use separate configurations with dedicated state files.
Key Resources:
Prerequisites¶
Terraform CLI¶
Install the latest Terraform CLI:
Confluent Cloud API Keys¶
Create API keys with OrganizationAdmin role at confluent.cloud/settings/api-keys.
For production, use service account keys rather than user keys. Create a service account for the Terraform runner and assign the OrganizationAdmin role following the RBAC guide.
List existing keys:
Environment Variables¶
Export credentials as environment variables:
export TF_VAR_confluent_cloud_api_key="<your-api-key>"
export TF_VAR_confluent_cloud_api_secret="<your-api-secret>"
Do not commit terraform.tfstate or environment variable files to git.
Provider Configuration¶
Create main.tf with the Confluent provider:
terraform {
required_providers {
confluent = {
source = "confluentinc/confluent"
version = "2.57.0"
}
}
}
provider "confluent" {
cloud_api_key = var.confluent_cloud_api_key
cloud_api_secret = var.confluent_cloud_api_secret
}
data "confluent_organization" "my_org" {}
data "confluent_flink_region" "flink_region" {
cloud = var.cloud_provider
region = var.cloud_region
}
Define variables in variables.tf:
variable "confluent_cloud_api_key" {
description = "Confluent Cloud API Key"
type = string
sensitive = true
}
variable "confluent_cloud_api_secret" {
description = "Confluent Cloud API Secret"
type = string
sensitive = true
}
variable "cloud_provider" {
description = "Cloud provider (AWS, GCP, AZURE)"
type = string
default = "AWS"
}
variable "cloud_region" {
description = "Cloud region"
type = string
default = "us-west-2"
}
variable "prefix" {
description = "Prefix for resource names"
type = string
default = "j9r"
}
Initialize Terraform:
Resource Definitions¶
Build infrastructure incrementally following dependency order. The deployment/cc-terraform folder contains a complete base infrastructure example.
Environment Layer¶
Create the Confluent Cloud environment with Stream Governance:
resource "confluent_environment" "env" {
display_name = "${var.prefix}-env"
stream_governance {
package = "ADVANCED"
}
}
resource "confluent_service_account" "env-manager" {
display_name = "${var.prefix}-env-manager"
description = "Service account to manage ${var.prefix} environment"
depends_on = [confluent_environment.env]
}
resource "confluent_role_binding" "env-admin" {
principal = "User:${confluent_service_account.env-manager.id}"
role_name = "EnvironmentAdmin"
crn_pattern = confluent_environment.env.resource_name
}
Importing existing resources:
For existing environments, use an imports.tf file:
import {
to = confluent_environment.env
id = "env-abc123"
}
import {
to = confluent_service_account.env-manager
id = "sa-xyz789"
}
Kafka Layer¶
Create a Kafka cluster with API keys:
resource "confluent_kafka_cluster" "standard" {
display_name = "${var.prefix}-kafka"
availability = "SINGLE_ZONE"
cloud = var.cloud_provider
region = var.cloud_region
standard {}
environment {
id = confluent_environment.env.id
}
}
resource "confluent_api_key" "kafka-api-key" {
display_name = "kafka-api-key"
description = "Kafka API Key"
owner {
id = confluent_service_account.env-manager.id
api_version = confluent_service_account.env-manager.api_version
kind = confluent_service_account.env-manager.kind
}
managed_resource {
id = confluent_kafka_cluster.standard.id
api_version = confluent_kafka_cluster.standard.api_version
kind = confluent_kafka_cluster.standard.kind
environment {
id = confluent_environment.env.id
}
}
}
Import existing Kafka cluster:
Schema Registry Layer¶
Schema Registry is auto-provisioned with the environment. Reference it as a data source:
data "confluent_schema_registry_cluster" "essentials" {
environment {
id = confluent_environment.env.id
}
depends_on = [confluent_kafka_cluster.standard]
}
resource "confluent_api_key" "schema-registry-api-key" {
display_name = "schema-registry-api-key"
owner {
id = confluent_service_account.env-manager.id
api_version = confluent_service_account.env-manager.api_version
kind = confluent_service_account.env-manager.kind
}
managed_resource {
id = data.confluent_schema_registry_cluster.essentials.id
api_version = data.confluent_schema_registry_cluster.essentials.api_version
kind = data.confluent_schema_registry_cluster.essentials.kind
environment {
id = confluent_environment.env.id
}
}
}
Flink Layer¶
Flink requires two service accounts with specific role bindings:
- flink-app - Runtime principal for Flink statements
- flink-developer-sa - Deploys Flink statements
# Flink runtime principal
resource "confluent_service_account" "flink-app" {
display_name = "${var.prefix}-flink-app"
description = "Service account as which Flink statements run"
}
resource "confluent_role_binding" "flink-app-clusteradmin" {
principal = "User:${confluent_service_account.flink-app.id}"
role_name = "CloudClusterAdmin"
crn_pattern = confluent_kafka_cluster.standard.rbac_crn
}
resource "confluent_role_binding" "flink-app-sr-read" {
principal = "User:${confluent_service_account.flink-app.id}"
role_name = "DeveloperRead"
crn_pattern = "${data.confluent_schema_registry_cluster.essentials.resource_name}/subject=*"
}
resource "confluent_role_binding" "flink-app-sr-write" {
principal = "User:${confluent_service_account.flink-app.id}"
role_name = "DeveloperWrite"
crn_pattern = "${data.confluent_schema_registry_cluster.essentials.resource_name}/subject=*"
}
# Flink developer (deploys statements)
resource "confluent_service_account" "flink-developer-sa" {
display_name = "${var.prefix}-fd-sa"
description = "Service account to deploy Flink statements"
}
resource "confluent_role_binding" "flink-developer" {
principal = "User:${confluent_service_account.flink-developer-sa.id}"
role_name = "FlinkDeveloper"
crn_pattern = confluent_environment.env.resource_name
}
resource "confluent_role_binding" "flink-assigner" {
principal = "User:${confluent_service_account.flink-developer-sa.id}"
role_name = "Assigner"
crn_pattern = "${data.confluent_organization.my_org.resource_name}/service-account=${confluent_service_account.flink-app.id}"
}
resource "confluent_api_key" "flink-api-key" {
display_name = "flink-api-key"
description = "Flink API Key owned by flink-developer-sa"
owner {
id = confluent_service_account.flink-developer-sa.id
api_version = confluent_service_account.flink-developer-sa.api_version
kind = confluent_service_account.flink-developer-sa.kind
}
managed_resource {
id = data.confluent_flink_region.flink_region.id
api_version = data.confluent_flink_region.flink_region.api_version
kind = data.confluent_flink_region.flink_region.kind
environment {
id = confluent_environment.env.id
}
}
}
Flink Compute Pools¶
resource "confluent_flink_compute_pool" "default" {
display_name = "default"
cloud = upper(data.confluent_flink_region.flink_region.cloud)
region = data.confluent_flink_region.flink_region.region
max_cfu = 50
environment {
id = confluent_environment.env.id
}
}
Deploying Flink Statements¶
Flink SQL statements can be deployed using the confluent_flink_statement resource. See the Flink Statement Resource Documentation.
Single Compute Pool Configuration¶
When managing a single compute pool, configure Flink parameters in the provider block:
provider "confluent" {
organization_id = var.organization_id
environment_id = var.environment_id
flink_compute_pool_id = var.flink_compute_pool_id
flink_rest_endpoint = var.flink_rest_endpoint
flink_api_key = var.flink_api_key
flink_api_secret = var.flink_api_secret
flink_principal_id = var.flink_principal_id
}
resource "confluent_flink_statement" "create_table" {
statement = "CREATE TABLE orders (order_id STRING, amount DECIMAL(10,2));"
properties = {
"sql.current-catalog" = var.environment_display_name
"sql.current-database" = var.kafka_cluster_display_name
}
lifecycle {
prevent_destroy = true
}
}
Multiple Compute Pools Configuration¶
When managing multiple compute pools, specify resources explicitly:
resource "confluent_flink_statement" "transaction_faker" {
organization {
id = data.confluent_organization.my_org.id
}
environment {
id = confluent_environment.env.id
}
compute_pool {
id = confluent_flink_compute_pool.data-generation.id
}
principal {
id = confluent_service_account.flink-app.id
}
rest_endpoint = data.confluent_flink_region.flink_region.rest_endpoint
properties = {
"sql.current-catalog" = confluent_environment.env.display_name
"sql.current-database" = confluent_kafka_cluster.standard.display_name
}
credentials {
key = confluent_api_key.flink-api-key.id
secret = confluent_api_key.flink-api-key.secret
}
statement = file("${path.module}/sql/create_table.sql")
statement_name = "create-orders-table"
lifecycle {
prevent_destroy = true
}
}
Statement Lifecycle Management¶
Use the stopped attribute to control statement execution:
resource "confluent_flink_statement" "streaming_job" {
statement = "INSERT INTO sink SELECT * FROM source;"
stopped = false # Set to true to stop the statement
lifecycle {
prevent_destroy = true
}
}
For complex deployments with many statements and canary releases, consider using shift_left_utils.
Workflow Commands¶
Standard Workflow¶
terraform init # Initialize provider
terraform validate # Validate configuration
terraform plan # Preview changes
terraform apply # Apply changes
Auto-approve for CI/CD¶
Retrieve Outputs¶
Handle 401 Errors¶
If you encounter 401 errors, verify the API key environment variables are set correctly.
Resource Importer¶
The Confluent Resource Importer exports existing Confluent Cloud resources to Terraform configuration files. This is useful for:
- Discovering existing resource IDs for import statements
- Generating Terraform configurations from existing infrastructure
- Cross-checking deployed resources against Terraform definitions
Using the Importer¶
Create a separate folder for the importer (e.g., importer/main.tf):
terraform {
required_providers {
confluent = {
source = "confluentinc/confluent"
version = "2.57.0"
}
}
}
provider "confluent" {}
resource "confluent_tf_importer" "cloud_resources" {
output_path = "${path.module}/imported_infrastructure"
# Supported resource types (cannot mix Cloud and Kafka resources)
resources = [
"confluent_environment",
"confluent_service_account",
"confluent_kafka_cluster"
]
}
Supported Resource Types¶
The importer supports these resource types (but Cloud and Kafka resources cannot be imported simultaneously):
| Mode | Resources |
|---|---|
| Cloud | confluent_environment, confluent_service_account, confluent_kafka_cluster |
| Kafka | confluent_kafka_topic, confluent_kafka_acl, confluent_connector |
| Schema | confluent_schema |
Running the Importer¶
export CONFLUENT_CLOUD_API_KEY="<your-api-key>"
export CONFLUENT_CLOUD_API_SECRET="<your-api-secret>"
cd importer
terraform init
terraform apply -auto-approve
The importer creates:
imported_infrastructure/main.tf- Resource definitionsimported_infrastructure/terraform.tfstate- State file with resource IDsimported_infrastructure/variables.tf- Variable definitions
Extracting Import IDs¶
After running the importer, extract resource IDs from the generated files:
# Find environment IDs
grep -A2 "display_name.*j9r-env" imported_infrastructure/main.tf
# Find service account IDs from state
grep -A5 "j9r" imported_infrastructure/terraform.tfstate | grep '"id"'
Creating imports.tf¶
Use the discovered IDs to create an imports.tf file in your main configuration:
import {
to = confluent_environment.env
id = "env-nknqp3"
}
import {
to = confluent_kafka_cluster.standard
id = "env-nknqp3/lkc-3mnm0m"
}
import {
to = confluent_service_account.env-manager
id = "sa-7vvrvw"
}
import {
to = confluent_flink_compute_pool.default
id = "env-nknqp3/lfcp-80r0n7"
}
Cleanup¶
After importing resources into your main Terraform state, the importer folder can be deleted:
Iterative Development¶
Build Terraform manifests incrementally:
- Add resources and variables
- Run validation:
- Review outputs and add dependent resources
- Repeat
This approach provides clear understanding of dependencies and makes troubleshooting straightforward.
Adding Flink to an Existing Environment¶
This section covers adding Flink resources to an existing Confluent Cloud environment that already has Kafka and Schema Registry.
Step 1: Import Existing Resources¶
Use the Resource Importer (see previous section) or manually create imports.tf with your existing resource IDs:
# imports.tf - Reference existing resources
import {
to = confluent_environment.env
id = "env-abc123"
}
import {
to = confluent_kafka_cluster.standard
id = "env-abc123/lkc-xyz789"
}
import {
to = confluent_service_account.env-manager
id = "sa-123abc"
}
Step 2: Create Resource Definitions for Existing Resources¶
Create minimal resource blocks that match your existing infrastructure:
# env.tf - Match your existing environment
resource "confluent_environment" "env" {
display_name = "my-existing-env"
stream_governance {
package = "ADVANCED" # or "ESSENTIALS"
}
}
# kafka.tf - Match your existing cluster
resource "confluent_kafka_cluster" "standard" {
display_name = "my-existing-kafka"
availability = "SINGLE_ZONE"
cloud = "AWS"
region = "us-west-2"
standard {}
environment {
id = confluent_environment.env.id
}
}
Step 3: Add Flink-Specific Resources¶
See the quickstart TF sample for some examples.
Create flink.tf with the Flink service accounts, role bindings, and API keys:
# flink.tf - New Flink resources
# Data source for Schema Registry (already exists)
data "confluent_schema_registry_cluster" "essentials" {
environment {
id = confluent_environment.env.id
}
}
# Flink runtime principal - runs Flink statements
resource "confluent_service_account" "flink-app" {
display_name = "${var.prefix}-flink-app"
description = "Service account as which Flink statements run"
}
# Role bindings for flink-app
resource "confluent_role_binding" "flink-app-clusteradmin" {
principal = "User:${confluent_service_account.flink-app.id}"
role_name = "CloudClusterAdmin"
crn_pattern = confluent_kafka_cluster.standard.rbac_crn
}
resource "confluent_role_binding" "flink-app-sr-read" {
principal = "User:${confluent_service_account.flink-app.id}"
role_name = "DeveloperRead"
crn_pattern = "${data.confluent_schema_registry_cluster.essentials.resource_name}/subject=*"
}
resource "confluent_role_binding" "flink-app-sr-write" {
principal = "User:${confluent_service_account.flink-app.id}"
role_name = "DeveloperWrite"
crn_pattern = "${data.confluent_schema_registry_cluster.essentials.resource_name}/subject=*"
}
# Flink developer - deploys Flink statements
resource "confluent_service_account" "flink-developer-sa" {
display_name = "${var.prefix}-fd-sa"
description = "Service account to deploy Flink statements"
}
resource "confluent_role_binding" "flink-developer" {
principal = "User:${confluent_service_account.flink-developer-sa.id}"
role_name = "FlinkDeveloper"
crn_pattern = confluent_environment.env.resource_name
}
resource "confluent_role_binding" "flink-assigner" {
principal = "User:${confluent_service_account.flink-developer-sa.id}"
role_name = "Assigner"
crn_pattern = "${data.confluent_organization.my_org.resource_name}/service-account=${confluent_service_account.flink-app.id}"
}
# Flink API key for deploying statements
resource "confluent_api_key" "flink-api-key" {
display_name = "${var.prefix}-flink-api-key"
description = "Flink API Key owned by flink-developer-sa"
owner {
id = confluent_service_account.flink-developer-sa.id
api_version = confluent_service_account.flink-developer-sa.api_version
kind = confluent_service_account.flink-developer-sa.kind
}
managed_resource {
id = data.confluent_flink_region.flink_region.id
api_version = data.confluent_flink_region.flink_region.api_version
kind = data.confluent_flink_region.flink_region.kind
environment {
id = confluent_environment.env.id
}
}
}
Create pools.tf with Flink compute pools:
# pools.tf - Flink compute pools
resource "confluent_flink_compute_pool" "default" {
display_name = "default"
cloud = upper(data.confluent_flink_region.flink_region.cloud)
region = data.confluent_flink_region.flink_region.region
max_cfu = 50
environment {
id = confluent_environment.env.id
}
}
Step 4: Apply the Configuration¶
The plan should show:
- Existing resources being imported (environment, kafka cluster, service accounts)
- New Flink resources being created (flink-app, flink-developer-sa, role bindings, compute pool)
Removing Flink resources¶
Compute pools with running Flink statements cannot be deleted - stop all statements first.
-
If the compute pool is in your Terraform state, simply remove or comment out the resource block in the tf and output.tf and apply
-
To destroy only a specific compute pool without removing it from config:
-
If you want to stop managing it with Terraform but keep it running: