easyfabric.load_data_bronze
logging
datetime
DataFrame
config
MSG_BRONZE_CHANGE_DETECT_FAILED
MSG_BRONZE_CLEANUP_CHECKING
MSG_BRONZE_CLEANUP_DELETE_FAILED
MSG_BRONZE_CLEANUP_DELETING
MSG_BRONZE_CLEANUP_EVALUATING
MSG_BRONZE_CLEANUP_FOUND
MSG_BRONZE_CLEANUP_KEEPING
MSG_BRONZE_CLEANUP_KEEPING_INVALID_DATE
MSG_BRONZE_CLEANUP_KEEPING_NO_MOD_TIME
MSG_BRONZE_CLEANUP_KEEPING_TRACKING
MSG_BRONZE_CLEANUP_NO_CONFIGS
MSG_BRONZE_CLEANUP_PATH_ERROR
MSG_BRONZE_CLEANUP_PATH_NOT_FOUND
MSG_BRONZE_CLEANUP_SKIP_CONN
MSG_BRONZE_CLEANUP_SKIP_INACTIVE
MSG_BRONZE_CLEANUP_STARTED
MSG_BRONZE_CLEANUP_SUMMARY
MSG_BRONZE_DFLOADER_SETTINGS
MSG_BRONZE_DFLOADER_START
MSG_BRONZE_ERROR
MSG_BRONZE_EXCEPTION
MSG_BRONZE_FILE_PROGRESS
MSG_BRONZE_FILES_UNCHANGED
MSG_BRONZE_LOAD_START
MSG_BRONZE_LOADING_FILES
MSG_BRONZE_LOADING_HISTORY
MSG_BRONZE_LOADSKIP
MSG_BRONZE_MIDBRONZE_SET
MSG_BRONZE_NO_CONN_TYPE
MSG_BRONZE_NO_FILES
MSG_BRONZE_NO_PREV_TIMESTAMP
MSG_BRONZE_NOACTION
MSG_BRONZE_NOT_ACTIVE
MSG_BRONZE_NOT_CONFIGURED
MSG_BRONZE_POSTBRONZE_SET
MSG_BRONZE_PREBRONZE_SET
MSG_BRONZE_PRECHECK_FAILED
MSG_BRONZE_SNAPSHOT_FAILED
MSG_BRONZE_SOURCE_UNCHANGED
MSG_CONFIG_NOT_INITIALIZED
MSG_CORRELATION_NO_ROWS
MSG_COUNT_ERROR
MSG_FILETYPE_NONE
MSG_HISTORY_WRITE_ERROR
MSG_STALE_FILE
MSG_SYSTEM_STOP_AT_ERROR
MSG_UNSUPPORTED_FILETYPE
ConfigManager
LoadConfig
ObjectInfo
TableConfig
initialize_config
FabricChecksumMismatchError
FabricConfigError
FabricIntegrityError
FabricLoadError
FabricStaleFileError
get_log_file_path
init_logging
log_segment
save_log_file_to_table
get_spark
check_source_unchanged
dataframe_to_bronze
dataframe_to_bronze_old
load_csvwithschema_bronze
load_dataframe_bronze_history
load_json_bronze
load_notebook_bronze
load_notebook_midbronze
load_notebook_postbronze
load_notebook_prebronze
load_parquet_bronze
load_xlsx_bronze
load_xml_bronze
pull_files
refresh_table
truncate_bronze_table
get_tracker_file_path
has_files_changed
load_previous_snapshot
save_snapshot
ViolationRegistry
layer
run
def run(tablefile: str, config_manager: ConfigManager = None) -> str | None
Runs the bronze loader process for a specified table configuration and pulls files from the source, processes them, and loads them into the bronze layer.
This function initializes the table configuration based on the specified table file and checks its active state for the bronze process. If the table's configuration specifies pre-processing or post-processing notebooks, they are executed accordingly. Files from the source are pulled, processed, and loaded into the bronze layer based on their specified file types. Supported file types include CSV, JSON, XML, Parquet, and Notebook. The function also handles exceptions and ensures logs are saved correctly.
Arguments:
tablefilestr - Path to the YAML file representing a table's configuration.config_managerConfigManager - An instance of ConfigManager used for accessing the application's configuration settings.
Returns:
str- A message indicating the outcome of the loading process, such as the number of files loaded or an error message in case of failure.
Raises:
Exception- If theconfig_manageris not initialized, no active configuration can be found for the table, or the filetype is unsupported. @sidebar_position 3
dataframeloader
def dataframeloader(data_frame: DataFrame,
load_config: LoadConfig,
table_config: TableConfig,
config_manager: ConfigManager = None) -> str | None
Loads a DataFrame into a specified data platform table using the provided configuration and manager.
This function handles the loading operation by using detailed configurations for the DataFrame, table, and the application configuration manager. It sets up logging, ensures required parameters are initialized, and supports specific settings for different layers (e.g., bronze layer). The function handles exception logging and provides mechanisms to stop processing upon encountering errors based on configuration settings.
Arguments:
data_frameDataFrame - The data to be loaded into the specified table.load_configLoadConfig - Contains configuration for the loading process, including destination table.table_configTableConfig - Holds table-specific settings, e.g., table name identifiers and layers.config_managerConfigManager - Manages and validates application-level configurations.
Returns:
str- Message indicating the result of the DataFrame loading process, including the target table name and error details if applicable.
Raises:
Exception- If the destination table name is missing from LoadConfig.Exception- If the ConfigManager is not properly initialized.
cleanup_bronze_files
def cleanup_bronze_files(config_manager: ConfigManager = None) -> dict
Cleans up old files and partitions in the Bronze Lakehouse depending on the configuration settings on the Table, Connection, or global Lakehouse level.
Arguments:
config_managerConfigManager - An instance of ConfigManager.
Returns:
dict- A dictionary containing metrics of the cleanup: deleted_files, bytes_deleted, and skipped_objects.