Single-Run Workflow¶
A Single-run Workflow represent a single-time execution of a sequence of jobs. It does not include a schedule
section.
Code Snippet
The following code snippet illustrates a Workflow with two jobs. The first one executes upon Flare Stack that reads data from the thirdparty01
depot in batch mode and subsequently writes to the icebase
depot. Once the first job, completes it execution the second job starts execution, which does the profilling on the data using same flare stack.
Sample Single Run Workflow manifest
workflow.yml
version: v1
name: wf-city
type: workflow
owner: iamgroot
tags:
- Tier.Gold
- mpulse.altamed
description: The "wf-city" is a data pipeline focused on ingesting city data from icebase to icebase AltaMed healthcare provider. It involves stages such as data ingestion, tranformation and profiling.
workflow:
title: City Data Pipeline
dag:
- name: city-data-ingestion
title: City Dimension Ingester
description: The job ingests city data from dropzone into raw zone
spec:
tags:
- Connect
- City
stack: flare:5.0 # The job gets executed upon the Flare Stack, so its a Flare Job
compute: runnable-default
# Flare Stack-specific Section
stackSpec:
driver:
coreLimit: 1100m
cores: 1
memory: 1048m
job:
explain: true #job section will contain explain, log-level, inputs, outputs and steps
logLevel: INFO
inputs:
- name: city_connect
query: SELECT
*,
date_format (NOW(), 'yyyyMMddHHmm') AS version1,
NOW() AS ts_city1
FROM
icebase.retail.city
options:
SSL: "true"
driver: "io.trino.jdbc.TrinoDriver"
cluster: "system"
outputs:
- name: city_connect
dataset: dataos://icebase:retail/city01?acl=rw
format: Iceberg
description: City data ingested from retail city
tags:
- retail
- city
options:
saveMode: overwrite
- name: icebase-city-profiling
title: Profiler City01
description: The job performs profiling on city01 data
spec:
envs:
DISABLE_RAW_FILE_SYSTEM_PERMISSION_SET: "true"
tags:
- Fides
- City
- has_profile
stack: flare:5.0
compute: runnable-default
title: City Profile
persistentVolume: # Define Persistent Volume
name: persistent-v
directory: fides
stackSpec:
driver:
coreLimit: 2400m
cores: 2
memory: 3072m
executor:
coreLimit: 2400m
cores: 2
instances: 1
memory: 4096m
job:
explain: true
logLevel: WARN
inputs:
- name: profile_city
dataset: dataos://icebase:retail/city01 # Dataset Name
format: iceberg
profile:
# basic | intermediate | advanced
level: basic
filters:
- type: expression
expression: "state_code='AL'" # Filter Expression
sparkConf:
- spark.sql.shuffle.partitions: 10
- spark.default.parallelism: 10
dependencies:
- city-data-ingestion
Here are it's workflow and profiling manifests to do a hands on:
Sample Workflows
workflow.yml
# Resource Section
#This is a icebase to icebase workflow hence it's way of giving input and output is diff.
version: v1
name: wf-tmdc-01
type: workflow
tags:
- Connect
- City
description: The job ingests city data from dropzone into raw zone
# Workflow-specific Section
workflow:
title: Connect City
dag:
# Job 1 Specific Section
- name: wf-job1 # Job 1 name
title: City Dimension Ingester
description: The job ingests city data from dropzone into raw zone
spec:
tags:
- Connect
- City
stack: flare:5.0 # The job gets executed upon the Flare Stack, so its a Flare Job
compute: runnable-default
# Flare Stack-specific Section
stackSpec:
driver:
coreLimit: 1100m
cores: 1
memory: 1048m
job:
explain: true #job section will contain explain, log-level, inputs, outputs and steps
logLevel: INFO
inputs:
- name: city_connect
query: SELECT
*,
date_format (NOW(), 'yyyyMMddHHmm') AS version1,
NOW() AS ts_city1
FROM
icebase.retail.city
# dataset: dataos://icebase:retail/city
# format: Iceberg
options:
SSL: "true"
driver: "io.trino.jdbc.TrinoDriver"
cluster: "system"
# schemaPath: dataos://thirdparty01:none/schemas/avsc/city.avsc #schema path is not necessary for icebase to icebase
outputs:
- name: city_connect
dataset: dataos://icebase:retail/city01?acl=rw
format: Iceberg
description: City data ingested from retail city
tags:
- retail
- city
options:
saveMode: overwrite
profiling.yaml
version: v1
name: profiler-raw-city-01
type: workflow
tags:
- Fides
- City
- has_profile
description: The job performs profiling on top of city data
workflow:
title: City Profiler
dag:
- name: profiling-city01
title: Profiler City01
description: The job performs profiling on city01 data
spec:
envs:
DISABLE_RAW_FILE_SYSTEM_PERMISSION_SET: "true"
tags:
- Fides
- City
- has_profile
stack: flare:5.0
compute: runnable-default
title: City Profile
persistentVolume: # Define Persistent Volume
name: persistent-v
directory: fides
stackSpec:
driver:
coreLimit: 2400m
cores: 2
memory: 3072m
executor:
coreLimit: 2400m
cores: 2
instances: 1
memory: 4096m
job:
explain: true
logLevel: WARN
inputs:
- name: profile_city
dataset: dataos://icebase:retail/city01 # Dataset Name
format: iceberg
profile:
# basic | intermediate | advanced
level: basic
filters:
- type: expression
expression: "state_code='AL'" # Filter Expression
sparkConf:
- spark.sql.shuffle.partitions: 10
- spark.default.parallelism: 10