Skip to content

Merge Into Query

Merge Into functionality is used when you need to update values in the existing table using flare for a certain condition.

MERGE INTO allows you to perform an upsert operation (update or insert) between a target table and a source table. The operation works as follows:

  • Update: If the condition matches an existing row in the target table, the record is updated.

  • Insert: If the condition does not match, a new row is inserted into the target table.

  • It updates the target table using a source table using a set of conditions and updates records. This is a row-specific update where the row is found based on the ON clause.

Spark 3 added support for MERGE INTO queries that can express row-level updates.

Case Scenario

Let’s say we have written a dataset of a city (i.e., city_merge) and it has all the information about the city and now the city data has changed.

In the city data (Source table) information about zip_code = 36006 has changed.

Column Name Old Value New Value
city_name Billingsley abc
country_name Autauga abcd
state_code AL ab
state_name Alabama abcde

Now we want to update this on zip_code (i.e. 36006) records only.

Existing city table.

Screen Shot 2022-06-02 at 1.08.49 AM.png

City table with updated records (Using merge into the function)

Screen Shot 2022-06-02 at 1.21.52 AM.png

How to define merge in Flare.

Syntax

outputOptions:
  saveMode: overwrite
  iceberg:
    merge:
      onClause: "old.old_table_column_a = new.new_table_column_a"
      whenClause: "MATCHED THEN UPDATE SET old.old_table_column_b = new.old_table_column_b"

onClause: We can use multiple clauses for ON (the clauses should be separated using AND operator) We can’t use OR for multiple clauses.

whenClause: This will be executed when onClause condition matches and whenClause conditions can be referred.

**old** : Refers to the existing dataset in Lakehouse.

**new**: Refers to the data frame by you are updating existing dataset.

Info

Merge Into requires below sparkConf and this is mandatory for merge into function to work.

sparkConf:  # spark configuration 
    - spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions  # mandatory for merge into

Info

Below flare sequence is only to create updated sample data (which will have new city_name, country_name, state_code and state_name) for zip_code = 36006.

# **Update set using multiple column** 
- sequence:     # data transformation steps using Spark SQL 
    - name: main 
      sql: select * from input where zip_code = "36006"   # Selecting only zip code (36006) 
    - name: new_city  # Updating table value for zip code (36006)  
      sql: select  city_id, zip_code, 'abc' as city_name, 'abcd' as county_name, 'ab' as state_code , 'abcde' as state_name from main
# **Update set using single column** 
merge:
  onClause: "old.city_id = new.city_id"
  whenClause: "MATCHED THEN UPDATE SET old.state_name = new.state_name"

Flare workflow for merge into functionality

version: v1
name: iceberg-merge-job-011 #workflow name
type: workflow
tags:
  - Connect
  - merge
  - iceberg
description: Merges city data into Iceberg table based on zip_code
workflow:
  title: Merge City Data into Iceberg
  dag:
    - name: merge-iceberg-city
      title: Merging city dataset
      description: Performs a MERGE INTO operation on Iceberg table using zip_code
      spec:
        tags:
          - merge
          - iceberg
        stack: flare:6.0
        compute: runnable-default
        envs:
          DISABLE_HADOOP_PATH_CHECKS: "true"
        stackSpec:
          job:
            explain: true
            logLevel: INFO
            inputs:
              - name: input01
                dataset: dataos://lakehouse:retail/city
                format: iceberg
                # schemaPath: dataos://lakehouse:default/schemas/city.avsc
                options:
                  saveMode: append
            outputs:
              - name: city_merge_output
                dataset: dataos://lakehouse:syndicate/city_merge
                format: iceberg
                options:
                  saveMode: overwrite
                  merge:
                    onClause: "old.zip_code = new.zip_code"
                    whenClause: >
                      MATCHED THEN UPDATE SET
                      old.city_name = new.city_name,
                      old.county_name = new.county_name,
                      old.state_code = new.state_code,
                      old.state_name = new.state_name
                  properties:
                    write.format.default: parquet
                    write.metadata.compression-codec: gzip
            steps:
              - sequence:
                  - name: city_source 
                    sql: select * from input01 where zip_code = "36006"   # Selecting only zip code (36006) 
                  - name: city_merge_output  # Updating table value for zip code (36006)  
                    sql: select city_id, zip_code, 'abc' as city_name, 'abcd' as county_name, 'ab' as state_code , 'abcde' as state_name from city_source 

Tip

If you are using Flare 5.0 we recommend to use sparkConf in the flare manifest file.

Note

```yaml version: v1 name: iceberg-merge-job-011 #workflow name type: workflow tags: - Connect - merge - iceberg description: Merges city data into Iceberg table based on zip_code workflow: title: Merge City Data into Iceberg dag: - name: merge-iceberg-city title: Merging city dataset description: Performs a MERGE INTO operation on Iceberg table using zip_code spec: tags: - merge - iceberg stack: flare:5.0 compute: runnable-default envs: DISABLE_HADOOP_PATH_CHECKS: "true" stackSpec: job: explain: true logLevel: INFO inputs: - name: input01 dataset: dataos://lakehouse:retail/city format: iceberg # schemaPath: dataos://lakehouse:default/schemas/city.avsc options: saveMode: append outputs: - name: city_merge_output dataset: dataos://lakehouse:syndicate/city_merge format: iceberg options: saveMode: overwrite merge: onClause: "old.zip_code = new.zip_code" whenClause: > MATCHED THEN UPDATE SET old.city_name = new.city_name, old.county_name = new.county_name, old.state_code = new.state_code, old.state_name = new.state_name properties: write.format.default: parquet write.metadata.compression-codec: gzip steps: - sequence: - name: city_source sql: select * from input01 where zip_code = "36006" # Selecting only zip code (36006) - name: city_merge_output # Updating table value for zip code (36006)
sql: select city_id, zip_code, 'abc' as city_name, 'abcd' as county_name, 'ab' as state_code , 'abcde' as state_name from city_source

        sparkConf:  # spark configuration 
          - spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions  # mandatory for merge into function to work  
    ```