Skip to content

Window Processing

A window is a batch of messages made with respect to time, with which we are able to perform processing that can analyze or aggregate the messages of the window. This is useful in stream processing as the dataset is never "complete", and therefore in order to perform analysis against a collection of messages, we must do so by creating a continuous feed of windows (collections), where our analysis is made against each window.

For example, given a stream of messages relating to cars passing through various traffic lights:

{
  "traffic_light": "cbf2eafc-806e-4067-9211-97be7e42cee3",
  "created_at": "2021-08-07T09:49:35Z",
  "registration_plate": "AB1C DEF",
  "passengers": 3
}

Windowing allows us to produce a stream of messages representing the total traffic for each light every hour:

{
  "traffic_light": "cbf2eafc-806e-4067-9211-97be7e42cee3",
  "created_at": "2021-08-07T10:00:00Z",
  "unique_cars": 15,
  "passengers": 43
}

Creating Windows

The first step in processing windows is producing the windows themselves, this can be done by configuring a window-producing buffer after your input:

System Clock

system_window buffer creates windows by following the system clock of the running machine. Windows will be created and emitted at predictable times, but this also means windows for historical data will not be emitted and therefore prevents backfills of traffic data:

input:
  kafka:
    addresses: [ TODO ]
    topics: [ traffic_data ]
    consumer_group: traffic_consumer
    checkpoint_limit: 1000

buffer:
  system_window:
    timestamp_mapping: root = this.created_at
    size: 1h
    allowed_lateness: 3m

For more information about this buffer, refer to the system_window buffer docs.

Grouping

With a window buffer chosen, our stream of messages will be emitted periodically as batches of all messages that fit within each window. Since we want to analyze the window separately for each traffic light, we need to expand this single batch out into one for each traffic light identifier within the window. For that purpose, we have two processor options: group_by and group_by_value.

In our case, we want to group by the value of the field traffic_light of each message, which we can do with the following:

pipeline:
  processors:
    - group_by_value:
        value: ${! json("traffic_light") }

Aggregating

Once our window has been grouped, the next step is to calculate the aggregated passenger and unique car counts. For this purpose, the Benthos mapping language Bloblang comes in handy as the method from_all executes the target function against the entire batch and returns an array of the values, allowing us to mutate the result with chained methods such as sum:

pipeline:
  processors:
    - group_by_value:
        value: ${! json("traffic_light") }

    - mapping: |
        let is_first_message = batch_index() == 0

        root.traffic_light = this.traffic_light
        root.created_at = @window_end_timestamp
        root.total_cars = if $is_first_message {
          json("registration_plate").from_all().unique().length()
        }
        root.passengers = if $is_first_message {
          json("passengers").from_all().sum()
        }

        # Only keep the first batch message containing the aggregated results.
        root = if ! $is_first_message {
          deleted()
        }

Bloblang is very powerful, and by using from and from_all, it's possible to perform a wide range of batch-wide processing. If you fancy a challenge, try updating the above mapping to only count passengers from the first journey of each registration plate in the window (hint: the fold method might come in handy).