Bucketing¶
Bucketing is an optimization technique that helps to prevent the shuffling and sorting of data during compute-heavy operations such as joins. Based on the bucketing columns we specify, data is collected in a number of bins.
Bucketing vs Partitioning¶
Bucketing is similar to partitioning, but in the case of partitioning, we create directories for each partition. In bucketing, we create equal-sized buckets, and data is distributed across these buckets by a hash on the value of the bucket.
When to use bucketing?¶
Bucketing is helpful in the following scenarios:
- When joins are performed between dimension tables that contain primary keys for joining.
- When join operations are being performed between small and large tables.
- Where the data is heavily skewed, or for executing faster joins on a cluster we can also use the bucketing technique to improve performance
Configurations¶
partitionSpec:
- type: bucket # select type bucket
column: week_year_column # bucketing column
numBuckets: 2 # number of buckets
Code Snippets¶
Simple Bucketing¶
Flare Workflow
version: v1
name: wf-sample
type: workflow
workflow:
dag:
- name: sample
spec:
stack: flare:5.0
compute: runnable-default
stackSpec:
driver:
coreLimit: 12000m
cores: 2
memory: 12000m
executor:
coreLimit: 12000m
cores: 4
instances: 3
memory: 22000m
job:
inputs:
- name: input
dataset: dataos://thirdparty01:analytics/survey_unpivot/unpivot_data.csv
format: csv
logLevel: INFO
outputs:
- name: clustered_records
dataset: dataos://icebase:sample/unpivot_data_02?acl=rw
format: Iceberg
description: unpivotdata
options:
saveMode: overwrite
sort:
mode: global
columns:
- name: week_year_column
order: desc
iceberg:
properties:
write.format.default: parquet
write.metadata.compression-codec: gzip
# Bucketing
partitionSpec:
- type: bucket # bucket
column: week_year_column # bucketing column
numBuckets: 2 # number of buckets
title: unpivot data
steps:
- sequence:
- name: select_all_column
sql: Select * from input
functions:
- name: cleanse_column_names
- name: unpivot
columns:
- "*"
pivotColumns:
- week_year
keyColumnName: week_year_column
valueColumnName: values_columns
- name: clustered_records
sql: SELECT * FROM select_all_column CLUSTER BY week_year_column
sparkConf:
- spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
Toolbox Workflow
version: v1
name: datatool-wf-sample
type: workflow
workflow:
dag:
- name: dataos-tool-simple-bucket
spec:
stack: toolbox
compute: runnable-default
stackSpec:
dataset: dataos://icebase:sample/unpivot_data_02
action:
name: set_version
value: latest
Big Data-Nested Bucket¶
Flare Workflow
version: v1
name: wf-sample-02
type: workflow
workflow:
dag:
- name: sample
spec:
stack: flare:5.0
compute: runnable-default
stackSpec:
driver:
coreLimit: 2200m
cores: 2
memory: 2000m
executor:
coreLimit: 3300m
cores: 3
instances: 1
memory: 6000m
job:
inputs:
- name: input
dataset: dataos://icebase:retail/city
format: iceberg
logLevel: INFO
outputs:
- name: random_data
dataset: dataos://icebase:sample/bucket_large_data_02?acl=rw
format: Iceberg
options:
saveMode: overwrite
iceberg:
properties:
write.format.default: parquet
write.metadata.compression-codec: gzip
partitionSpec:
- type: identity
column: category
- type: bucket # bucket
column: seg_id # bucketing column
numBuckets: 20 # number of buckets
- type: day
column: ts
name: day_partitioned
title: unpivot data
steps:
- sequence:
- name: random_data
sql: |
SELECT
uuid,
seg_id,
ts,
CASE WHEN cat > 0.67 THEN
'A'
WHEN cat > 0.33 THEN
'B'
ELSE
'C'
END AS category
FROM (
SELECT
explode (rand_value) uuid,
cast(random() * 100 AS int) seg_id,
add_months (CURRENT_TIMESTAMP, cast(random() * 100 AS int)) ts,
random() AS cat
FROM (
SELECT
SEQUENCE (1,
10000000) AS rand_value))
sparkConf:
- spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
Toolbox Workflow