Source code for pyflare.sdk.readers.iceberg_reader
import ast
from pyflare.sdk.config.constants import S3_ICEBERG_FILE_IO
from pyflare.sdk.config.read_config import ReadConfig
from pyflare.sdk.readers.file_reader import FileInputReader
from pyflare.sdk.utils import pyflare_logger, generic_utils
[docs]class IcebergInputReader(FileInputReader):
"""
`IcebergInputReader` is a Python class designed to facilitate data retrieval from an Iceberg source
within the SDK, offering comprehensive functionality for both batch and streaming data reading.
Extends : FileInputReader class
Methods:
---------
- `read()`: Reads data from the Iceberg source in batch mode.
- `read_stream()`: Reads data from the Iceberg source in streaming mode.
- `get_conf()`: Returns spark configuration required for iceberg as per underlying filesystem.
- Other methods inherited from FileInputReader.
Notes:
-------
Ensure that the necessary dependencies and configurations for Iceberg and the SDK are set up before using this class.
For more information about Iceberg, refer to the Iceberg documentation: https://iceberg.apache.org/
For more information about the SDK, refer to the SDK documentation.
"""
def __init__(self, read_config: ReadConfig):
super().__init__(read_config)
self.log = pyflare_logger.get_pyflare_logger(name=__name__)
[docs] def read(self):
"""
Read data from the Iceberg source in batch mode.
:return: Batch data read from the Iceberg source.
:rtype: pyspark.sql.dataframe.DataFrame
"""
spark_options = self.read_config.spark_options
io_format = self.read_config.io_format
dataset_path = self.read_config.dataset_absolute_path()
df = self.spark.read
if spark_options:
df = df.options(**spark_options)
return df.format(io_format).load(dataset_path)
[docs] def read_stream(self):
"""
Read data from the Iceberg source in streaming mode.
:return: Streaming data read from the Iceberg source.
:rtype: Generator or other streaming data structure
"""
pass
[docs] def get_conf(self):
"""
Returns spark configuration required for iceberg as per underlying filesystem.
:return: Spark configuration required for iceberg as per underlying filesystem.
:rtype: list of tuples
"""
self.log.debug(f"calling : _{self.read_config.depot_type()}_{self.read_config.io_format}")
return getattr(self, f"_{self.read_config.depot_type()}_{self.read_config.io_format}")()
def _abfss_iceberg(self):
iceberg_conf = []
iceberg_conf.extend(generic_utils.get_abfss_spark_conf(self.read_config))
return iceberg_conf
def _s3_iceberg(self):
iceberg_conf = []
iceberg_conf.append(S3_ICEBERG_FILE_IO)
iceberg_conf.extend(generic_utils.get_s3_spark_conf(self.read_config))
return iceberg_conf
def _gcs_iceberg(self):
iceberg_conf = []
iceberg_conf.extend(generic_utils.get_gcs_spark_conf(self.read_config))
return iceberg_conf