pyflare.sdk.core package

Submodules

pyflare.sdk.core.dataos_input module

class pyflare.sdk.core.dataos_input.DataOSInput(name, parsed_inputs, spark, apikey, is_stream=None, source_format=None, driver=None, query=None, options=None)

Bases: object

process_inputs() Tuple[Any, Any]

Read dataset from a source with the supplied parameters and create a temp view with the name passed in the dataos_source decorator.

pyflare.sdk.core.dataos_output module

class pyflare.sdk.core.dataos_output.DataOSOutput(name, dataframe, parsed_outputs, spark, apikey, is_stream=None, sink_format=None, mode=None, driver=None, options=None)

Bases: object

process_outputs()

Write the transformed dataset to sink, with the supplied parameters to dataos_sink decorator.

pyflare.sdk.core.decorator module

pyflare.sdk.core.minerva_input module

class pyflare.sdk.core.minerva_input.MinervaInput(name, parsed_inputs, spark, apikey, cluster_name, source_format='jdbc', driver='io.trino.jdbc.TrinoDriver', query=None, options=None)

Bases: object

process_inputs() DataFrame

Run query on minerva and the result is stored as a temp view with the name passed in the dataos_source decorator.

pyflare.sdk.core.session_builder module

class pyflare.sdk.core.session_builder.SparkSessionBuilder(log_level: str)[source]

Bases: object

add_reader_instance(depot_name, format_list: list)[source]
add_writer_instance(depot_name, format_list: list)[source]
api_token: str = ''
build_session() SparkSession[source]
dataos_fqdn: str = ''
load_default_spark_conf() SparkSessionBuilder[source]
log_level: str = 'INFO'
logger: Logger = None
parsed_inputs: dict = {}
parsed_outputs: dict = {}
spark: SparkSession = None
spark_conf = []
with_dataos_fqdn(dataos_fqdn: str)[source]
with_depot(depot_name: str, acl: str = 'r') SparkSessionBuilder[source]
with_readers(reader_address_list) SparkSessionBuilder[source]
with_spark_conf(conf) SparkSessionBuilder[source]
with_user_apikey(apikey: str)[source]
with_writers(writer_address_list) SparkSessionBuilder[source]
pyflare.sdk.core.session_builder.g_dataos_token: str
pyflare.sdk.core.session_builder.g_inputs: dict
pyflare.sdk.core.session_builder.g_outputs: dict
pyflare.sdk.core.session_builder.load(name, format, driver=None, query=None, options=None)[source]

Read a dataset from the source.

Parameters:
  • name (str) – Depot address of the source.

  • format (str) – Read format.

  • driver (str) – Driver needed to read from the source (optional).

  • query (str) – Query to execute (optional).

  • options (dict) – Additional Spark and source properties (optional).

Returns:

A Spark DataFrame with governed data.

Return type:

pyspark.sql.DataFrame

Raises:

PyflareReadException – If the dataset does not exist or read access fails.

Examples

Iceberg:

read_options = {
    'compression': 'gzip',
    'iceberg': {
        'table_properties': {
            'read.split.target-size': 134217728,
            'read.split.metadata-target-size': 33554432
        }
    }
}

load(name="dataos://lakehouse:retail/city", format="iceberg", options=read_options)

JDBC:

read_options = {
    'compression': 'gzip',
    'partitionColumn': 'last_update',
    'lowerBound': datetime.datetime(2008, 1, 1),
    'upperBound': datetime.datetime(2009, 1, 1),
    'numPartitions': 6
}

load(name="dataos://sanitypostgres:public/city", format="postgresql",
     driver="com.mysql.cj.jdbc.Driver", options=read_options)
pyflare.sdk.core.session_builder.minerva_input(name, query, cluster_name='system', driver='io.trino.jdbc.TrinoDriver', options=None)[source]
pyflare.sdk.core.session_builder.refresh_global_data(spark_session_builder: SparkSessionBuilder)[source]
pyflare.sdk.core.session_builder.save(name: str, dataframe, format: Optional[str] = None, mode='append', driver=None, options=None)[source]

Write the transformed dataset to the output sink.

Parameters:
  • name (str) – Output key to write.

  • dataframe (pyspark.sql.DataFrame) – The DataFrame to write.

  • format (str) – Output format (e.g., iceberg, parquet).

  • mode (str) – Write mode (default is “append”).

  • driver (str) – Driver to use for the sink (optional).

  • options (dict) – Additional write configuration (optional).

Raises:

PyflareWriteException – If dataset does not exist or write access fails.

Example

write_options = {
    "compression": "gzip",
    "iceberg": {
        "table_properties": {
            "write.format.default": "parquet",
            "write.parquet.compression-codec": "gzip",
            "write.metadata.previous-versions-max": 3,
            "parquet.page.write-checksum.enabled": "false"
        },
        "partition": [
            {"type": "months", "column": "ts_city"},
            {"type": "bucket", "column": "city_id", "bucket_count": 8},
            {"type": "identity", "column": "city_name"}
        ]
    }
}

save(name="dataos://lakehouse:sdk/city", format="iceberg",
     mode="append", options=write_options)
pyflare.sdk.core.session_builder.spark: SparkSession

Module contents