Merge Into Query¶
Merge Into functionality is used when you need to update values in the existing table using flare for a certain condition.
MERGE INTO allows you to perform an upsert operation (update or insert) between a target table and a source table. The operation works as follows:
-
Update: If the condition matches an existing row in the target table, the record is updated.
-
Insert: If the condition does not match, a new row is inserted into the target table.
- It updates the target table using a source table using a set of conditions and updates records. This is a row-specific update where the row is found based on the ON clause.
Spark 3 added support for MERGE INTO queries that can express row-level updates.
Case Scenario¶
Let’s say we have written a dataset of a city (i.e., city_merge
) and it has all the information about the city and now the city data has changed.
In the city data (Source table) information about zip_code = 36006
has changed.
Column Name | Old Value | New Value |
---|---|---|
city_name | Billingsley | abc |
country_name | Autauga | abcd |
state_code | AL | ab |
state_name | Alabama | abcde |
Now we want to update this on zip_code (i.e. 36006) records only.
Existing city table.¶
City table with updated records (Using merge into the function)¶
How to define merge in Flare.¶
Syntax
outputOptions:
saveMode: overwrite
iceberg:
merge:
onClause: "old.old_table_column_a = new.new_table_column_a"
whenClause: "MATCHED THEN UPDATE SET old.old_table_column_b = new.old_table_column_b"
onClause
: We can use multiple clauses for ON (the clauses should be separated using AND
operator) We can’t use OR
for multiple clauses.
whenClause
: This will be executed when onClause
condition matches and whenClause
conditions can be referred.
**old**
: Refers to the existing dataset in Lakehouse.
**new**
: Refers to the data frame by you are updating existing dataset.
Info
Merge Into requires below sparkConf and this is mandatory for merge into function to work.
Info
Below flare sequence
is only to create updated sample data (which will have new city_name
, country_name
, state_code
and state_name
) for zip_code = 36006
.
# **Update set using multiple column**
- sequence: # data transformation steps using Spark SQL
- name: main
sql: select * from input where zip_code = "36006" # Selecting only zip code (36006)
- name: new_city # Updating table value for zip code (36006)
sql: select city_id, zip_code, 'abc' as city_name, 'abcd' as county_name, 'ab' as state_code , 'abcde' as state_name from main
# **Update set using single column**
merge:
onClause: "old.city_id = new.city_id"
whenClause: "MATCHED THEN UPDATE SET old.state_name = new.state_name"
Flare workflow for merge into functionality¶
version: v1
name: iceberg-merge-job-011 #workflow name
type: workflow
tags:
- Connect
- merge
- iceberg
description: Merges city data into Iceberg table based on zip_code
workflow:
title: Merge City Data into Iceberg
dag:
- name: merge-iceberg-city
title: Merging city dataset
description: Performs a MERGE INTO operation on Iceberg table using zip_code
spec:
tags:
- merge
- iceberg
stack: flare:6.0
compute: runnable-default
envs:
DISABLE_HADOOP_PATH_CHECKS: "true"
stackSpec:
job:
explain: true
logLevel: INFO
inputs:
- name: input01
dataset: dataos://lakehouse:retail/city
format: iceberg
# schemaPath: dataos://lakehouse:default/schemas/city.avsc
options:
saveMode: append
outputs:
- name: city_merge_output
dataset: dataos://lakehouse:syndicate/city_merge
format: iceberg
options:
saveMode: overwrite
merge:
onClause: "old.zip_code = new.zip_code"
whenClause: >
MATCHED THEN UPDATE SET
old.city_name = new.city_name,
old.county_name = new.county_name,
old.state_code = new.state_code,
old.state_name = new.state_name
properties:
write.format.default: parquet
write.metadata.compression-codec: gzip
steps:
- sequence:
- name: city_source
sql: select * from input01 where zip_code = "36006" # Selecting only zip code (36006)
- name: city_merge_output # Updating table value for zip code (36006)
sql: select city_id, zip_code, 'abc' as city_name, 'abcd' as county_name, 'ab' as state_code , 'abcde' as state_name from city_source
Tip
If you are using Flare 5.0 we recommend to use sparkConf in the flare manifest file.
Note
```yaml
version: v1
name: iceberg-merge-job-011 #workflow name
type: workflow
tags:
- Connect
- merge
- iceberg
description: Merges city data into Iceberg table based on zip_code
workflow:
title: Merge City Data into Iceberg
dag:
- name: merge-iceberg-city
title: Merging city dataset
description: Performs a MERGE INTO operation on Iceberg table using zip_code
spec:
tags:
- merge
- iceberg
stack: flare:5.0
compute: runnable-default
envs:
DISABLE_HADOOP_PATH_CHECKS: "true"
stackSpec:
job:
explain: true
logLevel: INFO
inputs:
- name: input01
dataset: dataos://lakehouse:retail/city
format: iceberg
# schemaPath: dataos://lakehouse:default/schemas/city.avsc
options:
saveMode: append
outputs:
- name: city_merge_output
dataset: dataos://lakehouse:syndicate/city_merge
format: iceberg
options:
saveMode: overwrite
merge:
onClause: "old.zip_code = new.zip_code"
whenClause: >
MATCHED THEN UPDATE SET
old.city_name = new.city_name,
old.county_name = new.county_name,
old.state_code = new.state_code,
old.state_name = new.state_name
properties:
write.format.default: parquet
write.metadata.compression-codec: gzip
steps:
- sequence:
- name: city_source
sql: select * from input01 where zip_code = "36006" # Selecting only zip code (36006)
- name: city_merge_output # Updating table value for zip code (36006)
sql: select city_id, zip_code, 'abc' as city_name, 'abcd' as county_name, 'ab' as state_code , 'abcde' as state_name from city_source
sparkConf: # spark configuration
- spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions # mandatory for merge into function to work
```