Case Scenario¶
Batch Jobs¶
Batch jobs are utilized in situations where there is a need to recompute all changed datasets in every run, ensuring consistent end-to-end performance on each occasion.
Simple Batch Jobs follow a straightforward process that involves:
- Reading data from a specified set of depots.
- Applying transformations to the data.
- Writing the transformed data to another set of depots.
Case Scenario
The code snippet below demonstrates a Workflow involving a single Flare batch job that reads the input dataset fromthirdparty01
depot, perform transformation using Flare Stack, and stores the output dataset in the bqdepot
depot.
**Code Snippet**
name: bq-write-01
version: v1
type: workflow
tags:
- bq
- City
title: Write bq
workflow:
dag:
- name: city-write-bq-01
title: City write bq
description: This job read data from azure and writes to Sbq
spec:
tags:
- Connect
- City
stack: flare:5.0
compute: runnable-default
stackSpec:
job:
explain: true
inputs:
- name: city_connect
dataset: dataos://thirdparty01:none/city
format: csv
schemaPath: dataos://thirdparty01:none/schemas/avsc/city.avsc
logLevel: INFO
outputs:
- name: finalDf
dataset: dataos://bqdepot:dev/city?acl=rw
format: bigquery
options:
saveMode: overwrite
bigquery:
temporaryBucket: tmdc-development-new
steps:
- sequence:
- name: finalDf
sql: SELECT * FROM city_connect LIMIT 10
set-metadata
command on the DataOS CLI. Once the metadata is updated, it becomes discoverable and accessible through the Metis UI.
Stream Jobs¶
In scenarios where there is a continuous requirement to process incoming data in real-time, Flare Stream Jobs offer an effective solution. However, it is advisable to exercise caution when creating Stream Jobs, as they should be reserved for cases where strict latency requirements exist, typically demanding a processing time of less than a minute, considering that they may incur higher computing costs.
Case Scenario
The following code snippet illustrates a Workflow involving a Flare Stream Job that reads data from thethirdparty01
depot in a streaming format and subsequently written to the eventhub
depot. During this process, all intermediate streams of data batches are stored at the location specified in the checkpointLocation
attribute.
**Code Snippet**
version: v1
name: write-eventhub-b-02
type: workflow
tags:
- eventhub
- write
description: this jobs reads data from thirdparty and writes to eventhub
workflow:
dag:
- name: eventhub-write-b-02
title: write data to eventhub
description: write data to eventhub
spec:
tags:
- Connect
stack: flare:5.0
compute: runnable-default
stackSpec:
job:
explain: true
streaming:
checkpointLocation: /tmp/checkpoints/devd01
forEachBatchMode: "true"
inputs:
- name: input
dataset: dataos://thirdparty01:none/city
format: csv
schemaPath: dataos://thirdparty01:none/schemas/avsc/city.avsc
logLevel: INFO
outputs:
- name: finalDf
dataset: dataos://eventhub:default/eventhub01?acl=rw
format: Eventhub
steps:
- sequence:
- name: finalDf
sql: SELECT * FROM input
Incremental Jobs¶
Computes only the changed rows or files of data since the last build, reducing overall computation and latency. Incremental Jobs only compute the rows or files of data that have changed since the last build. They are suitable for processing event data and datasets with frequent changes. Incremental jobs reduce overall computation and significantly decrease end-to-end latency compared to batch jobs. Moreover, compute costs for incremental jobs can be lower than batch jobs when dealing with high-scale datasets, as the amount of actual computation is minimized. By processing only new data, incremental jobs eliminate the need to redo analysis on large datasets where most information remains unchanged. For case scenarios on Incremental Jobs, refer to here.
Data Transformation¶
Read and write from Iceberg branch
Data Quality Jobs (Assertions)
Query Dataset for Job in Progress
Data Syndication¶
Flare Actions¶
The below functionality is only supported in the DataOS managed depot, Icebase
Rewrite Orphans¶
The remove_orphans
action cleans up orphans files older than a specified time period. This action may take a long time to finish if you have lots of files in data and metadata directories. It is recommended to execute this periodically, but you may not need to execute this often.
Case Scenario
The following code snippet demonstrates removing orphan files older than the time specified in the `olderThan` in Unix epoch format. The following code snippet aims to remove orphan files within Iceberg tables in DataOS Depot using the ``remove_orphans`` action. The task relies on the remove_orphans action, which requires the inputDf dataset as an input. This dataset is defined as dataos://icebase:actions/random_users_data and is in Iceberg format. Additionally, the action provides options, such as the olderThan parameter, which specifies the timestamp (in Unix format) for identifying orphan files.version: v1
name: orphans
type: workflow
tags:
- orphans
workflow:
title: Remove orphan files
dag:
- name: orphans
title: Remove orphan files
spec:
tags:
- orphans
stack: flare:5.0
compute: runnable-default
stackSpec:
job:
explain: true
logLevel: INFO
inputs:
- name: inputDf
dataset: dataos://icebase:actions/random_users_data
format: Iceberg
actions: # Flare Action
- name: remove_orphans # Action Name
input: inputDf # Input Dataset Name
options: # Options
olderThan: "1674201289720" # Timestamp in Unix Format
Rewrite Dataset¶
The rewrite_dataset
action provided by DataOS allows for the parallel compaction of data files in Iceberg tables using Flare. This action efficiently reduces the size of data files to meet the specified target file size in bytes, as defined in the YAML configuration.
Case Scenario
The following code snippet demonstrates the compression of Iceberg data files for a given input dataset, `inputDf`, stored in a DataOS Depot. The compression process aims to reduce the file size to a specified target size in bytes, denoted by the variable `target-file-size-bytes`.version: v1
name: rewrite
type: workflow
tags:
- Rewrite
workflow:
title: Compress iceberg data files
dag:
- name: rewrite
title: Compress iceberg data files
spec:
tags:
- Rewrite
stack: flare:5.0
compute: runnable-default
stackSpec:
job:
explain: true
logLevel: INFO
inputs:
- name: inputDf
dataset: dataos://icebase:actions/random_users_data?acl=rw
format: Iceberg
actions: # Flare Action
- name: rewrite_dataset # Name of the action
input: inputDf # Input Dataset Name
options: # Options
properties: # Properties
"target-file-size-bytes": "2048" # Target File Size in Bytes