Skip to content

Syncing SODA Quality Check Results to Lakehouse

SODA Stack executes data quality checks on your datasets and publishes the results to the DataOS stream at systemstreams:soda/quality_profile_results_03. This guide demonstrates how to extract, transform, and sync quality check results to your Lakehouse for analysis, monitoring, and reporting.

Warning

The stream address systemstreams:soda/quality_profile_results_03 may vary depending on the environment. Contact the DBA or a DataOS Operator to confirm the correct stream address.

By syncing quality check results to Lakehouse, you can:

  • Track data quality trends over time across all datasets.
  • Build comprehensive data quality dashboards and scorecards.
  • Monitor check outcomes (pass/fail/warning) for SLA compliance.
  • Analyze metric values to understand data quality patterns.

Understanding Quality Check Data

SODA quality checks validate data against defined rules and constraints. Each check execution generates:

  • Check Definition: The specific quality rule being validated (e.g., "row_count between 100 and 1000")
  • Check Outcome: Result status (passed, failed, warning)
  • Metrics: Measured values (e.g., actual row count, missing count, duplicate count)
  • Category: Type of quality check (Completeness, Accuracy, Validity, Uniqueness, Freshness, Schema)
  • Context: Dataset identification (depot, collection, dataset, column)

This workflow extracts these elements from the nested SODA stream format and transforms them into a clean, queryable table structure.

Workflow Architecture

The quality check sync workflow follows a five-stage transformation pipeline:

SODA Stream (Raw Data)
[Step 1] Drop Unnecessary Columns
[Step 2] Extract & Flatten Check Information
[Step 3] Extract & Flatten Metrics Information
[Step 4] Join Checks with Metrics
[Step 5] Extract Category & Final Transformation
Lakehouse (Iceberg Table)

Complete Workflow Manifest

# Quality Check Events Workflow
# Purpose: Process and aggregate quality check results from SODA stream into Lakehouse
name: quality-check-events-sync
version: v1
type: workflow
workflow:
  # Schedule workflow to run daily at 17:45
  schedule:
    cron: '45 17 * * *'
    concurrencyPolicy: Forbid
  dag:
    - name: dg-quality-cm-data
      spec:
        stack: flare:7.0
        compute: runnable-default
        stackSpec:
          job:
            explain: true
            # Input configuration - Read quality check results from SODA
            inputs:
              - name: soda
                dataset: dataos://systemstreams:soda/quality_profile_results_03
                isStream: false
                options:
                  startingOffsets: earliest
            logLevel: INFO
            # Output configuration - Write processed results to Iceberg
            outputs:
              - name: final
                dataset: dataos://icebase:sys01/slo_quality_checks?acl=rw
                format: Iceberg
                options:
                  saveMode: overwrite
                  checkpointLocation: dataos://icebase:sys01/checkpoints/quality-checks/v1000?acl=rw
                  # Configure sorting by depot, collection, and dataset
                  sort:
                    mode: partition
                    columns:
                        - name: depot
                          order: desc
                        - name: collection
                          order: desc
                        - name: dataset
                          order: desc
                  # Configure Iceberg partitioning
                  iceberg:
                    partitionSpec:
                      - type: identity
                        column: depot
                      - type: identity
                        column: collection
                      - type: identity
                        column: dataset
            # Data processing pipeline
            steps:
              - sequence:
                  # Step 1: Remove system and unnecessary columns
                  - name: dropped_columns
                    sql: SELECT * from soda
                    functions:
                      - name: drop
                        columns:
                          - __eventTime
                          - __key
                          - __producer_name
                          - __messageProperties
                          - __publishTime
                          - __topic
                          - automatedMonitoringChecks
                          - clusterName
                          - dataosResourceId
                          - defaultDataSource
                          - definitionName
                          - engine
                          - hasFailures
                          - hasErrors
                          - hasWarnings
                          - logs
                          - metadata
                          - profiling
                          - queries
                          - runId
                      - name: cleanse_column_names

                  # Step 2: Extract and process check information
                  - name: checks_extracted
                    sql: select * from dropped_columns
                    functions:
                      - name: unfurl
                        expression: explode(checks) as checks_
                      - name: drop
                        columns:
                          - checks
                      - name: rename_all
                        columns:
                          metrics: metrics_value
                      - name: unfurl
                        expression: checks_.*
                      - name: drop
                        columns:
                          - checks_
                      - name: unfurl
                        expression: explode(metrics) as metrics_
                      - name: drop
                        columns:
                          - metrics

                  # Step 3: Extract and process metrics information
                  - name: metrics_extracted
                    sql: select dataos_run_id, metrics_value from checks_extracted
                    functions:
                      - name: unfurl
                        expression: explode(metrics_value) as metrics_value_
                      - name: drop
                        columns:
                          - metrics_value
                      - name: unfurl
                        expression: metrics_value_.*
                      - name: drop
                        columns:
                          - metrics_value_

                  # Step 4: Join checks and metrics data
                  - name: joined_checks_metrics
                    sql: |
                      SELECT
                        ce.dataos_run_id,
                        ce.job_name,
                        ce.scan_start_timestamp as timestamp,
                        ce.user_name,
                        ce.depot,
                        ce.collection,
                        ce.dataset,
                        ce.column,
                        ce.name as check_definition,
                        me.metric_name,
                        me.value as metric_value,
                        ce.outcome as check_outcome,
                        ce.resource_attributes as resource_attributes
                      FROM checks_extracted ce
                      LEFT JOIN metrics_extracted me
                        ON ce.dataos_run_id = me.dataos_run_id
                        AND ce.metrics_ = me.identity

                  # Step 5: Final transformation with category extraction
                  - name: final
                    sql: |
                      SELECT
                        dataos_run_id,
                        job_name,
                        timestamp,
                        user_name,
                        depot,
                        collection,
                        dataset,
                        column,
                        check_definition,
                        resource_attributes,
                        metric_name,
                        metric_value,
                        check_outcome,
                        attribute['value'] AS category
                      FROM joined_checks_metrics
                      LATERAL VIEW explode(resource_attributes) AS attribute
                      WHERE attribute['name'] = 'category'

Configuration Breakdown

Input Configuration

inputs:
  - name: soda
    dataset: dataos://systemstreams:soda/quality_profile_results_03
    isStream: false
    options:
      startingOffsets: earliest

Key Points:

  • isStream: false: Processes all available data in batch mode for complete historical view
  • startingOffsets: earliest: Reads all quality check results from the beginning of the stream

Use startingOffsets: latest for incremental processing of only new check results and isStream: true for continuous real-time processing (with appropriate checkpoint location).

Output Configuration

outputs:
  - name: final
    dataset: dataos://icebase:sys01/slo_quality_checks_a?acl=rw
    format: Iceberg
    options:
      saveMode: overwrite
      checkpointLocation: dataos://icebase:sys01/checkpoints/quality-checks/v1000_a?acl=rw
      sort:
        mode: partition
        columns:
          - name: depot
            order: desc
          - name: collection
            order: desc
          - name: dataset
            order: desc
      iceberg:
        partitionSpec:
          - type: identity
            column: depot
          - type: identity
            column: collection
          - type: identity
            column: dataset

Key Points:

  • saveMode: overwrite: Replaces all data on each run (use append for incremental updates)
  • checkpointLocation: Tracks processing progress; essential for exactly-once processing guarantees
  • Sorting: Data is sorted within partitions by depot → collection → dataset for optimized query performance
  • Partitioning: Three-level hierarchical partitioning enables efficient filtering:
    • Filter by depot → scans only relevant data source partitions
    • Filter by collection → scans only relevant schema/database partitions
    • Filter by dataset → scans only relevant table partitions

Transformation Pipeline

Step 1: Drop Unnecessary Columns

- name: dropped_columns
  sql: SELECT * from soda
  functions:
    - name: drop
      columns:
        - __eventTime              # Stream event timestamp
        - __key                    # Stream message key
        - __producer_name          # Producer identifier
        - __messageProperties      # Message metadata
        - __publishTime            # Stream publish time
        - __topic                  # Pulsar topic name
        - automatedMonitoringChecks  # Automated monitoring data
        - clusterName              # Execution cluster
        - dataosResourceId         # DataOS resource ID
        - defaultDataSource        # Data source config
        - definitionName           # Check definition file
        - engine                   # Query engine used
        - hasFailures              # Boolean flag (redundant)
        - hasErrors                # Boolean flag (redundant)
        - hasWarnings              # Boolean flag (redundant)
        - logs                     # Execution logs
        - metadata                 # Additional metadata
        - profiling                # Profiling data (not needed for checks)
        - queries                  # SQL queries executed
        - runId                    # Run identifier
    - name: cleanse_column_names

This step removes columns that are not needed for quality check analysis:

  • Stream Metadata (__eventTime, __key, __topic, etc.): System-level metadata from the Pulsar stream
  • Execution Metadata (clusterName, engine, runId, etc.): Job execution details that don't add value to check analysis
  • Redundant Flags (hasFailures, hasErrors, hasWarnings): Boolean flags that are derived from check outcomes
  • Profiling Data (profiling): This workflow focuses on checks, not profiling statistics
  • Verbose Data (logs, queries, metadata): Detailed execution information that clutters the output

The cleanse_column_names function standardizes column naming conventions (e.g., converting camelCase to snake_case).

Step 2: Extract and Process Check Information

- name: checks_extracted
  sql: select * from dropped_columns
  functions:
    - name: unfurl
      expression: explode(checks) as checks_
    - name: drop
      columns:
        - checks
    - name: rename_all
      columns:
        metrics: metrics_value
    - name: unfurl
      expression: checks_.*
    - name: drop
      columns:
        - checks_
    - name: unfurl
      expression: explode(metrics) as metrics_
    - name: drop
      columns:
        - metrics

This is the core transformation that flattens the nested check structure. Let's break it down:

Explode Checks Array

- name: unfurl
  expression: explode(checks) as checks_
  • One SODA scan can execute multiple quality checks
  • explode(checks) creates one row per check execution
  • Example: If a scan runs 5 checks, this creates 5 rows

Rename Metrics Column

- name: rename_all
  columns:
    metrics: metrics_value
  • The checks object has a metrics field (array of metric identifiers)
  • We rename it to metrics_value to avoid naming conflicts
  • This allows us to later join with the actual metric values

Unfurl Check Object

- name: unfurl
  expression: checks_.*
  • Extracts all fields from the checks_ object into separate columns
  • Creates columns like: name, outcome, column, resource_attributes, metrics_value, etc.
  • Example Output Columns:
    • name: "row_count between 100 and 1000"
    • outcome: "passed"
    • column: "customer_id"
    • resource_attributes: [{name: "category", value: "Accuracy"}]

Explode Metrics Array

- name: unfurl
  expression: explode(metrics) as metrics_
  • The metrics_value field is an array of metric identifiers (strings)
  • Each identifier references a metric in the separate metrics array
  • explode(metrics) creates additional rows if a check uses multiple metrics
  • Example: A check using both "row_count" and "missing_count" creates 2 rows

A flat table where each row represents one check execution for one metric identifier.

Step 3: Extract and Process Metrics Information

- name: metrics_extracted
  sql: select dataos_run_id, metrics_value from checks_extracted
  functions:
    - name: unfurl
      expression: explode(metrics_value) as metrics_value_
    - name: drop
      columns:
        - metrics_value
    - name: unfurl
      expression: metrics_value_.*
    - name: drop
      columns:
        - metrics_value_

This step extracts the actual metric values from the metrics_value array (which was originally the metrics array from the root level, not from checks). In the SODA stream structure:

  • checks[].metrics: Contains metric identifiers (strings like "dataset-row_count")
  • metrics[]: Contains actual metric values (objects with metric_name, value, identity)

We need to extract both and join them by their identity/identifier.

Explode Metrics Array

- name: unfurl
  expression: explode(metrics_value) as metrics_value_
  • Goes back to the original metrics_value array (from root level)
  • Creates one row per metric object

Unfurl Metric Object

- name: unfurl
  expression: metrics_value_.*

Extracts fields from each metric object:

  • metric_name: Name of the metric (e.g., "row_count", "missing_count")
  • value: The actual measured value (e.g., 1523, 45)
  • identity: Unique identifier to link with checks (e.g., "customer_table-row_count")

A separate table with metric details (metric_name, value, identity) linked by dataos_run_id.

Step 4: Join Checks and Metrics Data

- name: joined_checks_metrics
  sql: |
    SELECT
      ce.dataos_run_id,
      ce.job_name,
      ce.scan_start_timestamp as timestamp,
      ce.user_name,
      ce.depot,
      ce.collection,
      ce.dataset,
      ce.column,
      ce.name as check_definition,
      me.metric_name,
      me.value as metric_value,
      ce.outcome as check_outcome,
      ce.resource_attributes as resource_attributes
    FROM checks_extracted ce
    LEFT JOIN metrics_extracted me
      ON ce.dataos_run_id = me.dataos_run_id
      AND ce.metrics_ = me.identity

This critical step joins the check information with the actual metric values.

ON ce.dataos_run_id = me.dataos_run_id    -- Same scan execution
AND ce.metrics_ = me.identity              -- Matching metric identifier

Example:

  • Check Row: check_definition="row_count > 100", metrics_="customer-row_count", outcome="passed"
  • Metric Row: identity="customer-row_count", metric_name="row_count", value=1523
  • Joined Result: check_definition="row_count > 100", metric_name="row_count", metric_value=1523, check_outcome="passed"

Some checks may not have associated metrics (e.g., schema checks, freshness checks that don't compute numeric values). LEFT JOIN ensures we keep all checks even if metrics are missing.

This step gives a unified table with check definitions, their outcomes, and the actual metric values measured.

Step 5: Extract Category and Final Transformation

- name: final
  sql: |
    SELECT
      dataos_run_id,
      job_name,
      timestamp,
      user_name,
      depot,
      collection,
      dataset,
      column,
      check_definition,
      resource_attributes,
      metric_name,
      metric_value,
      check_outcome,
      attribute['value'] AS category
    FROM joined_checks_metrics
    LATERAL VIEW explode(resource_attributes) AS attribute
    WHERE attribute['name'] = 'category'

This final step extracts the quality check category from the nested resource_attributes array.

Understanding resource_attributes:

resource_attributes: [
  {name: "category", value: "Completeness"},
  {name: "title", value: "Ensure no missing birth dates"},
  {name: "custom_tag", value: "critical"}
]

LATERAL VIEW explode() Explained:

LATERAL VIEW explode(resource_attributes) AS attribute

  • explode() creates one row per attribute in the array
  • LATERAL VIEW allows us to reference both the original row and the exploded elements
  • Each attribute becomes available as attribute['name'] and attribute['value']

Filtering for Category:

WHERE attribute['name'] = 'category'
  • Keeps only the row where attribute name is "category"
  • Extracts the category value (Completeness, Accuracy, Validity, Uniqueness, Freshness, Schema)

The final transformation step provides a clean, flat table with all check details including the quality check category.

Output Schema

The final output table contains the following columns:

Column Type Description
dataos_run_id String Unique identifier for the SODA scan execution; links all checks from the same run
job_name String Name of the SODA job or workflow that executed the quality checks
timestamp Timestamp When the quality check scan was executed (from scan_start_timestamp)
user_name String DataOS user who triggered the quality check job
depot String Data source or depot identifier where the dataset resides (e.g., icebase, snowflake)
collection String Schema or database name containing the dataset being checked
dataset String Table or dataset name that was validated by the quality check
column String Column name being checked (NULL for dataset-level checks like row_count)
check_definition String The quality check rule definition (e.g., "row_count between 100 and 1000", "missing_count(email) = 0")
resource_attributes Array Full array of check attributes including category, title, and custom tags
metric_name String Name of the metric measured during the check (e.g., "row_count", "missing_count", "duplicate_count")
metric_value Double The actual measured value for the metric (e.g., 1523 rows, 45 missing values)
check_outcome String Result of the quality check: "passed", "failed", or "warning"
category String Quality dimension category: Completeness, Accuracy, Validity, Uniqueness, Freshness, or Schema

Sample Output

dataos_run_id | job_name          | timestamp           | user_name | depot   | collection | dataset  | column      | check_definition                    | metric_name     | metric_value | check_outcome | category
--------------|-------------------|---------------------|-----------|---------|------------|----------|-------------|-------------------------------------|-----------------|--------------|---------------|-------------
run_20240115  | check-retail-data | 2024-01-15 10:30:00 | john_doe  | icebase | retail     | customer | NULL        | row_count between 100 and 1000      | row_count       | 852.0        | passed        | Accuracy
run_20240115  | check-retail-data | 2024-01-15 10:30:00 | john_doe  | icebase | retail     | customer | email       | missing_count(email) = 0            | missing_count   | 12.0         | failed        | Completeness
run_20240115  | check-retail-data | 2024-01-15 10:30:00 | john_doe  | icebase | retail     | customer | email       | duplicate_count(email) = 0          | duplicate_count | 5.0          | failed        | Uniqueness
run_20240115  | check-retail-data | 2024-01-15 10:30:00 | john_doe  | icebase | retail     | customer | age         | invalid_count(age) = 0              | invalid_count   | 0.0          | passed        | Validity
run_20240115  | check-retail-data | 2024-01-15 10:30:00 | john_doe  | icebase | retail     | customer | phone       | invalid_percent(phone) < 1%         | invalid_percent | 0.5          | passed        | Validity

Insights from Sample Data:

  • Row 1: Dataset has 852 rows, within expected range (100-1000) ✓
  • Row 2: Email column has 12 missing values, violating the completeness check ✗
  • Row 3: Email column has 5 duplicate values, violating the uniqueness check ✗
  • Row 4: Age column has no invalid values, passing validation ✓
  • Row 5: Phone column has 0.5% invalid values, within 1% threshold ✓

Customization Guide

Customization Guide for Quality Check Results

Adjust Save Mode

Choose between full refresh and incremental updates:

options:
  saveMode: overwrite   # Full refresh - replaces all data
  # OR
  saveMode: append      # Incremental - adds new data

When to Use Each:

  • overwrite: Simple to maintain, ensures no duplicates, good for smaller datasets or complete reprocessing
  • append: More efficient for large historical datasets, requires deduplication logic

Change Output Location

Update the dataset path to your preferred location:

outputs:
  - name: final
    dataset: dataos://icebase:your_collection/your_table_name?acl=rw
    checkpointLocation: dataos://icebase:your_collection/checkpoints/quality-checks?acl=rw

Enable Scheduling

Uncomment and configure the schedule to run automatically:

workflow:
  schedule:
    cron: '45 17 * * *'              # Daily at 5:45 PM
    # cron: '0 */6 * * *'            # Every 6 hours
    # cron: '0 8 * * 1'              # Weekly on Monday at 8 AM
    concurrencyPolicy: Forbid         # Prevent overlapping runs

Scheduling Best Practices:

  • Align with your SODA check execution frequency
  • Run after SODA checks complete (with appropriate delay)
  • Use concurrencyPolicy: Forbid to prevent concurrent executions

Add Custom Partitioning

Customize partitioning based on your query patterns:

iceberg:
  partitionSpec:
    - type: identity
      column: depot
    - type: identity
      column: collection
    - type: days            # Time-based partitioning
      column: timestamp

Filter Specific Data

Add filters to process only specific datasets or check results:

steps:
  - sequence:
      - name: filtered_data
        sql: |
          SELECT * FROM soda
          WHERE depot IN ('icebase', 'snowflake')
            AND scan_start_timestamp >= current_date() - INTERVAL 7 DAYS

Extract Additional Attributes

Extract more attributes from resource_attributes:

- name: final
  sql: |
    SELECT
      dataos_run_id,
      job_name,
      timestamp,
      depot,
      collection,
      dataset,
      column,
      check_definition,
      metric_name,
      metric_value,
      check_outcome,
      MAX(CASE WHEN attribute['name'] = 'category' THEN attribute['value'] END) as category,
      MAX(CASE WHEN attribute['name'] = 'title' THEN attribute['value'] END) as check_title,
      MAX(CASE WHEN attribute['name'] = 'priority' THEN attribute['value'] END) as priority
    FROM joined_checks_metrics
    LATERAL VIEW explode(resource_attributes) AS attribute
    WHERE attribute['name'] IN ('category', 'title', 'priority')
    GROUP BY dataos_run_id, job_name, timestamp, depot, collection, dataset,
            column, check_definition, metric_name, metric_value, check_outcome

Best Practices

Best Practices for Quality Check Results

Start with Batch Processing

Begin with isStream: false and startingOffsets: earliest to understand the data structure and volume, then optimize for streaming if needed.

Use Appropriate Save Modes

  • Development/Testing: Use saveMode: overwrite for simplicity
  • Production (Small Data): Use saveMode: overwrite with scheduled runs
  • Production (Large Data): Use saveMode: append with deduplication logic

Leverage Partitioning

Always include partition columns (depot, collection, dataset) in your WHERE clauses:

-- GOOD: Uses partitioning
SELECT * FROM quality_checks
WHERE depot = 'icebase' AND collection = 'retail' AND dataset = 'customer'

-- BAD: Full table scan
SELECT * FROM quality_checks
WHERE column = 'email'

Monitor Workflow Execution

  • Set up alerts for workflow failures
  • Monitor data freshness (time lag between check execution and sync)
  • Track row counts and data volume growth

Create Materialized Views

For frequently accessed queries, create materialized views:

-- Daily quality summary view
CREATE TABLE quality_summary_daily AS
SELECT
  DATE(timestamp) as check_date,
  depot,
  collection,
  dataset,
  category,
  COUNT(*) as total_checks,
  SUM(CASE WHEN check_outcome = 'passed' THEN 1 ELSE 0 END) as passed_checks,
  SUM(CASE WHEN check_outcome = 'failed' THEN 1 ELSE 0 END) as failed_checks
FROM dataos://icebase:sys01/slo_quality_checks_a
GROUP BY DATE(timestamp), depot, collection, dataset, category

Implement Retention Policies

Archive or delete old data to manage storage costs:

-- Delete data older than 90 days
DELETE FROM dataos://icebase:sys01/slo_quality_checks_a
WHERE timestamp < current_date() - INTERVAL 90 DAYS

Use Check Checkpoint Location

The checkpointLocation is critical for exactly-once processing: - Use a unique path for each workflow version - Never delete checkpoint data while workflow is active - Change checkpoint path when making breaking schema changes

Troubleshooting

Issue: Workflow Fails with "Cannot resolve column"

Error message about unresolved columns during transformation

Solution:

  • Verify column names match the actual SODA stream structure
  • Check if cleanse_column_names is changing column names unexpectedly
  • Use explain: true to see the query plan and identify the problematic column
job:
  explain: true  # Enable to see full query plan

Issue: Duplicate Records in Output

Same check results appear multiple times

Solution:

Option 1: Use overwrite mode

options:
  saveMode: overwrite

Option 2: Add deduplication (for append mode)

-- Add ROW_NUMBER() to keep only latest
SELECT * FROM (
  SELECT *,
    ROW_NUMBER() OVER (
      PARTITION BY dataos_run_id, check_definition, metric_name
      ORDER BY timestamp DESC
    ) as rn
  FROM dataos://icebase:sys01/slo_quality_checks_a
) WHERE rn = 1

Issue: Missing Categories in Output

Some checks have NULL category values

Solution:

  • Verify that SODA checks have category attribute defined
  • Check if resource_attributes contains the category:
-- Debug query to see all attributes
SELECT
  check_definition,
  resource_attributes
FROM joined_checks_metrics
WHERE depot = 'your_depot'
LIMIT 10
  • If category is under a different attribute name, update the WHERE clause:
WHERE attribute['name'] = 'your_category_field_name'

Issue: Join Produces No Results

joined_checks_metrics table is empty or has NULL metric values

Solution:

  • Verify that metrics_ column in checks_extracted matches identity column in metrics_extracted
  • Add diagnostic query to check join keys:
-- Check join key values
SELECT DISTINCT metrics_ FROM checks_extracted LIMIT 10;
SELECT DISTINCT identity FROM metrics_extracted LIMIT 10;

-- Check if they match
SELECT
  ce.metrics_ as check_metric_id,
  me.identity as metric_identity,
  ce.dataos_run_id
FROM checks_extracted ce
LEFT JOIN metrics_extracted me
  ON ce.dataos_run_id = me.dataos_run_id
  AND ce.metrics_ = me.identity
WHERE me.identity IS NULL
LIMIT 10;

Issue: Checkpoint Location Errors

Error about checkpoint location being invalid or inaccessible

Solution:

  • Ensure checkpoint location has ?acl=rw permissions
  • Use a unique checkpoint path for each workflow
  • If changing workflow significantly, use a new checkpoint path:
checkpointLocation: dataos://icebase:sys01/checkpoints/quality-checks/v1001_a?acl=rw

Issue: LATERAL VIEW Syntax Error

SQL syntax error on LATERAL VIEW statement

Solution:

  • Ensure you're using Spark SQL syntax (Flare stack supports this)
  • Verify array column is not NULL:
-- Add NULL check
FROM joined_checks_metrics
LATERAL VIEW explode(
  COALESCE(resource_attributes, array())  -- Handle NULL arrays
) AS attribute
WHERE attribute['name'] = 'category'