Partitioning¶
Partitioning is a way to make queries faster by grouping similar rows together when writing.
This use case checks how partitioning works for Flare while writing data to Iceberg and non-Iceberg type data storage (Parquet, Kafka) to improve query processing performance.
This use case describes data partitioning with partition keys of different data types for Iceberg and Parquet formats.
Iceberg can partition timestamps by year, month, day, and hour granularity. It can also use a categorical column of data type- string, integer, long, double (to be checked), to store rows together and speed up queries, e.g., like vendor ID in this example.
Iceberg produces partition values by taking a column value and optionally transforming it. Iceberg converts timestamp values into a date, and extracts year, month, day, hour, etc.
While in non-Iceberg data sink, partitions are explicit and appear as a separate column in the table that must be supplied in every table write operation. For example, you need to provide the columns explicitly for the year, month, hour as a transformation from the timestamp column is not automatic.
You need to provide a categorical column as a partition criterion for non-Iceberg formats such as Parquet.
Implementation details¶
The following examples demonstrate the use of various partitioning modes.
Iceberg¶
- Partition using type identity
- Integer values
- String values
- Partition using type timestamp
- Year
- Month
- Day
- Hour
Example 1: Partitioning is done on identity by taking the vendor_id column. You don't need to provide the property name if the partition field type is identity type.
partitionSpec:
- type: identity **# options tested: identity, year, month, day, hour**
column: vendor_id **# columns used - identity (vendor_id, one string column) & for rest date_col**
Example 2: Partitioning is done on the year.
partitionSpec:
- type: year **# options tested: identity, year, month, day, hour**
column: date_col **# columns used - identity (vendor_id, one string column) & for rest date_col of type timestamp**
name: year
Example 3: Nested partitioning is done on (identity, year). Here, the vendor_id used for identity should come at the first level.
partitionSpec:
- type: identity
column: vendor_id **# columns used - identity (vendor_id, one string column) & for rest date_col**
- type: year **# options tested: identity, year, month, day, hour**
column: date_col
name: year
Parquet¶
- Partition using type identity
Example 1: Partitioning is done on identity by taking the vendor_id column.
- sink:
- sequenceName: ny_taxi_ts
datasetName: ny_taxi_parquet_06
outputName: output01
outputType: Parquet
outputOptions:
saveMode: overwrite
partitionBy:
- vendor_id
Outcomes¶
The files will be stored in the folders based on the partition criterion defined, and you can view them in workbench or storage locations.
Code files¶
Note: When specifying the partition criterion for Iceberg, the 'partitionSpec' property should be defined as a child property under 'iceberg' in the sink section. Otherwise partitioning will not be performed as desired.
version: v1beta1
name: workflow-ny-taxi-partitioned-vendor
type: workflow
tags:
- Connect
- NY-Taxi
description: The job ingests NY-Taxi data small files and write with partitioning on vendor_id
workflow:
title: Connect NY Taxi
dag:
- name: nytaxi
title: NY-taxi data ingester
description: The job ingests NY-Taxi data from dropzone into raw zone
spec:
tags:
- Connect
- NY-Taxi
stack: flare:1.0
flare:
job:
explain: true
inputs:
- name: ny_taxi
dataset: dataos://thirdparty01:none/ny-taxi-data?acl=r
format: json
isStream: false
logLevel: INFO
outputs:
- name: output01
depot: dataos://icebase:raw01?acl=rw
steps:
- sink:
- sequenceName: ny_taxi_ts
datasetName: ny_taxi_07
outputName: output01
outputType: Iceberg
outputOptions:
saveMode: overwrite
iceberg:
properties:
write.format.default: parquet
write.metadata.compression-codec: gzip
partitionSpec:
- type: identity **# options tested: string, integer, long**
column: vendor_id **# columns used - identity (vendor_id, one string column)**
- type: year **# options tested: identity, year, month, day, hour**
column: date_col **# columns used - identity (vendor_id, one string column) & for rest date_col**
name: year
tags:
- Connect
- NY-Taxi
title: NY-Taxi Data Partitioned Vendor
sequence:
- name: ny_taxi_changed_dateformat
sql: select *, to_timestamp(pickup_datetime/1000) as date_col from ny_taxi
- name: ny_taxi_ts
sql: SELECT *, date_format(now(), 'yyyyMMddHHmm') as version, now() as
ts_ny_taxi FROM ny_taxi_changed_dateformat