Syncing SODA Queries Data to Lakehouse¶
When SODA Stack executes quality checks and profiling scans, it generates and runs SQL queries against your datasets. These queries are logged in the SODA stream at systemstreams:soda/quality_profile_results_03. This guide demonstrates how to extract, aggregate, and sync query execution data to your Lakehouse for analysis and monitoring.
Warning
The stream address systemstreams:soda/quality_profile_results_03 may vary depending on the environment. Contact the DBA or a DataOS Operator to confirm the correct stream address.
By syncing queries data to Lakehouse, you can:
- Monitor SQL query execution patterns across all SODA scans.
- Track query success and failure rates by dataset.
- Debug SODA check failures by analyzing failed queries.
- Understand query volume and performance trends.
- Identify datasets with frequent query issues.
- Optimize SODA check configurations based on query patterns.
Understanding SODA Queries Data¶
SODA quality checks and profiling operations translate into SQL queries that execute against your data sources. For example:
Quality Check:
Generated SQL Query:
Profiling Request:
Generated SQL Queries:
SELECT AVG(customer_id), STDDEV(customer_id), MIN(customer_id), MAX(customer_id) FROM customer_table
SELECT COUNT(DISTINCT email), COUNT(*) FROM customer_table WHERE email IS NULL
Each SODA scan execution generates multiple queries stored in the queries array. This workflow extracts these queries, normalizes them, tracks their success/failure status, and aggregates statistics for monitoring.
Workflow Architecture¶
The queries sync workflow follows a streamlined transformation pipeline:
SODA Stream (Raw Data)
↓
Complex Transformation with 3 CTEs(Common Table Expression):
├─ CTE 1: Explode & Normalize Queries
├─ CTE 2: Determine Query Status
└─ CTE 3: Aggregate Query Statistics
↓
Lakehouse (Iceberg Table)
Complete Workflow Manifest¶
name: queries-data-workflow
version: v1
type: workflow
workflow:
schedule:
cron: '15 11 * * *'
concurrencyPolicy: Forbid
dag:
- name: queries-data
spec:
stack: flare:7.0
compute: runnable-default
stackSpec:
job:
explain: true
logLevel: INFO
inputs:
- name: soda
dataset: dataos://systemstreams:soda/quality_profile_results_03
isStream: false
options:
startingOffsets: earliest
outputs:
- name: final
dataset: dataos://icebase:sandbox/queries_data?acl=rw
format: Iceberg
options:
saveMode: append
sort:
mode: partition
columns:
- name: depot
order: desc
- name: collection
order: desc
- name: dataset
order: desc
iceberg:
partitionSpec:
- type: identity
column: depot
- type: identity
column: collection
- type: identity
column: dataset
steps:
- sequence:
- name: final
sql: |
WITH exploded AS (
SELECT
a.dataset,
a.depot,
a.collection,
a.clustername,
a.branchname,
a.username,
to_timestamp(a.datatimestamp, "yyyy-MM-dd'T'HH:mm:ssXXX") AS event_time,
lower(regexp_replace(q.sql, '\\s+', ' ')) AS sql_norm,
q.exception AS exception
FROM soda a
LATERAL VIEW OUTER explode(a.queries) qv AS q
WHERE q.sql IS NOT NULL
),
query_status AS (
SELECT
dataset,
depot,
collection,
clustername,
branchname,
username,
event_time,
sql_norm,
MAX(CASE WHEN exception IS NOT NULL AND trim(exception) <> '' THEN 1 ELSE 0 END) AS has_failure
FROM exploded
GROUP BY
dataset, depot, collection, clustername, branchname, username, event_time, sql_norm
)
SELECT
clustername,
depot,
collection,
dataset,
username,
event_time,
COUNT(*) AS total_queries,
SUM(CASE WHEN has_failure = 0 THEN 1 ELSE 0 END) AS successful_queries,
SUM(CASE WHEN has_failure = 1 THEN 1 ELSE 0 END) AS failed_queries
FROM query_status
GROUP BY
dataset, depot, collection, clustername, branchname, username, event_time
ORDER BY
event_time DESC,
total_queries DESC
Configuration Breakdown¶
Input Configuration¶
inputs:
- name: soda
dataset: dataos://systemstreams:soda/quality_profile_results_03
isStream: false
options:
startingOffsets: earliest
Key Points:
- Same Source: Reads from the same SODA stream as quality checks and profiling workflows
- Batch Mode:
isStream: falseprocesses all historical query data - Complete Data:
startingOffsets: earliestensures all queries since stream inception are included
Output Configuration¶
outputs:
- name: final
dataset: dataos://icebase:sandbox/queries_data?acl=rw
format: Iceberg
options:
saveMode: append
sort:
mode: partition
columns:
- name: depot
order: desc
- name: collection
order: desc
- name: dataset
order: desc
iceberg:
partitionSpec:
- type: identity
column: depot
- type: identity
column: collection
- type: identity
column: dataset
Key Points:
saveMode: append: Adds new query statistics to existing data (ideal for incremental monitoring)- Partitioning: Three-level partitioning by depot → collection → dataset for efficient querying
- Sorting: Within each partition, data is sorted for optimal read performance
Transformation Pipeline Explained¶
Complex Query Aggregation
This step uses a single SQL statement with three Common Table Expressions (CTEs) to extract and aggregate query data.
CTE 1: Explode and Normalize Queries:
WITH exploded AS (
SELECT
a.dataset,
a.depot,
a.collection,
a.clustername,
a.branchname,
a.username,
to_timestamp(a.datatimestamp, "yyyy-MM-dd'T'HH:mm:ssXXX") AS event_time,
lower(regexp_replace(q.sql, '\\s+', ' ')) AS sql_norm,
q.exception AS exception
FROM soda a
LATERAL VIEW OUTER explode(a.queries) qv AS q
WHERE q.sql IS NOT NULL
)
Extract and normalize individual queries from the nested queries array.
Key Operations:
-
LATERAL VIEW OUTER explode()
explode(a.queries): Converts thequeriesarray into separate rows (one row per query)LATERAL VIEW: Enables referencing both the original row (a.*) and exploded elements (q.*)OUTER: Preserves rows even ifqueriesarray is NULL or empty- Result: If a SODA scan executed 5 queries, this creates 5 rows
-
Timestamp Parsing
- Converts ISO 8601 timestamp string to proper timestamp type
- Format:
"2024-01-15T10:30:00+05:30"→TIMESTAMP '2024-01-15 10:30:00' - Enables time-based filtering and aggregation
-
SQL Normalization
- Why Normalize? Same logical query may have different whitespace:
- Step 1 -
regexp_replace(q.sql, '\\s+', ' '): Replace multiple spaces/tabs/newlines with single space - Step 2 -
lower(): Convert to lowercase - Result: All three examples become
"select count(*) from customer" - Benefit: Enables grouping identical queries regardless of formatting
-
Exception Extraction
- Each query object has an
exceptionfield - Contains error message if query failed, NULL if successful
- Used to determine query success/failure status
- Each query object has an
Example Input/Output:
Input (SODA Stream Record):
{
"depot": "icebase",
"collection": "retail",
"dataset": "customer",
"datatimestamp": "2024-01-15T10:30:00Z",
"queries": [
{
"sql": "SELECT COUNT(*) FROM customer",
"exception": null
},
{
"sql": "SELECT AVG(age), STDDEV(age) FROM customer",
"exception": null
},
{
"sql": "SELECT COUNT(DISTINCT email) FROM customer",
"exception": "Syntax error at line 1"
}
]
}
Output (exploded CTE - 3 rows):
depot | collection | dataset | event_time | sql_norm | exception
---------|------------|----------|----------------------|-----------------------------------------------|---------------------------
icebase | retail | customer | 2024-01-15 10:30:00 | select count(*) from customer | NULL
icebase | retail | customer | 2024-01-15 10:30:00 | select avg(age), stddev(age) from customer | NULL
icebase | retail | customer | 2024-01-15 10:30:00 | select count(distinct email) from customer | Syntax error at line 1
CTE 2: Determine Query Status:
query_status AS (
SELECT
dataset,
depot,
collection,
clustername,
branchname,
username,
event_time,
sql_norm,
MAX(CASE WHEN exception IS NOT NULL AND trim(exception) <> '' THEN 1 ELSE 0 END) AS has_failure
FROM exploded
GROUP BY
dataset, depot, collection, clustername, branchname, username, event_time, sql_norm
)
Group by normalized SQL to determine if any execution of that query failed.
Key Operations:
-
Failure Detection Logic
- Checks two conditions:
exception IS NOT NULL: Exception field has a valuetrim(exception) <> '': Exception is not an empty string (after trimming whitespace)- Returns: 1 if query has an exception, 0 if successful
- MAX() aggregation: If the same query runs multiple times in a scan, mark as failed if ANY execution failed
-
Grouping by Normalized SQL
- Groups identical queries (same
sql_norm) from the same scan execution - Enables detecting if the same query was retried or executed multiple times
- Preserves context (who ran it, when, on which dataset)
- Groups identical queries (same
Example Scenario:
Input (exploded CTE):
dataset | event_time | sql_norm | exception
---------|----------------------|-----------------------------------|---------------------------
customer | 2024-01-15 10:30:00 | select count(*) from customer | NULL
customer | 2024-01-15 10:30:00 | select count(*) from customer | NULL (retry)
customer | 2024-01-15 10:30:00 | select avg(age) from customer | NULL
customer | 2024-01-15 10:30:00 | select sum(revenue) from customer | Division by zero
customer | 2024-01-15 10:30:00 | select sum(revenue) from customer | Division by zero (retry)
Output (query_status CTE):
dataset | event_time | sql_norm | has_failure
---------|----------------------|-----------------------------------|-------------
customer | 2024-01-15 10:30:00 | select count(*) from customer | 0
customer | 2024-01-15 10:30:00 | select avg(age) from customer | 0
customer | 2024-01-15 10:30:00 | select sum(revenue) from customer | 1
- The
count(*)query that ran twice is grouped into one row withhas_failure=0(both successful) - The
sum(revenue)query that failed twice is grouped into one row withhas_failure=1(at least one failed)
CTE 3: Final Aggregation:
SELECT
clustername,
depot,
collection,
dataset,
username,
event_time,
COUNT(*) AS total_queries,
SUM(CASE WHEN has_failure = 0 THEN 1 ELSE 0 END) AS successful_queries,
SUM(CASE WHEN has_failure = 1 THEN 1 ELSE 0 END) AS failed_queries
FROM query_status
GROUP BY
dataset, depot, collection, clustername, branchname, username, event_time
ORDER BY
event_time DESC,
total_queries DESC
Aggregate query execution statistics per SODA scan (by dataset, user, time).
Key Operations:
-
Count Total Queries
- Counts distinct normalized queries executed in a scan
- Each row in
query_statusrepresents one unique query - Example: If a scan ran 5 different queries,
total_queries = 5
-
Count Successful Queries
- Sums up queries where
has_failure = 0(no exceptions) - Example: If 3 out of 5 queries succeeded,
successful_queries = 3
- Sums up queries where
-
Count Failed Queries
- Sums up queries where
has_failure = 1(had exceptions) - Example: If 2 out of 5 queries failed,
failed_queries = 2
- Sums up queries where
-
Grouping by Scan Execution
- Each unique combination represents one SODA scan execution
- Aggregates all queries from that scan into summary statistics
-
Ordering Results
- Latest scans first (
event_time DESC) - Within same time, scans with more queries first (
total_queries DESC) - Helps quickly identify recent or query-heavy scan executions
- Latest scans first (
Example Final Output:
Input (query_status CTE):
depot | collection | dataset | username | event_time | sql_norm | has_failure
---------|------------|----------|----------|----------------------|-----------------------------------|-------------
icebase | retail | customer | john_doe | 2024-01-15 10:30:00 | select count(*) from customer | 0
icebase | retail | customer | john_doe | 2024-01-15 10:30:00 | select avg(age) from customer | 0
icebase | retail | customer | john_doe | 2024-01-15 10:30:00 | select stddev(age) from customer | 0
icebase | retail | customer | john_doe | 2024-01-15 10:30:00 | select sum(revenue) from customer | 1
icebase | retail | customer | john_doe | 2024-01-15 10:30:00 | select max(price) from customer | 1
Output (Final aggregation):
depot | collection | dataset | username | event_time | total_queries | successful_queries | failed_queries
---------|------------|----------|----------|----------------------|---------------|--------------------|-----------------
icebase | retail | customer | john_doe | 2024-01-15 10:30:00 | 5 | 3 | 2
Interpretation:
- This SODA scan executed 5 distinct queries
- 3 queries succeeded (count, avg, stddev)
- 2 queries failed (sum, max)
- Query success rate = ⅗ = 60%
Output Schema¶
The final output table contains the following columns:
| Column | Type | Description |
|---|---|---|
clustername |
String | Cluster where SODA scan was executed (e.g., minervaprod, themisprod) |
depot |
String | Data source or depot identifier where queries were executed (e.g., icebase, snowflake) |
collection |
String | Schema or database name containing the dataset |
dataset |
String | Table or dataset name against which queries were executed |
username |
String | DataOS user who triggered the SODA scan |
event_time |
Timestamp | When the SODA scan was executed (parsed from datatimestamp) |
total_queries |
Long | Total number of distinct SQL queries executed during the scan |
successful_queries |
Long | Number of queries that completed successfully (without exceptions) |
failed_queries |
Long | Number of queries that encountered errors or exceptions |
Calculated Metrics¶
You can derive additional metrics from the output:
-- Query success rate
SELECT
*,
ROUND(successful_queries * 100.0 / total_queries, 2) AS success_rate_percent,
ROUND(failed_queries * 100.0 / total_queries, 2) AS failure_rate_percent
FROM dataos://icebase:sandbox/queries_data
Sample Output¶
clustername | depot | collection | dataset | username | event_time | total_queries | successful_queries | failed_queries
-------------|----------|------------|----------|----------|----------------------|---------------|--------------------|-----------------
minervaprod | icebase | retail | customer | john_doe | 2024-01-15 10:30:00 | 12 | 11 | 1
themisprod | icebase | retail | orders | jane_doe | 2024-01-15 09:15:00 | 8 | 8 | 0
minervaprod | snowflake| sales | revenue | john_doe | 2024-01-14 16:45:00 | 15 | 13 | 2
themisprod | icebase | retail | products | bot_user | 2024-01-14 14:20:00 | 6 | 5 | 1
Insights from Sample Data:
- Row 1: Customer dataset scan executed 12 queries with 1 failure (91.7% success rate)
- Row 2: Orders dataset scan had perfect execution (100% success rate)
- Row 3: Revenue dataset scan had 2 query failures (86.7% success rate) - investigate needed
- Row 4: Products dataset scan had 1 failure (83.3% success rate)
Customization Guide¶
Customization Guide for syncing Queries Data
Adjust Save Mode
Choose between append and overwrite:
options:
saveMode: append # Add new data incrementally
# OR
saveMode: overwrite # Replace all data on each run
Recommendation: Use append for continuous monitoring, overwrite for testing or periodic full refresh.
Change Output Location
Update the dataset path:
Enable Scheduling
Uncomment and configure the schedule:
workflow:
schedule:
cron: '15 11 * * *' # Daily at 11:15 AM
# cron: '0 */4 * * *' # Every 4 hours
# cron: '0 2 * * 1' # Weekly on Monday at 2 AM
concurrencyPolicy: Forbid # Prevent concurrent runs
Add Column Filtering
Filter unnecessary columns in the pass-through stage:
- name: dropped_columns
sql: SELECT * from soda
functions:
- name: drop
columns:
- __eventTime
- __key
- __producer_name
- __messageProperties
- __publishTime
- __topic
Add Time-based Filtering
Process only recent data:
- name: soda_raw
sql: |
SELECT * FROM dropped_columns
WHERE to_timestamp(datatimestamp, "yyyy-MM-dd'T'HH:mm:ssXXX") >= current_date() - INTERVAL 7 DAYS
Capture Actual SQL Text
Store actual SQL queries (not just counts):
- name: final
sql: |
WITH exploded AS (
-- ... (same as before)
)
SELECT
clustername,
depot,
collection,
dataset,
username,
event_time,
sql_norm, -- Include normalized SQL
COUNT(*) AS execution_count,
MAX(CASE WHEN exception IS NOT NULL THEN 1 ELSE 0 END) AS has_failure,
MAX(exception) AS sample_exception -- Include exception message
FROM exploded
GROUP BY clustername, depot, collection, dataset, username, event_time, sql_norm
ORDER BY event_time DESC
Note: This creates more granular output (one row per query instead of per scan) but provides deeper insights.
Best Practices¶
Best Practices for syncing Queries Data
1. Use Append Mode for Monitoring
For continuous monitoring, use saveMode: append to build historical trend data:
Add deduplication logic if needed:
SELECT DISTINCT * FROM dataos://icebase:sandbox/queries_data
WHERE event_time >= current_date() - INTERVAL 30 DAYS
2. Schedule After SODA Scans
Schedule this workflow to run after your SODA check workflows complete:
3. Leverage Partitioning
Always filter by partition columns for optimal performance:
-- GOOD: Uses partitioning
SELECT * FROM queries_data
WHERE depot = 'icebase' AND collection = 'retail' AND dataset = 'customer'
-- BAD: Full table scan
SELECT * FROM queries_data
WHERE username = 'john_doe'
4. Create Alerting Rules
Set up alerts for high failure rates:
-- Alert when failure rate exceeds 10%
SELECT
depot,
collection,
dataset,
failed_queries,
total_queries,
ROUND(failed_queries * 100.0 / total_queries, 2) AS failure_rate
FROM dataos://icebase:sandbox/queries_data
WHERE event_time >= current_timestamp() - INTERVAL 1 DAY
AND failed_queries * 100.0 / total_queries > 10.0
5. Monitor Query Volume Growth
Track query volume to optimize SODA check configurations:
-- Identify checks generating too many queries
SELECT
depot,
collection,
dataset,
AVG(total_queries) AS avg_queries_per_scan
FROM dataos://icebase:sandbox/queries_data
WHERE event_time >= current_date() - INTERVAL 7 DAYS
GROUP BY depot, collection, dataset
HAVING AVG(total_queries) > 20 -- Threshold
ORDER BY avg_queries_per_scan DESC
6. Correlate with Check Results
Join with quality check results to understand impact:
-- Correlate failed queries with failed checks
SELECT
q.depot,
q.collection,
q.dataset,
q.event_time,
q.failed_queries,
c.check_outcome,
c.check_definition
FROM dataos://icebase:sandbox/queries_data q
JOIN dataos://icebase:sys01/slo_quality_checks_a c
ON q.depot = c.depot
AND q.collection = c.collection
AND q.dataset = c.dataset
AND DATE(q.event_time) = DATE(c.timestamp)
WHERE q.failed_queries > 0
AND c.check_outcome = 'failed'
Troubleshooting¶
Issue: No Data in Output Table
Workflow succeeds but output table is empty
Solution:
Check 1: Verify queries exist in source
SELECT
dataset,
size(queries) AS query_count
FROM dataos://systemstreams:soda/quality_profile_results_03
LIMIT 10
Check 2: Verify WHERE clause
-- The workflow filters WHERE q.sql IS NOT NULL
-- Check if queries have SQL text
SELECT
dataset,
queries
FROM dataos://systemstreams:soda/quality_profile_results_03
WHERE queries IS NOT NULL AND size(queries) > 0
LIMIT 5
Check 3: Check timestamp parsing
-- Verify datatimestamp format
SELECT
datatimestamp,
to_timestamp(datatimestamp, "yyyy-MM-dd'T'HH:mm:ssXXX") AS parsed_timestamp
FROM dataos://systemstreams:soda/quality_profile_results_03
LIMIT 10
Issue: Incorrect Query Counts
Query counts don't match expected values
Solution:
Debug: Check exploded CTE Add intermediate output to see exploded queries:
- name: debug_exploded
sql: |
SELECT
dataset,
depot,
collection,
event_time,
sql_norm,
exception
FROM (
SELECT
a.dataset,
a.depot,
a.collection,
to_timestamp(a.datatimestamp, "yyyy-MM-dd'T'HH:mm:ssXXX") AS event_time,
lower(regexp_replace(q.sql, '\\s+', ' ')) AS sql_norm,
q.exception AS exception
FROM soda_raw a
LATERAL VIEW OUTER explode(a.queries) qv AS q
WHERE q.sql IS NOT NULL
)
ORDER BY event_time DESC
LIMIT 100
Output this to a separate table to inspect the intermediate results.
Issue: LATERAL VIEW Syntax Error
Spark SQL error about LATERAL VIEW syntax
Solution:
- Ensure you're using Flare stack (Spark SQL), not another query engine
- Verify the
LATERAL VIEW OUTER explode()syntax:
-- Correct syntax
LATERAL VIEW OUTER explode(a.queries) qv AS q
-- Common mistake (missing OUTER)
LATERAL VIEW explode(a.queries) AS q
Issue: Timestamp Parsing Errors
Error about invalid timestamp format
Solution:
Check the actual datatimestamp format in your stream:
Adjust the format string if needed:
-- If format is: 2024-01-15T10:30:00Z
to_timestamp(a.datatimestamp, "yyyy-MM-dd'T'HH:mm:ss'Z'")
-- If format is: 2024-01-15T10:30:00+05:30
to_timestamp(a.datatimestamp, "yyyy-MM-dd'T'HH:mm:ssXXX")
-- If format is: 2024-01-15 10:30:00
to_timestamp(a.datatimestamp, "yyyy-MM-dd HH:mm:ss")
Issue: Duplicate Records
Same scan appears multiple times with identical statistics
Solution:
Option 1: Use overwrite mode
Option 2: Add deduplication
SELECT DISTINCT
clustername,
depot,
collection,
dataset,
username,
event_time,
total_queries,
successful_queries,
failed_queries
FROM dataos://icebase:sandbox/queries_data
Issue: High Failure Rates
Most queries showing as failed
Solution:
Investigate exceptions: Modify the final SQL to capture exception details:
WITH exploded AS (
SELECT
a.dataset,
a.depot,
lower(regexp_replace(q.sql, '\\s+', ' ')) AS sql_norm,
q.exception AS exception
FROM soda_raw a
LATERAL VIEW OUTER explode(a.queries) qv AS q
WHERE q.sql IS NOT NULL
)
SELECT
depot,
dataset,
sql_norm,
exception,
COUNT(*) AS occurrence_count
FROM exploded
WHERE exception IS NOT NULL AND trim(exception) <> ''
GROUP BY depot, dataset, sql_norm, exception
ORDER BY occurrence_count DESC
LIMIT 20
This helps identify the most common query failures.