Source code for pyflare.sdk.writers.jdbc_writer

from pyflare.sdk.config.write_config import WriteConfig
from pyflare.sdk.utils import pyflare_logger
from pyflare.sdk.writers.writer import Writer


[docs]class JDBCOutputWriter(Writer): def __init__(self, write_config: WriteConfig): super().__init__(write_config) self.log = pyflare_logger.get_pyflare_logger(name=__name__)
[docs] def write(self, df): # self.resolve_write_format() if self.write_config.is_stream: return self.write_stream() spark_options = getattr(self, f"_{self.write_config.io_format}_write_options")() # df = self.spark.sql(f"select * from {self.view_name}") df.write.options(**spark_options).format("jdbc").mode(self.write_config.mode).save()
[docs] def write_stream(self): pass
[docs] def get_conf(self): return {}
def _jdbc_write_options(self): pass def _postgres_write_options(self): return self._postgresql_write_options() def _postgresql_write_options(self): spark_options = self.write_config.spark_options secrets = self.write_config.depot_details.get("secrets") connection = self.write_config.connection() if not self.write_config.driver: self.write_config.driver = "org.postgresql.Driver" postgres_write_options = { "url": connection.get("url", "").split("?currentSchema=")[0], "dbtable": f"{self.write_config.collection()}.{self.write_config.dataset_name()}", "user": secrets.get("username", ""), "password": secrets.get("password", ""), "driver": self.write_config.driver } if spark_options: postgres_write_options.update(spark_options) self.log.info(f"Merged options: {postgres_write_options}") return postgres_write_options def _redshift_write_options(self): spark_options = self.write_config.spark_options secrets = self.write_config.depot_details.get("secrets") connection = self.write_config.connection() if not self.write_config.driver: self.write_config.driver = "com.amazon.redshift.jdbc.Driver" redshift_write_options = { "url": connection.get("url", ""), "dbtable": f"{self.write_config.collection()}.{self.write_config.dataset_name()}", "user": secrets.get("username", ""), "password": secrets.get("password", ""), "driver": self.write_config.driver } if spark_options: redshift_write_options.update(spark_options) self.log.info(f"Merged options: {redshift_write_options}") return redshift_write_options def _sqlserver_write_options(self): pass def _oracle_write_options(self): pass def _mysql_write_options(self): spark_options = self.write_config.spark_options secrets = self.write_config.depot_details.get("secrets") connection = self.write_config.connection() if not self.write_config.driver: self.write_config.driver = "com.mysql.jdbc.Driver" mysql_write_options = { "url": connection.get("url", ""), "dbtable": f"{self.read_config.collection()}.{self.read_config.dataset_name()}", "user": secrets.get("username", ""), "password": secrets.get("password", ""), "driver": self.write_config.driver } self.log.warn(f"mysql_write_options: {mysql_write_options}") self.log.debug(f"mysql_write_options: {mysql_write_options}") if spark_options: mysql_write_options.update(spark_options) self.log.debug(f"Merged options: {mysql_write_options}") return mysql_write_options