Skip to content

Data ingestion and transformation

A Depot can be used further for data ingestion and transformation through Flare Stack. To learn more about the Flare Stack, please refer to this link.

For example:

The below manifest file defines a Flare job, wf-vendor-insights, for ingesting and transforming vendor insights data from poss3 Depot into the Lakehouse Depot. It includes a DAG node, dg-vendor-insights, which processes promotional_data from a CSV dataset in poss3 and outputs transformed_data as an Iceberg table in Lakehouse. The Workflow specifies compute resources for execution, with a driver and executor configuration limiting cores and memory. The transformation logic applies an SQL query that extracts key vendor attributes, including organization details, contact information, certifications, and category identifiers, while also generating a row number using row_number() OVER (ORDER BY id). The resulting dataset is saved in Iceberg format with Parquet storage and gzip compression, ensuring optimized analytics and efficient querying.

version: v1
name: wf-vendor-insights
type: workflow
tags:
    - vendor.insights
    - Tier.Gold
description: The job is to ingest vendor-insights from poss3 into Lakehouse.
workflow:
  title: promotional
  dag:
    - name: dg-vendor-insights
      description: The job is to ingest vendor-insights from poss3 into Lakehouse.
      title: organization  data 
      spec:
      tags:
        - vendor.insights
      stack: flare:6.0
      compute: runnable-default
      stackSpec:
        driver:
          coreLimit: 1200m
          cores: 1
          memory: 1024m
        executor:
          coreLimit: 1200m    
          cores: 1
          instances: 1
          memory: 1024m
        job:
          explain: true
          inputs:
            - name: promotional_data
              dataset: dataos://poss3:promo_effectiveness/target_count02.csv?acl=rw
              format: csv
              options: 
                inferSchema: true

          logLevel: INFO
          outputs:
            - name: transformed_data
              dataset: dataos://lakehouse:promo_effectiveness/vendor_subscription_insight?acl=rw
              format: Iceberg
              description: The job is to ingest vendor data from S3 into lakehouse.
              tags:
                - vendor.insights
              options:
                saveMode: overwrite
                iceberg:
                  properties:
                    write.format.default: parquet
                    write.metadata.compression-codec: gzip
                title: vendor data

          steps:
            - sequence:
                - name: transformed_data
                  sql: >
                    select 
                    messagekey,
                    id,
                    organization_id,
                    unique_entity_id,
                    state_of_incorporation,
                    business_type,
                    no_dbe_designations,
                    emergency_disaster,
                    emergency_health,
                    emergency_other,
                    created_at,
                    updated_at,
                    doing_business_as,
                    doing_business_as_keyword,
                    ein,
                    duns,
                    name,
                    organization_name_keyword,
                    website,
                    address1,
                    address2,
                    city,
                    country_code,
                    state,
                    zip_code,
                    billing_address1,
                    billing_address2,
                    billing_city,
                    billing_country_code,
                    billing_state,
                    billing_zip_code,
                    description,
                    phone_country,
                    phone,
                    phone_ext,
                    fax_phone_country,
                    fax_phone,
                    fax_phone_ext,
                    categories_ids,
                    categories_codes,
                    nigp_categories,
                    naics_categories,
                    unspsc_categories,
                    users_emails,
                    users_ids,
                    subscribed_governments_ids,
                    certification_ids,
                    self_reported_certifications,
                    verified_certifications,
                    vendor_list_ids,
                    private_vendor_lists,
                    public_vendor_lists,
                    ethnicities,
                    row_number() over (order by id) as row_num 
                    from  promotional_data