Source code for commons.utils.helper

import base64
import configparser
import json
import urllib
from urllib.parse import urlparse

from commons.utils import constants


[docs]def get_absolute_url(address_info): depot_type = str(address_info.get("type")).upper() connection = address_info.get("connection") if depot_type == "ABFSS": return connection.get("abfssUrl") elif depot_type == "WASBS": return connection.get("wasbsUrl") elif depot_type == "GCS": return connection.get("gcsUrl") elif depot_type == "S3": return connection.get("s3Url") elif depot_type == "FILE": return connection.get("url") raise Exception("Unsupported depot type: ", depot_type)
[docs]def decode_base64(s): s = str(s).strip() try: return base64.b64decode(s).decode('utf-8') except Exception: raise Exception("Invalid base64 string: {}".format(s))
[docs]def get_content(key: str, secret_res: dict): if secret_res.get("data", None) is None: raise Exception("Key : '%s' not found in secrets" % "data") for item in secret_res['data']: if item.get("key", None) is None: raise Exception("Key : '%s' not found in secrets" % "key") if item["key"] == key: if item.get("base64Value", None) is None: raise Exception("Key : '%s' not found in secrets" % "base64Value") encoded_gcs_conf = item["base64Value"] decoded_gcs_conf = decode_base64(encoded_gcs_conf) return str(decoded_gcs_conf) raise Exception("Key: {0} not found in response".format(key))
[docs]def get_properties(decoded_str: str): result = {} for line in decoded_str.splitlines(): if not line: continue # Filter out blank lines key_value = line.split("=", 1) if len(key_value) == 2: result[key_value[0]] = key_value[1] return result
[docs]def write_file(file_name: str, content: str): file = open(file_name, "w") file.write(content) file.close()
[docs]def to_url(url_string: str) -> str: try: parsed_url = urlparse(url_string) return parsed_url.geturl() except ValueError as e: raise AssertionError(f"Malformed URL: {url_string}") from e
[docs]def decode_url(dataset_name: str) -> str: return urllib.parse.unquote(dataset_name)
[docs]def encode_url(dataset_name: str) -> str: return urllib.parse.quote(dataset_name)
[docs]def encode_url_if_not(url: str) -> str: try: decode_url(url) return url except ValueError: return encode_url(url)
[docs]def get_value_or_throw(key: str, data: dict): if data[key] is None: raise Exception("Key: {0} not found in abfss properties".format(key)) return data[key]
[docs]def get_value_or_null(key: str, data: dict) -> str: if key in data: return data[key] else: return None
[docs]def convert_json_string_to_properties(json_string: str) -> dict: return json.loads(json_string)
[docs]def normalize_base_url(base_url): # Check if the base_url does not end with a slash if not base_url.endswith('/'): # Append a trailing slash to the base_url base_url = base_url + '/' return base_url