Skip to content

Case Scenario

Batch Jobs

Batch jobs are utilized in situations where there is a need to recompute all changed datasets in every run, ensuring consistent end-to-end performance on each occasion.

Simple Batch Jobs follow a straightforward process that involves:

  1. Reading data from a specified set of depots.
  2. Applying transformations to the data.
  3. Writing the transformed data to another set of depots.
Case Scenario The code snippet below demonstrates a Workflow involving a single Flare batch job that reads the input dataset from thirdparty01 depot, perform transformation using Flare Stack, and stores the output dataset in the bqdepot depot. **Code Snippet**
name: bq-write-01
version: v1
type: workflow
tags:
  - bq
  - City
title: Write bq
workflow:
  dag:
    - name: city-write-bq-01
      title: City write bq
      description: This job read data from azure and writes to Sbq
      spec:
        tags:
          - Connect
          - City
        stack: flare:5.0
        compute: runnable-default
        stackSpec:
          job:
            explain: true
            inputs:
              - name: city_connect
                dataset: dataos://thirdparty01:none/city
                format: csv
                schemaPath: dataos://thirdparty01:none/schemas/avsc/city.avsc
            logLevel: INFO
            outputs:
              - name: finalDf
                dataset: dataos://bqdepot:dev/city?acl=rw
                format: bigquery
                options:
                  saveMode: overwrite
                  bigquery:
                    temporaryBucket: tmdc-development-new
            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM city_connect LIMIT 10
In the context of depots, with the exception of those supporting Iceberg file formats with Hadoop Catalog type, the metadata of the datasets is automatically surfaced in the Metis. However, for depots utilizing the Iceberg file format with Hadoop Catalog type, the metadata version needs to be updated manually using the Toolbox Stack or using the set-metadata command on the DataOS CLI. Once the metadata is updated, it becomes discoverable and accessible through the Metis UI.

Stream Jobs

In scenarios where there is a continuous requirement to process incoming data in real-time, Flare Stream Jobs offer an effective solution. However, it is advisable to exercise caution when creating Stream Jobs, as they should be reserved for cases where strict latency requirements exist, typically demanding a processing time of less than a minute, considering that they may incur higher computing costs.

Case Scenario The following code snippet illustrates a Workflow involving a Flare Stream Job that reads data from the thirdparty01 depot in a streaming format and subsequently written to the eventhub depot. During this process, all intermediate streams of data batches are stored at the location specified in the checkpointLocation attribute. **Code Snippet**
version: v1
name: write-eventhub-b-02
type: workflow
tags:
  - eventhub
  - write
description: this jobs reads data from thirdparty and writes to eventhub
workflow:
  dag:
    - name: eventhub-write-b-02
      title: write data to eventhub
      description: write data to eventhub
      spec:
        tags:
          - Connect
        stack: flare:5.0
        compute: runnable-default
        stackSpec:
          job:
            explain: true
            streaming:
              checkpointLocation: /tmp/checkpoints/devd01
              forEachBatchMode: "true"
            inputs:
              - name: input
                dataset: dataos://thirdparty01:none/city
                format: csv
                schemaPath: dataos://thirdparty01:none/schemas/avsc/city.avsc

            logLevel: INFO
            outputs:
              - name: finalDf
                dataset: dataos://eventhub:default/eventhub01?acl=rw
                format: Eventhub

            steps:
              - sequence:
                - name: finalDf
                  sql: SELECT * FROM input
In the context of output depots, the automatic surfacing of metadata in the Metis is applicable to all depots except those supporting Iceberg file formats with Hadoop Catalog type. For such depots, manual updating of the metadata version is required using the Toolbox Stack. If there is a need to obtain the metadata at the end of transformation, when the entire data has been completely written to the output depot, you can execute the Toolbox Stack once at the conclusion of the transformation process. Alternatively, if metadata is required at a specific cadence, scheduling the job upon the Toolbox Stack can fulfill this requirement. The code snippet below illustrates a sample schedule workflow for updating the metadata pointer using the Toolbox Stack in output depots with Iceberg file format with Hadoop Catalog type.
version: v1
name: dataos-tool-random-user
type: workflow
workflow:
  schedule:
    cron: '*/5 * * * *'
  dag:
    - name: dataos-tool-job
      spec:
        stack: toolbox
        compute: runnable-default
        stackSpec:
          dataset: dataos://icebase:kafka/random_users_icebase01?acl=rw
          action:
            name: set_version
            value: latest
Once the metadata is updated, it becomes discoverable and accessible through the Metis UI.

Incremental Jobs

Computes only the changed rows or files of data since the last build, reducing overall computation and latency. Incremental Jobs only compute the rows or files of data that have changed since the last build. They are suitable for processing event data and datasets with frequent changes. Incremental jobs reduce overall computation and significantly decrease end-to-end latency compared to batch jobs. Moreover, compute costs for incremental jobs can be lower than batch jobs when dealing with high-scale datasets, as the amount of actual computation is minimized. By processing only new data, incremental jobs eliminate the need to redo analysis on large datasets where most information remains unchanged. For case scenarios on Incremental Jobs, refer to here.

Incremental Job

Data Transformation

Read and write from Iceberg branch

Data Profiling Jobs

Data Quality Jobs (Assertions)

Compression

Merge Data

Enrichment

Merge Into Functionality

Partitioning

Partition Evolution

Data Replay

Concurrent Writes

Query Dataset for Job in Progress

Bucketing

Caching

Job Optimization by Tuning

Column Tagging

Data Syndication

Syndication

Flare Actions

The below functionality is only supported in the DataOS managed depot, Icebase

Delete from Dataset

Expire Snapshots

Remove Orphans

Rewrite Manifest Files

Rewrite Orphans

The remove_orphans action cleans up orphans files older than a specified time period. This action may take a long time to finish if you have lots of files in data and metadata directories. It is recommended to execute this periodically, but you may not need to execute this often.

Case Scenario The following code snippet demonstrates removing orphan files older than the time specified in the `olderThan` in Unix epoch format. The following code snippet aims to remove orphan files within Iceberg tables in DataOS Depot using the ``remove_orphans`` action. The task relies on the remove_orphans action, which requires the inputDf dataset as an input. This dataset is defined as dataos://icebase:actions/random_users_data and is in Iceberg format. Additionally, the action provides options, such as the olderThan parameter, which specifies the timestamp (in Unix format) for identifying orphan files.
version: v1 
name: orphans 
type: workflow 
tags: 
  - orphans
workflow: 
  title: Remove orphan files 
  dag: 
    - name: orphans 
      title: Remove orphan files 
      spec: 
        tags: 
          - orphans
        stack: flare:5.0 
        compute: runnable-default 
        stackSpec: 
          job: 
            explain: true 
            logLevel: INFO 
            inputs: 
              - name: inputDf 
                dataset: dataos://icebase:actions/random_users_data 
                format: Iceberg 
            actions: # Flare Action
              - name: remove_orphans # Action Name
                input: inputDf # Input Dataset Name
                options: # Options
                  olderThan: "1674201289720" # Timestamp in Unix Format

Rewrite Dataset

The rewrite_dataset action provided by DataOS allows for the parallel compaction of data files in Iceberg tables using Flare. This action efficiently reduces the size of data files to meet the specified target file size in bytes, as defined in the YAML configuration.

Case Scenario The following code snippet demonstrates the compression of Iceberg data files for a given input dataset, `inputDf`, stored in a DataOS Depot. The compression process aims to reduce the file size to a specified target size in bytes, denoted by the variable `target-file-size-bytes`.
version: v1 
name: rewrite 
type: workflow 
tags: 
  - Rewrite
workflow: 
  title: Compress iceberg data files 
  dag: 
    - name: rewrite 
      title: Compress iceberg data files 
      spec: 
        tags: 
          - Rewrite
        stack: flare:5.0 
        compute: runnable-default 
        stackSpec: 
          job: 
            explain: true 
            logLevel: INFO 
            inputs: 
              - name: inputDf 
                dataset: dataos://icebase:actions/random_users_data?acl=rw
                format: Iceberg 
            actions: # Flare Action
              - name: rewrite_dataset # Name of the action
                input: inputDf # Input Dataset Name 
                options: # Options
                  properties: # Properties
                    "target-file-size-bytes": "2048" # Target File Size in Bytes