Apache Flink Personal Studies¶
Site updates
- Created 2018
- Updated 10/2024: reorganize the content, separate SQL versus Java, add Confluent Cloud and Platform contents in technology chapters.
This chapter updates
- Created 2018
- 10/24: rework intro and stream processing description. Move fault tolerance to architecture
- 12/24: work on cookbook and add more SQL how to
Why Flink?¶
In classical IT architecture, we can observe two types of data processing: 1/ transactional and 2/ analytics.
-
Transactional processing: In a 'monolithic' application, the database system serves multiple applications that may access the same database instances and tables. This approach can create challenges in supporting evolution and scalability. Microservice architecture addresses some of these issues by isolating data storage per service.
-
Analytics: To gain insights from the data, the traditional approach involves developing a data warehouse and ETL jobs to copy and transform data from transactional systems to the warehouse. ETL processes extract data from a transactional database, transform it into a common representation (including validation, normalization, encoding, deduplication, and schema transformation), and then load the new records into the target analytical database. These processes are run periodically in batches.
From the data warehouse, the analysts build queries, metrics, and dashboards / reports to address specific business questions. Data may be at stale. Massive storage is needed, and often the same data is replicated in multiple places, increasing cost and energy consumption.
Today, there is a new way to think about data by considering them as continuous streams of events that can be processed in real time. These event streams form the basis for stateful stream processing applications where analytics are brought to the data.
We can categorize applications implemented with stateful stream processing into three classes:
- Event-driven applications: These applications follow the reactive manifesto for scaling, resilience, and responsiveness, utilizing messaging as a communication system. Microservice applications produce events, and consume them.
- Data pipeline applications: These applications aim to replace ETL or ELT with low-latency stream processing for data transformation, enrichment, deduplication, etc...
- Data analytics applications: These applications focus on computing aggregates and taking immediate action on data, as well as querying live, or creating reports.
Overview of Apache Flink¶
Apache Flink (established in 2016) is a powerful framework and distributed processing engine designed for stateful computations over both unbounded and bounded data streams. It excels in batch processing and graph processing, becoming a standard in the industry due to its impressive performance and comprehensive feature set.
Flink's functionality is built upon four core capabilities:
- Streaming
- State Management
- Time Handling
- Checkpointing
Key characteristics of Flink include:
- Low Latency Processing: Offers event time semantics for consistent and accurate results, even with out-of-order events.
- Exactly-Once Consistency: Ensures reliable state management to avoid duplicates and not loosing message.
- High Throughput: Achieves millisecond latencies while processing millions of events per second.
- Powerful APIs: Provides APIs for operations such as map, reduce, join, window, split, and connect.
- Fault Tolerance and High Availability: Supports failover for task manager nodes, eliminating single points of failure.
- Multilingual Support: Enables streaming logic implementation in Java, Scala, Python, and SQL.
- Extensive Connectors: Integrates seamlessly with various systems, including Kafka, Cassandra, Pulsar, Elasticsearch, File system, JDBC complain Database, HDFS and S3.
- Kubernetes Compatibility: Supports containerization and deployment on Kubernetes with dedicated k8s operator to manage session job or application as well as job and task managers.
- Dynamic Code Updates: Allows for application code updates and job migrations across different Flink clusters without losing application state.
- Batch Processing: Also transparently support traditional batch processing workloads as reading at rest table becomes a stream in Flink.
Stream processing concepts¶
A Flink application is executed as a job, representing a processing pipeline known as a Dataflow.
- Dataflows can be modified using user-defined operators. Each step of the pipeline is handled by an operator.
- These dataflows are structured as directed acyclic graphs (DAGs), beginning with one or more sources and concluding with one or more sinks.
- Sources retrieve data from streams, such as Kafka topics or partitions.
- The source operator forwards records to downstream operators.
- The graph can run in parallel, consuming data from different partitions.
- Operators can filter records from the streams or perform enrichments.
- Some operators can run in parallel after data redistribution.
- Operators like Group By may require reshuffling or repartitioning of data, or rebalancing to merge streams.
Bounded and unbounded data¶
A Stream is a sequence of events, bounded or unbounded:
Dataflow¶
In Flink, applications are composed of streaming dataflows. Dataflow can consume from Kafka, Kinesis, Queue, and any data sources. A typical high level view of Flink app is presented in figure below:
src: apache Flink product doc
The figure below, from the product documentation, summarizes the APIs used to develop a data stream processing flow:
src: apache Flink product doc
Stream processing includes a set of functions to transform data, and to produce a new output stream. Intermediate steps compute rolling aggregations like min, max, mean, or collect and buffer records in time window to compute metrics on a finite set of events.
Distributed platform¶
To improve throughput the data is partitioned so operators can run in parallel. Programs in Flink are inherently parallel and distributed. During execution, a stream has one or more stream partitions, and each operator has one or more operator subtasks.
src: apache Flink site
Some operations, such as GROUP BY or COUNT, may require reshuffling or repartitioning the data. This can be a costly operation, involving serialization and network transmission. Ultimately, events can be assigned to a single sink through rebalancing from multiple streams.
In this example, the SQL statement can be executed in parallel. During the grouping phase, the streams are repartitioned, followed by rebalancing to send the results to one sink.
The following figure is showing integration of stream processing runtime with an append log system, like Kafka, with internal local state persistence and continuous checkpointing to remote storage as HA support:
Using a local state persistence, improves latency, while adopting a remote backup storage increases fault tolerance.
Stateless¶
Stateless applications are designed to tolerate data loss and prioritize rapid recovery following failures. They continuously process the most recent incoming data, making them ideal for scenarios where only low-latency alerts are valuable. This category includes alerting systems and applications that only require the latest data point to function effectively.
When checkpointing is disabled, Apache Flink does not provide built-in guarantees against failures. As a result, you may encounter issues such as data loss, duplicate messages, and a complete loss of application state. This lack of reliability necessitates careful consideration when designing stateless systems, particularly in environments where data integrity is crucial.
The following queries are stateless:
INSERT INTO, FROM (reading and writing to Kafka)
WHERE (filters)
CROSS JOIN UNNEST & LATERAL
SELECT (projection)
scalar and table functions
Stateful Processing¶
Stateful applications require the retention of state information, particularly when using aggregate or window operators. The following operations lead to stateful queries:
JOIN (except CROSS JOIN UNNEST & LATERAL)
--
GROUP BY windowed or non-windowed aggregation
--
OVER aggregation
-- for pattern matching:
MATCH_RECOGNIZE
To ensure fault tolerance, Flink employs checkpoints and savepoints.
See the checkpointing section for details
Windowing¶
Windows are buckets within a Stream and can be defined with times, or count of elements.
-
Tumbling window assigns events to non-overlapping buckets of fixed size. Once the window boundary is crossed, all events within that window are sent to an evaluation function for processing.
- Count-based tumbling windows define how many events are collected before triggering evaluation.
- Time-based tumbling** windows define time interval (e.g., n seconds) during which events are collected. The amount of data within a window can vary depending on the incoming event rate.
-
Sliding windows allows for overlapping periods, meaning an event can belong to multiple buckets. This is particularly useful for capturing trends over time. The window sliding time parameter defines the duration of the window and the interval at which new windows are created. For example, in the following code snippet defines a new 2-second window is created every 1 second:
As a result, each event that arrives during this period will be included in multiple overlapping windows, enabling more granular analysis of the data stream.
-
Session window begins when the data stream processes records and ends when there is a defined period of inactivity. The inactivity threshold is set using a timer, which determines how long to wait before closing the window.
The operator creates one window for each data element received. If there is a gap of 5 seconds without new events, the window will close. This makes session windows particularly useful for scenarios where you want to group events based on user activity or sessions of interaction, capturing the dynamics of intermittent data streams effectively.
-
Global window: one window per key and never close. The processing is done with Trigger:
Event time¶
Time is a central concept in stream processing and can have different interpretations based on the context of the flow or environment:
- Processing Time refers to the system time of the machine executing the task. It offers the best performance and lowest latency since it relies on the local clock. But it may lead to no deterministic results due to factors like ingestion delays, parallel execution, clock synch, backpressure...
- Event Time is the timestamp embedded in the record at the event source level. Using event-time ensures consistent and deterministic results, regardless of the order in which events are processed. This is crucial for accurately reflecting the actual timing of events.
- Ingestion Time denotes the time when an event enters the Flink system. It captures the latency introduced during the event's journey into the processing framework.
In any time window, the order of arrival may not be guarantee, and some events with an older timestamp may fall outside of the time window boundaries. To address this challenge, particularly when computing aggregates, it’s essential to ensure that all relevant events have arrived within the intended time frame.
The watermark serves as a heuristic for this purpose.
Watermarks¶
Watermarks are special markers that indicate the progress of event time in the stream. They help manage late arrivals by allowing the system to understand when it can safely close a window, ensuring that all necessary events have been accounted for before processing the aggregate.
Watermarks are generated in the data stream at regular intervals and serve as indicators of the progression of time. They are special messages intermingled with other data records. Each watermark carries a timestamp that is calculated by subtracting an estimate of out-of-orderness from the largest timestamp encountered so far. This timestamp are always increasing. All records following a watermark should have a higher timestamp than the watermark, if not they are considered late data, and will be discarded. This mechanism allows the stream processing system to make informed decisions about when to close windows and process aggregates, ensuring that all relevant events are considered while still accommodating the inherent variability in event arrival times.
Late arrived events are ignored as the complete information window is already gone. Within a window, states are saved on disk and need to be cleaned once the window is closed. The watermark is the limit from where the garbage collection can occur.
The out-of-orderness estimate serves as an educated guess and is defined for each individual stream. Watermarks are essential for comparing timestamps of events, allowing the system to assert that no earlier events will arrive after the watermark's timestamp.
Watermarks are crucial for effectively processing out-of-order events, especially when dealing with multiple sources. In scenarios involving IoT devices and network latency, it’s possible to receive an event with an earlier timestamp even after the operator has already processed events with that timestamp from other sources. Importantly, watermarks are applicable to any timestamps and are not limited to window semantics.
When working with Kafka topic partitions, the absence of watermarks may represent some challenges. Watermarks are generated independently for each stream and partition. When two partitions are combined, the resulting watermark will be the oldest of the two, reflecting the point at which the system has complete information. If one partition stops receiving new events, the watermark for that partition will not progress. To ensure that processing continues over time, an idle timeout configuration can be implemented.
The watermark generator operates within the Kafka consumer before the events are injected into the Flink stream or table, ensuring that the event time semantics are preserved throughout the processing pipeline.
When using open source Flink, developers need to define 1/ the watermark delay, 2/ the idle timeout, 3/ the max allowed watermark drift to control the pipeline efficiency.
With the windowing operator, event timestamps are utilized, but the windows themselves are defined based on elapsed time—such as a 10-minute duration. Watermarks are crucial for tracking the point in time beyond which no more delayed events are expected to arrive.
When using processing time, the watermark advances at regular intervals, typically every second. Events within the window are emitted for processing once the watermark surpasses the end of that window.
The Flink API requires a WatermarkStrategy
, which consists of both a TimestampAssigner
and a WatermarkGenerator
. A TimestampAssigner
is a simple function that extracts a timestamp from a specific field in each event. Flink provides several common strategies as static methods within the WatermarkStrategy
class.
Additionally, it is possible to configure the system to accept late events by specifying an allowed lateness
period. This defines how late an element can arrive before it is discarded. Flink maintains the state of the window until the allowed lateness time has expired, allowing for flexible handling of late-arriving data while ensuring that the processing remains efficient and accurate.
Some examples¶
Parallel watermarking is an example of getting data from 4 partitions with 2 kafka consumers and 2 windows:
Shuffling is done as windows are computing some COUNT or GROUP BY operations. Event A arriving at 3:13, and B[3:20] on green partitions, and are processed by Window 1 which considers 60 minutes time between 3:00 and 4:00.
The connector sends a Watermark for each partition independently. If the out-of-orderness is set to be 5 minutes, so a watermark is created with a timestamp 3:08 = 3:13 - 5 (partition 0) and at 3:15 for partition 1. The generator sends the minimum of both. The timestamp reflects how complete the stream is so far: it could not be no more complete than the further behind which was event at 3:13,
In the case of a partition does not get any events, as there is no watermark generated for this partition, it may mean the watermark does no advance, and as a side effect it prevents windows from producing events. To avoid this problem, we need to balance kafka partitions so none are empty or idle, or configure the watermarking to use idleness detection.
- See example TumblingWindowOnSale.java in my-fink folder and to test it, do the following:
# Start the SaleDataServer that starts a server on socket 9181 and will read the avg.txt file and send each line to the socket
java -cp target/my-flink-1.0.0-SNAPSHOT.jar jbcodeforce.sale.SaleDataServer
# inside the job manager container started with
`flink run -d -c jbcodeforce.windows.TumblingWindowOnSale /home/my-flink/target/my-flink-1.0.0-SNAPSHOT.jar`.
# The job creates the data/profitPerMonthWindowed.txt file with accumulated sale and number of record in a 2 seconds tumbling window
(June,Bat,Category5,154,6)
(August,PC,Category5,74,2)
(July,Television,Category1,50,1)
(June,Tablet,Category2,142,5)
(July,Steamer,Category5,123,6)
...
Trigger¶
A Trigger in Flink, determines when a window is ready to be processed.
Each window has a default trigger associated with it. For example, a tumbling window might have a default trigger set to 2 seconds, while a global window requires an explicit trigger definition.
You can implement custom triggers by creating a class that implements the Trigger interface, which includes methods such as onElement(..), onEventTime(..), and onProcessingTime(..).
Flink provides several default triggers::
- EventTimeTrigger fires based upon progress of event time
- ProcessingTimeTrigger fires based upon progress of processing time
- CountTrigger fires when # of elements in a window exceeds a specified parameter.
- PurgingTrigger is used for purging the window, allowing for more flexible management of state.
Eviction¶
Evictor is used to remove elements from a window either after the trigger fires or before/after the window function is applied. The specific logic for removing elements is application-specific and can be tailored to meet the needs of your use case.
The predefined evictors:
- CountEvictor removes elements based on a specified count, allowing for fine control over how many elements remain in the window.
- DeltaEvictor evicts elements based on the difference between the current and previous counts, useful for scenarios where you want to maintain a specific change threshold.
- TimeEvictor removes elements based on time, allowing you to keep only the most recent elements within a given time frame.
Source of kanowledge¶
- Product documentation.
- Official training.
- Confluent "Fundamentals of Apache Flink" training- David Andersion.
- Anatomy of a Flink Cluster - product documentation.
- Jobs and Scheduling - product documentation.
- Base docker image is: https://hub.docker.com/_/flink
- Flink docker setup and the docker-compose files in this repo.
- FAQ
- Cloudera flink stateful tutorial: very good example for inventory transaction and queries on item considered as stream
- Building real-time dashboard applications with Apache Flink, Elasticsearch, and Kibana