pyflare.sdk.core package
Submodules
pyflare.sdk.core.dataos_input module
- class pyflare.sdk.core.dataos_input.DataOSInput(name, parsed_inputs, spark, apikey, is_stream=None, source_format=None, driver=None, query=None, options=None)
Bases:
object
- process_inputs() Tuple[Any, Any]
Read dataset from a source with the supplied parameters and create a temp view with the name passed in the dataos_source decorator.
pyflare.sdk.core.dataos_output module
pyflare.sdk.core.decorator module
pyflare.sdk.core.minerva_input module
- class pyflare.sdk.core.minerva_input.MinervaInput(name, parsed_inputs, spark, apikey, cluster_name, source_format='jdbc', driver='io.trino.jdbc.TrinoDriver', query=None, options=None)
Bases:
object
- process_inputs() DataFrame
Run query on minerva and the result is stored as a temp view with the name passed in the dataos_source decorator.
pyflare.sdk.core.session_builder module
- class pyflare.sdk.core.session_builder.SparkSessionBuilder(log_level: str)[source]
Bases:
object
- api_token: str = ''
- dataos_fqdn: str = ''
- load_default_spark_conf() SparkSessionBuilder [source]
- log_level: str = 'INFO'
- logger: Logger = None
- parsed_inputs: dict = {}
- parsed_outputs: dict = {}
- spark: SparkSession = None
- spark_conf = []
- with_depot(depot_name: str, acl: str = 'r') SparkSessionBuilder [source]
- with_readers(reader_address_list) SparkSessionBuilder [source]
- with_spark_conf(conf) SparkSessionBuilder [source]
- with_writers(writer_address_list) SparkSessionBuilder [source]
- pyflare.sdk.core.session_builder.g_dataos_token: str
- pyflare.sdk.core.session_builder.g_inputs: dict
- pyflare.sdk.core.session_builder.g_outputs: dict
- pyflare.sdk.core.session_builder.load(name, format, driver=None, query=None, options=None)[source]
Read a dataset from the source.
- Parameters:
name (str) – Depot address of the source.
format (str) – Read format.
driver (str) – Driver needed to read from the source (optional).
query (str) – Query to execute (optional).
options (dict) – Additional Spark and source properties (optional).
- Returns:
A Spark DataFrame with governed data.
- Return type:
pyspark.sql.DataFrame
- Raises:
PyflareReadException – If the dataset does not exist or read access fails.
Examples
Iceberg:
read_options = { 'compression': 'gzip', 'iceberg': { 'table_properties': { 'read.split.target-size': 134217728, 'read.split.metadata-target-size': 33554432 } } } load(name="dataos://lakehouse:retail/city", format="iceberg", options=read_options)
JDBC:
read_options = { 'compression': 'gzip', 'partitionColumn': 'last_update', 'lowerBound': datetime.datetime(2008, 1, 1), 'upperBound': datetime.datetime(2009, 1, 1), 'numPartitions': 6 } load(name="dataos://sanitypostgres:public/city", format="postgresql", driver="com.mysql.cj.jdbc.Driver", options=read_options)
- pyflare.sdk.core.session_builder.minerva_input(name, query, cluster_name='system', driver='io.trino.jdbc.TrinoDriver', options=None)[source]
- pyflare.sdk.core.session_builder.refresh_global_data(spark_session_builder: SparkSessionBuilder)[source]
- pyflare.sdk.core.session_builder.save(name: str, dataframe, format: Optional[str] = None, mode='append', driver=None, options=None)[source]
Write the transformed dataset to the output sink.
- Parameters:
name (str) – Output key to write.
dataframe (pyspark.sql.DataFrame) – The DataFrame to write.
format (str) – Output format (e.g., iceberg, parquet).
mode (str) – Write mode (default is “append”).
driver (str) – Driver to use for the sink (optional).
options (dict) – Additional write configuration (optional).
- Raises:
PyflareWriteException – If dataset does not exist or write access fails.
Example
write_options = { "compression": "gzip", "iceberg": { "table_properties": { "write.format.default": "parquet", "write.parquet.compression-codec": "gzip", "write.metadata.previous-versions-max": 3, "parquet.page.write-checksum.enabled": "false" }, "partition": [ {"type": "months", "column": "ts_city"}, {"type": "bucket", "column": "city_id", "bucket_count": 8}, {"type": "identity", "column": "city_name"} ] } } save(name="dataos://lakehouse:sdk/city", format="iceberg", mode="append", options=write_options)
- pyflare.sdk.core.session_builder.spark: SparkSession