Skip to content

Data Quality Jobs (Assertions)

Case Scenario

In this scenario, we define assertions to evaluate the quality of data. To learn more about Data Quality Jobs, click here

Different Ways to Define Assertions

Assertions can be classified into two categories, namely, sinked or standalone assertions and pre-sink assertions.

Standalone Assertions

In Flare Workflow, assertions on pre-existing datasets can be specified by declaring assertions section distinct from the inputs section in the YAML definition. The following YAML of the Flare Workflow. It should be noted that, since check and metric information is stored in separate locations, the corresponding metadata must be updated individually. As a result, two separate Toolbox Workflow runs must be executed, one for metrics and the other for checks.

Code Snippets

Flare Workflow

version: v1 # Version
name: mtrx-chks-odr-enr-01 # Name of the Workflow
type: workflow
  - Metrics
  - Checks
description: The job performs metrics calculations and checks on order enriched data
  title: Metrics and checks
    - name: metrics-chks-order-enrich
      title: Metrics and checks
      description: The job performs metrics calculations and checks on order enriched data
        stack: flare:5.0
        compute: runnable-default
          - Metrics
        title: Metrics and checks
        description: The job performs metrics calculations and checks on order enriched data
            coreLimit: 3000m
            cores: 2
            memory: 4000m
            coreLimit: 6000m
            cores: 2
            instances: 1
            memory: 10000m
            explain: true
            logLevel: INFO
            #validate single input
              - name: source
                dataset: dataos://icebase:retail/orders_enriched
                format: iceberg
            #override outputs, steps with specific template
              - column: order_amount
                  - avg > 1000.00
                  - max < 1000
                  - max > 1000
                  - distinct_count > 100
                  - missing_count < 100
                  - missing_percentage < 0.5

              - column: order_amount
                filter: brand_name == 'Urbane'
                  - avg > 500
                  - distinct_count > 100
                  - missing_count < 100

              - column: brand_name
                  regex: Awkward
                  - invalid_count < 5
                  - invalid_percentage < 0.1

              - sql: |
                    AVG(order_amount) AS avg_order_amount,
                    MAX(order_amount) AS max_order_amount
                  FROM source
                   where brand_name = 'Awkward Styles'
                  - avg_order_amount > 1000
                  - max_order_amount < 1000

            - spark.serializer: org.apache.spark.serializer.KryoSerializer
            - spark.sql.shuffle.partitions: "10"
            - spark.memory.storageFraction: "0.1"
            - spark.memory.fraction: "0.1"
            - spark.shuffle.memoryFraction: "0.2"

Toolbox Workflow (for checks)

version: v1
name: dataos-tool-checks
type: workflow
  - Metrics
  - Checks
description: This workflow is for data tool of customer demo quality
    - name: checks-tool
        stack: toolbox
          dataset: dataos://icebase:sys01/quality_checks?acl=rw
            name: set_version
            value: latest

Toolbox Workflow (for metrics)

version: v1
name:  dataos-tool-metrics
type: workflow
  - Metrics
  - Checks
description: This workflow for data tool
    - name: tool-metrics
        stack: toolbox
          dataset: dataos://icebase:sys01/quality_metrics?acl=rw
            name: set_version
            value: latest

Pre-Sink Assertions

Starting from Flare version 4.0, users have the capability to integrate assertions directly into their data transformation processes within the outputs section, eliminating the requirement for standalone data quality jobs. This introduces pre-sink checks for datasets generated by Flare, enabling users to perform row-level checks on columns within the outputs section to avoid writing erroneous data to the target.

Code Snippet

To utilize this functionality, pre-assertions can be established using the following YAML syntax:

version: v1 # Version
name: pre-sink-assertion-workflow # Name of the Workflow
type: workflow # Resource Type (Here its workflow)
tags: # Tags
  - Assertion
title: Pre-Sink Assertions # Title of the workflow
description: |
  The purpose of this workflow is to define pre-sink assertions.
workflow: # Workflow Section
  dag: # Directed Acyclic Graph (DAG)
    - name: pre-sink-assertion-job # Name of the Job
      title: Pre-Sink Job # Title of the Job
      description: |
        The purpose of this job is to define pre-sink assertions.
      spec: # Specs
        tags: # Tags
          - Assertions
          - Quality
        stack: flare:5.0 # Flare Stack Version (Here its 4.0)
        compute: runnable-default # Compute
        stackSpec: # Flare Section
          job: # Job Section
            explain: true # Explain
            logLevel: INFO # Loglevel
            showPreviewLines: 2 
            inputs: # Inputs Section
              - name: sanity_city_input # Name of the Input Dataset
                dataset: dataos://thirdparty01:none/city # Input Dataset UDL
                format: csv # Input Dataset Format 
                schemaPath: dataos://thirdparty01:none/schemas/avsc/city.avsc # Schema Path

            steps: # Steps Section
              - sequence: # Sequence
                  - name: cities # Transformation Step Name
                    doc: Pick all columns from cities and add version as yyyyMMddHHmm formatted timestamp. # Documentation
                    sql: |
                        date_format (now(), 'yyyyMMddHHmm') AS version,
                        now() AS ts_city

            outputs: # Outputs Section
              - name: cities # Output Dataset Name
                dataset: dataos://sanityazurealok01:retail/city_csv?acl=rw # Output Dataset UDL
                format: csv # Output Dataset Format 
                options: # Options
                  saveMode: overwrite
                    - version
                tags: # Tags
                  - CSV
                title: Azure csv # Title
                description: Azure csv # Description
              - name: cities # Name of Output Dataset
                dataset: dataos://sanityazurealok01:retail/city_json?acl=rw # Output Datset UDL
                format: json # Format
                options: # Options
                  saveMode: overwrite
                    - version
                tags: # Tags
                  - JSON
                title: Azure json # Title
                description: Azure json # Description
              - name: cities # Output Dataset Name
                dataset: dataos://sanityazurealok01:retail/city_parquet?acl=rw # Output Dataset UDL
                format: parquet # Output Datset Format
                options: # Options
                  saveMode: overwrite
                    - version
                tags: # Tags
                  - Parquet
                title: Azure parquet
                description: Azure parquet
# Pre-sink Assertions
                assertions: # Assertions Section
                  - column: zip_code # Column Name
                    tests: # Tests
                      - avg < 100
                      - max < 1000
                      - max > 100
                      - distinct_count > 10
                      - missing_count < 100
                      - missing_percentage < 0.5
                  - column: zip_code # Column Name
                    filter: state_code == 'AL' # Filter Condition
                    tests: # Tests
                      - avg > 500
                  - column: city_name # Column Name
                      regex: Prattville
                    tests: # Tests
                      - invalid_count < 5
                      - invalid_percentage < 0.1
                  - sql: |
                        AVG(zip_code) AS avg_zip_code,
                        MAX(zip_code) AS max_zip_code
                      FROM products
                      WHERE state_code == 'AL'
                    tests: # Tests
                      - avg_zip_code > 3600
                      - max_zip_code < 36006