Understanding Stream Processing: Insights into Windowing in Kafka and Flink
Written on
Stream processing has emerged as the optimal method for handling event data. While batch processing remains relevant, it is stream processing that enables real-time reactions to events.
So, what does it mean to react to events? You might recall the frequently referenced fraud scenario where an individual gains access to a consumer's credit card information. Thanks to the bank's responsive processing system, the fraudulent transaction is promptly rejected.
Additionally, stream processing is crucial for immediate reactions that aren't linked to isolated events. For example, consider the monitoring of temperature within a manufacturing setup; if the average temperature surpasses a designated threshold over a certain timeframe, an alert should be triggered. This scenario is not about a single spike but rather a consistent upward trend—essentially, how temperature readings behave during a defined period.
This concept brings us to windowing in event streams. While aggregations (groupings of events based on shared attributes) are essential for utilizing event streams, a comprehensive aggregation across all time periods fails to provide clarity on specific activity intervals. The following illustration exemplifies this:
Though the average temperature has shown an increase over time, it doesn't convey the complete picture. Now, let’s examine the average temperature readings across specific intervals:
By capturing readings at defined intervals (windows), one can identify significant fluctuations in the average value.
While an overall aggregation can be beneficial, in many instances, it is advantageous to focus on specific time frames. In other situations, you may wish to aggregate based on behavioral patterns rather than fixed time limits, such as session windows that are defined by periods of inactivity. We will explore session windows in a future blog post.
This article is the first in a series discussing windowing in two leading stream processing frameworks: Kafka Streams and Flink, with a specific emphasis on Flink SQL. It’s crucial to clarify that this series does not aim to directly compare the two APIs. Instead, it serves as a guide to windowed operations within Kafka Streams and Flink SQL. While competitive analysis is common, it is not our primary focus here.
The series will cover:
- Various types of windowing, their semantics, and potential applications.
- Time semantics.
- Result interpretation.
- Testing of windowed applications.
A basic understanding of Kafka Streams and Flink SQL is assumed, and examples will initiate with windowing.
Before diving into windowing, it’s essential to examine how Kafka Streams and Flink SQL organize windowing applications. This introductory post will address this foundational detail, while subsequent posts will concentrate on the windowing aspects.
Kafka Streams Windowing
To implement any windowing in Kafka Streams, you must define an aggregation. Aggregations function by combining smaller elements into a larger composition, centered around a common attribute, which, in Kafka Streams, will be the key in key-value pairs. Additionally, a reduce operation—a specialized form of aggregation—can be performed, returning the same type as its input components. Generally, aggregations can yield entirely different outputs from the inputs. However, since windowing operates similarly for both reduce and aggregation, we will use aggregation in our examples throughout this series.
KStream<String, Double> iotHeatSensorStream =
builder.stream("heat-sensor-input",
Consumed.with(stringSerde, doubleSerde));iotHeatSensorStream.groupByKey() // 1
.windowedBy(<window specification>) // 2
.aggregate(() -> new IotSensorAggregation(tempThreshold), // 3
aggregator,
Materialized.with(stringSerde, aggregationSerde))
.toStream().to("sensor-agg-output",
Produced.with(windowedSerde, sensorAggregationSerde)); // 4
Let’s break down the critical components involved in setting up the Kafka Streams window aggregation:
- The initial step is to group all records by key, which is necessary before any aggregation can occur. Here, KStream.groupByKey is utilized, presuming that the key-value pairs have the correct keys for clustering. If not, KStream.groupBy can be employed, allowing you to pass a KeyValueMapper instance that maps the current key-value pair to a new one, enabling the creation of a suitable key for aggregation grouping. Note that changing the key during grouping will lead to a re-partitioning of the records.
- You will specify the windowing—specific types will be discussed in future posts.
- This step involves detailing how to aggregate records. The first parameter is an Initializer represented as a lambda function, which provides the initial value. The second parameter is the Aggregator instance that implements the aggregation action you define. In this case, it involves a simple average while tracking the highest and lowest values. The third parameter is a Materialized instance that indicates how to store the aggregation. As the value type differs from the incoming value, you must supply the correct Serde instance for Kafka Streams to use during (de)serialization.
- The last step entails providing the Serde instances for sending the results back to Kafka. The key Serde is distinct as Kafka Streams encapsulates the incoming record key within a Windowed instance.
One notable aspect of this aggregation example is the absence of visible timestamps for the window. However, there’s a significant hint in the aggregation example explanation. At point four, Kafka Streams encapsulates the original key within a Windowed object.
As depicted in this illustration, the Windowed object incorporates the original key and the Window instance for the aggregation values. The Window object contains the start and end times for the aggregation window. While the window size is not explicitly included, it can be easily calculated by subtracting the start time from the end. Reporting and analysis of the aggregation window times will be discussed in a future blog post.
Encapsulating the original key within a Windowed object alters the type, necessitating an update to Kafka Streams regarding the serialization of results. Thankfully, Kafka Streams offers the WindowedSerdes utility class, simplifying the process of obtaining the correct Serde for producing results back to Kafka:
Serde<Windowed<String>> windowedSerde =
WindowedSerdes.timeWindowedSerdeFrom(String.class, // 1
60_000L); // 2KStream<String, Double> iotHeatSensorStream =
builder.stream("heat-sensor-input",
Consumed.with(stringSerde, doubleSerde));iotHeatSensorStream.groupByKey() // 1
.windowedBy(<window specification>)
.aggregate(() -> new IotSensorAggregation(tempThreshold),
aggregator,
Materialized.with(stringSerde, aggregationSerde)).toStream().to("sensor-agg-output",
Produced.with(windowedSerde, sensorAggregationSerde)); // 3
- The class type for the original key.
- The size of the window in milliseconds.
- Providing the Serde for the Windowed key.
By utilizing the WindowedSerdes class, you ensure the correct deserialization strategy for Kafka Streams when producing windowed results back to Kafka. This implies that downstream consumers will also be equipped to handle the windowed results appropriately. We will delve into that scenario in an upcoming blog post focusing on reporting.
Now, let’s transition to Flink SQL's aggregation windows.
Flink SQL Windowing
Flink provides windowing for event stream data through windowing table-valued functions (TVFs). These TVFs adhere to the SQL 2016 standard Polymorphic Table Functions (PTF). In essence, PTFs allow for user-defined functions on a table that return another table.
The intriguing aspect of PTF is that the schema of the table returned by the function is dynamic and determined at runtime by the function's output. Consequently, PTFs facilitate windowing and aggregation functions on existing tables, which is precisely what Flink SQL windowing offers. The windowing TVFs in Flink have replaced the now-obsolete Group Window Functions. Window TVFs enable more sophisticated window-based calculations, including Window TopN and Window Deduplication.
Next, let’s look at how to execute a windowed aggregation in Flink SQL. Similar to the Kafka Streams example, we will examine the structure of a windowed aggregation, with specific window implementations to be addressed in later posts.
SELECT window_start,
window_end,
device_id,
AVG(reading) AS avg_reading // 1
FROM TABLE(
<Window Function> ( // 2
TABLE device_readings, // 3
DESCRIPTOR(ts), // 4
INTERVAL '5' MINUTES, // 5
[INTERVAL '10' MINUTES] // 6
)
)
GROUP BY window_start, // 7
window_end,
device_id
Here’s the breakdown of the query:
- Selecting the columns and applying the Flink SQL AVG function, providing a descriptive name; these columns form the schema of the returned table.
- The TABLE function.
- You specify a particular window function, either HOP, TUMBLING, or CUMULATE. Support for SESSION types is forthcoming. We will cover specific types in future posts.
- The parameters for the window function begin with the input table.
- The DESCRIPTOR denotes the time attribute column that the function uses for the window.
- Depending on the window function, the next one or two parameters define the window advance and size or just the size.
- As with standard SQL aggregate functions, the same columns must be included in the GROUP BY clause as in the SELECT clause.
Flink SQL adds three additional columns to windowed operations: window_start, window_end, and window_time. The window_time is determined by subtracting 1ms from the window_end value.
This concludes our introduction to the structure of windowing applications in Kafka Streams and Flink SQL. In the next installment, we will discuss hopping and tumbling windows.
Resources
- Apache Flink on Confluent Cloud
- Flink SQL Windows
- Kafka Streams windowing documentation
- Kafka Streams in Action, 2nd Edition