Job Optimization by Tuning¶
The following document outlines the optimized Flare job configuration for enhanced performance. Various adjustments have been made to both the driver and executor settings, as well as key Spark configurations. These changes aim to improve resource allocation, optimize memory usage, and enhance the execution speed of Flare jobs.
Code Snippet¶
version: v1
name: wf-orders-enriched-data
type: workflow
tags:
- customers
- orders
description: The purpose of the workflow is to retrieve clickstream data from blobstorage to Lakehouse.
workflow:
title: Customer Order Enriched Data
dag:
- name: orders-enriched-data
title: Customer Order Enriched Data
description: The purpose of the workflow is to retrieve clickstream data from blobstorage to Lakehouse.
spec:
tags:
- customers
- orders
stack: flare:6.0
compute: runnable-default
stackSpec:
driver:
coreLimit: 2000m
cores: 2
memory: 2000m
executor:
coreLimit: 2200m
cores: 3
instances: 2
memory: 4800m
job:
explain: true
inputs:
- name: region
dataset: dataos://snowflake:TPCH_SF10/REGION
- name: customer
dataset: dataos://snowflake:TPCH_SF10/CUSTOMER
- name: lineitem
dataset: dataos://snowflake:TPCH_SF10/LINEITEM
incremental:
context: incrinputs
sql: select * from incrinputs where l_commitdate between '$|start_time|' AND '$|end_time|'
keys:
- name: start_time
sql: select '1998-07-31'
- name: end_time
sql: select '1998-08-29'
- name: nation
dataset: dataos://snowflake:TPCH_SF10/NATION
- name: orders
dataset: dataos://snowflake:TPCH_SF10/ORDERS
incremental:
context: incrinputs
sql: select * from incrinputs where O_ORDERDATE between '$|start_time|' AND '$|end_time|'
keys:
- name: start_time
sql: select '1998-07-01'
- name: end_time
sql: select '1998-07-30'
- name: part
dataset: dataos://snowflake:TPCH_SF10/PART
- name: partsupp
dataset: dataos://snowflake:TPCH_SF10/PARTSUPP
- name: suppliers
dataset: dataos://snowflake:TPCH_SF10/SUPPLIER
logLevel: INFO
outputs:
- name: last_5_tans_each_cust
dataset: dataos://lakehouse:sandbox/last_5_tans_each_cust?acl=rw
format: Iceberg
description: This dataset has information of all event
tags:
- clickstream
options:
saveMode: append
sort:
mode: partition
columns:
- name: O_ORDERDATE
order: desc
iceberg:
partitionSpec:
- type: identity
column: event_name
properties:
write.format.default: parquet
write.metadata.compression-codec: gzip
partitionSpec:
- type: month
column: O_ORDERDATE
name: month
- type: identity
column: user_agent
title: Customer Order Enriched Data
steps:
- sequence:
- name: nation_region
sql: select * from nation left join region on n_regionkey = r_regionkey
- name: cust_repart
sql: SELECT /*+ REPARTITION(c_nationkey) */ * FROM customer
- name: customer_nation_region # customer_final
sql: select /*+ MAPJOIN(nation_region) */ * from cust_repart left join nation_region on c_nationkey = n_nationkey
- name: repart_customer_nation_region
sql: select /*+ REPARTITION(100,c_custkey) */ * from customer_nation_region
- name: ps_repart
sql: select /*+ REPARTITION(ps_suppkey,ps_PARTKEY) */ * from partsupp
- name: partsupp_supp
sql: select * from ps_repart left join suppliers on ps_suppkey = s_suppkey
- name: partsupp_supp_part
sql: select * from partsupp_supp left join part on ps_partkey = p_partkey
# Order
- name: filtered_order
sql: select *, row_number() over (partition by o_custkey order by o_orderdate desc) as ranked from orders
- name: latest_order
sql: select /*+ REPARTITION_BY_RANGE(100,o_orderkey) */ * from filtered_order where ranked <= 2
- name: order_cust
sql: select * from latest_order left join repart_customer_nation_region on o_custkey = c_custkey
- name: repart_order_cust
sql: select /*+ REPARTITION(100,o_orderkey) */ * from order_cust
- name: line_repart
sql: select * from lineitem
- name: order_cust_line
sql: select * from repart_order_cust right join line_repart on o_orderkey = l_orderkey
- name: repart_order_cust_line
sql: select /*+ REPARTITION(100,l_suppkey,l_partkey)*/ * from order_cust_line
- name: last_5_tans_each_cust
sql: select * from repart_order_cust_line left join partsupp_supp_part on s_suppkey = l_suppkey and p_partkey = l_partkey
sparkConf:
- spark.sql.adaptive.autoBroadcastJoinThreshold: 10m
- spark.sql.shuffle.partitions: 50
- spark.sql.optimizer.dynamicPartitionPruning.enabled: true
- spark.serializer: org.apache.spark.serializer.KryoSerializer
- spark.default.parallelism: 45
- spark.sql.broadcastTimeout: 15
Method: 1¶
driver:
coreLimit: 3800m
cores: 2
memory: 4000m
executor:
coreLimit: 7500m
cores: 3
instances: 3
memory: 8000m
sparkConf:
- spark.sql.adaptive.autoBroadcastJoinThreshold: 20m
- spark.sql.shuffle.partitions: 450
- spark.sql.optimizer.dynamicPartitionPruning.enabled: true
- spark.serializer: org.apache.spark.serializer.KryoSerializer
- spark.default.parallelism: 300
- spark.sql.broadcastTimeout: 200
Total Duration: 8.13
RUNTIME | PROGRESS | STARTED | FINISHED
------------|----------|---------------------------|----------------------------
succeeded | 8/8 | 2022-08-10T22:36:43+05:30 | 2022-08-10T22:44:16+05:30
Click here to see the full manifest file
version: v1
name: wf-orders-enriched-data
type: workflow
tags:
- customers
- orders
description: The purpose of the workflow is to retrieve clickstream data from blobstorage to Lakehouse.
workflow:
title: Customer Order Enriched Data
dag:
- name: orders-enriched-data
title: Customer Order Enriched Data
description: The purpose of the workflow is to retrieve clickstream data from blobstorage to Lakehouse.
spec:
tags:
- customers
- orders
stack: flare:6.0
compute: runnable-default
stackSpec:
driver:
coreLimit: 3800m
cores: 2
memory: 4000m
executor:
coreLimit: 7500m
cores: 3
instances: 3
memory: 8000m
job:
explain: true
inputs:
- name: region
dataset: dataos://snowflake:TPCH_SF10/REGION
- name: customer
dataset: dataos://snowflake:TPCH_SF10/CUSTOMER
- name: lineitem
dataset: dataos://snowflake:TPCH_SF10/LINEITEM
incremental:
context: incrinputs
sql: select * from incrinputs where l_commitdate between '$|start_time|' AND '$|end_time|'
keys:
- name: start_time
sql: select '1998-07-31'
- name: end_time
sql: select '1998-08-29'
- name: nation
dataset: dataos://snowflake:TPCH_SF10/NATION
- name: orders
dataset: dataos://snowflake:TPCH_SF10/ORDERS
incremental:
context: incrinputs
sql: select * from incrinputs where O_ORDERDATE between '$|start_time|' AND '$|end_time|'
keys:
- name: start_time
sql: select '1998-07-01'
- name: end_time
sql: select '1998-07-30'
- name: part
dataset: dataos://snowflake:TPCH_SF10/PART
- name: partsupp
dataset: dataos://snowflake:TPCH_SF10/PARTSUPP
- name: suppliers
dataset: dataos://snowflake:TPCH_SF10/SUPPLIER
logLevel: INFO
outputs:
- name: last_5_tans_each_cust
dataset: dataos://lakehouse:sandbox/last_5_tans_each_cust?acl=rw
format: Iceberg
description: This dataset has information of all event
tags:
- clickstream
options:
saveMode: append
sort:
mode: partition
columns:
- name: O_ORDERDATE
order: desc
iceberg:
partitionSpec:
- type: identity
column: event_name
properties:
write.format.default: parquet
write.metadata.compression-codec: gzip
partitionSpec:
- type: month
column: O_ORDERDATE
name: month
- type: identity
column: user_agent
title: Customer Order Enriched Data
steps:
- sequence:
- name: nation_region
sql: select * from nation left join region on n_regionkey = r_regionkey
- name: cust_repart
sql: SELECT /*+ REPARTITION(c_nationkey) */ * FROM customer
- name: customer_nation_region # customer_final
sql: select /*+ MAPJOIN(nation_region) */ * from cust_repart left join nation_region on c_nationkey = n_nationkey
- name: repart_customer_nation_region
sql: select /*+ REPARTITION(100,c_custkey) */ * from customer_nation_region
- name: ps_repart
sql: select /*+ REPARTITION(ps_suppkey,ps_PARTKEY) */ * from partsupp
- name: partsupp_supp
sql: select * from ps_repart left join suppliers on ps_suppkey = s_suppkey
- name: partsupp_supp_part
sql: select * from partsupp_supp left join part on ps_partkey = p_partkey
# Order
- name: filtered_order
sql: select *, row_number() over (partition by o_custkey order by o_orderdate desc) as ranked from orders
- name: latest_order
sql: select /*+ REPARTITION_BY_RANGE(100,o_orderkey) */ * from filtered_order where ranked <= 2
- name: order_cust
sql: select * from latest_order left join repart_customer_nation_region on o_custkey = c_custkey
- name: repart_order_cust
sql: select /*+ REPARTITION(100,o_orderkey) */ * from order_cust
- name: line_repart
sql: select * from lineitem
- name: order_cust_line
sql: select * from repart_order_cust right join line_repart on o_orderkey = l_orderkey
- name: repart_order_cust_line
sql: select /*+ REPARTITION(100,l_suppkey,l_partkey)*/ * from order_cust_line
- name: last_5_tans_each_cust
sql: select * from repart_order_cust_line left join partsupp_supp_part on s_suppkey = l_suppkey and p_partkey = l_partkey
sparkConf:
- spark.sql.adaptive.autoBroadcastJoinThreshold: 20m
- spark.sql.shuffle.partitions: 450
- spark.sql.optimizer.dynamicPartitionPruning.enabled: true
- spark.serializer: org.apache.spark.serializer.KryoSerializer
- spark.default.parallelism: 300
- spark.sql.broadcastTimeout: 200
Method: 2¶
driver:
coreLimit: 3800m
cores: 2
memory: 2000m
executor:
coreLimit: 3000m
cores: 3
instances: 3
memory: 6000m
sparkConf:
- spark.sql.adaptive.autoBroadcastJoinThreshold: 20m
- spark.sql.shuffle.partitions: 450
- spark.sql.optimizer.dynamicPartitionPruning.enabled: true
- spark.serializer: org.apache.spark.serializer.KryoSerializer
- spark.default.parallelism: 300
- spark.sql.broadcastTimeout: 200
Total Duration: 7.33
RUNTIME | PROGRESS | STARTED | FINISHED
------------|----------|---------------------------|----------------------------
succeeded | 8/8 | 2022-08-10T22:50:41+05:30 | 2022-08-10T22:57:39+05:30
Click here to see the full manifest file
version: v1
name: wf-orders-enriched-data
type: workflow
tags:
- customers
- orders
description: The purpose of the workflow is to retrieve clickstream data from blobstorage to Lakehouse.
workflow:
title: Customer Order Enriched Data
dag:
- name: orders-enriched-data
title: Customer Order Enriched Data
description: The purpose of the workflow is to retrieve clickstream data from blobstorage to Lakehouse.
spec:
tags:
- customers
- orders
stack: flare:6.0
compute: runnable-default
stackSpec:
driver:
coreLimit: 3800m
cores: 2
memory: 2000m
executor:
coreLimit: 3000m
cores: 3
instances: 3
memory: 6000m
job:
explain: true
inputs:
- name: region
dataset: dataos://snowflake:TPCH_SF10/REGION
- name: customer
dataset: dataos://snowflake:TPCH_SF10/CUSTOMER
- name: lineitem
dataset: dataos://snowflake:TPCH_SF10/LINEITEM
incremental:
context: incrinputs
sql: select * from incrinputs where l_commitdate between '$|start_time|' AND '$|end_time|'
keys:
- name: start_time
sql: select '1998-07-31'
- name: end_time
sql: select '1998-08-29'
- name: nation
dataset: dataos://snowflake:TPCH_SF10/NATION
- name: orders
dataset: dataos://snowflake:TPCH_SF10/ORDERS
incremental:
context: incrinputs
sql: select * from incrinputs where O_ORDERDATE between '$|start_time|' AND '$|end_time|'
keys:
- name: start_time
sql: select '1998-07-01'
- name: end_time
sql: select '1998-07-30'
- name: part
dataset: dataos://snowflake:TPCH_SF10/PART
- name: partsupp
dataset: dataos://snowflake:TPCH_SF10/PARTSUPP
- name: suppliers
dataset: dataos://snowflake:TPCH_SF10/SUPPLIER
logLevel: INFO
outputs:
- name: last_5_tans_each_cust
dataset: dataos://lakehouse:sandbox/last_5_tans_each_cust?acl=rw
format: Iceberg
description: This dataset has information of all event
tags:
- clickstream
options:
saveMode: append
sort:
mode: partition
columns:
- name: O_ORDERDATE
order: desc
iceberg:
partitionSpec:
- type: identity
column: event_name
properties:
write.format.default: parquet
write.metadata.compression-codec: gzip
partitionSpec:
- type: month
column: O_ORDERDATE
name: month
- type: identity
column: user_agent
title: Customer Order Enriched Data
steps:
- sequence:
- name: nation_region
sql: select * from nation left join region on n_regionkey = r_regionkey
- name: cust_repart
sql: SELECT /*+ REPARTITION(c_nationkey) */ * FROM customer
- name: customer_nation_region # customer_final
sql: select /*+ MAPJOIN(nation_region) */ * from cust_repart left join nation_region on c_nationkey = n_nationkey
- name: repart_customer_nation_region
sql: select /*+ REPARTITION(100,c_custkey) */ * from customer_nation_region
- name: ps_repart
sql: select /*+ REPARTITION(ps_suppkey,ps_PARTKEY) */ * from partsupp
- name: partsupp_supp
sql: select * from ps_repart left join suppliers on ps_suppkey = s_suppkey
- name: partsupp_supp_part
sql: select * from partsupp_supp left join part on ps_partkey = p_partkey
# Order
- name: filtered_order
sql: select *, row_number() over (partition by o_custkey order by o_orderdate desc) as ranked from orders
- name: latest_order
sql: select /*+ REPARTITION_BY_RANGE(100,o_orderkey) */ * from filtered_order where ranked <= 2
- name: order_cust
sql: select * from latest_order left join repart_customer_nation_region on o_custkey = c_custkey
- name: repart_order_cust
sql: select /*+ REPARTITION(100,o_orderkey) */ * from order_cust
- name: line_repart
sql: select * from lineitem
- name: order_cust_line
sql: select * from repart_order_cust right join line_repart on o_orderkey = l_orderkey
- name: repart_order_cust_line
sql: select /*+ REPARTITION(100,l_suppkey,l_partkey)*/ * from order_cust_line
- name: last_5_tans_each_cust
sql: select * from repart_order_cust_line left join partsupp_supp_part on s_suppkey = l_suppkey and p_partkey = l_partkey
sparkConf:
- spark.sql.adaptive.autoBroadcastJoinThreshold: 20m
- spark.sql.shuffle.partitions: 450
- spark.sql.optimizer.dynamicPartitionPruning.enabled: true
- spark.serializer: org.apache.spark.serializer.KryoSerializer
- spark.default.parallelism: 300
- spark.sql.broadcastTimeout: 200
Method: 3¶
In this approach, the configuration settings are left at their default values, allowing Apache Spark to automatically manage the execution and optimization of tasks.
driver:
coreLimit: 3800m
cores: 2
memory: 2000m
executor:
coreLimit: 2000m
cores: 3
instances: 3
memory: 6000m
#spark configuration would be set as default
Click here to see the full manifest file
version: v1
name: wf-orders-enriched-data
type: workflow
tags:
- customers
- orders
description: The purpose of the workflow is to retrieve clickstream data from blobstorage to Lakehouse.
workflow:
title: Customer Order Enriched Data
dag:
- name: orders-enriched-data
title: Customer Order Enriched Data
description: The purpose of the workflow is to retrieve clickstream data from blobstorage to Lakehouse.
spec:
tags:
- customers
- orders
stack: flare:6.0
compute: runnable-default
stackSpec:
driver:
coreLimit: 3800m
cores: 2
memory: 2000m
executor:
coreLimit: 2000m
cores: 3
instances: 3
memory: 6000m
job:
explain: true
inputs:
- name: region
dataset: dataos://snowflake:TPCH_SF10/REGION
- name: customer
dataset: dataos://snowflake:TPCH_SF10/CUSTOMER
- name: lineitem
dataset: dataos://snowflake:TPCH_SF10/LINEITEM
incremental:
context: incrinputs
sql: select * from incrinputs where l_commitdate between '$|start_time|' AND '$|end_time|'
keys:
- name: start_time
sql: select '1998-07-31'
- name: end_time
sql: select '1998-08-29'
- name: nation
dataset: dataos://snowflake:TPCH_SF10/NATION
- name: orders
dataset: dataos://snowflake:TPCH_SF10/ORDERS
incremental:
context: incrinputs
sql: select * from incrinputs where O_ORDERDATE between '$|start_time|' AND '$|end_time|'
keys:
- name: start_time
sql: select '1998-07-01'
- name: end_time
sql: select '1998-07-30'
- name: part
dataset: dataos://snowflake:TPCH_SF10/PART
- name: partsupp
dataset: dataos://snowflake:TPCH_SF10/PARTSUPP
- name: suppliers
dataset: dataos://snowflake:TPCH_SF10/SUPPLIER
logLevel: INFO
outputs:
- name: last_5_tans_each_cust
dataset: dataos://lakehouse:sandbox/last_5_tans_each_cust?acl=rw
format: Iceberg
description: This dataset has information of all event
tags:
- clickstream
options:
saveMode: append
sort:
mode: partition
columns:
- name: O_ORDERDATE
order: desc
iceberg:
partitionSpec:
- type: identity
column: event_name
properties:
write.format.default: parquet
write.metadata.compression-codec: gzip
partitionSpec:
- type: month
column: O_ORDERDATE
name: month
- type: identity
column: user_agent
title: Customer Order Enriched Data
steps:
- sequence:
- name: nation_region
sql: select * from nation left join region on n_regionkey = r_regionkey
- name: cust_repart
sql: SELECT /*+ REPARTITION(c_nationkey) */ * FROM customer
- name: customer_nation_region # customer_final
sql: select /*+ MAPJOIN(nation_region) */ * from cust_repart left join nation_region on c_nationkey = n_nationkey
- name: repart_customer_nation_region
sql: select /*+ REPARTITION(100,c_custkey) */ * from customer_nation_region
- name: ps_repart
sql: select /*+ REPARTITION(ps_suppkey,ps_PARTKEY) */ * from partsupp
- name: partsupp_supp
sql: select * from ps_repart left join suppliers on ps_suppkey = s_suppkey
- name: partsupp_supp_part
sql: select * from partsupp_supp left join part on ps_partkey = p_partkey
# Order
- name: filtered_order
sql: select *, row_number() over (partition by o_custkey order by o_orderdate desc) as ranked from orders
- name: latest_order
sql: select /*+ REPARTITION_BY_RANGE(100,o_orderkey) */ * from filtered_order where ranked <= 2
- name: order_cust
sql: select * from latest_order left join repart_customer_nation_region on o_custkey = c_custkey
- name: repart_order_cust
sql: select /*+ REPARTITION(100,o_orderkey) */ * from order_cust
- name: line_repart
sql: select * from lineitem
- name: order_cust_line
sql: select * from repart_order_cust right join line_repart on o_orderkey = l_orderkey
- name: repart_order_cust_line
sql: select /*+ REPARTITION(100,l_suppkey,l_partkey)*/ * from order_cust_line
- name: last_5_tans_each_cust
sql: select * from repart_order_cust_line left join partsupp_supp_part on s_suppkey = l_suppkey and p_partkey = l_partkey
Total Duration: 6.25
RUNTIME | PROGRESS | STARTED | FINISHED
------------|----------|---------------------------|----------------------------
succeeded | 8/8 | 2022-08-10T23:24:06+05:30 | 2022-08-10T23:30:31+05:30
Method: 4¶
driver:
coreLimit: 3800m
cores: 2
memory: 2000m
executor:
coreLimit: 2000m
cores: 2
instances: 4
memory: 6000m
sparkConf:
- spark.sql.adaptive.autoBroadcastJoinThreshold: 10m
- spark.sql.shuffle.partitions: 150
- spark.sql.optimizer.dynamicPartitionPruning.enabled: true
- spark.serializer: org.apache.spark.serializer.KryoSerializer
- spark.default.parallelism: 130
- spark.sql.broadcastTimeout: 30
Total Duration: 6.04
RUNTIME | PROGRESS | STARTED | FINISHED
------------|----------|---------------------------|----------------------------
succeeded | 8/8 | 2022-08-11T10:41:29+05:30 | 2022-08-11T10:47:33+05:30
Click here to see the full manifest file
version: v1
name: wf-orders-enriched-data
type: workflow
tags:
- customers
- orders
description: The purpose of the workflow is to retrieve clickstream data from blobstorage to Lakehouse.
workflow:
title: Customer Order Enriched Data
dag:
- name: orders-enriched-data
title: Customer Order Enriched Data
description: The purpose of the workflow is to retrieve clickstream data from blobstorage to Lakehouse.
spec:
tags:
- customers
- orders
stack: flare:6.0
compute: runnable-default
stackSpec:
driver:
coreLimit: 3800m
cores: 2
memory: 2000m
executor:
coreLimit: 2000m
cores: 2
instances: 4
memory: 6000m
job:
explain: true
inputs:
- name: region
dataset: dataos://snowflake:TPCH_SF10/REGION
- name: customer
dataset: dataos://snowflake:TPCH_SF10/CUSTOMER
- name: lineitem
dataset: dataos://snowflake:TPCH_SF10/LINEITEM
incremental:
context: incrinputs
sql: select * from incrinputs where l_commitdate between '$|start_time|' AND '$|end_time|'
keys:
- name: start_time
sql: select '1998-07-31'
- name: end_time
sql: select '1998-08-29'
- name: nation
dataset: dataos://snowflake:TPCH_SF10/NATION
- name: orders
dataset: dataos://snowflake:TPCH_SF10/ORDERS
incremental:
context: incrinputs
sql: select * from incrinputs where O_ORDERDATE between '$|start_time|' AND '$|end_time|'
keys:
- name: start_time
sql: select '1998-07-01'
- name: end_time
sql: select '1998-07-30'
- name: part
dataset: dataos://snowflake:TPCH_SF10/PART
- name: partsupp
dataset: dataos://snowflake:TPCH_SF10/PARTSUPP
- name: suppliers
dataset: dataos://snowflake:TPCH_SF10/SUPPLIER
logLevel: INFO
outputs:
- name: last_5_tans_each_cust
dataset: dataos://lakehouse:sandbox/last_5_tans_each_cust?acl=rw
format: Iceberg
description: This dataset has information of all event
tags:
- clickstream
options:
saveMode: append
sort:
mode: partition
columns:
- name: O_ORDERDATE
order: desc
iceberg:
partitionSpec:
- type: identity
column: event_name
properties:
write.format.default: parquet
write.metadata.compression-codec: gzip
partitionSpec:
- type: month
column: O_ORDERDATE
name: month
- type: identity
column: user_agent
title: Customer Order Enriched Data
steps:
- sequence:
- name: nation_region
sql: select * from nation left join region on n_regionkey = r_regionkey
- name: cust_repart
sql: SELECT /*+ REPARTITION(c_nationkey) */ * FROM customer
- name: customer_nation_region # customer_final
sql: select /*+ MAPJOIN(nation_region) */ * from cust_repart left join nation_region on c_nationkey = n_nationkey
- name: repart_customer_nation_region
sql: select /*+ REPARTITION(100,c_custkey) */ * from customer_nation_region
- name: ps_repart
sql: select /*+ REPARTITION(ps_suppkey,ps_PARTKEY) */ * from partsupp
- name: partsupp_supp
sql: select * from ps_repart left join suppliers on ps_suppkey = s_suppkey
- name: partsupp_supp_part
sql: select * from partsupp_supp left join part on ps_partkey = p_partkey
# Order
- name: filtered_order
sql: select *, row_number() over (partition by o_custkey order by o_orderdate desc) as ranked from orders
- name: latest_order
sql: select /*+ REPARTITION_BY_RANGE(100,o_orderkey) */ * from filtered_order where ranked <= 2
- name: order_cust
sql: select * from latest_order left join repart_customer_nation_region on o_custkey = c_custkey
- name: repart_order_cust
sql: select /*+ REPARTITION(100,o_orderkey) */ * from order_cust
- name: line_repart
sql: select * from lineitem
- name: order_cust_line
sql: select * from repart_order_cust right join line_repart on o_orderkey = l_orderkey
- name: repart_order_cust_line
sql: select /*+ REPARTITION(100,l_suppkey,l_partkey)*/ * from order_cust_line
- name: last_5_tans_each_cust
sql: select * from repart_order_cust_line left join partsupp_supp_part on s_suppkey = l_suppkey and p_partkey = l_partkey
sparkConf:
- spark.sql.adaptive.autoBroadcastJoinThreshold: 10m
- spark.sql.shuffle.partitions: 150
- spark.sql.optimizer.dynamicPartitionPruning.enabled: true
- spark.serializer: org.apache.spark.serializer.KryoSerializer
- spark.default.parallelism: 130
- spark.sql.broadcastTimeout: 30
Method: 5¶
driver:
coreLimit: 2000m
cores: 2
memory: 2000m
executor:
coreLimit: 2000m
cores: 2
instances: 4
memory: 6000m
sparkConf:
- spark.sql.adaptive.autoBroadcastJoinThreshold: 10m
- spark.sql.shuffle.partitions: 150
- spark.sql.optimizer.dynamicPartitionPruning.enabled: true
- spark.serializer: org.apache.spark.serializer.KryoSerializer
- spark.default.parallelism: 130
- spark.sql.broadcastTimeout: 30
Total Duration: 6.05
RUNTIME | PROGRESS | STARTED | FINISHED
------------|----------|---------------------------|----------------------------
succeeded | 8/8 | 2022-08-11T22:38:39+05:30 | 2022-08-11T22:44:44+05:30
Click here to see the full manifest file
version: v1
name: wf-orders-enriched-data
type: workflow
tags:
- customers
- orders
description: The purpose of the workflow is to retrieve clickstream data from blobstorage to Lakehouse.
workflow:
title: Customer Order Enriched Data
dag:
- name: orders-enriched-data
title: Customer Order Enriched Data
description: The purpose of the workflow is to retrieve clickstream data from blobstorage to Lakehouse.
spec:
tags:
- customers
- orders
stack: flare:6.0
compute: runnable-default
stackSpec:
driver:
coreLimit: 2000m
cores: 2
memory: 2000m
executor:
coreLimit: 2000m
cores: 2
instances: 4
memory: 6000m
job:
explain: true
inputs:
- name: region
dataset: dataos://snowflake:TPCH_SF10/REGION
- name: customer
dataset: dataos://snowflake:TPCH_SF10/CUSTOMER
- name: lineitem
dataset: dataos://snowflake:TPCH_SF10/LINEITEM
incremental:
context: incrinputs
sql: select * from incrinputs where l_commitdate between '$|start_time|' AND '$|end_time|'
keys:
- name: start_time
sql: select '1998-07-31'
- name: end_time
sql: select '1998-08-29'
- name: nation
dataset: dataos://snowflake:TPCH_SF10/NATION
- name: orders
dataset: dataos://snowflake:TPCH_SF10/ORDERS
incremental:
context: incrinputs
sql: select * from incrinputs where O_ORDERDATE between '$|start_time|' AND '$|end_time|'
keys:
- name: start_time
sql: select '1998-07-01'
- name: end_time
sql: select '1998-07-30'
- name: part
dataset: dataos://snowflake:TPCH_SF10/PART
- name: partsupp
dataset: dataos://snowflake:TPCH_SF10/PARTSUPP
- name: suppliers
dataset: dataos://snowflake:TPCH_SF10/SUPPLIER
logLevel: INFO
outputs:
- name: last_5_tans_each_cust
dataset: dataos://lakehouse:sandbox/last_5_tans_each_cust?acl=rw
format: Iceberg
description: This dataset has information of all event
tags:
- clickstream
options:
saveMode: append
sort:
mode: partition
columns:
- name: O_ORDERDATE
order: desc
iceberg:
partitionSpec:
- type: identity
column: event_name
properties:
write.format.default: parquet
write.metadata.compression-codec: gzip
partitionSpec:
- type: month
column: O_ORDERDATE
name: month
- type: identity
column: user_agent
title: Customer Order Enriched Data
steps:
- sequence:
- name: nation_region
sql: select * from nation left join region on n_regionkey = r_regionkey
- name: cust_repart
sql: SELECT /*+ REPARTITION(c_nationkey) */ * FROM customer
- name: customer_nation_region # customer_final
sql: select /*+ MAPJOIN(nation_region) */ * from cust_repart left join nation_region on c_nationkey = n_nationkey
- name: repart_customer_nation_region
sql: select /*+ REPARTITION(100,c_custkey) */ * from customer_nation_region
- name: ps_repart
sql: select /*+ REPARTITION(ps_suppkey,ps_PARTKEY) */ * from partsupp
- name: partsupp_supp
sql: select * from ps_repart left join suppliers on ps_suppkey = s_suppkey
- name: partsupp_supp_part
sql: select * from partsupp_supp left join part on ps_partkey = p_partkey
# Order
- name: filtered_order
sql: select *, row_number() over (partition by o_custkey order by o_orderdate desc) as ranked from orders
- name: latest_order
sql: select /*+ REPARTITION_BY_RANGE(100,o_orderkey) */ * from filtered_order where ranked <= 2
- name: order_cust
sql: select * from latest_order left join repart_customer_nation_region on o_custkey = c_custkey
- name: repart_order_cust
sql: select /*+ REPARTITION(100,o_orderkey) */ * from order_cust
- name: line_repart
sql: select * from lineitem
- name: order_cust_line
sql: select * from repart_order_cust right join line_repart on o_orderkey = l_orderkey
- name: repart_order_cust_line
sql: select /*+ REPARTITION(100,l_suppkey,l_partkey)*/ * from order_cust_line
- name: last_5_tans_each_cust
sql: select * from repart_order_cust_line left join partsupp_supp_part on s_suppkey = l_suppkey and p_partkey = l_partkey
sparkConf:
- spark.sql.adaptive.autoBroadcastJoinThreshold: 10m
- spark.sql.shuffle.partitions: 150
- spark.sql.optimizer.dynamicPartitionPruning.enabled: true
- spark.serializer: org.apache.spark.serializer.KryoSerializer
- spark.default.parallelism: 130
- spark.sql.broadcastTimeout: 30
Method: 6¶
driver:
coreLimit: 2000m
cores: 2
memory: 2000m
executor:
coreLimit: 2000m
cores: 2
instances: 3
memory: 5000m
sparkConf:
- spark.sql.adaptive.autoBroadcastJoinThreshold: 10m
- spark.sql.shuffle.partitions: 100
- spark.sql.optimizer.dynamicPartitionPruning.enabled: true
- spark.serializer: org.apache.spark.serializer.KryoSerializer
- spark.default.parallelism: 80
- spark.sql.broadcastTimeout: 30
Total Duration: 6.11
RUNTIME | PROGRESS | STARTED | FINISHED
------------|----------|---------------------------|----------------------------
succeeded | 8/8 | 2022-08-11T22:46:25+05:30 | 2022-08-11T22:52:36+05:30
Click here to see the full manifest file
version: v1
name: wf-orders-enriched-data
type: workflow
tags:
- customers
- orders
description: The purpose of the workflow is to retrieve clickstream data from blobstorage to Lakehouse.
workflow:
title: Customer Order Enriched Data
dag:
- name: orders-enriched-data
title: Customer Order Enriched Data
description: The purpose of the workflow is to retrieve clickstream data from blobstorage to Lakehouse.
spec:
tags:
- customers
- orders
stack: flare:6.0
compute: runnable-default
stackSpec:
driver:
coreLimit: 2000m
cores: 2
memory: 2000m
executor:
coreLimit: 2000m
cores: 2
instances: 3
memory: 5000m
job:
explain: true
inputs:
- name: region
dataset: dataos://snowflake:TPCH_SF10/REGION
- name: customer
dataset: dataos://snowflake:TPCH_SF10/CUSTOMER
- name: lineitem
dataset: dataos://snowflake:TPCH_SF10/LINEITEM
incremental:
context: incrinputs
sql: select * from incrinputs where l_commitdate between '$|start_time|' AND '$|end_time|'
keys:
- name: start_time
sql: select '1998-07-31'
- name: end_time
sql: select '1998-08-29'
- name: nation
dataset: dataos://snowflake:TPCH_SF10/NATION
- name: orders
dataset: dataos://snowflake:TPCH_SF10/ORDERS
incremental:
context: incrinputs
sql: select * from incrinputs where O_ORDERDATE between '$|start_time|' AND '$|end_time|'
keys:
- name: start_time
sql: select '1998-07-01'
- name: end_time
sql: select '1998-07-30'
- name: part
dataset: dataos://snowflake:TPCH_SF10/PART
- name: partsupp
dataset: dataos://snowflake:TPCH_SF10/PARTSUPP
- name: suppliers
dataset: dataos://snowflake:TPCH_SF10/SUPPLIER
logLevel: INFO
outputs:
- name: last_5_tans_each_cust
dataset: dataos://lakehouse:sandbox/last_5_tans_each_cust?acl=rw
format: Iceberg
description: This dataset has information of all event
tags:
- clickstream
options:
saveMode: append
sort:
mode: partition
columns:
- name: O_ORDERDATE
order: desc
iceberg:
partitionSpec:
- type: identity
column: event_name
properties:
write.format.default: parquet
write.metadata.compression-codec: gzip
partitionSpec:
- type: month
column: O_ORDERDATE
name: month
- type: identity
column: user_agent
title: Customer Order Enriched Data
steps:
- sequence:
- name: nation_region
sql: select * from nation left join region on n_regionkey = r_regionkey
- name: cust_repart
sql: SELECT /*+ REPARTITION(c_nationkey) */ * FROM customer
- name: customer_nation_region # customer_final
sql: select /*+ MAPJOIN(nation_region) */ * from cust_repart left join nation_region on c_nationkey = n_nationkey
- name: repart_customer_nation_region
sql: select /*+ REPARTITION(100,c_custkey) */ * from customer_nation_region
- name: ps_repart
sql: select /*+ REPARTITION(ps_suppkey,ps_PARTKEY) */ * from partsupp
- name: partsupp_supp
sql: select * from ps_repart left join suppliers on ps_suppkey = s_suppkey
- name: partsupp_supp_part
sql: select * from partsupp_supp left join part on ps_partkey = p_partkey
# Order
- name: filtered_order
sql: select *, row_number() over (partition by o_custkey order by o_orderdate desc) as ranked from orders
- name: latest_order
sql: select /*+ REPARTITION_BY_RANGE(100,o_orderkey) */ * from filtered_order where ranked <= 2
- name: order_cust
sql: select * from latest_order left join repart_customer_nation_region on o_custkey = c_custkey
- name: repart_order_cust
sql: select /*+ REPARTITION(100,o_orderkey) */ * from order_cust
- name: line_repart
sql: select * from lineitem
- name: order_cust_line
sql: select * from repart_order_cust right join line_repart on o_orderkey = l_orderkey
- name: repart_order_cust_line
sql: select /*+ REPARTITION(100,l_suppkey,l_partkey)*/ * from order_cust_line
- name: last_5_tans_each_cust
sql: select * from repart_order_cust_line left join partsupp_supp_part on s_suppkey = l_suppkey and p_partkey = l_partkey
sparkConf:
- spark.sql.adaptive.autoBroadcastJoinThreshold: 10m
- spark.sql.shuffle.partitions: 100
- spark.sql.optimizer.dynamicPartitionPruning.enabled: true
- spark.serializer: org.apache.spark.serializer.KryoSerializer
- spark.default.parallelism: 80
- spark.sql.broadcastTimeout: 30
Method: 7¶
driver:
coreLimit: 2000m
cores: 2
memory: 2000m
executor:
coreLimit: 2200m
cores: 3
instances: 2
memory: 5000m
sparkConf:
- spark.sql.adaptive.autoBroadcastJoinThreshold: 10m
- spark.sql.shuffle.partitions: 100
- spark.sql.optimizer.dynamicPartitionPruning.enabled: true
- spark.serializer: org.apache.spark.serializer.KryoSerializer
- spark.default.parallelism: 80
- spark.sql.broadcastTimeout: 30
Total Duration: 6.38
RUNTIME | PROGRESS | STARTED | FINISHED
------------|----------|---------------------------|----------------------------
succeeded | 8/8 | 2022-08-11T22:58:41+05:30 | 2022-08-11T23:04:39+05:30
Click here to see the full manifest file
version: v1
name: wf-orders-enriched-data
type: workflow
tags:
- customers
- orders
description: The purpose of the workflow is to retrieve clickstream data from blobstorage to Lakehouse.
workflow:
title: Customer Order Enriched Data
dag:
- name: orders-enriched-data
title: Customer Order Enriched Data
description: The purpose of the workflow is to retrieve clickstream data from blobstorage to Lakehouse.
spec:
tags:
- customers
- orders
stack: flare:6.0
compute: runnable-default
stackSpec:
driver:
coreLimit: 2000m
cores: 2
memory: 2000m
executor:
coreLimit: 2200m
cores: 3
instances: 2
memory: 5000m
job:
explain: true
inputs:
- name: region
dataset: dataos://snowflake:TPCH_SF10/REGION
- name: customer
dataset: dataos://snowflake:TPCH_SF10/CUSTOMER
- name: lineitem
dataset: dataos://snowflake:TPCH_SF10/LINEITEM
incremental:
context: incrinputs
sql: select * from incrinputs where l_commitdate between '$|start_time|' AND '$|end_time|'
keys:
- name: start_time
sql: select '1998-07-31'
- name: end_time
sql: select '1998-08-29'
- name: nation
dataset: dataos://snowflake:TPCH_SF10/NATION
- name: orders
dataset: dataos://snowflake:TPCH_SF10/ORDERS
incremental:
context: incrinputs
sql: select * from incrinputs where O_ORDERDATE between '$|start_time|' AND '$|end_time|'
keys:
- name: start_time
sql: select '1998-07-01'
- name: end_time
sql: select '1998-07-30'
- name: part
dataset: dataos://snowflake:TPCH_SF10/PART
- name: partsupp
dataset: dataos://snowflake:TPCH_SF10/PARTSUPP
- name: suppliers
dataset: dataos://snowflake:TPCH_SF10/SUPPLIER
logLevel: INFO
outputs:
- name: last_5_tans_each_cust
dataset: dataos://lakehouse:sandbox/last_5_tans_each_cust?acl=rw
format: Iceberg
description: This dataset has information of all event
tags:
- clickstream
options:
saveMode: append
sort:
mode: partition
columns:
- name: O_ORDERDATE
order: desc
iceberg:
partitionSpec:
- type: identity
column: event_name
properties:
write.format.default: parquet
write.metadata.compression-codec: gzip
partitionSpec:
- type: month
column: O_ORDERDATE
name: month
- type: identity
column: user_agent
title: Customer Order Enriched Data
steps:
- sequence:
- name: nation_region
sql: select * from nation left join region on n_regionkey = r_regionkey
- name: cust_repart
sql: SELECT /*+ REPARTITION(c_nationkey) */ * FROM customer
- name: customer_nation_region # customer_final
sql: select /*+ MAPJOIN(nation_region) */ * from cust_repart left join nation_region on c_nationkey = n_nationkey
- name: repart_customer_nation_region
sql: select /*+ REPARTITION(100,c_custkey) */ * from customer_nation_region
- name: ps_repart
sql: select /*+ REPARTITION(ps_suppkey,ps_PARTKEY) */ * from partsupp
- name: partsupp_supp
sql: select * from ps_repart left join suppliers on ps_suppkey = s_suppkey
- name: partsupp_supp_part
sql: select * from partsupp_supp left join part on ps_partkey = p_partkey
# Order
- name: filtered_order
sql: select *, row_number() over (partition by o_custkey order by o_orderdate desc) as ranked from orders
- name: latest_order
sql: select /*+ REPARTITION_BY_RANGE(100,o_orderkey) */ * from filtered_order where ranked <= 2
- name: order_cust
sql: select * from latest_order left join repart_customer_nation_region on o_custkey = c_custkey
- name: repart_order_cust
sql: select /*+ REPARTITION(100,o_orderkey) */ * from order_cust
- name: line_repart
sql: select * from lineitem
- name: order_cust_line
sql: select * from repart_order_cust right join line_repart on o_orderkey = l_orderkey
- name: repart_order_cust_line
sql: select /*+ REPARTITION(100,l_suppkey,l_partkey)*/ * from order_cust_line
- name: last_5_tans_each_cust
sql: select * from repart_order_cust_line left join partsupp_supp_part on s_suppkey = l_suppkey and p_partkey = l_partkey
sparkConf:
- spark.sql.adaptive.autoBroadcastJoinThreshold: 10m
- spark.sql.shuffle.partitions: 100
- spark.sql.optimizer.dynamicPartitionPruning.enabled: true
- spark.serializer: org.apache.spark.serializer.KryoSerializer
- spark.default.parallelism: 80
- spark.sql.broadcastTimeout: 30
Method: 8¶
driver:
coreLimit: 2000m
cores: 2
memory: 2000m
executor:
coreLimit: 2200m
cores: 3
instances: 2
memory: 4800m
sparkConf:
- spark.sql.adaptive.autoBroadcastJoinThreshold: 10m
- spark.sql.shuffle.partitions: 75
- spark.sql.optimizer.dynamicPartitionPruning.enabled: true
- spark.serializer: org.apache.spark.serializer.KryoSerializer
- spark.default.parallelism: 60
- spark.sql.broadcastTimeout: 15
Total Duration: 6
RUNTIME | PROGRESS | STARTED | FINISHED
------------|----------|---------------------------|----------------------------
succeeded | 8/8 | 2022-08-11T23:07:17+05:30 | 2022-08-11T23:13:17+05:30
Click here to see the full manifest fil
version: v1
name: wf-orders-enriched-data
type: workflow
tags:
- customers
- orders
description: The purpose of the workflow is to retrieve clickstream data from blobstorage to Lakehouse.
workflow:
title: Customer Order Enriched Data
dag:
- name: orders-enriched-data
title: Customer Order Enriched Data
description: The purpose of the workflow is to retrieve clickstream data from blobstorage to Lakehouse.
spec:
tags:
- customers
- orders
stack: flare:6.0
compute: runnable-default
stackSpec:
driver:
coreLimit: 2000m
cores: 2
memory: 2000m
executor:
coreLimit: 2200m
cores: 3
instances: 2
memory: 4800m
job:
explain: true
inputs:
- name: region
dataset: dataos://snowflake:TPCH_SF10/REGION
- name: customer
dataset: dataos://snowflake:TPCH_SF10/CUSTOMER
- name: lineitem
dataset: dataos://snowflake:TPCH_SF10/LINEITEM
incremental:
context: incrinputs
sql: select * from incrinputs where l_commitdate between '$|start_time|' AND '$|end_time|'
keys:
- name: start_time
sql: select '1998-07-31'
- name: end_time
sql: select '1998-08-29'
- name: nation
dataset: dataos://snowflake:TPCH_SF10/NATION
- name: orders
dataset: dataos://snowflake:TPCH_SF10/ORDERS
incremental:
context: incrinputs
sql: select * from incrinputs where O_ORDERDATE between '$|start_time|' AND '$|end_time|'
keys:
- name: start_time
sql: select '1998-07-01'
- name: end_time
sql: select '1998-07-30'
- name: part
dataset: dataos://snowflake:TPCH_SF10/PART
- name: partsupp
dataset: dataos://snowflake:TPCH_SF10/PARTSUPP
- name: suppliers
dataset: dataos://snowflake:TPCH_SF10/SUPPLIER
logLevel: INFO
outputs:
- name: last_5_tans_each_cust
dataset: dataos://lakehouse:sandbox/last_5_tans_each_cust?acl=rw
format: Iceberg
description: This dataset has information of all event
tags:
- clickstream
options:
saveMode: append
sort:
mode: partition
columns:
- name: O_ORDERDATE
order: desc
iceberg:
partitionSpec:
- type: identity
column: event_name
properties:
write.format.default: parquet
write.metadata.compression-codec: gzip
partitionSpec:
- type: month
column: O_ORDERDATE
name: month
- type: identity
column: user_agent
title: Customer Order Enriched Data
steps:
- sequence:
- name: nation_region
sql: select * from nation left join region on n_regionkey = r_regionkey
- name: cust_repart
sql: SELECT /*+ REPARTITION(c_nationkey) */ * FROM customer
- name: customer_nation_region # customer_final
sql: select /*+ MAPJOIN(nation_region) */ * from cust_repart left join nation_region on c_nationkey = n_nationkey
- name: repart_customer_nation_region
sql: select /*+ REPARTITION(100,c_custkey) */ * from customer_nation_region
- name: ps_repart
sql: select /*+ REPARTITION(ps_suppkey,ps_PARTKEY) */ * from partsupp
- name: partsupp_supp
sql: select * from ps_repart left join suppliers on ps_suppkey = s_suppkey
- name: partsupp_supp_part
sql: select * from partsupp_supp left join part on ps_partkey = p_partkey
# Order
- name: filtered_order
sql: select *, row_number() over (partition by o_custkey order by o_orderdate desc) as ranked from orders
- name: latest_order
sql: select /*+ REPARTITION_BY_RANGE(100,o_orderkey) */ * from filtered_order where ranked <= 2
- name: order_cust
sql: select * from latest_order left join repart_customer_nation_region on o_custkey = c_custkey
- name: repart_order_cust
sql: select /*+ REPARTITION(100,o_orderkey) */ * from order_cust
- name: line_repart
sql: select * from lineitem
- name: order_cust_line
sql: select * from repart_order_cust right join line_repart on o_orderkey = l_orderkey
- name: repart_order_cust_line
sql: select /*+ REPARTITION(100,l_suppkey,l_partkey)*/ * from order_cust_line
- name: last_5_tans_each_cust
sql: select * from repart_order_cust_line left join partsupp_supp_part on s_suppkey = l_suppkey and p_partkey = l_partkey
sparkConf:
- spark.sql.adaptive.autoBroadcastJoinThreshold: 10m
- spark.sql.shuffle.partitions: 75
- spark.sql.optimizer.dynamicPartitionPruning.enabled: true
- spark.serializer: org.apache.spark.serializer.KryoSerializer
- spark.default.parallelism: 60
- spark.sql.broadcastTimeout: 15
Method: 9¶
driver:
coreLimit: 2000m
cores: 2
memory: 2000m
executor:
coreLimit: 2200m
cores: 3
instances: 2
memory: 4800m
sparkConf:
- spark.sql.adaptive.autoBroadcastJoinThreshold: 10m
- spark.sql.shuffle.partitions: 50
- spark.sql.optimizer.dynamicPartitionPruning.enabled: true
- spark.serializer: org.apache.spark.serializer.KryoSerializer
- spark.default.parallelism: 45
- spark.sql.broadcastTimeout: 15
Total Duration: 6.33
RUNTIME | PROGRESS | STARTED | FINISHED
------------|----------|---------------------------|----------------------------
succeeded | 8/8 | 2022-08-11T23:51:59+05:30 | 2022-08-11T23:57:52+05:30
Click here to see the full manifest file
version: v1
name: wf-orders-enriched-data
type: workflow
tags:
- customers
- orders
description: The purpose of the workflow is to retrieve clickstream data from blobstorage to Lakehouse.
workflow:
title: Customer Order Enriched Data
dag:
- name: orders-enriched-data
title: Customer Order Enriched Data
description: The purpose of the workflow is to retrieve clickstream data from blobstorage to Lakehouse.
spec:
tags:
- customers
- orders
stack: flare:6.0
compute: runnable-default
stackSpec:
driver:
coreLimit: 2000m
cores: 2
memory: 2000m
executor:
coreLimit: 2200m
cores: 3
instances: 2
memory: 4800m
job:
explain: true
inputs:
- name: region
dataset: dataos://snowflake:TPCH_SF10/REGION
- name: customer
dataset: dataos://snowflake:TPCH_SF10/CUSTOMER
- name: lineitem
dataset: dataos://snowflake:TPCH_SF10/LINEITEM
incremental:
context: incrinputs
sql: select * from incrinputs where l_commitdate between '$|start_time|' AND '$|end_time|'
keys:
- name: start_time
sql: select '1998-07-31'
- name: end_time
sql: select '1998-08-29'
- name: nation
dataset: dataos://snowflake:TPCH_SF10/NATION
- name: orders
dataset: dataos://snowflake:TPCH_SF10/ORDERS
incremental:
context: incrinputs
sql: select * from incrinputs where O_ORDERDATE between '$|start_time|' AND '$|end_time|'
keys:
- name: start_time
sql: select '1998-07-01'
- name: end_time
sql: select '1998-07-30'
- name: part
dataset: dataos://snowflake:TPCH_SF10/PART
- name: partsupp
dataset: dataos://snowflake:TPCH_SF10/PARTSUPP
- name: suppliers
dataset: dataos://snowflake:TPCH_SF10/SUPPLIER
logLevel: INFO
outputs:
- name: last_5_tans_each_cust
dataset: dataos://lakehouse:sandbox/last_5_tans_each_cust?acl=rw
format: Iceberg
description: This dataset has information of all event
tags:
- clickstream
options:
saveMode: append
sort:
mode: partition
columns:
- name: O_ORDERDATE
order: desc
iceberg:
partitionSpec:
- type: identity
column: event_name
properties:
write.format.default: parquet
write.metadata.compression-codec: gzip
partitionSpec:
- type: month
column: O_ORDERDATE
name: month
- type: identity
column: user_agent
title: Customer Order Enriched Data
steps:
- sequence:
- name: nation_region
sql: select * from nation left join region on n_regionkey = r_regionkey
- name: cust_repart
sql: SELECT /*+ REPARTITION(c_nationkey) */ * FROM customer
- name: customer_nation_region # customer_final
sql: select /*+ MAPJOIN(nation_region) */ * from cust_repart left join nation_region on c_nationkey = n_nationkey
- name: repart_customer_nation_region
sql: select /*+ REPARTITION(100,c_custkey) */ * from customer_nation_region
- name: ps_repart
sql: select /*+ REPARTITION(ps_suppkey,ps_PARTKEY) */ * from partsupp
- name: partsupp_supp
sql: select * from ps_repart left join suppliers on ps_suppkey = s_suppkey
- name: partsupp_supp_part
sql: select * from partsupp_supp left join part on ps_partkey = p_partkey
# Order
- name: filtered_order
sql: select *, row_number() over (partition by o_custkey order by o_orderdate desc) as ranked from orders
- name: latest_order
sql: select /*+ REPARTITION_BY_RANGE(100,o_orderkey) */ * from filtered_order where ranked <= 2
- name: order_cust
sql: select * from latest_order left join repart_customer_nation_region on o_custkey = c_custkey
- name: repart_order_cust
sql: select /*+ REPARTITION(100,o_orderkey) */ * from order_cust
- name: line_repart
sql: select * from lineitem
- name: order_cust_line
sql: select * from repart_order_cust right join line_repart on o_orderkey = l_orderkey
- name: repart_order_cust_line
sql: select /*+ REPARTITION(100,l_suppkey,l_partkey)*/ * from order_cust_line
- name: last_5_tans_each_cust
sql: select * from repart_order_cust_line left join partsupp_supp_part on s_suppkey = l_suppkey and p_partkey = l_partkey
sparkConf:
- spark.sql.adaptive.autoBroadcastJoinThreshold: 10m
- spark.sql.shuffle.partitions: 50
- spark.sql.optimizer.dynamicPartitionPruning.enabled: true
- spark.serializer: org.apache.spark.serializer.KryoSerializer
- spark.default.parallelism: 45
- spark.sql.broadcastTimeout: 15