Getting started¶
Update - to rework
- Created 2018
- Updated 10/2024
This chapter discusses various environments for deploying Flink jobs on a developer's workstation. Options include using Docker Compose, Minikube, or a hybrid approach that combines a Confluent Cloud Kafka cluster with a local Flink instance.
For detailed instructions on using Confluent Cloud with Flink, refer this chapter.
Pre-requisites¶
- Need a docker engine, with docker compose CLIs or Minikube and docker-ce engine.
- Get docker cli, helm, and kubectl
- Clone this repository.
Minikube¶
- Install Minikube using some best practices
- Start with enough memory and cpu
- Only one time, install Flink Operator for kubernetes
-
If we want integration with Kafka and Schema registry select one platform:
- Install Confluent Plaform Operator
kubectl create namespace confluent kubectl config set-context --current --namespace confluent helm repo add confluentinc https://packages.confluent.io/helm helm repo update helm upgrade --install confluent-operator confluentinc/confluent-for-kubernetes
- Or Strimzi Operator in the
kafka
namespace:
Flink Kubernetes deployment¶
Running Flink Java in Standalone JVM¶
Docker Desktop and Compose¶
During development, we can use docker-compose to start a simple Flink session
cluster or a standalone job manager to execute one unique job, which has the application jar mounted inside the docker image. We can use this same environment to do SQL based Flink apps.
As Task manager will execute the job, it is important that the container running the flink has access to jars needed to connect to external sources like Kafka or other tools like FlinkFaker. Therefore there is a Dockerfile to get some important jars to build a custom Flink image that we will use for Taskmanager and SQL client.
- Build Custom Flink image, go under
custom-flink-image
folder
- Start Flink session cluster using the following command:
The docker compose starts one job manager and one task manager server:
services:
jobmanager:
image: flink:latest
hostname: jobmanager
ports:
- "8081:8081"
command: jobmanager
user: "flink:flink"
environment:
FLINK_PROPERTIES: "jobmanager.rpc.address: jobmanager"
volumes:
- .:/home
taskmanager:
image: flink:latest
hostname: taskmanager
depends_on:
- jobmanager
command: taskmanager
user: "flink:flink"
scale: 1
volumes:
- .:/home
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 4
The docker compose mounts the local folder to /home
in both the job manager and task manager containers so that, we can submit the job from the job manager (accessing the compiled jar) and also access the input data files in the task manager container.
Docker compose with Kafka and Flink¶
In the deployment/local
folder the docker compose start a one node kafka broker, one zookeeper, one job manager and one task manager.
The SQL client can be used to compute some aggregation on the sale events created by the E-commerce simulator
. To start the simluator using a Python virtual environment do:
{'event_type': 'user_action',
'timestamp': '2024-09-04T15:24:59.450582',
'user_id': 'user5',
'action': 'add_to_cart',
'page': 'category',
'product': 'headphones'
}
- Use the Kafdrop interface to verify the messages in the topic
- Connect to SQL client container
CREATE TABLE user_page_views (
event_type STRING,
user_id STRING,
action STRING,
page STRING,
product STRING,
timestamp_str STRING, # (1)
timestamp_sec TIMESTAMP(3), # derived field
WATERMARK FOR timestamp_sec AS TO_TIMESTAMP(timestamp_str, 'yyyy-MM-dd HH:mm:ss') - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'ecommerce_events',
'properties.bootstrap.servers' = 'kafka:29092',
'properties.group.id' = 'sql-flink-grp-1',
'properties.auto.offset.reset' = 'earliest',
'format' = 'json'
);
- The event timestamp as string created by the Kafka producer
WATERMARK statement is used to define a watermark strategy for handling event time in streaming applications. Watermarks are crucial for dealing with out-of-order events, allowing Flink to manage late arrivals and trigger processing based on event time rather than processing time. A watermark is a timestamp that indicates that no events with a timestamp earlier than the watermark will arrive.
It is important to set the consumer properties like consumer group id, the offset reset strategy...
The next SQL statement is to count the number of page per user
The results
SQL Client¶
The SQL Client aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of code in any programming language.
Build the image within the sql-client folder using the dockerfile. Modify the flink version as needed.
Then to interact with Flink using the SQL client open a bash in the running container
Then use Flink SQL CLI commands. (See documentation for sqlclient).
See this folder to get some basic examples.