Skip to content

Flink Getting Started Guide

Updates
  • Created 2018
  • Updated 2/14/2025 - improve note, on k8s deployment and get simple demo reference, review done.
  • 03/30/25: converged the notes and update referenced links

This guide covers four different approaches to deploy and run Apache Flink:

  1. Local Binary Installation
  2. Docker-based Deployment
  3. Kubernetes Deployment: Colima or minicube
  4. Confluent Cloud Managed Service

Prerequisites

Before getting started, ensure you have:

  1. Java 11 or higher installed (OpenJDK)
  2. Docker Engine and Docker CLI (for Docker and Kubernetes deployments)
  3. kubectl and Helm (for Kubernetes deployment)
  4. Confluent Cloud account (for Confluent Cloud deployment)
  5. Git (to clone this repository)
  6. Confluent cli installed

1. Local Binary Installation

This approach is ideal for development and testing on a single machine.

Installation Steps

  1. Download and extract Flink binary
    # Using the provided script
    ./deployment/product-tar/install-local.sh
    
    # Or manually
    curl https://dlcdn.apache.org/flink/flink-1.20.1/flink-1.20.1-bin-scala_2.12.tgz --output flink-1.20.1-bin-scala_2.12.tgz
    tar -xzf flink-1.20.1-bin-scala_2.12.tgz
    
  2. Download and extract Kafka binary See download versions

    curl https://downloads.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz  --output kafka_2.13-3.9.0.tgz 
    tar -xvf kafka_2.13-3.9.0.tgz
    cd kafka_2.13-3.9.0
    KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
    bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
    

  3. Set environment variables:

    export FLINK_HOME=$(pwd)/flink-1.20.1
    export KAFKA_HOME=$(pwd)/kafka_2.13-3.9.0
    export PATH=$PATH:$FLINK_HOME/bin:$KAFKA_HOME/bin
    

  4. Start the Flink cluster:

    $FLINK_HOME/bin/start-cluster.sh
    

  5. Access the Web UI at http://localhost:8081

  6. Submit a job

    ./bin/flink run ./examples/streaming/TopSpeedWindowing.jar
    ./bin/flink list
    ./bin/flink cancel <id> 
    

  7. Download needed SQL connector for Kafka

    cd flink-1.20.1
    mkdir sql-lib
    cd sql-lib
    curl https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.4.0-1.20/flink-sql-connector-kafka-3.4.0-1.20.jar --output flink-sql-connector-kafka-3.4.0-1.20.jar
    

  8. Stop th cluster:

    $FLINK_HOME/bin/stop-cluster.sh
    

Running a simple SQL application

  1. Submit a sample job:
    $FLINK_HOME/bin/flink run examples/streaming/WordCount.jar
    
  2. Start Kafka cluster and create topics

    $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/kraft/server.properties
    $KAFKA_HOME/bin/kafka-topics.sh --create --topic flink-input --bootstrap-server localhost:9092
    $KAFKA_HOME/bin/kafka-topics.sh --create --topic message-count --bootstrap-server localhost:9092
    

  3. Use the Flink SQL Shell:

    $FLINK_HOME/bin/sql-client.sh --library $FLINK_HOME/sql-lib
    

    • Create a table using Kafka connector:
      CREATE TABLE flinkInput (
         `raw` STRING,
         `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
      ) WITH (
         'connector' = 'kafka',
         'topic' = 'flink-input',
         'properties.bootstrap.servers' = 'localhost:9092',
         'properties.group.id' = 'j9rGroup',
         'scan.startup.mode' = 'earliest-offset',
         'format' = 'raw'
      );
      
    • Create an output table in a debizium format so we can see the before and after state:
      CREATE TABLE msgCount (
         `count` BIGINT NOT NULL
      ) WITH (
         'connector' = 'kafka',
         'topic' = 'message-count',
         'properties.bootstrap.servers' = 'localhost:9092',
         'properties.group.id' = 'j9rGroup',
         'scan.startup.mode' = 'earliest-offset',
         'format' = 'debezium-json'
      );
      
    • Make the simplest flink processing by counting the messages:
      INSERT INTO msgCount SELECT COUNT(*) as `count` FROM flinkInput;
      
      The result will look like:
      [INFO] Submitting SQL update statement to the cluster...
      [INFO] SQL update statement has been successfully submitted to the cluster:
      Job ID: 2be58d7f7f67c5362618b607da8265d7
      
    • Start a producer in one terminal
       bin/kafka-console-producer.sh --topic flink-input --bootstrap-server localhost:9092
      
    • Verify result in a second terminal
      bin/kafka-console-consumer.sh -topic message-count  --bootstrap-server localhost:9092
      
      The results will be a list of debezium records like
      {"before":null,"after":{"count":1},"op":"c"}
      {"before":{"count":1},"after":null,"op":"d"}
      {"before":null,"after":{"count":2},"op":"c"}
      {"before":{"count":2},"after":null,"op":"d"}
      {"before":null,"after":{"count":3},"op":"c"}
      {"before":{"count":3},"after":null,"op":"d"}
      {"before":null,"after":{"count":4},"op":"c"}
      
  4. [Optional] Start SQL Gateway for concurrent SQL queries:

    $FLINK_HOME/bin/sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=localhost
    

See product documentation for different examples. To do some Python table API demonstrations see this chapter.

Troubleshooting

  • If port 8081 is already in use, modify $FLINK_HOME/conf/flink-conf.yaml
  • Check logs in $FLINK_HOME/log directory
  • Ensure Java 11+ is installed and JAVA_HOME is set correctly

2. Docker-based Deployment

This approach provides containerized deployment using Docker Compose.

Prerequisites

  • Docker Engine
  • Docker Compose

Quick Start

  1. Build custom Flink image (if needed):

    cd deployment/custom-flink-image
    docker build -t jbcodeforce/myflink .
    

  2. Start Flink session cluster:

    cd deployment/docker
    docker compose up -d
    

Docker Compose with Kafka

To run Flink with Kafka:

cd deployment/docker
docker compose -f kafka-docker-compose.yaml up -d

Customization

  • Modify deployment/custom-flink-image/Dockerfile to add required connectors
  • Update deployment/docker/flink-oss-docker-compose.yaml for configuration changes

During development, we can use docker-compose to start a simple Flink session cluster or a standalone job manager to execute one unique job, which has the application jar mounted inside the docker image. We can use this same environment to do SQL based Flink apps.

As Task manager will execute the job, it is important that the container running the flink code has access to jars needed to connect to external sources like Kafka or other tools like FlinkFaker. Therefore, in deployment/custom-flink-image, there is a Dockerfile to get the needed jars to build a custom Flink image that may be used for Taskmanager and SQL client. Always update the jar version with new Flink version.

Docker hub and maven links

3. Kubernetes Deployment

This approach provides scalable, production-ready deployment using Kubernetes. See the K8S deployment deeper dive chapter and the lab readme for all command details.

  • Deploy a State Machine example application to validate the deployment.
    kubectl create -f https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.11/examples/basic.yaml -n flink
    
  • Verify Flink UI access
    kubectl port-forward svc/basic-example-rest 8081 -n flink
    
  • Undeploy
    kubectl delete -f https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.11/examples/basic.yaml -n flink
    

Confluent Platform on Kubernetes

For Confluent Platform deployment:

  1. Install Confluent Flink Operator:

    make deploy_cp_flink_operator
    

  2. Deploy Confluent Platform Kafka cluster:

    make deploy_cp_cluster
    

  3. Deploy Flink applications

See Kubernetes deployment chapter for detailed instructions. And Confluent operator documentation.

4. Confluent Cloud Deployment

This approach provides a fully managed Flink service.

Prerequisites

  • Confluent Cloud account
  • Confluent CLI installed
  • Environment configured

Getting Started

  1. Create a Flink compute pool:

    confluent flink compute-pool create my-pool --cloud aws --region us-west-2
    

  2. Start SQL client:

    confluent flink shell
    

  3. Submit SQL statements:

    CREATE TABLE my_table (
      id INT,
      name STRING
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'my-topic',
      'properties.bootstrap.servers' = 'pkc-xxxxx.region.provider.confluent.cloud:9092',
      'properties.security.protocol' = 'SASL_SSL',
      'properties.sasl.mechanism' = 'PLAIN',
      'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="<API_KEY>" password="<API_SECRET>";',
      'format' = 'json'
    );
    

See Confluent Cloud Flink documentation for more details.

See the Shift Left project to manage bigger Flink project with a dedicated CLI.

Choosing the Right Deployment Approach

Approach Use Case Pros Cons
Local Binary Development, Testing Simple setup, Fast iteration Limited scalability, or manual configuration and maintenance on distributed computers.
Docker Development, Testing Containerized, Reproducible Manual orchestration
Kubernetes Production Scalable, Production-ready Complex setup
Confluent Cloud Production Fully managed, No ops Vendor Control Plane

Application deployment

For production it is recommended to deploy in application mode, so packaging SQL, python or java application in a jar.

Additional Resources