Skip to main content

easyfabric.load_data_bronze

logging

datetime

DataFrame

config

MSG_BRONZE_CHANGE_DETECT_FAILED

MSG_BRONZE_CLEANUP_CHECKING

MSG_BRONZE_CLEANUP_DELETE_FAILED

MSG_BRONZE_CLEANUP_DELETING

MSG_BRONZE_CLEANUP_EVALUATING

MSG_BRONZE_CLEANUP_FOUND

MSG_BRONZE_CLEANUP_KEEPING

MSG_BRONZE_CLEANUP_KEEPING_INVALID_DATE

MSG_BRONZE_CLEANUP_KEEPING_NO_MOD_TIME

MSG_BRONZE_CLEANUP_KEEPING_TRACKING

MSG_BRONZE_CLEANUP_NO_CONFIGS

MSG_BRONZE_CLEANUP_PATH_ERROR

MSG_BRONZE_CLEANUP_PATH_NOT_FOUND

MSG_BRONZE_CLEANUP_SKIP_CONN

MSG_BRONZE_CLEANUP_SKIP_INACTIVE

MSG_BRONZE_CLEANUP_STARTED

MSG_BRONZE_CLEANUP_SUMMARY

MSG_BRONZE_DFLOADER_SETTINGS

MSG_BRONZE_DFLOADER_START

MSG_BRONZE_ERROR

MSG_BRONZE_EXCEPTION

MSG_BRONZE_FILE_PROGRESS

MSG_BRONZE_FILES_UNCHANGED

MSG_BRONZE_LOAD_START

MSG_BRONZE_LOADING_FILES

MSG_BRONZE_LOADING_HISTORY

MSG_BRONZE_LOADSKIP

MSG_BRONZE_MIDBRONZE_SET

MSG_BRONZE_NO_CONN_TYPE

MSG_BRONZE_NO_FILES

MSG_BRONZE_NO_PREV_TIMESTAMP

MSG_BRONZE_NOACTION

MSG_BRONZE_NOT_ACTIVE

MSG_BRONZE_NOT_CONFIGURED

MSG_BRONZE_POSTBRONZE_SET

MSG_BRONZE_PREBRONZE_SET

MSG_BRONZE_PRECHECK_FAILED

MSG_BRONZE_SNAPSHOT_FAILED

MSG_BRONZE_SOURCE_UNCHANGED

MSG_CONFIG_NOT_INITIALIZED

MSG_CORRELATION_NO_ROWS

MSG_COUNT_ERROR

MSG_FILETYPE_NONE

MSG_HISTORY_WRITE_ERROR

MSG_STALE_FILE

MSG_SYSTEM_STOP_AT_ERROR

MSG_UNSUPPORTED_FILETYPE

ConfigManager

LoadConfig

ObjectInfo

TableConfig

initialize_config

FabricChecksumMismatchError

FabricConfigError

FabricIntegrityError

FabricLoadError

FabricStaleFileError

get_log_file_path

init_logging

log_segment

save_log_file_to_table

get_spark

check_source_unchanged

dataframe_to_bronze

dataframe_to_bronze_old

load_csvwithschema_bronze

load_dataframe_bronze_history

load_json_bronze

load_notebook_bronze

load_notebook_midbronze

load_notebook_postbronze

load_notebook_prebronze

load_parquet_bronze

load_xlsx_bronze

load_xml_bronze

pull_files

refresh_table

truncate_bronze_table

get_tracker_file_path

has_files_changed

load_previous_snapshot

save_snapshot

ViolationRegistry

layer

run

def run(tablefile: str, config_manager: ConfigManager = None) -> str | None

Runs the bronze loader process for a specified table configuration and pulls files from the source, processes them, and loads them into the bronze layer.

This function initializes the table configuration based on the specified table file and checks its active state for the bronze process. If the table's configuration specifies pre-processing or post-processing notebooks, they are executed accordingly. Files from the source are pulled, processed, and loaded into the bronze layer based on their specified file types. Supported file types include CSV, JSON, XML, Parquet, and Notebook. The function also handles exceptions and ensures logs are saved correctly.

Arguments:

  • tablefile str - Path to the YAML file representing a table's configuration.
  • config_manager ConfigManager - An instance of ConfigManager used for accessing the application's configuration settings.

Returns:

  • str - A message indicating the outcome of the loading process, such as the number of files loaded or an error message in case of failure.

Raises:

  • Exception - If the config_manager is not initialized, no active configuration can be found for the table, or the filetype is unsupported. @sidebar_position 3

dataframeloader

def dataframeloader(data_frame: DataFrame,
load_config: LoadConfig,
table_config: TableConfig,
config_manager: ConfigManager = None) -> str | None

Loads a DataFrame into a specified data platform table using the provided configuration and manager.

This function handles the loading operation by using detailed configurations for the DataFrame, table, and the application configuration manager. It sets up logging, ensures required parameters are initialized, and supports specific settings for different layers (e.g., bronze layer). The function handles exception logging and provides mechanisms to stop processing upon encountering errors based on configuration settings.

Arguments:

  • data_frame DataFrame - The data to be loaded into the specified table.
  • load_config LoadConfig - Contains configuration for the loading process, including destination table.
  • table_config TableConfig - Holds table-specific settings, e.g., table name identifiers and layers.
  • config_manager ConfigManager - Manages and validates application-level configurations.

Returns:

  • str - Message indicating the result of the DataFrame loading process, including the target table name and error details if applicable.

Raises:

  • Exception - If the destination table name is missing from LoadConfig.
  • Exception - If the ConfigManager is not properly initialized.

cleanup_bronze_files

def cleanup_bronze_files(config_manager: ConfigManager = None) -> dict

Cleans up old files and partitions in the Bronze Lakehouse depending on the configuration settings on the Table, Connection, or global Lakehouse level.

Arguments:

  • config_manager ConfigManager - An instance of ConfigManager.

Returns:

  • dict - A dictionary containing metrics of the cleanup: deleted_files, bytes_deleted, and skipped_objects.