Source code for pyflare.sdk.readers.fastbase_reader

import ast

from pyflare.sdk.config.read_config import ReadConfig
from pyflare.sdk.readers.reader import Reader
from pyflare.sdk.utils import pyflare_logger


[docs]class FastBaseInputReader(Reader): PULSAR_Options = '''[ ("service.url", "{serviceUrl}"), ("admin.url", "{adminUrl}"), ("pulsar.admin.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationToken"), ("pulsar.admin.authParams","token:{Apikey}"), ("pulsar.client.authPluginClassName","org.apache.pulsar.client.impl.auth.AuthenticationToken"), ("pulsar.client.authParams","token:{Apikey}"), ("topic","persistent://public/default/{dataset}") ]''' def __init__(self, read_config: ReadConfig): super().__init__(read_config) self.log = pyflare_logger.get_pyflare_logger(name=__name__)
[docs] def read(self): if self.read_config.is_stream: return self.read_stream() return getattr(self, f"_read_{self.read_config.io_format}")()
[docs] def read_stream(self): return
[docs] def get_conf(self): return []
def _read_pulsar(self): from pyflare.sdk.core.session_builder import g_dataos_token pulsar_options = dict(ast.literal_eval( self.PULSAR_Options.format(serviceUrl=self.read_config.connection().get("serviceUrl"), adminUrl=self.read_config.connection().get("adminUrl"), Apikey=g_dataos_token, dataset=self.read_config.dataset_name()))) spark_options = self.read_config.spark_options if spark_options: pulsar_options.update(spark_options) io_format = self.read_config.io_format df = self.spark.read.options(**pulsar_options).format(io_format).load() return df