Source code for pyflare.sdk.writers.iceberg_writer

from typing import List, Tuple

import traceback

import logging

import ast
import base64
import time
import warnings
import pyspark.sql.functions as F
from pyspark.sql import DataFrame

from pyflare.sdk.config import constants
from pyflare.sdk.config.constants import S3_ICEBERG_FILE_IO
from pyflare.sdk.config.write_config import WriteConfig
from pyflare.sdk.utils import pyflare_logger, generic_utils
from pyflare.sdk.writers.file_writer import FileOutputWriter
from pyspark.sql.readwriter import DataFrameWriterV2

warnings.filterwarnings("ignore", message=".*version-hint.text.*")


def _generate_iceberg_catalog_conf(catalog_name: str, depot_base_path: str) -> List[Tuple[str, str]]:
    """
    Generates a list of key-value pairs for the Iceberg configuration.

    :param catalog_name: The catalog name for the configuration.
    :param depot_base_path: The base path for the warehouse.
    :return: List of tuples with configuration keys and values.
    """
    return [
        (f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog"),
        (f"spark.sql.catalog.{catalog_name}.type", "hadoop"),
        (f"spark.sql.catalog.{catalog_name}.warehouse", depot_base_path)
    ]


[docs]class IcebergOutputWriter(FileOutputWriter): """ A class for writing data to the iceberg format using PySpark. """ def __init__(self, write_config: WriteConfig): super().__init__(write_config) self.log = pyflare_logger.get_pyflare_logger(name=__name__) self.depot_suffix = "_" + str(int(time.time() * 1e6))
[docs] def write(self, df: DataFrame): """ Writes the given DataFrame to the iceberg format. If the "merge" option is specified in the write_config's extra_options, the DataFrame is merged into an existing table using a temporary view. Otherwise, the DataFrame is written to the iceberg format with the given options. Parameters: df (DataFrame): The DataFrame to write. Returns: None """ try: self.temp_spark_conf() if "merge" in self.write_config.extra_options.keys(): # we add _temp to use catalog properties temporarily and later remove it from spark conf depot = self.write_config.depot_details.get("depot") + self.depot_suffix collection = self.write_config.depot_details.get("collection") dataset = self.write_config.depot_details.get("dataset") view_name = f"{depot}_{collection}_{dataset}_{int(time.time() * 1e9)}" df.createOrReplaceTempView(view_name) self.spark.sql(self.__merge_into_query(view_name, depot, collection, dataset)) else: spark_options = self.write_config.spark_options table_properties = self.write_config.extra_options.get("table_properties", {}) io_format = self.write_config.io_format dataset_path = generic_utils.get_dataset_path(self.write_config, self.depot_suffix) df_writer = df.writeTo(dataset_path).using(io_format) if spark_options: df_writer = df_writer.options(**spark_options) if table_properties: df_writer = df_writer.tableProperty(**table_properties) df_writer = self.__process_partition_conf(df_writer) self.__write_mode(df_writer) except Exception: logging.error("Error writing dataframe to iceberg table: %s", traceback.format_exc()) finally: self.cleanup_temp_spark_conf() logging.debug("iceberg catalog conf cleanup successful")
[docs] def write_stream(self): """ Writes the given DataFrame to the iceberg format in streaming mode. :return: None """ pass
[docs] def get_conf(self): """ Returns spark configuration required for iceberg as per underlying filesystem. :return: Spark configuration required for iceberg as per underlying filesystem. :rtype: list of tuples """ return getattr(self, f"_{self.write_config.depot_type()}_{self.write_config.io_format}")()
def _abfss_iceberg(self): """ Returns spark configuration required for iceberg on Azure Blob Storage. :return: Spark configuration required for iceberg on Azure Blob Storage. :rtype: list of tuples """ iceberg_conf = [] iceberg_conf.extend(generic_utils.get_abfss_spark_conf(self.write_config)) return iceberg_conf def _s3_iceberg(self): """ Returns spark configuration required for iceberg on AWS S3. :return: Spark configuration required for iceberg on AWS S3. :rtype: list of tuples """ iceberg_conf = [S3_ICEBERG_FILE_IO] iceberg_conf.extend(generic_utils.get_s3_spark_conf(self.write_config)) return iceberg_conf def _gcs_iceberg(self): """ Returns spark configuration required for iceberg on Google Cloud Storage. :return: Spark configuration required for iceberg on Google Cloud Storage. :rtype: list of tuples """ iceberg_conf = [] iceberg_conf.extend(generic_utils.get_gcs_spark_conf(self.write_config)) return iceberg_conf def __process_partition_conf(self, df_writer: DataFrameWriterV2) -> DataFrameWriterV2: """ Processes the partition configuration for the given DataFrameWriterV2 instance. :param df_writer: The DataFrameWriterV2 instance to process. :type df_writer: DataFrameWriterV2 :return: The processed DataFrameWriterV2 instance. :rtype: DataFrameWriterV2 """ partition_column_list = [] for temp_dict in self.write_config.extra_options.get("partition", []): partition_scheme: str = temp_dict.get("type", "") partition_column: str = temp_dict.get("column", "") if partition_scheme.casefold() in ["year", "month", "day", "hour"]: self.log.info(f"partition scheme: {partition_scheme}, partition column: {partition_column}") partition_column_list.append(getattr(F, f"{partition_scheme}s")(partition_column)) elif partition_scheme.casefold() == "bucket": bucket_count: int = temp_dict.get("bucket_count", 8) self.log.info( f"partition scheme: {partition_scheme}, partition column: {partition_column}, " f"bucket_count: {bucket_count}") self.log.info(f"F.bucket({bucket_count}, {partition_column}") partition_column_list.append(getattr(F, f"{partition_scheme}")(bucket_count, F.col(partition_column))) elif partition_scheme.casefold() == "identity": self.log.info(f"partition column: {partition_column}") partition_column_list.append(F.col(partition_column)) else: self.log.warn(f"Invalid partition scheme: {partition_scheme}") if partition_column_list: df_writer = df_writer.partitionedBy(*partition_column_list) return df_writer def __write_mode(self, df: DataFrameWriterV2): """ Sets the write mode for the given DataFrameWriterV2 instance based on the write_config's mode. :param df: The DataFrameWriterV2 instance to set the write mode for. :type df: DataFrameWriterV2 """ if self.write_config.mode in ["create", "overwrite", "write"]: df.createOrReplace() elif self.write_config.mode in ['overwriteByPartition']: df.overwritePartitions() else: df.append() def __merge_into_query(self, source_view: str, depot, collection, dataset): """ Generates a merge query for merging the given source view into the specified dataset. :param source_view: The name of the source view to merge. :type source_view: str :param depot: The name of the depot. :type depot: str :param collection: The name of the collection. :type collection: str :param dataset: The name of the dataset. :type dataset: str :return: The merge query. :rtype: str """ merge_clauses = self.write_config.extra_options.get("merge", {}) query = f"MERGE INTO {depot}.{collection}.{dataset} as target \n" query += f"USING (select * from {source_view}) as source \n" query += f"ON {merge_clauses.get('onClause', '')} \n" query += f"{merge_clauses.get('whenClause', '')} \n" return query
[docs] def temp_spark_conf(self): """ Generates a temporary Spark configuration for iceberg. :return: The temporary Spark configuration. :rtype: list of tuples """ dataset_absolute_path = self.write_config.depot_absolute_path() iceberg_conf = _generate_iceberg_catalog_conf(self.write_config.depot_name() + self.depot_suffix, dataset_absolute_path) for key, value in iceberg_conf: self.spark.conf.set(key, value) return iceberg_conf
[docs] def cleanup_temp_spark_conf(self): """ Cleans up the temporary Spark configuration for iceberg. :return: None """ dataset_absolute_path = self.write_config.depot_absolute_path() iceberg_conf = _generate_iceberg_catalog_conf(self.write_config.depot_name() + self.depot_suffix, dataset_absolute_path) for key, _ in iceberg_conf: self.spark.conf.unset(key)