Source code for pyflare.sdk.readers.bigquery_reader

import base64
import json
import tempfile

from pyflare.sdk.config.read_config import ReadConfig
from pyflare.sdk.readers.file_reader import Reader
from pyflare.sdk.utils import pyflare_logger, generic_utils


[docs]class BigqueryInputReader(Reader): def __init__(self, read_config: ReadConfig): super().__init__(read_config) self.log = pyflare_logger.get_pyflare_logger(name=__name__)
[docs] def read(self): spark_options = self.read_config.spark_options io_format = self.read_config.io_format dataset_path = "{}.{}.{}".format(self.read_config.spark_options.get("parentProject", ""), self.read_config.collection(), self.read_config.dataset_name()) if spark_options: df = self.spark.read.options(**spark_options).format(io_format).load(dataset_path) else: df = self.spark.read.format(io_format).load(dataset_path) return df
[docs] def read_stream(self): pass
[docs] def get_conf(self): # depot_name = self.read_config.depot_details['depot'] # secret_file_path = f"{depot_name}_secrets_file_path" # keyfile_path = self.read_config.depot_details.get("secrets", {}).get(secret_file_path, "") connection_details = self.read_config.depot_details.get("connection", {}) secrets = self.read_config.depot_details.get("secrets", {}) gcp_secrets_content = json.dumps(secrets) with tempfile.NamedTemporaryFile(delete=False, mode="w") as temp_secrets_file: temp_secrets_file.write(gcp_secrets_content) temp_secrets_file_path = temp_secrets_file.name bigquery_spark_option = { "parentProject": connection_details.get("project", "") } self.read_config.spark_options = bigquery_spark_option bigquery_conf = [ ("spark.hadoop.google.cloud.auth.service.account.json.keyfile", temp_secrets_file_path), ("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"), ("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") ] return bigquery_conf