Flink Getting started¶
Update
- Created 2018
- Updated 1/18/2025 - imporve note, review done.
This chapter reviews the different environments for deploying Flink, Flink jobs on a developer's workstation. Options include downloading product tar file, using Docker Compose, Minikube ot Colima -k3s, or adopting an hybrid approach that combines a Confluent Cloud Kafka cluster with a local Flink instance. This last option is not supported for production but is helpful for development purpose. To get started with Confluent Cloud for Flink see this summary chapter.
The section includes Open Source product, or Confluent Platform for Flink or Confluent Cloud for Flink.
Install locally¶
The Flink Open Source tar file can be downloaded. The install-local.sh
script in 'deployment/product-tar` folder does this download and untar operations.
- Once done, start Flink using the
start-cluster.sh
script inflink-1.19.1/bin
. See Flink OSS product documentation.
-
Access Web UI and submit one of the example using the flink client cli:
./bin/flink run examples/streaming/WordCount.jar
. -
As an option, start the SQL client:
- [Optional] Start SQL Gateway to be able to have multiple client apps to submit SQL queries in concurrency.
./bin/sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=localhost
# stop it
./bin/sql-gateway.sh stop-all -Dsql-gateway.endpoint.rest.address=localhost
- Stop the Flink job and the Task manager cluster:
With docker images¶
Pre-requisites¶
- Get docker cli, helm, and kubectl
- Clone this repository.
- For docker container execution, you need a docker engine, with docker compose CLIs. As an option, we can use Colima or Minikube with docker-ce engine.
Three options:
- Colima with Kubernetes
- Minikube
- docker compose
For each of those environment, see the next sections and for Flink Kubernetes operator deployment and configuratuin see the dedicated k8s deployment chapter.
Colima with Kubernetes¶
As an alternative to use Docker Desktop, Colima is an open source to run container on Linux or Mac.
- Start a k3s cluster:
Deploy the Flink and Confluent Platform operators (see Makefile in deployment/k8s and its readme).
Define a Flink cluster, and a Kafka Cluster if needed.
Minikube¶
- Install Minikube, and review some best practices on how to configure and use it.
- Start with enough memory and cpu
- Only to newly created minikube profile, install Flink Operator for kubernetes
-
If we want integration with Kafka and Schema registry select one of the Kafka platform:
- Install Confluent Plaform Operator
kubectl create namespace confluent kubectl config set-context --current --namespace confluent helm repo add confluentinc https://packages.confluent.io/helm helm repo update helm upgrade --install confluent-operator confluentinc/confluent-for-kubernetes
- Or Kafka OSS Strimzi Operator in the
kafka
namespace:
kubectl create namespace kafka kubectl config set-context --current --namespace kafka kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
with Apicu.io for Operator for schema management.
Docker Desktop and Compose¶
During development, we can use docker-compose to start a simple Flink session
cluster or a standalone job manager to execute one unique job, which has the application jar mounted inside the docker image. We can use this same environment to do SQL based Flink apps.
As Task manager will execute the job, it is important that the container running the flink code has access to jars needed to connect to external sources like Kafka or other tools like FlinkFaker. Therefore there is a Dockerfile to get some important jars to build a custom Flink image that we will use for Taskmanager and SQL client. Always update the jar version with new Flink version.
- If specific integrations are needed, get the needed jar references, update the dockerfile and then build the Custom Flink image, under
deployment/custom-flink-image
folder
- Start Flink session cluster using the following command:
The Flink OSS docker compose starts one job manager and one task manager server:
services:
jobmanager:
image: flink:latest
hostname: jobmanager
ports:
- "8081:8081"
command: jobmanager
user: "flink:flink"
environment:
FLINK_PROPERTIES: "jobmanager.rpc.address: jobmanager"
volumes:
- .:/home
taskmanager:
image: flink:latest
hostname: taskmanager
depends_on:
- jobmanager
command: taskmanager
user: "flink:flink"
scale: 1
volumes:
- .:/home
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 4
The docker compose mounts the local folder to /home
in both the job manager and task manager containers so that, we can submit jobs from the job manager (accessing the compiled jar) and also access the input data files and connector jars in the task manager container.
See this section to deploy an application with flink
Docker compose with Kafka and Flink¶
In the deployment/docker
folder the docker compose starts one node OSS kafka broker, one zookeeper, one OSS Flink job manager and one task manager.
The SQL client can be used to compute some aggregation on the sale events created by the E-commerce simulator
. To start the simulator using a Python virtual environment do:
The application sends events like the following:
{'event_type': 'user_action',
'timestamp': '2024-09-04T15:24:59.450582',
'user_id': 'user5',
'action': 'add_to_cart',
'page': 'category',
'product': 'headphones'
}
- Use the Kafdrop interface to verify the messages in the topic
- Connect to SQL client container
CREATE TABLE user_page_views (
event_type STRING,
user_id STRING,
action STRING,
page STRING,
product STRING,
timestamp_str STRING, # (1)
timestamp_sec TIMESTAMP(3), # derived field
WATERMARK FOR timestamp_sec AS TO_TIMESTAMP(timestamp_str, 'yyyy-MM-dd HH:mm:ss') - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'ecommerce_events',
'properties.bootstrap.servers' = 'kafka:29092',
'properties.group.id' = 'sql-flink-grp-1',
'properties.auto.offset.reset' = 'earliest',
'format' = 'json'
);
- The event timestamp is a string created by the Kafka producer
WATERMARK statement is used to define a watermark strategy for handling event time in streaming applications. Watermarks are crucial for dealing with out-of-order events, allowing Flink to manage late arrivals and trigger processing based on event time rather than processing time. A watermark is a timestamp that indicates that no events with a timestamp earlier than the watermark will arrive.
It is important to set the consumer properties like consumer group id, the offset reset strategy...
The next SQL statement is to count the number of page per user
The results
Confluent Cloud¶
See getting started product documentation and this summary.
To use the Confluent client flink sql client see this note