How to ensure high data quality in Lakehouse Storage using the Write-Audit-Publish pattern?¶
Ensuring the highest standard of data quality is imperative for extracting valuable insights. Compromises in data quality at any stage within a data engineering workflow can detrimentally impact downstream analyses, including Business Intelligence (BI) and predictive analytics.
Take, for instance, an Extract, Transform, and Load (ETL) process. This process involves moving data from an operational system, such as a relational database, to an analytical system like DataOS Lakehouse storage (Icebase Depot), for utilization in BI reporting or ad-hoc analysis. If the original data contains duplicates or inconsistencies, or if such issues arise during the ETL process and are not resolved before the data is integrated into the production analytics environment, the integrity of insights derived could be compromised, leading to potentially erroneous decisions. This underscores the paramount importance of maintaining data quality throughout data engineering processes. Ensuring data accuracy, consistency, and reliability is not merely theoretical but a practical imperative.
The Write-Audit-Publish (WAP) pattern offers a structured method to guarantee data quality. This approach can be broken down into three key stages:
- Write: Initially, data is extracted from its sources and stored in a designated non-production or staged area. This step serves to protect production data from potential quality issues.
- Audit: In this stage, the staged data is subjected to comprehensive validation checks. These checks may include the examination of null values, identification of duplicates, verification of data types, and ensuring data integrity.
- Publish: Once the data has passed validation, it is seamlessly integrated into production tables. This ensures that end-users either have access to the entire, updated dataset or none of it, preserving data integrity.
Below, we'll explore how to implement this framework within the context of DataOS Lakehouse Storage (alternatively known as Icebase Depot) using Iceberg tables’ branching capability.
WAP Using Iceberg’s Branching Capability¶
Although there is more than one way to implement WAP in Iceberg, we will focus on an approach using Iceberg’s table-level branching capabilities.
Branching in Iceberg works similarly to Git branches where you can create local branches from the production table (main
branch) to carry out isolated data work on the table. The core Iceberg component behind a branch is the snapshot, which describes the state of an Iceberg table at a certain point in time. Branches are named references to these snapshots.
Problem Statement¶
Let’s imagine that product sales data from an e-commerce company is regularly extracted from an operational systems, such as relational database management systems (Postgres Database, MySQL Database, etc.), and is then loaded into the company’s cloud data lake (AWS S3, GCS, WASBS, ABFSS). The data engineering team is responsible for creating and maintaining Iceberg tables in the data lake, which serves as the base dataset to cater to different production applications such as BI reports and predictive models.
How can they ensure the quality of the data before pushing it into the production environment?
Solution Approach¶
The solution involves utilizing Iceberg's branching capability to implement the WAP pattern as follows:
- Create a New Branch: Initiate the process by creating a new branch (let’s say
stage
) using the DataOS CLI, off the production Iceberg table (main
branch). This branch acts as a working area that is isolated from the main production data, ensuring no disruptions to the main data flow. - Ingest New Records: Load the new or updated records onto this newly created branch using the Flare Stack. This step is crucial for staging the data, allowing for thorough validation before it is considered for production.
- Conduct Data Validation: Perform rigorous data validation checks on the staged data using the Soda Stack. These checks might include verifying data accuracy, consistency, completeness, and ensuring it meets all predefined quality standards.
- Publish to Main Branch: Upon successful validation, merge the new data from the
stage
branch into the main production branch by using the cherrypick-snapshot command of the DataOS CLI. This step ensures that only quality-assured data is added to the production environment. - Handle Validation Failures: If the data fails any validation checks, the branch can be dropped. You can also set up Monitors and Pagers on these for notification. This allows for the rectification of issues and reattempting of the process without any impact on the production data.
Prerequisites¶
- Operational Data Source - Postgres Depot
- Analytical Data Source - Lakehouse Storage (Icebase Depot)
- Dataset -
dataos://icebase:retail/customer
Create a branch¶
First, let’s create a new branch named stage
from the existing Iceberg dataset. This branch will act as the staging area for the new data.
Use the create-branch command to create a new branch within a dataset at a specified location within DataOS Lakehouse Storage (Icebase):
# Create a Branch on the dataset called 'stage'
dataos-ctl dataset create-branch -a dataos://icebase:retail/customer -b stage
List Branch¶
List Branch Snapshots¶
This command lists all snapshots associated with a specific branch within a dataset at a specified location in DataOS.
Write the data¶
For ETL, we will be using the Flare Stack. A couple of preparatory steps are required before we begin using the WAP method for ingesting new data.
Flare job configuration¶
The configuration of the Flare job involves specifying input sources and customizing output settings to direct the data specifically to the desired branch. Here is a detailed breakdown of how to configure and run an ETL job using the Flare Stack:
- Input Configuration: Start by configuring the input to read data from a Postgres Depot. To know more about the configuation, refer to the link: Flare Postgres Read configuration.
- Output Configuration: The output needs special attention to ensure data is written to the correct branch. Set the branch attribute in the Flare configuration to direct the writing action to the
stage
branch specifically.
Below is a sample manifest file for an ETL job that ingests new records into a specific staging branch of an Iceberg dataset:
name: lakehouse-storage-write
version: v1
type: workflow
tags:
- lakehouse
- storage
- icebase
- depot
- write
description: The job ingests data from operational source depots to DataOS Lakehouse storage (or Icebase depot) in the staging branch
workflow:
dag:
- name: source-read-lakehouse-write
title: Write to Lakehouse Storage from source Depot
description: The job ingests city data from dropzone into raw zone
spec:
tags:
- Connect
- City
stack: flare:5.0
compute: runnable-default
stackSpec:
job:
explain: true
inputs:
- name: source_data
dataset: dataos://postgresdepot:retail/customer
format: postgres
options:
driver: org.postgres.driver
logLevel: INFO
outputs:
- name: output01
dataset: dataos://icebase:retail/customer
format: ICEBERG
options:
extraOptions:
branch: stage
saveMode: append
steps:
- sequence:
- name: output01
sql: SELECT * FROM source_data
Remember that this branch acts as a standalone variant of the production data. Any action taken on this branch affects only this dataset, not the main one. You can validate your production table by querying the record count on the main branch. You should see that this query returns the same count we had originally, telling us the main branch is unchanged:
--- Count of Records on the Main Branch
SELECT COUNT(*) as total_records FROM catalog.db.table VERSION AS OF 'main'
Audit the data¶
After writing the new data into the isolated branch, stage, it is essential to ensure that this new dataset stands up to the organization’s quality standards. The audit phase acts as a checkpoint where we subject our data to rigorous evaluation, ensuring its fitness for purpose. We will perform a few basic data quality checks for this exercise using the DataOS Soda Stack.
name: soda-workflow-sample-v01
version: v1
type: workflow
tags:
- workflow
- soda-checks
description: Customer
workspace: public
workflow:
dag:
- name: soda-job-v1
description: This is sample job for soda dataos sdk
title: soda Sample Test Job
spec:
stack: soda:1.0
logLevel: INFO
compute: runnable-default
resources:
requests:
cpu: 1000m
memory: 250Mi
limits:
cpu: 1000m
memory: 250Mi
stackSpec:
inputs:
- dataset: dataos://icebase:retail/customer
options:
engine: minerva
clusterName: miniature
branchName: stage
checks:
- row_count between 10 and 1000
- row_count between (10 and 55)
- missing_count(birthdate) = 0
- invalid_percent(phone_number) < 1 %:
valid format: phone number
- invalid_count(number_of_children) < 0:
valid min: 0
valid max: 6
- min(age) > 30:
filter: marital_status = 'Married'
- duplicate_count(phone_number) = 0
- row_count same as city
- duplicate_count(customer_index) > 10
- duplicate_percent(customer_index) < 0.10
- failed rows:
samples limit: 70
fail condition: age < 18 and age >= 50
- failed rows:
fail query: |
SELECT DISTINCT customer_index
FROM customer as customer
- freshness(ts_customer) < 1d
- freshness(ts_customer) < 5d
- max(age) <= 100
- max_length(first_name) = 8
- values in (occupation) must exist in city (city_name)
- schema:
name: Confirm that required columns are present
warn:
when required column missing: [first_name, last_name]
fail:
when required column missing:
- age
- no_phone
Applying fixes.¶
At this stage, you can take a couple of actions, such as communicating about the anomalies with the required stakeholders, reviewing the ETL job, determining the origin of the anomalies, and applying some quick fixes.
Now our branch has new validated records, and we’ve shipped off the invalid records to our stakeholders, who can fix them for later backfilling.
Publish the changes¶
The final operation in the WAP pattern is to publish the changes and make the data available to the production environment so that downstream applications can use it. The publish operation in Iceberg is made possible by the cherry-pick procedure. In DataOS, the cherrypick-snapshot
procedure produces a new snapshot based on a previous one, all the while preserving the original without any changes or deletions. For our use case, we can select a specific snapshot, the branch (stage), to form a new snapshot. What sets cherry-picking apart is that it is a metadata-only operation. This implies that the actual datafiles remain untouched and only the metadata references are altered. As a result, we’re essentially making the new data available in the production table without the need to relocate any datafiles. One limitation to note is that cherry-picking caters to a single commit. To run this procedure, we will need to provide the snapshot ID as an argument to the method. Let’s find out the snapshot ID associated with stage
by querying a metadata table called refs, meaning references:
List branch snapshots
or you can query the 'refs' metadata table from the DataOS Workbench app.
This will return our list of references (branches and tags), and we can see the current snapshot ID for each branch, which is the information we need. Now let’s execute the cherry_pick() procedure:
Once the operation runs successfully, our main branch’s current snapshot will be made the same snapshot at the current stage
snapshot. This means we have made the newly inserted records in stage
available to the main
branch for production usage. If we now query the record count on main
and stage
, we should see they are identical:
In this use case, we reviewed how to leverage the WAP data quality pattern in Apache Iceberg to address the challenges of dealing with data quality at scale. With WAP, before committing data to the production environment, there’s a structured mechanism to write, assess for quality concerns, and finalize or discard the data. This method preserves the reliability of the data, ensuring that what drives business decisions is accurate, consistent, and free from anomalies.