Skip to content

List of Flare Functions

add_column

Function Description
add_column The add_column function adds a new column. The supported types are string, boolean, byte, short, int, long, float, double, decimal, date, and timestamp.
functions:
  - name: add_column 
    column: {{new_column_name}} 
    value: {{some_value}} 
    type: {{int}}
add_column.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:6.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: transactions_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM transactions_connect LIMIT 1

                    functions:    
                      - name: add_column 
                        column: transactions
                        value: 8 
                        type: int

Output

| order_amount |
|--------------|
|      8       |
|      8       |

Note

In the event that the specified data type is not among the supported options, the add_column function will return an Invalid datatype found error.

any_date

Function Description
any_date The any_date function converts the string to date type while parsing the string based on the rule provided. Using the by default rule, it converts a date column given in any format like yyyy.mm.dd, dd.mm.yyyy, dd/MM/yy, yyyy/mm/dd, and other (total 18 different formats) into yyyy-mm-dd format. The rule should be a regular expression.
functions:
  - name: any_date 
    column: {{date_string_column_name}} 
    asColumn: {{column_name}} 
    rule: 
      - "【(?<year>\\d{4})\\W{1}(?<month>\\d{1,2})\\W{1}(?<day>\\d{1,2})[^\\d]? \\W*(?:at )?(?<hour>\\d{1,2}):(?<minute>\\d{1,2})(?::(?<second>\\d{1,2}))?(?:[.,](?<ns>\\d{1,9}))?(?<zero>z)?】"
any_date.yaml
  version: v1
  name: wf-policy-details-data-ingestion
  type: workflow
  tags:
    - Tier.Gold
    - Domain.Sales
  description:  The workflow transforms and cleans data from  source, ensuring it aligns with Lakehouse schema standards. After transformations, the data is ingested into Lakehouse, enabling reliable access for analytics and downstream applications.
  workflow:
    title: Policy Details Data Ingestion 
    dag:
      - name: wf-policy-details-ingestion
        title: Policy Details Data Ingestion Process
        description: The DAG for Data Transformation and Ingestion automates the process of transforming and cleaning data before loading it into the target system, ensuring it is ready for analysis and reporting.
        spec:
          tags:
            - Domain.Sales
          stack: flare:6.0
          compute: runnable-default
          stackSpec:
            driver:
              coreLimit: 1050m
              cores: 1
              memory: 1200m
            executor:
              coreLimit: 1400m
              cores: 1
              instances: 1
              memory: 1600m 
            job:
              explain: true  
              inputs:
                - name: input_data
                  dataset: dataos://lakehouse:retail/customers?acl=rw
                  format: bigquery

              logLevel: INFO
              outputs:
                - name: finalDf
                  dataset: dataos://lakehouse:sample/customers?acl=rw
                  format: Iceberg

                  options:
                    saveMode: overwrite
                    iceberg:                              
                      properties:
                        write.format.default: parquet
                        write.metadata.compression-codec: snappy
                  title: Policy Details Data Transformations

              steps:
                - sequence:
                    - name: finalDf
                      sql: 
                        SELECT 
                            *
                        from input_data

                      functions:
                        - name: cleanse_column_names  #check for 
                        - name: change_column_case 
                          case: lower

                        - name: set_type
                          columns:
                            policy_holder_id  : int
                            annual_mileage_estimate: int

                        - name: any_timestamp
                          column: ts_customer
                          asColumn: ts_customer
                          timezone: Asia/Kolkata        

                        - name: change_case
                          case: uppercase
                          column: version

Output

+--------------------+----------+
|            END_YEAR|  year_end|
+--------------------+----------+
|2020-03-01T05:07:38Z|2020-03-01|
|2021-03-07T05:07:38Z|2021-03-07|
+--------------------+----------+

Note

You can provide explicit regex rules for the column passed in the any_date function. Without an explicit regex, the function will use the default rule.

any_timestamp

Function Description
any_timestamp The any_timestamp function converts a string to timestamp type while parsing the string based on the rule provided. The timestamp format is as per the specified timezone. The rule should be a regular expression.
functions:
  - name: any_timestamp 
    column: {{datetime_string_column_name}} 
    asColumn: {{column_name}} 
    timezone: {{Asia/Kolkata}}
    rules:
      - "【(?<year>\\d{4})\\W{1}(?<month>\\d{1,2})\\W{1}(?<day>\\d{1,2})[^\\d]? \\W*(?:at )?(?<hour>\\d{1,2}):(?<minute>\\d{1,2})(?::(?<second>\\d{1,2}))?(?:[.,](?<ns>\\d{1,9}))?(?<zero>z)?】"

any_timestamp.yaml
  version: v1
  name: standalone-read-bigquery
  type: workflow
  tags:
    - standalone
    - readJob
    - bigquery
  description: Sample job
  workflow:
    dag:
      - name: city-bigquery-read-01
        title: Sample Transaction Data Ingester
        description: The job ingests customer data from Bigquery to file source
        spec:
          tags:
            - standalone
            - readJob
            - bigquery
          stack: flare:3.0
          compute: runnable-default
          tier: connect
          flare:
            job:
              explain: true
              logLevel: INFO

              inputs: # Read from Bigquery
                - name: transactions_connect
                  inputType: bigquery
                  bigquery:
                    table: transactions_standalone_09_nov
                    projected: <project-id>
                    dataset: <dataset-name>

              outputs: # Write to Local System
                - name: finalDf
                  outputType: file
                  file:
                    schemaName: retail
                    tableName: transactions_standalone_readbq02
                    format: Iceberg
                    warehousePath: /data/examples/dataout/bigquerydata
                    options:
                      saveMode: append

              steps:
                - sequence:
                    - name: finalDf
                      sql: SELECT * FROM transactions_connect LIMIT 1

                      functions:    
                        - name: any_timestamp 
                          column: END_YEAR
                          asColumn: END_YEAR_ASIA 
                          timezone: Asia/Kolkata 
                          rules: 
                            - "[(?<year>\\d{4})\\W{1}(?<month>\\d{1,2})\\W{1}(?<day>\\d{1,2})[^\\d]?\\
                              W*(?:at )?(?<hour>\\d{1,2}):(?<minute>\\d{1,2})(?::(?<second>\\d{1,2}))?
                              (?:[.,](?<ns>\\d{1,9}))?(?<zero>z)?]"
Output

+--------------------+-------------------+
|            END_YEAR|      END_YEAR_ASIA|
+--------------------+-------------------+
|2020-03-01T05:07:38Z|2020-03-01 10:37:38|
|2021-03-07T05:07:38Z|2021-03-07 10:37:38|
+--------------------+-------------------+
for 'timezone': America/New_York

+--------------------+-------------------+
|            END_YEAR|   END_YEAR_AMERICA|
+--------------------+-------------------+
|2020-03-01T05:07:38Z|2020-03-01 00:07:38|
|2021-03-07T05:07:38Z|2021-03-07 00:07:38|
+--------------------+-------------------+

Note

You can provide explicit regex rules for the column passed in the any_timestamp function. Without an explicit regex, the function will use the default rule.

change_case

Function Description
change_case The change_case function alters the values in the column to a specified case (Lowercase, Uppercase, or Titlecase), depending on the applied transformation. It does not affect the column name but modifies the data within the column. For example, if the value is 'John Doe' and the function is applied with Uppercase, the value will change to 'JOHN DOE'.
  functions:
    - name: change_case 
      case: {{lower/upper/title}} 
      column: {{asin}} 

change_case.yaml
  version: v1
  name: wf-flare-functions-change-case-04
  type: workflow
  tags:
    - dataos:resources:workflow
  description:  The workflow transforms and cleans data from  source, ensuring it aligns with Lakehouse schema standards. After transformations, the data is ingested into Lakehouse, enabling reliable access for analytics and downstream applications.
  workflow:
    title: Policy Details Data Ingestion 
    dag:
      - name: wf-policy-details-ingestion
        title: Policy Details Data Ingestion Process
        description: The DAG for Data Transformation and Ingestion automates the process of transforming and cleaning data before loading it into the target system, ensuring it is ready for analysis and reporting.
        spec:
          tags:
            # - Domain.Sales
          stack: flare:6.0
          compute: runnable-default
          stackSpec:
            driver:
              coreLimit: 1050m
              cores: 1
              memory: 1200m
            executor:
              coreLimit: 1400m
              cores: 1
              instances: 1
              memory: 1600m 
            job:
              explain: true  
              inputs:
                - name: input_data
                  dataset: dataos://lakehouse:retail/customer?acl=rw
                  format: Iceberg

              logLevel: INFO
              outputs:
                - name: finalDf
                  dataset: dataos://lakehouse:sample/flare_function_testing?acl=rw
                  format: Iceberg
                  options:
                    saveMode: overwrite
                    iceberg:                              
                      properties:
                        write.format.default: parquet
                        write.metadata.compression-codec: snappy
                  title: Policy Details Data Transformations

              steps:
                - sequence:
                    - name: finalDf
                      sql: 
                        SELECT 
                            *
                        from input_data

                      functions:

                        - name: change_case 
                          case: lower
                          column: mailing_street
Output

patient column in lower case will be displayed

+--------------------+--------------------+
|             patient|            MEMBERID|
+--------------------+--------------------+
|b9c610cd-28a6-463...|bca22051-b39b-759...|
|b9c610cd-28a6-463...|bca22051-b39b-759...|
+--------------------+--------------------+

change_column_case

Function Description
change_column_case The change_column_case function changes column names(all columns) to either lowercase or uppercase based on value of case: lower/upper.
  functions:
    - name: change_column_case 
      case: {{upper}} 
change_column_case.yaml
  version: v1
  name: standalone-read-bigquery
  type: workflow
  tags:
    - standalone
    - readJob
    - bigquery
  description: Sample job
  workflow:
    dag:
      - name: city-bigquery-read-01
        title: Sample Transaction Data Ingester
        description: The job ingests customer data from Bigquery to file source
        spec:
          tags:
            - standalone
            - readJob
            - bigquery
          stack: flare:3.0
          compute: runnable-default
          tier: connect
          flare:
            job:
              explain: true
              logLevel: INFO

              inputs: # Read from Bigquery
                - name: transactions_connect
                  inputType: bigquery
                  bigquery:
                    table: transactions_standalone_09_nov
                    projected: <project-id>
                    dataset: <dataset-name>

              outputs: # Write to Local System
                - name: finalDf
                  outputType: file
                  file:
                    schemaName: retail
                    tableName: transactions_standalone_readbq02
                    format: Iceberg
                    warehousePath: /data/examples/dataout/bigquerydata
                    options:
                      saveMode: append

              steps:
                - sequence:
                    - name: finalDf
                      sql: SELECT * FROM transactions_connect LIMIT 1

                      functions:
                        - name: change_column_case 
                          case: upper

Output

+--------------------+--------------------+
|          PATIENT   |            MEMBERID|
+--------------------+--------------------+
|b9c610cd-28a6-463...|bca22051-b39b-759...|
|b9c610cd-28a6-463...|bca22051-b39b-759...|
+--------------------+--------------------+

cleanse_column_names

Function Description
cleanse_column_names The cleanse_column_names function sanatizes column names, following these rules:
• Trim leading and trailing spaces
• Lowercases the column name
• Replaces any character that are not one of [A-Z][a-z][0-9] or _ with an underscore (_)
  functions:
    - name: cleanse_column_names
cleanse_column_names
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: transactions_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM transactions_connect LIMIT 1

                    functions:
                      - name: cleanse_column_names

Before

+----------------------+----------------------+
|          PATIENT     |       MEMBERID      |
+----------------------+----------------------+
|b9c610cd-28a6-463...  |bca22051-b39b-759...  |
|b9c610cd-28a6-463...  |bca22051-b39b-759...  |
+----------------------+----------------------+
After

+----------------------+----------------------+
|patient               |memberid              |
+----------------------+----------------------+
|b9c610cd-28a6-463...  |bca22051-b39b-759...  |
|b9c610cd-28a6-463...  |bca22051-b39b-759...  |
+----------------------+----------------------+

columns_replace

Function Description
columns_replace The columns_replace function alters column names in bulk.
    functions:
      - name: columns_replace
        sedExpression: 's/^string_/new_/g' #any column that starts with string_ will be replaced by new_

columns_replace.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: transactions_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM patients

              #any column that starts with stay will be replaced by stayed_                    functions:
                    functions:  
                      - name: columns_replace
                        sedExpression: 's/^stay/stayed/g' 
Before

+--------------------+---------------+---------+
|             PATIENT|num_days_stay  |stay_days|
+--------------------+---------------+---------+
|b9c610cd-28a6-463...|            -23|     null|
|b9c610cd-28a6-463...|              6|     null|
+--------------------+---------------+---------+
After

+--------------------+---------------+----------+
|     PATIENT        |num_days_stayed|stayed_days|
+--------------------+---------------+-----------+
|b9c610cd-28a6-463...|            -23|     null  |
|b9c610cd-28a6-463...|              6|     null  |
+--------------------+---------------+-----------+

copy

Function Description
copy The copy function copies values from a source column into a destination column.
  functions:
    - name: copy
      fromColumn: {{source_column_name}}
      toColumn: {{destination_column_name}}

copy.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: transactions_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM transactions_connect LIMIT 1

                    functions:
                      - name: copy
                        fromColumn: PAYER
                        toColumn: SECONDARY_PAYER
Before

+--------------------+---------------+
|               PAYER|SECONDARY_PAYER|
+--------------------+---------------+
|7c4411ce-02f1-39b...|           null|
|7c4411ce-02f1-39b...|           null|
+--------------------+---------------+
After

+--------------------+--------------------+
|               PAYER|SECONDARY_PAYER     |
+--------------------+--------------------+
|7c4411ce-02f1-39b...|7c4411ce-02f1-39b...|
|7c4411ce-02f1-39b...|7c4411ce-02f1-39b...|
+--------------------+--------------------+

cut_character

Function Description
cut_character The cut_character function selects parts of a string value, accepting standard cut options.
functions: 
  - name: cut_character 
    column: {{title}} 
    asColumn: {{title_prefix}} 
    startIndex: {{1}} 
    endIndex: {{5}}
cut_character.yaml
  version: v1
  name: standalone-read-bigquery
  type: workflow
  tags:
    - standalone
    - readJob
    - bigquery
  description: Sample job
  workflow:
    dag:
      - name: city-bigquery-read-01
        title: Sample Transaction Data Ingester
        description: The job ingests customer data from Bigquery to file source
        spec:
          tags:
            - standalone
            - readJob
            - bigquery
          stack: flare:3.0
          compute: runnable-default
          tier: connect
          flare:
            job:
              explain: true
              logLevel: INFO

              inputs: # Read from Bigquery
                - name: transactions_connect
                  inputType: bigquery
                  bigquery:
                    table: transactions_standalone_09_nov
                    projected: <project-id>
                    dataset: <dataset-name>

              outputs: # Write to Local System
                - name: finalDf
                  outputType: file
                  file:
                    schemaName: retail
                    tableName: transactions_standalone_readbq02
                    format: Iceberg
                    warehousePath: /data/examples/dataout/bigquerydata
                    options:
                      saveMode: append

              steps:
                - sequence:
                    - name: finalDf
                      sql: SELECT * FROM patients

                      functions:
                        - name: cut_character 
                          column: BIRTHDATE
                          asColumn: birth_year
                          startIndex: 1
                          endIndex: 4

Output

+----------+----------+
| BIRTHDATE|birth_year|
+----------+----------+
|2019-02-17|      2019|
|2005-07-04|      2005|
+----------+----------+

decode

Function Description
decode The decode function decodes a column value as one of base32, base64, or hex following RFC-4648.
functions: 
  - name: decode 
    algo: base64 
    column: col1 
    asColumn: new_col1

decode.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: transactions_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM transactions_connect 

                    functions:
                      - name: decode 
                        algo: base32
                        column: ZIP_ENCODED
                        asColumn: ZIP_DECODED
Before

+-----------+
|ZIP_ENCODED|
+-----------+
|   GIYTENBU|
|   GIYTENBU|
+-----------+
After

+-----+
|  ZIP_DECODED|
+-------------+
|    21244    |
|    21244    |
+-------------+

diff_date

Function Description
diff_date The diff_date function calculates difference between two date columns.
functions: 
  - name: diff_date 
    columnA: col_a 
    columnB: col_b 
    asColumn: col_diff_date

diff_date.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: patients_connect
                inputType: bigquery
                bigquery:
                  table: patients_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: patients_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM patients

                    functions:
                      - name: diff_date
                        columnA: END_YEAR
                        columnB: START_YEAR
                        asColumn: Days_Stayed
Output

+--------------------+--------------------+-----------+
|          START_YEAR|            END_YEAR|Days_Stayed|
+--------------------+--------------------+-----------+
|2019-02-24T05:07:38Z|2020-03-01T05:07:38Z|        371|
|2020-03-01T05:07:38Z|2021-03-07T05:07:38Z|        371|
+--------------------+--------------------+-----------+

drop

Function Description
drop The drop function is used to drop a list of columns.
functions:
  - name: drop
    columns:
      - column_1 
      - column_2

drop.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: patients_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM patients

                    functions:
                      - name: drop
                        columns:
                          - ZIP
                          - STATE_HEADQUARTERED
Before

|-- Id: string (nullable = true)
|-- NAME: string (nullable = true)
|-- ADDRESS: string (nullable = true)
|-- CITY: string (nullable = true)
|-- STATE_HEADQUARTERED: string (nullable = true)
|-- ZIP: string (nullable = true)
|-- PHONE: string (nullable = true)
After

|-- Id: string (nullable = true)
|-- NAME: string (nullable = true)
|-- ADDRESS: string (nullable = true)
|-- CITY: string (nullable = true)
|-- PHONE: string (nullable = true)

drop_duplicates

Function Description
drop_duplicates The drop_duplicates function removes all the duplicate elements from a list of columns.
functions:
  - name: drop_duplicates
    columns:
      - column_1 
      - column_2

drop_duplicates.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: patients_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM patients

                    functions:
                      - name: drop_duplicates
                        columns:
                          - "CITY"
Before

+------------+
|        CITY|
+------------+
|   Baltimore|
|   Baltimore|
|   Baltimore|
|  Louisville|
|     Chicago|
|  Minnetonka|
|    Hartford|
|  Bloomfield|
|Indianapolis|
|        null|
+------------+
After

+------------+
|        CITY|
+------------+
|  Louisville|
|Indianapolis|
|        null|
|     Chicago|
|  Minnetonka|
|  Bloomfield|
|    Hartford|
|   Baltimore|
+------------+

encode

Function Description
encode The encode function encodes a column value as one of base32 , base64, or hex following RFC-4648.
functions:
  - name: encode
    algo: {{base64}} 
    column: {{col1}} 
    asColumn: {{new_col1}}

encode.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: patients_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM patients

                    functions:
                      - name: encode
                        algo: base64
                        column: ZIP_DECODED
                        asColumn: ZIP_ENCODED
Before

+-----+
|  ZIP_DECODED|
+-------------+
|    21244    |
|    21244    |
+-------------+
After

+-----------+
|ZIP_ENCODED|
+-----------+
|   GIYTENBU|
|   GIYTENBU|
+-----------+

epoch_to_timestamp

Function Description
epoch_to_timestamp The epoch_to_timestamp function converts epoch string to timestamp format.
functions:
  - name: epoch_to_timestamp
    column: epoch_column
    asColumn: date_column

version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: patients_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM patients

                    functions:
                      - name: epoch_to_timestamp
                        column: epoch_column
                        asColumn: date_column
Output

+----------+-------------------+
|    epochs|          timestamp|
+----------+-------------------+
|1709541360|2024-03-04 08:36:00|
|1709541360|2024-03-04 08:36:00|
+----------+-------------------+

fill_null_or_empty

Function Description
fill_null_or_empty The fill_null_or_empty function fills column value with a fixed value if it is either null or empty (""). If the column does not exist, then the function will fail.The defaultValue can only be of type string.
functions:
  - name: fill_null_or_empty
    columns:
      column1: {{value1}}
      column2: {{value2}}
fill_null_or_empty.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: patients_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM patients

                    functions:
                      - name: fill_null_or_empty
                        columns:
                          PREFIX: not known

Before

+---------+
|   PREFIX|
+---------+
|   null  |
|   Mr.   |
+---------+

After

+---------+
|   PREFIX|
+---------+
|not known|
|    Mr.  |
+---------+

find_and_replace

Function Description
find_and_replace The find_and_replace function transforms string column values using a "sed"-like expression to find and replace text within the same column.
functions:
  - name: find_and_replace
    column: title
    sedExpression: "s/regex/replacement/g"

find_and_replace.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: patients_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM patients

                    functions:
                        - name: find_and_replace
                          column: OWNERNAME
                          sedExpression: "s/^Adalberto/Albert/g"
Before

+--------------------+
|           OWNERNAME|
+--------------------+
| Damon455 Langosh790|
||Adalberto916 Wuns..|
|Adalberto916 Wuns...|
|Adalberto916 Wuns...|
| Larissa293 Jerde200|
+--------------------+
After

+-------------------+
|          OWNERNAME|
+-------------------+
|Damon455 Langosh790|
|Albert916 Wunsch504|
|Albert916 Wunsch504|
|Larissa293 Jerde200|
+-------------------+

flatten

Function Description
flatten The flatten function separates the elements in a repeated field into individual records. This function is useful for the flexible exploration of repeated data. To maintain the association between each flattened value and the other fields in the record, the FLATTEN directive copies all of the other columns into each new record.
Note :- Use flatten_outer when array has null values and you want records of root with null in flattened columns.
functions:
  - name: flatten
    column: array_holding_column
    asColumn: new_column_name

flatten.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: patients_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM patients

                    functions:
                      - name: flatten
                        column: discount
                        asColumn: discounts
Output

root
|-- discount: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- discount_amount: long (nullable = true)
|    |    |-- discount_percent: long (nullable = true)
|    |    |-- discount_sequence_number: long (nullable = true)
|    |    |-- discount_type: string (nullable = true)
|-- discounts: struct (nullable = true)
|    |-- discount_amount: long (nullable = true)
|    |-- discount_percent: long (nullable = true)
|    |-- discount_sequence_number: long (nullable = true)
|    |-- discount_type: string (nullable = true)

format_date

Function Description
format_date The format_date function allows custom patterns for date-time formatting.
functions:
  - name: format_date
    column: date_column
    format: "yyyy-MM-dd'T'HH:mm:ss"
format_date.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: patients_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM patients

                    functions:
                      - name: format_date
                        column: START_YEAR
                        refformat: "yyyy/MM/dd'T' HH:mm:ss"

Before

+--------------------+
|          START_YEAR|
+--------------------+
|2019-02-24T05:07:38Z|
|2020-03-01T05:07:38Z|
+--------------------+
After

+--------------------+
|          START_YEAR|
+--------------------+
|2019/02/24T 05: 0...|
|2020/03/01T 05: 0...|
+--------------------+

format_unix_date

Function Description
format_unix_date The format_unix_date function allows custom patterns for date-time formatting.
functions:
  - name: format_unix_date
    column: {{unix_epoch_column}}
    format: {{"yyyy-MM-dd'T'HH:mm:ss"}}

format_unix_date.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: patients_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM patients

                    functions:
                      - name: format_unix_date
                        column: {{unix_epoch_column}}
                        format: "yyyy-MM-dd'T'HH:mm:ss"
Before:

+------------+
|epoch_column|
+------------+
|   178976646|
|   178976646|
+------------+
After:

+-------------------+
|       epoch_column|
+-------------------+
|1975-09-03T11:44:06|
|1975-09-03T11:44:06|
+-------------------+

generate_uuid

Function Description
generate_uuid The generate_uuid function generates a universally unique identifier (UUID) of the record.
functions:
  - name: generate_uuid
    asColumn: {{column_01}}

generate_uuid.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: patients_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM patients

                    functions:
                      - name: generate_uuid
                        asColumn: UUID_COLUMN
Output

----------------+
|         UUID_COLUMN|
+--------------------+
|0b74fc7f-5043-477...|
|bbe0c644-6361-441...|
+--------------------+

hash

Function Description
hash The hash function generates a message digest. The column is replaced with the digest created using the supplied algorithm. The type of column is a string.
functions:
  - name: hash
    column: column_to_hash
    algo: MD5 | SHA-1 | SHA-256 | SHA-384 | SHA-512

hash.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: patients_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM patients

                    functions:
                      - name: hash
                        column: PHONE
                        algo: MD5
Before:

+--------------+
|    PHONE     |
+--------------+
|1-877-267-2323|
|1-800-633-4227|
+--------------+
After:

+--------------------+
|       PHONE        |
+--------------------+
|75cb2eeea30b8b577...|
|d71c4d3eae0755df6...|
+--------------------+

increment_variable

Function Description
increment_variable The increment_variable function increments the value of the variable that is local to the input record being processed.
functions:
  - name: increment_variable
    column: column_01

mask_number

Function Description
mask_number The mask_number function applies substitution masking on the column values.
The 'column' specifies the name of an existing column to be masked.
The 'pattern' is a substitution pattern to be used to mask the column values.
Substitution masking is generally used for masking credit card or social security numbers. The MASK_NUMBER applies substitution masking on the column values. This type of masking is fixed masking, where the pattern is applied on the fixed length string.
These rules are used for the pattern:
• Use of # will include the digit from the position.
• Use x or any other character to mask the digit at that position.
• E.g. For SSN '000-00-0000' and pattern: 'XXX-XX-####' output would be like: XXX-XX-0000
functions:
  - name: mask_number
    column: {{column01}}
    pattern: {{XXX-XX-####}}

mask_number.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: patients_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM patients

                    functions:
                      - name: mask_number
                        column: SSN
                        pattern: XXX-XX-####
Before:

+-----------+
|    SSN    |
+-----------+
|999-65-3251|
|999-49-3323|
+-----------+
After:

+-----------+
|    SSN    |
+-----------+
|XXX-XX-3251|
|XXX-XX-3323|
+-----------+

merge

Function Description
merge The merge function merges two or more columns by inserting a third column specified as asColumn into a row. The values in the third column are merged values from the specified columns delimited by a specified separator.
functions:
  - name: merge
    separator: "__"
    columns:
      - first_name
      - last_name
    asColumn: full_name
merge.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: patients_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM patients

                    functions:
                      - name: merge
                        separator: " "
                        columns:
                          - FIRST
                          - LAST
                        asColumn: full_name
+--------+----------+-------------------+
|   FIRST|      LAST|          full_name|
+--------+----------+-------------------+
|Damon   |Langosh   |  Damon  Langosh   |
|  Thi   | Wunsch   |    Thi  Wunsch    |
+--------+----------+-------------------+

parse_as_json

Function Description
parse_as_json The parse_as_json function is for parsing a JSON object. The function can operate on String or JSON Object types. It requires spark schema json to parse the json back into dataframe.
functions:
  - name: parse_as_json
    column: json_string_column_name
    asColumn: column_name
    sparkSchema: "<spark_schema_json>"
    avroSchema: "<avro_schema>"

parse_html

Function Description
parse_html The parse_html function is used to convert the HTML-coded string to a normal string without any html tags. Here, asColumn is an optional parameter incase you wish to create a separate column for the processed data. Else, the processed data will replace the original column on which the function is performed. The function works using the jsoup library. More details about this library can be found here: https://github.com/jhy/jsoup
functions:
  - name: parse_html
    column: questionText
    asColumn: parsedText

pivot

Function Description
pivot The pivot function is used to pivot/rotate the data from one DataFrame/Dataset column into multiple columns (transform row to columns).
Here, values and approach are optional parameters. Also, aggregate_expression requires an alias to be written the same as the column name used with aggregate functions like sum, count, avg, etc.
Values can be used to specify only those columns needed after pivot from column .
Approach can be set to “two-phase” for running an optimized version query on large datasets.
functions:
  - name: pivot
    groupBy:
      - {{"Product"}}
      - "{{Country"}}
    column: {{"Country"}}
    values:
      - {{"USA"}}
      - {{"Mexico"}}
      - {{"India"}}
    aggregateExpression: {{"sum(Amount) as Amount"}}
    approach: {{"two-phase"}}

pivot.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: patients_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM patients

                    functions:
                      - name: pivot
                        groupBy:
                          - "Product"
                          - "Country"
                        column: "Country"
                        values:
                          - "USA"
                          - "Mexico"
                          - "India"
                        aggregateExpression: "sum(Amount) as Amount"
                        approach: "two-phase"
Output

| ADDRESS           | STATE_HEADQUARTERED | null | Baltimore  | Bloomfield | Chicago | Hartford  | Indianapolis | Louisville | Minnetonka  |
|-------------------|---------------------|------|------------|------------|---------|-----------|--------------|------------|------------|
| Security Blvd     | MD                  | null | 2.309825E7 | null       | null    | null      | null         | null       | null       |
| Virginia Ave      | IN                  | null | null       | null       | null    | null      | 1.598388E8   | null       | null       |
| Farmington Ave    | CT                  | null | null       | null       | null    | 1.51906E8 | null         | null       | null       |
| Healthcare Lane   | MN                  | null | null       | null       | null    | null      | null         | null       | 1.441036E8 |

Example 2: with values added, values is optional here

+--------------------+-------------------+------+-----+
|             ADDRESS|STATE_HEADQUARTERED|Mexico|India|
+--------------------+-------------------+------+-----+
|       Security Blvd|                 MD|  null| null|
|                null|               null|  null| null|
|        Virginia Ave|                 IN|  null| null|
|      Farmington Ave|                 CT|  null| null|
|     Healthcare Lane|                 MN|  null| null|
+--------------------+-------------------+------+-----+

rename

Function Description
rename The rename function will change the name of a supplied column to a new column name.
functions:
  - name: rename
    column: column_name
    asColumn: new_column_name

rename_all

Function Description
rename_all The rename_all function will change the names of a supplied in columns to values.
functions:
  - name: rename_all
    columns:
      column1: new_column1
      column2: new_column2

rename_all.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: patients_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM patients

                    functions:
                      - name: rename_all
                        columns:
                          CITY: payers_city
                          Id: payers_id
Before

+--------------------+----------+
|                  Id|      CITY|
+--------------------+----------+
|d47b3510-2895-3b7...|Louisville|
|6e2f1a2d-27bd-370...|   Chicago|
+--------------------+----------+
After

+--------------------+-----------+
|           payers_id|payers_city|
+--------------------+-----------+
|d47b3510-2895-3b7...| Louisville|
|6e2f1a2d-27bd-370...|    Chicago|
+--------------------+-----------+

select

Function Description
select The select function is used to keep specified columns from the record. This is the opposite behaviour of the DROP function.
functions:
  - name: select
    columns:
      - column_01
      - column_02
      - column_03
      - column_04
      - column_05
      - column_06

select.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: patients_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM patients

                    functions:
                      - name: select
                        columns:
                          - customer_index
                          - discount
Output

+--------------+--------------------+
|customer_index|            discount|
+--------------+--------------------+
|          1971|[{null, null, 1, ...|
|          1516|[{45, 15, 1, 15% ...|
+--------------+--------------------+

### set_column

Function Description
set_column The set_column function will change name of a supplied in column to value in asColumn.
functions:
  - name: set_column
    column: my_col_name
    value: "some value here"
set_column.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: patients_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM patients

                    functions:
                      - name: set_column
                        column: customer_index
                        value: "iamgroot"

set_type

Function Description
set_type The set_type function converts the data type of a column. Here type can be one of the Spark data types e.g. int, string, long, double, etc.
functions:
  - name: set_type
    columns:
      column1: type
      column2: type

set_type.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: patients_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM patients

                    functions:
                      - name: set_type
                        columns:
                          HEALTHCARE_EXPENSES: float
                          HEALTHCARE_COVERAGE: float
Before

|-- HEALTHCARE_EXPENSES: string (nullable = true)
|-- HEALTHCARE_COVERAGE: string (nullable = true)
After

|-- HEALTHCARE_EXPENSES: float (nullable = true)
|-- HEALTHCARE_COVERAGE: float (nullable = true)

set_variable

Function Description
set_variable The set_variable function evaluates the expression supplied and sets the value in the variable.
functions:
  - name: set_variable
    column: some_new_column
    expression: "ROUND(AVG(src_bytes), 2)"

set_variable.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: patients_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM patients

                    functions:
                      - name: set_variable
                        column: HEALTHCARE_EXPENSES_ROUNDED
                        expression: ROUND(HEALTHCARE_EXPENSES, 0)
Output

+-------------------+---------------------------+
|HEALTHCARE_EXPENSES|HEALTHCARE_EXPENSES_ROUNDED|
+-------------------+---------------------------+
|           9039.164|                     9039.0|
|           402723.4|                   402723.0|
+-------------------+---------------------------+

snake_case

Function Description
snake_case The snake_case function converts column names from camel case to snake case and is only applicable for batch dataframe/ job.
functions:
  - name: snake_case
snake_case.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: patients_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM patients

                    functions:
                      - name: snake_case

Before

|-- COVERED_ENCOUNTERS: string (nullable = true)
|-- UNCOVERED_ENCOUNTERS: string (nullable = true)
|-- COVERED_MEDICATIONS: string (nullable = true)
|-- UNCOVERED_MEDICATIONS: string (nullable = true)
|-- COVERED_PROCEDURES: string (nullable = true)
After

|-- covered_encounters: string (nullable = true)
|-- uncovered_encounters: string (nullable = true)
|-- covered_medications: string (nullable = true)
|-- uncovered_medications: string (nullable = true)
|-- covered_procedures: string (nullable = true)

split_email

Function Description
split_email The split_email function splits/parses an email ID into its two constituent parts: account and domain. After splitting the email address stored in the column within the column property, the directive will create two new columns, appending to the original column, named: column_account, and column_domain. If the email address cannot be parsed correctly, the additional columns will still be generated, but they would be set to null depending on the parts that could not be parsed.
functions:
  - name: split_email
    column: email_column
split_email.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: patients_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM patients

                    functions:
                      - name: split_email
                        column: Email
+------------------+-------------+------------+
|             Email|Email_account|Email_domain|
+------------------+-------------+------------+
|iamgroot@gmail.com|     iamgroot|   gmail.com|
|iamgroot@gmail.com|     iamgroot|   gmail.com|
+------------------+-------------+------------+

split_url

Function Description
split_url The split_url function splits a URL into protocol, authority, host, port, path, filename, and query. The function will parse the URL into its constituents. Upon splitting the URL, the directive creates seven new columns by appending to the original
column name: column_protocol
column_authority
column_host
column_port
column_path
column_filename
column_query
If the URL cannot be parsed correctly, an exception is thrown. If the URL column does not exist, columns with a null value are added to the record.
functions:
  - name: split_url
    column: column_with_url_content
split_url.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: patients_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM patients

                    functions:
                      - name: split_url
                        column: URL
+-----------------+------------+-------------+--------+
|              URL|URL_protocol|URL_authority|URL_port|
+-----------------+------------+-------------+--------+
|www.spiderman.com|        null|         null|    null|
|www.spiderman.com|        null|         null|    null|
+-----------------+------------+-------------+--------+

swap

Function Description
swap The swap function swaps the column names of two columns.
functions:
  - name: swap
    columnA: col_1
    columnB: col_2

swap.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: patients_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM patients

                    functions:
                      - name: swap
                        columnA: Id
                        columnB: ZIP
Before

+--------------------+-----+
|                  Id|  ZIP|
+--------------------+-----+
|b3221cfc-24fb-339...|21244|
|7caa7254-5050-3b5...|21244|
|7c4411ce-02f1-39b...|21244|
|d47b3510-2895-3b7...|40018|
+--------------------+-----+
After

+-----+--------------------+
|   Id|                 ZIP|
+-----+--------------------+
|21244|b3221cfc-24fb-339...|
|21244|7caa7254-5050-3b5...|
|21244|7c4411ce-02f1-39b...|
|40018|d47b3510-2895-3b7...|
+-----+--------------------+

trim

Function Description
trim The trim function trim whitespace from both sides, left side or right side of string values they are applied to. One can supply method as trim/ltrim/rtrim
functions:
  - name: trim
    column: col_001
    method: trim

unfurl

Function Description
unfurl The unfurl function uses the expression supplied to pull columns from within the nested JSON attribute out e.g. columnName.*
functions:
  - name: unfurl
    expression: explode(col01)  #it deconstructs array
  - name: unfurl
    expression: col02.* #it deconstructs struct
unfurl.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: patients_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM patients

                    functions:
                      - name: unfurl
                        expression: explode(discount) as discount01

Before

|-- discount: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- discount_amount: long (nullable = true)
|    |    |-- discount_percent: long (nullable = true)
|    |    |-- discount_sequence_number: long (nullable = true)
|    |    |-- discount_type: string (nullable = true)
After

|-- discount01: struct (nullable = true)
|    |-- discount_amount: long (nullable = true)
|    |-- discount_percent: long (nullable = true)
|    |-- discount_sequence_number: long (nullable = true)
|    |-- discount_type: string (nullable = true)

unpivot

Function Description
unpivot The unpivot function works as the reverse function for the pivot function in which you can achieve rotating column values into rows values.
functions:
  - name: unpivot
    columns: # Columns can have - "*" if need to select all remaining columns than pivot columns
      - {{USA}} or "*"
    pivotColumns: # pivotColumns can have - "*" if need to select all remaining columns than columns
      - {{Col1}}
      - {{Col2}}
      - {{Col3}}
    keyColumnName: {{Country}}
    valueColumnName: {{Amount}}

unpivot.yaml
version: v1
name: standalone-read-bigquery
type: workflow
tags:
  - standalone
  - readJob
  - bigquery
description: Sample job
workflow:
  dag:
    - name: city-bigquery-read-01
      title: Sample Transaction Data Ingester
      description: The job ingests customer data from Bigquery to file source
      spec:
        tags:
          - standalone
          - readJob
          - bigquery
        stack: flare:3.0
        compute: runnable-default
        tier: connect
        flare:
          job:
            explain: true
            logLevel: INFO

            inputs: # Read from Bigquery
              - name: patients_connect
                inputType: bigquery
                bigquery:
                  table: transactions_standalone_09_nov
                  projected: <project-id>
                  dataset: <dataset-name>

            outputs: # Write to Local System
              - name: finalDf
                outputType: file
                file:
                  schemaName: retail
                  tableName: transactions_standalone_readbq02
                  format: Iceberg
                  warehousePath: /data/examples/dataout/bigquerydata
                  options:
                    saveMode: append

            steps:
              - sequence:
                  - name: finalDf
                    sql: SELECT * FROM patients

                    functions:
                      - name: unpivot
                        columns: "*"
                        pivotColumns:
                          - Col1
                          - Col2
                          - Col3
                        keyColumnName: Country
                        valueColumnName: Amount
pivoted_column

+--------------------+-------------------+------+-----+
|             ADDRESS|STATE_HEADQUARTERED|Mexico|India|
+--------------------+-------------------+------+-----+
|  7500 Security Blvd|                 MD|  null| null|
|                null|               null|  null| null|
|    220 Virginia Ave|                 IN|  null| null|
|  151 Farmington Ave|                 CT|  null| null|
|9800 Healthcare Lane|                 MN|  null| null|
+--------------------+-------------------+------+-----+
unpivoted_column

+------------------+-------------------+------+-------+
|           ADDRESS|STATE_HEADQUARTERED|  CITY|Revenue|
+------------------+-------------------+------+-------+
|7500 Security Blvd|                 MD| India|   null|
|7500 Security Blvd|                 MD|Mexico|   null|
|              null|               null| India|   null|
|              null|               null|Mexico|   null|
|  220 Virginia Ave|                 IN| India|   null|
+------------------+-------------------+------+-------+

Note

Make sure provided list has the same datatype else it will throw an error. Do not forget to clean the column before passing * in columns if column names have hyphens and spaces.

Was this page helpful?