system_window¶
Chops a stream of messages into tumbling or sliding windows of fixed temporal size, following the system clock.
# Config fields, showing default values
buffer:
system_window:
timestamp_mapping: root = now()
size: ""
slide: ""
offset: ""
allowed_lateness: ""
A window is a grouping of messages that fit within a discrete measure of time following the system clock. Messages are allocated to a window either by the processing time (the time at which they're ingested) or by the event time, and this is controlled via the timestamp_mapping
field.
In tumbling mode (default) the beginning of a window immediately follows the end of a prior window. When the buffer is initialized the first window to be created and populated is aligned against the zeroth minute of the zeroth hour of the day by default, and may therefore be open for a shorter period than the specified size.
A window is flushed only once the system clock surpasses its scheduled end. If an allowed_lateness
is specified then the window will not be flushed until the scheduled end plus that length of time.
When a message is added to a window it has a metadata field window_end_timestamp
added to it containing the timestamp of the end of the window as an RFC3339 string.
Sliding Windows¶
Sliding windows begin from an offset of the prior windows' beginning rather than its end, and therefore messages may belong to multiple windows. In order to produce sliding windows specify a slide
duration.
Back Pressure¶
If back pressure is applied to this buffer either due to output services being unavailable or resources being saturated, windows older than the current and last according to the system clock will be dropped in order to prevent unbounded resource usage. This means you should ensure that under the worst case scenario you have enough system memory to store two windows' worth of data at a given time (plus extra for redundancy and other services).
If messages could potentially arrive with event timestamps in the future (according to the system clock) then you should also factor in these extra messages in memory usage estimates.
Delivery Guarantees¶
This buffer honours the transaction model within Benthos in order to ensure that messages are not acknowledged until they are either intentionally dropped or successfully delivered to outputs. However, since messages belonging to an expired window are intentionally dropped there are circumstances where not all messages entering the system will be delivered.
When this buffer is configured with a slide duration it is possible for messages to belong to multiple windows, and therefore be delivered multiple times. In this case the first time the message is delivered it will be acked (or nacked) and subsequent deliveries of the same message will be a "best attempt".
During graceful termination if the current window is partially populated with messages they will be nacked such that they are re-consumed the next time the service starts.
Examples¶
Counting Passengers at Traffic¶
Given a stream of messages relating to cars passing through various traffic lights of the form:
{
"traffic_light": "cbf2eafc-806e-4067-9211-97be7e42cee3",
"created_at": "2021-08-07T09:49:35Z",
"registration_plate": "AB1C DEF",
"passengers": 3
}
We can use a window buffer in order to create periodic messages summarising the traffic for a period of time of this form:
{
"traffic_light": "cbf2eafc-806e-4067-9211-97be7e42cee3",
"created_at": "2021-08-07T10:00:00Z",
"total_cars": 15,
"passengers": 43
}
With the following config:
buffer:
system_window:
timestamp_mapping: root = this.created_at
size: 1h
pipeline:
processors:
# Group messages of the window into batches of common traffic light IDs
- group_by_value:
value: '${! json("traffic_light") }'
# Reduce each batch to a single message by deleting indexes > 0, and
# aggregate the car and passenger counts.
- mapping: |
root = if batch_index() == 0 {
{
"traffic_light": this.traffic_light,
"created_at": meta("window_end_timestamp"),
"total_cars": json("registration_plate").from_all().unique().length(),
"passengers": json("passengers").from_all().sum(),
}
} else { deleted() }
Fields¶
timestamp_mapping
¶
A Bloblang mapping applied to each message during ingestion that provides the timestamp to use for allocating it a window. By default, the function now()
is used in order to generate a fresh timestamp at the time of ingestion (the processing time), whereas this mapping can instead extract a timestamp from the message itself (the event time).
The timestamp value assigned to root
must either be a numerical unix time in seconds (with up to nanosecond precision via decimals), or a string in ISO 8601 format. If the mapping fails or provides an invalid result the message will be dropped (with logging to describe the problem).
Type: string
Default: "root = now()"
# Examples
timestamp_mapping: root = this.created_at
timestamp_mapping: root = meta("kafka_timestamp_unix").number()
size
¶
A duration string describing the size of each window. By default windows are aligned to the zeroth minute and zeroth hour on the UTC clock, meaning windows of 1 hour duration will match the turn of each hour in the day, this can be adjusted with the offset
field.
Type: string
slide
¶
An optional duration string describing by how much time the beginning of each window should be offset from the beginning of the previous, and therefore creates sliding windows instead of tumbling. When specified, this duration must be smaller than the size
of the window.
Type: string
Default: ""
offset
¶
An optional duration string to offset the beginning of each window by, otherwise, they are aligned to the zeroth minute and zeroth hour on the UTC clock. The offset cannot be a larger or equal measure to the window size or the slide.
Type: string
Default: ""
allowed_lateness
¶
An optional duration string describing the length of time to wait after a window has ended before flushing it, allowing late arrivals to be included. Since this windowing buffer uses the system clock, an allowed lateness can improve the matching of messages when using event time.
Type: string
Default: ""