Skip to content

Orchestrating Multiple Workflows from a Single Workflow

This section demonstrates how to orchestrate multiple workflows by referencing separate YAML files in a master Workflow YAML file. By following this approach, you can streamline complex data processing tasks into manageable pieces.

Code Snippet

The code snippet below shows the master Workflow or super dag (super_dag.yaml) that references two separate Workflows (stored in workflow.yaml and profile.yaml) by specifying the file path within the file attribute.

Click here to view the code snippets
version: v1
name: wf-city
type: workflow
owner: aayushisolanki
tags:
  - Tier.Gold
  - mpulse.altamed
description: The "wf-city" is a data pipeline focused on ingesting city data from icebase to icebase AltaMed healthcare provider. It involves stages such as data ingestion, tranformation and profiling.
workflow:
  title: City Data Pipeline
  dag: 
   - name: city-data-ingestion
     file: /home/iamgroot/resources/workflow/workflow.yml
     retry:
       count: 2
       strategy: "OnFailure"

   - name: icebase-city-profiling
     file: /home/iamgroot/resources/workflow/profile.yml
     retry:
       count: 2
       strategy: "OnFailure"
     dependencies:
       - city-data-ingestion
workeflow.yaml
# Resource Section
#This is a icebase to icebase workflow hence it's way of giving input and output is diff.
version: v1 
name: wf-tmdc-01 
type: workflow 
tags:
  - Connect
  - City
description: The job ingests city data from dropzone into raw zone

# Workflow-specific Section
workflow:
  title: Connect City
  dag: 

# Job 1 Specific Section
    - name: wf-job1 # Job 1 name
      title: City Dimension Ingester
      description: The job ingests city data from dropzone into raw zone
      spec:
        tags:
          - Connect
          - City
        stack: flare:5.0 # The job gets executed upon the Flare Stack, so its a Flare Job
        compute: runnable-default

        # Flare Stack-specific Section
        stackSpec:
          driver:
            coreLimit: 1100m
            cores: 1
            memory: 1048m
          job:
            explain: true  #job section will contain explain, log-level, inputs, outputs and steps
            logLevel: INFO

            inputs:
              - name: city_connect
                query: SELECT
                        *,
                        date_format (NOW(), 'yyyyMMddHHmm') AS version1,
                        NOW() AS ts_city1
                      FROM
                        icebase.retail.city 
            #    dataset: dataos://icebase:retail/city
             #   format: Iceberg
                options: 
                  SSL: "true"
                  driver: "io.trino.jdbc.TrinoDriver"
                  cluster: "system"

              #   schemaPath: dataos://thirdparty01:none/schemas/avsc/city.avsc #schema path is not necessary for icebase to icebase

            outputs:
              - name: city_connect
                dataset: dataos://icebase:retail/city01?acl=rw
                format: Iceberg
                description: City data ingested from retail city
                tags:
                  - retail
                  - city
                options:
                  saveMode: overwrite
profiling.yaml
# Resource Section
#This is a icebase to icebase workflow hence it's way of giving input and output is diff.
version: v1 
name: wf-tmdc-01 
type: workflow 
tags:
  - Connect
  - City
description: The job ingests city data from dropzone into raw zone

# Workflow-specific Section
workflow:
  title: Connect City
  dag: 

# Job 1 Specific Section
    - name: wf-job1 # Job 1 name
      title: City Dimension Ingester
      description: The job ingests city data from dropzone into raw zone
      spec:
        tags:
          - Connect
          - City
        stack: flare:5.0 # The job gets executed upon the Flare Stack, so its a Flare Job
        compute: runnable-default

        # Flare Stack-specific Section
        stackSpec:
          driver:
            coreLimit: 1100m
            cores: 1
            memory: 1048m
          job:
            explain: true  #job section will contain explain, log-level, inputs, outputs and steps
            logLevel: INFO

            inputs:
              - name: city_connect
                query: SELECT
                        *,
                        date_format (NOW(), 'yyyyMMddHHmm') AS version1,
                        NOW() AS ts_city1
                      FROM
                        icebase.retail.city 
            #    dataset: dataos://icebase:retail/city
             #   format: Iceberg
                options: 
                  SSL: "true"
                  driver: "io.trino.jdbc.TrinoDriver"
                  cluster: "system"

              #   schemaPath: dataos://thirdparty01:none/schemas/avsc/city.avsc #schema path is not necessary for icebase to icebase

            outputs:
              - name: city_connect
                dataset: dataos://icebase:retail/city01?acl=rw
                format: Iceberg
                description: City data ingested from retail city
                tags:
                  - retail
                  - city
                options:
                  saveMode: overwrite

Implementation Flow

  • Save the above code snippets into separate YAML files.

  • Once you do that mention the path (relative or absolute) of the workflow.yaml and profiling.yaml in the file property of the master file super_dag.yaml.

  • Apply the super_dag.yaml command from the CLI.

When you apply the super_dag.yaml file, using CLI, it calls in the workflow.yaml file first and the profiling.yaml file second as the second file is dependent upon the first. The Workflow within the workflow.yaml writes the data from icebase.retail.city depot to icebase.retail.city01. Once that is done the second workflow is executed which does profiling on the same data from icebase.retail.city01. This finishes the two processing tasks by applying just one file.