Skip to content

system_window

Chops a stream of messages into tumbling or sliding windows of fixed temporal size, following the system clock.

# Config fields, showing default values
buffer:
  system_window:
    timestamp_mapping: root = now()
    size: ""
    slide: ""
    offset: ""
    allowed_lateness: ""

A window is a grouping of messages that fit within a discrete measure of time following the system clock. Messages are allocated to a window either by the processing time (the time at which they're ingested) or by the event time, and this is controlled via the timestamp_mapping field.

In tumbling mode (default) the beginning of a window immediately follows the end of a prior window. When the buffer is initialized the first window to be created and populated is aligned against the zeroth minute of the zeroth hour of the day by default, and may therefore be open for a shorter period than the specified size.

A window is flushed only once the system clock surpasses its scheduled end. If an allowed_lateness is specified then the window will not be flushed until the scheduled end plus that length of time.

When a message is added to a window it has a metadata field window_end_timestamp added to it containing the timestamp of the end of the window as an RFC3339 string.

Sliding Windows

Sliding windows begin from an offset of the prior windows' beginning rather than its end, and therefore messages may belong to multiple windows. In order to produce sliding windows specify a slide duration.

Back Pressure

If back pressure is applied to this buffer either due to output services being unavailable or resources being saturated, windows older than the current and last according to the system clock will be dropped in order to prevent unbounded resource usage. This means you should ensure that under the worst case scenario you have enough system memory to store two windows' worth of data at a given time (plus extra for redundancy and other services).

If messages could potentially arrive with event timestamps in the future (according to the system clock) then you should also factor in these extra messages in memory usage estimates.

Delivery Guarantees

This buffer honours the transaction model within Benthos in order to ensure that messages are not acknowledged until they are either intentionally dropped or successfully delivered to outputs. However, since messages belonging to an expired window are intentionally dropped there are circumstances where not all messages entering the system will be delivered.

When this buffer is configured with a slide duration it is possible for messages to belong to multiple windows, and therefore be delivered multiple times. In this case the first time the message is delivered it will be acked (or nacked) and subsequent deliveries of the same message will be a "best attempt".

During graceful termination if the current window is partially populated with messages they will be nacked such that they are re-consumed the next time the service starts.

Examples

Counting Passengers at Traffic

Given a stream of messages relating to cars passing through various traffic lights of the form:

{
  "traffic_light": "cbf2eafc-806e-4067-9211-97be7e42cee3",
  "created_at": "2021-08-07T09:49:35Z",
  "registration_plate": "AB1C DEF",
  "passengers": 3
}

We can use a window buffer in order to create periodic messages summarising the traffic for a period of time of this form:

{
  "traffic_light": "cbf2eafc-806e-4067-9211-97be7e42cee3",
  "created_at": "2021-08-07T10:00:00Z",
  "total_cars": 15,
  "passengers": 43
}

With the following config:

buffer:
  system_window:
    timestamp_mapping: root = this.created_at
    size: 1h

pipeline:
  processors:
    # Group messages of the window into batches of common traffic light IDs
    - group_by_value:
        value: '${! json("traffic_light") }'

    # Reduce each batch to a single message by deleting indexes > 0, and
    # aggregate the car and passenger counts.
    - mapping: |
        root = if batch_index() == 0 {
          {
            "traffic_light": this.traffic_light,
            "created_at": meta("window_end_timestamp"),
            "total_cars": json("registration_plate").from_all().unique().length(),
            "passengers": json("passengers").from_all().sum(),
          }
        } else { deleted() }

Fields

timestamp_mapping

Bloblang mapping applied to each message during ingestion that provides the timestamp to use for allocating it a window. By default, the function now() is used in order to generate a fresh timestamp at the time of ingestion (the processing time), whereas this mapping can instead extract a timestamp from the message itself (the event time).

The timestamp value assigned to root must either be a numerical unix time in seconds (with up to nanosecond precision via decimals), or a string in ISO 8601 format. If the mapping fails or provides an invalid result the message will be dropped (with logging to describe the problem).

Type: string

Default: "root = now()"

# Examples

timestamp_mapping: root = this.created_at

timestamp_mapping: root = meta("kafka_timestamp_unix").number()

size

A duration string describing the size of each window. By default windows are aligned to the zeroth minute and zeroth hour on the UTC clock, meaning windows of 1 hour duration will match the turn of each hour in the day, this can be adjusted with the offset field.

Type: string

# Examples

size: 30s

size: 10m

slide

An optional duration string describing by how much time the beginning of each window should be offset from the beginning of the previous, and therefore creates sliding windows instead of tumbling. When specified, this duration must be smaller than the size of the window.

Type: string

Default: ""

# Examples

slide: 30s

slide: 10m

offset

An optional duration string to offset the beginning of each window by, otherwise, they are aligned to the zeroth minute and zeroth hour on the UTC clock. The offset cannot be a larger or equal measure to the window size or the slide.

Type: string

Default: ""

# Examples

offset: -6h

offset: 30m

allowed_lateness

An optional duration string describing the length of time to wait after a window has ended before flushing it, allowing late arrivals to be included. Since this windowing buffer uses the system clock, an allowed lateness can improve the matching of messages when using event time.

Type: string

Default: ""

# Examples

allowed_lateness: 10s

allowed_lateness: 1m