Source code for pyflare.sdk.writers.elasticsearch_writer

from pyflare.sdk.config.constants import ELASTIC_SEARCH_IO_FORMAT
from pyflare.sdk.config.write_config import WriteConfig
from pyflare.sdk.utils import pyflare_logger
from pyflare.sdk.writers.writer import Writer


[docs]class ElasticSearchOutputWriter(Writer): def __init__(self, write_config: WriteConfig): super().__init__(write_config) self.log = pyflare_logger.get_pyflare_logger(name=__name__)
[docs] def write(self, df): # self.resolve_write_format() if self.write_config.is_stream: return self.write_stream() spark_options = self.write_config.spark_options # df = self.spark.sql(f"select * from {self.view_name}") df.write.format(ELASTIC_SEARCH_IO_FORMAT).mode(self.write_config.mode).options(**spark_options).save()
[docs] def write_stream(self): pass
[docs] def get_conf(self): es_spark_options = { "es.nodes": self.write_config.depot_details.get('connection', {}).get('nodes', "")[0], "es.resource": self.write_config.dataset_name(), "es.nodes.wan.only": True, "es.net.http.auth.user": self.write_config.depot_details.get('secrets', {}).get('username', ""), "es.net.http.auth.pass": self.write_config.depot_details.get('secrets', {}).get('password', ""), "es.nodes.discovery": False } self.write_config.spark_options = es_spark_options return list()