Streaming Jobs¶
In scenarios where there is a continuous requirement to process incoming data in real-time, Flare stream jobs offer an effective solution. However, it is advisable to exercise caution when creating stream jobs, as they should be reserved for cases where strict latency requirements exist, typically demanding a processing time of less than a minute, considering that they may incur higher computing costs.
Attribute configuration table for streaming job:¶
Feature | Description | Used In Which Trigger Modes? | Notes |
---|---|---|---|
trigger (Trigger Mode) |
Specifies the trigger mechanism that defines how and when streaming data is processed. Supported modes include ProcessingTime , Continuous , Once , and AvailableNow . |
Applies to all modes (Once , AvailableNow , ProcessingTime , Continuous ) |
Defines how the Stream job triggers micro-batches or continuous tasks. |
trigger durations |
Configures the time interval between micro-batches or checkpoint commits depending on trigger mode. Used to control frequency of micro-batch execution or commit interval in continuous mode. | Used in ProcessingTime , Continuous modes only |
Defines the frequency (e.g., 10s) between batches or commits. |
batchMode |
Used in orchestration layers to simulate batch-style jobs. Implied by triggers like Once or AvailableNow . |
Typically used when using Once , AvailableNow triggers |
Forces Stream Job to behave like a batch-style job. Implied by the trigger itself. |
foreachBatchMode |
Applies custom logic to each micro-batch, enabling batch-style integration with external systems. | Used with any trigger that processes micro-batches (ProcessingTime , AvailableNow , Once ) |
Provides per-micro-batch dataset access. Not used in pure Continuous mode. |
startingOffsets (Kafka-specific) |
Specifies where Streaming job starts consuming topics (like in Kafka) when no checkpoint exists. Accepts earliest , latest . |
Effective during first run of the query (with any trigger mode) if no checkpoint exists | Gets ignored if checkpointLocation exists (uses stored offsets). |
checkpointLocation |
For output sinks that support end-to-end fault-tolerance (such as Kafka, etc.), this setting specifies the location to store all checkpoint information, including offsets, state, and metadata required to resume or recover streaming queries. This must be a directory in an HDFS-compatible fault-tolerant file system. Mandatory for exactly-once semantics or stateful operations. | Required/recommended in all trigger modes if fault-tolerance or exactly-once is desired | Acts as the single source of truth after the first execution, enabling recovery and progress tracking. |
Important considerations
- It is mandatory to provide
isStream:true
in the input section when creating a streaming job. - Always provide
checkpointLocation
to save all the progress information.
ForeachBatchMode¶
The foreachBatchMode
option enables treating each micro-batch of streaming data as a standard batch dataset when set to true.
This is particularly useful for storage systems that do not support direct streaming writes (e.g., JDBC, Snowflake, Iceberg, MongoDB), as it allows leveraging existing batch data writers.
It also facilitates writing the same micro-batch to multiple destinations, supporting scenarios like data replication or multi-environment writes.
Note
foreachBatchMode
does not work with the continuous processing mode as it fundamentally relies on the micro-batch execution of a streaming query.
Trigger modes¶
The triggerMode
controls the frequency and style in which streaming data is processed. By setting an appropriate trigger mode, you can decide whether the streaming query operates in micro-batch mode (processing data in discrete intervals) or in continuous mode (processing data as soon as it arrives).
This setting is crucial to balance between latency, throughput, and cost-efficiency, depending on your business needs. Here are the different kinds of triggers that are supported:
Unspecified(default)
¶
If no trigger setting is explicitly specified, then by default, the query will be executed in micro-batch mode, where micro-batches will be generated as soon as the previous micro-batch has completed processing.
Once
¶
The query will execute only one micro-batch to process all the available data and then stop on its own. This is useful in scenarios you want to periodically spin up a cluster, process everything that is available since the last period, and then shutdown the cluster. In some case, this may lead to significant cost savings.
- Latency: Not applicable ( No streaming continuity — acts like a batch job for all unprocessed data).
- Use-cases: Ideal for periodic streaming jobs (e.g., nightly ETL processes).
Info
You need to run it over a cron to periodically check the events came in that duration, process them and stop. For other trigger modes, cron is not needed.
Example
version: v1
name: read-kafka
type: workflow
tags:
- read-kafka
description: this jobs reads data from kafka
workflow:
dag:
- name: sample-read-kafka
title: read kafka sample data
spec:
stack: flare:6.0
compute: runnable-default
stackSpec:
job:
explain: true
streaming:
batchMode: true
triggerMode: Once #Once for cont
inputs:
- name: sample_data
dataset: dataos://systemstreams:audit/themis_query_events_03?acl=rw
isStream: true
options:
startingOffsets: earliest
logLevel: INFO
outputs:
- name: sample_data
dataset: dataos://lakehouse:stream/stream_read_once_04?acl=rw
format: Iceberg
options:
saveMode: append
checkpointLocation: dataos://lakehouse:checkpoints/tqueryeventsync01
AvailableNow
¶
Similar to Once
trigger, the query will process all the available data and then stop on its own. The difference is that, it will process the data in (possibly) multiple micro-batches based on the source options which will result in better query scalability.
- This trigger provides a strong guarantee of processing: regardless of how many batches were left over in previous run, it ensures all available data at the time of execution gets processed before termination. All uncommitted batches will be processed first.
- Watermark gets advanced per each batch, and no-data batch gets executed before termination if the last batch advances the watermark. This helps to maintain smaller and predictable state size and smaller latency on the output of stateful operators.
Example
version: v1
name: resource-metadata-lakehouse-sink-01
type: workflow
tags:
- read
workflow:
dag:
- name: resource-metadata-lakehouse-sink
title: Resource Metadata Lakehouse Sink
description: sinks resource metadata into an iceberg warehouse
spec:
tags:
- Connect
stack: flare:6.0
compute: runnable-default
stackSpec:
job:
explain: true
showPreviewLines: 20
streaming:
forEachBatchMode: true
triggerMode: AvailableNow
checkpointLocation: dataos://lakehouse:stream/${CHECKPOINT_DATASET}
inputs:
- name: input
dataset: dataos://${DEPOT_NAME}:${COLLECTION_NAME}/stream_${DATASET_SUFFIX}?acl=rw
isStream: true
options:
startingOffsets: earliest
logLevel: INFO
outputs:
- name: finalDf
dataset: dataos://${LAKEHOUSE_DEPOT_NAME}:${LAKEHOUSE_COLLECTION_NAME}/${STREAM_FOR_EACH_DATASET}?acl=rw
format: Iceberg
options:
saveMode: append
steps:
- sequence:
- name: finalDf
sql: SELECT * FROM input
ProcessingTime
¶
Processes data at a fixed time interval (e.g., every 5 seconds). The query will be executed with micro-batches mode, where micro-batches will be kicked off at the user-specified intervals.
- If the previous micro-batch completes within the interval, then the engine will wait until the interval is over before kicking off the next micro-batch.
- If the previous micro-batch takes longer than the interval to complete (i.e. if an interval boundary is missed), then the next micro-batch will start as soon as the previous one completes (i.e., it will not wait for the next interval boundary).
-
If no new data is available, then no micro-batch will be kicked off.
-
Latency: User-defined interval (e.g., every 5 or 10 seconds).
-
Use-cases:
-
Scenarios where small batches of data need to be processed periodically.
-
Applications requiring a balance between resource efficiency and acceptable latency, such as dashboard refresh, metric aggregation, or periodic data transformation.
-
Example
version: v1
name: resource-metadata-lakehouse-sink-01
type: workflow
tags:
- read
workflow:
dag:
- name: resource-metadata-lakehouse-sink
title: Resource Metadata Lakehouse Sink
description: sinks resource metadata into an iceberg warehouse
spec:
tags:
- Connect
stack: flare:6.0
compute: runnable-default
stackSpec:
job:
explain: true
showPreviewLines: 20
streaming:
forEachBatchMode: true
triggerMode: ProcessingTime
triggerDuration: 600 seconds
checkpointLocation: dataos://lakehouse:stream/${CHECKPOINT_DATASET}
inputs:
- name: input
dataset: dataos://${DEPOT_NAME}:${COLLECTION_NAME}/stream_${DATASET_SUFFIX}?acl=rw
isStream: true
options:
startingOffsets: earliest
logLevel: INFO
outputs:
- name: finalDf
dataset: dataos://${LAKEHOUSE_DEPOT_NAME}:${LAKEHOUSE_COLLECTION_NAME}/${STREAM_FOR_EACH_DATASET}?acl=rw
format: Iceberg
options:
saveMode: append
steps:
- sequence:
- name: finalDf
sql: SELECT * FROM input
Continuous
¶
A trigger that continuously processes streaming data, asynchronously checkpointing at a specified interval (e.g., every 1 second).
-
Latency: Near real-time (sub-second).
-
Use-cases: Ideal for use cases with strict low-latency requirements and immediate event processing needs, such as financial transactions or fraud detections.
Info
There is no need to use forEachBatchMode: true
in Continuous trigger mode as the data is fetched in near real-time.
Example
version: v1
name: read-kafka
type: workflow
tags:
- read-kafka
description: this jobs reads data from kafka
workflow:
dag:
- name: sample-read-kafka
title: read kafka sample data
spec:
stack: flare:6.0
compute: runnable-default
stackSpec:
driver:
coreLimit: 2000m
cores: 1
memory: 1000m
executor:
coreLimit: 2000m
cores: 1
instances: 1
memory: 2000m
job:
explain: true
streaming:
batchMode: true
triggerMode: Continuous # options: ProcessingTime | Continuous | AvailableNow | Once
triggerDuration: 1 seconds
checkpointLocation: dataos://lakehouse:stream/stream_cl1?acl=rw
inputs:
- name: sample_data
dataset: dataos://stkafka:default/tmdc?acl=rw
format: kafkajson
isStream: true
options:
startingOffsets: earliest
logLevel: INFO
outputs:
- name: sample_data
dataset: dataos://lakehouse:stream/as_stream?acl=rw
format: Iceberg
options:
saveMode: append
Kafka specific configurations¶
In Kafka, every message is structured as a key-value pair:
-
The key helps determine which partition the message is routed to, ensuring ordering of messages with the same key.
-
The value contains the actual data payload that is processed downstream.
Even if the producer does not explicitly provide a key (null), Kafka maintains the key-value message structure.
When consumed in Flare streaming, messages from Kafka arrive in their native key and value structure, but both are received as raw binary (byte[]) format.
This raw format is not human-readable and must be explicitly deserialized by the developer inside Flare.
The most common and straightforward approach to deserialize these fields is to use the CAST
operation during transformations, converting the key and value into a readable string format.
Example transformation
steps:
- sequence:
- name: final
sql: |
SELECT
*,
CAST(key AS STRING) AS key_string,
CAST(value AS STRING) AS value_string
FROM
kafka
Click here to view the full manifest file
version: v1
name: kafka-lakehouse-if-stream-flare
type: workflow
tags:
- Tier.Gold
description: This workflow is responsible for getting insight finder insights data from kafka and persist the lakehouse
workflow:
title: Insight finder raw insights
dag:
- name: kafka-lakehouse-if-stream-flare
description: This workflow is responsible for getting insight finder insights data from kafka and persist the lakehouse
title: Insight finder raw insights
spec:
tags:
- Tier.Gold
stack: flare
compute: query-default
stackSpec:
job:
explain: true
streaming:
triggerMode: Continous
checkpointLocation: dataos://lakehouse:sys01/if_kafka_raw/if_kafka_raw_ip_chkpt
showPreviewLines: 20
inputs:
- name: kafka
dataset: dataos://kafkadepot:none/if-raw-test?acl=rw
format: KAFKAJSON
isStream: true
options:
startingOffsets: earliest
# failOnDataLoss: false
logLevel: INFO
outputs:
- name: final
dataset: dataos://lakehouse:raw/if_raw_data?acl=rw
format: Iceberg
description: The kafka test depot
tags:
- Tier.Gold
options:
saveMode: append
iceberg:
properties:
write.format.default: parquet
write.metadata.compression-codec: gzip
write.metadata.previous-versions-max: "10"
history.expire.max-snapshot-age-ms: "7200000"
title: Device Audit Record Source Dataset
steps:
- sequence:
- name: final
sql: |
SELECT
*,
CAST(key AS STRING) AS key_string,
CAST(value AS STRING) AS value_string
FROM
kafka