Partitioning¶
Partitioning is a technique used to improve query performance by grouping similar rows together when writing data. This approach helps speed up query processing by organizing data into partitions based on specific criteria.
This use case demonstrates how partitioning works in Flare when writing data to Iceberg and non-Iceberg data storage formats (e.g., Parquet and Kafka) to optimize query performance.
Iceberg partitioning¶
Iceberg supports partitioning based on timestamp granularity such as year, month, day, and hour, as well as categorical columns (e.g., vendor ID) with data types like string, integer, long, and potentially double (to be verified). Partitioning helps group rows together and speeds up query performance.
Iceberg produces partition values by taking a column value and optionally transforming it. For example, it converts timestamp values into dates and extracts components such as year, month, day, hour, etc.
Non-Iceberg Partitioning (e.g., Parquet, Kafka)¶
In non-Iceberg data formats(such as Parquet), partitions are explicit and appear as a separate column in the table that must be supplied in every table write operation. For example, you need to provide the columns explicitly for the year, month, hour as a transformation from the timestamp column is not automatic.
For non-Iceberg formats such as Parquet, you are required to provide a categorical column as the partitioning criterion.
Implementation details¶
The following examples demonstrate the use of various partitioning modes.
Iceberg¶
- Partition using type identity
- Integer values
- String values
- Partition using type timestamp
- Year
- Month
- Day
- Hour
Example 1: Partitioning by identity is used when a categorical column (e.g., vendor_id) is used to group data. When using an identity type, there is no need to specify the property name for the partition.
partitionSpec:
- type: identity **# options tested: identity, year, month, day, hour**
column: vendor_id **# columns used - identity (vendor_id, one string column) & for rest date_col**
Example 2: In this example, partitioning is done by year using a timestamp column (date_col). Iceberg will automatically extract the year from the timestamp.
partitionSpec:
- type: year **# options tested: identity, year, month, day, hour**
column: date_col **# columns used - identity (vendor_id, one string column) & for rest date_col of type timestamp**
name: year
Example 3: This example demonstrates nested partitioning, where data is first partitioned by social_class (identity type), followed by partitioning by year (timestamp
type). The social_class must come first in the partition specification.
partitionSpec:
- type: identity
column: vendor_id **# columns used - identity (vendor_id, one string column) & for rest date_col**
- type: year **# options tested: identity, year, month, day, hour**
column: date_col
name: year
Parquet¶
- Partition using type identity
Example 1: Partitioning is done on identity by taking the vendor_id column.
- sequence:
- name: ny_taxi_ts
outputType: Parquet
outputOptions:
saveMode: overwrite
partitionBy:
- vendor_id
Outcomes¶
The files will be stored in the folders based on the partition criterion defined, and you can view them in workbench or storage locations.
Code files¶
Note: When specifying the partition criterion for Iceberg, the 'partitionSpec' property should be defined as a child property under 'iceberg' in the sink section. Otherwise partitioning will not be performed as desired.
version: v1
name: workflow-ny-customer-partitioned-03
type: workflow
tags:
- Connect
- NY-Taxi
workspace: curriculum
description: The job ingests NY-Taxi data small files and write with partitioning on vendor_id
workflow:
title: Connect NY Taxi
dag:
- name: nytaxi
title: NY-taxi data ingester
description: The job ingests NY-Taxi data from dropzone into raw zone
spec:
tags:
- Connect
- NY-Taxi
stack: flare:6.0
compute: runnable-default
stackSpec:
job:
explain: true
inputs:
- name: customer
dataset: dataos://icebase:retail/customer?acl=r
format: Iceberg
isStream: false
logLevel: INFO
outputs:
- name: ts_customer
dataset: dataos://icebase:sample/partioning03?acl=rw
format: Iceberg
description: This is a customer dataset
options:
saveMode: overwrite
iceberg:
properties:
write.format.default: parquet
write.metadata.compression-codec: gzip
partitionSpec:
- type: identity # options tested: string, integer, long**
column: social_class # columns used - identity (vendor_id, one string column)**
- type: year # options tested: string, integer, long**
column: birthdate # columns used - identity (vendor_id, one string column)**
name: yearly
tags:
- Connect
- customer
title: customer Data Partitioned Vendor
steps:
- sequence:
- name: ts_customer
sql: SELECT * FROM customer;