Data Quality Jobs (Assertions)¶
Case Scenario¶
In this scenario, we define assertions to evaluate the quality of data. To learn more about Data Quality Jobs, click here
Different Ways to Define Assertions¶
Assertions can be classified into two categories, namely, sinked or standalone assertions and pre-sink assertions.
Standalone Assertions¶
In Flare Workflow, assertions on pre-existing datasets can be specified by declaring assertions
section distinct from the inputs
section in the YAML definition. The following YAML of the Flare Workflow. It should be noted that, since check and metric information is stored in separate locations, the corresponding metadata must be updated individually. As a result, two separate Toolbox Workflow runs must be executed, one for metrics and the other for checks.
Code Snippets¶
Flare Workflow
version: v1 # Version
name: mtrx-chks-odr-enr-01 # Name of the Workflow
type: workflow
tags:
- Metrics
- Checks
description: The job performs metrics calculations and checks on order enriched data
workflow:
title: Metrics and checks
dag:
- name: metrics-chks-order-enrich
title: Metrics and checks
description: The job performs metrics calculations and checks on order enriched data
spec:
stack: flare:5.0
compute: runnable-default
tags:
- Metrics
title: Metrics and checks
description: The job performs metrics calculations and checks on order enriched data
stackSpec:
driver:
coreLimit: 3000m
cores: 2
memory: 4000m
executor:
coreLimit: 6000m
cores: 2
instances: 1
memory: 10000m
job:
explain: true
logLevel: INFO
#validate single input
inputs:
- name: source
dataset: dataos://icebase:retail/orders_enriched
format: iceberg
#override outputs, steps with specific template
assertions:
- column: order_amount
tests:
- avg > 1000.00
- max < 1000
- max > 1000
- distinct_count > 100
- missing_count < 100
- missing_percentage < 0.5
- column: order_amount
filter: brand_name == 'Urbane'
tests:
- avg > 500
- distinct_count > 100
- missing_count < 100
- column: brand_name
validFormat:
regex: Awkward
tests:
- invalid_count < 5
- invalid_percentage < 0.1
- sql: |
SELECT
AVG(order_amount) AS avg_order_amount,
MAX(order_amount) AS max_order_amount
FROM source
where brand_name = 'Awkward Styles'
tests:
- avg_order_amount > 1000
- max_order_amount < 1000
sparkConf:
- spark.serializer: org.apache.spark.serializer.KryoSerializer
- spark.sql.shuffle.partitions: "10"
- spark.memory.storageFraction: "0.1"
- spark.memory.fraction: "0.1"
- spark.shuffle.memoryFraction: "0.2"
Toolbox Workflow (for checks)
version: v1
name: dataos-tool-checks
type: workflow
tags:
- Metrics
- Checks
description: This workflow is for data tool of customer demo quality
workflow:
dag:
- name: checks-tool
spec:
stack: toolbox
stackSpec:
dataset: dataos://icebase:sys01/quality_checks?acl=rw
action:
name: set_version
value: latest
Toolbox Workflow (for metrics)
version: v1
name: dataos-tool-metrics
type: workflow
tags:
- Metrics
- Checks
description: This workflow for data tool
workflow:
dag:
- name: tool-metrics
spec:
stack: toolbox
stackSpec:
dataset: dataos://icebase:sys01/quality_metrics?acl=rw
action:
name: set_version
value: latest
Pre-Sink Assertions¶
Starting from Flare version 4.0, users have the capability to integrate assertions directly into their data transformation processes within the outputs
section, eliminating the requirement for standalone data quality jobs. This introduces pre-sink checks for datasets generated by Flare, enabling users to perform row-level checks on columns within the outputs
section to avoid writing erroneous data to the target.
Code Snippet¶
To utilize this functionality, pre-assertions can be established using the following YAML syntax:
version: v1 # Version
name: pre-sink-assertion-workflow # Name of the Workflow
type: workflow # Resource Type (Here its workflow)
tags: # Tags
- Assertion
title: Pre-Sink Assertions # Title of the workflow
description: |
The purpose of this workflow is to define pre-sink assertions.
workflow: # Workflow Section
dag: # Directed Acyclic Graph (DAG)
- name: pre-sink-assertion-job # Name of the Job
title: Pre-Sink Job # Title of the Job
description: |
The purpose of this job is to define pre-sink assertions.
spec: # Specs
tags: # Tags
- Assertions
- Quality
stack: flare:5.0 # Flare Stack Version (Here its 4.0)
compute: runnable-default # Compute
stackSpec: # Flare Section
job: # Job Section
explain: true # Explain
logLevel: INFO # Loglevel
showPreviewLines: 2
inputs: # Inputs Section
- name: sanity_city_input # Name of the Input Dataset
dataset: dataos://thirdparty01:none/city # Input Dataset UDL
format: csv # Input Dataset Format
schemaPath: dataos://thirdparty01:none/schemas/avsc/city.avsc # Schema Path
steps: # Steps Section
- sequence: # Sequence
- name: cities # Transformation Step Name
doc: Pick all columns from cities and add version as yyyyMMddHHmm formatted timestamp. # Documentation
sql: |
SELECT
*,
date_format (now(), 'yyyyMMddHHmm') AS version,
now() AS ts_city
FROM
sanity_city_input
outputs: # Outputs Section
- name: cities # Output Dataset Name
dataset: dataos://sanityazurealok01:retail/city_csv?acl=rw # Output Dataset UDL
format: csv # Output Dataset Format
options: # Options
saveMode: overwrite
partitionBy:
- version
tags: # Tags
- CSV
title: Azure csv # Title
description: Azure csv # Description
- name: cities # Name of Output Dataset
dataset: dataos://sanityazurealok01:retail/city_json?acl=rw # Output Datset UDL
format: json # Format
options: # Options
saveMode: overwrite
partitionBy:
- version
tags: # Tags
- JSON
title: Azure json # Title
description: Azure json # Description
- name: cities # Output Dataset Name
dataset: dataos://sanityazurealok01:retail/city_parquet?acl=rw # Output Dataset UDL
format: parquet # Output Datset Format
options: # Options
saveMode: overwrite
partitionBy:
- version
tags: # Tags
- Parquet
title: Azure parquet
description: Azure parquet
# Pre-sink Assertions
assertions: # Assertions Section
- column: zip_code # Column Name
tests: # Tests
- avg < 100
- max < 1000
- max > 100
- distinct_count > 10
- missing_count < 100
- missing_percentage < 0.5
- column: zip_code # Column Name
filter: state_code == 'AL' # Filter Condition
tests: # Tests
- avg > 500
- column: city_name # Column Name
validFormat:
regex: Prattville
tests: # Tests
- invalid_count < 5
- invalid_percentage < 0.1
- sql: |
SELECT
AVG(zip_code) AS avg_zip_code,
MAX(zip_code) AS max_zip_code
FROM products
WHERE state_code == 'AL'
tests: # Tests
- avg_zip_code > 3600
- max_zip_code < 36006