Load Bronze
The load_data_bronze module is the entry point for extracting data from source systems and landing it in the Bronze layer. It handles multiple file formats (CSV, JSON, XML, etc.) and ensures that raw data is captured with minimal transformation while managing pre- and post-processing notebook hooks. The module includes intelligent file change detection to skip unnecessary loads and validation checks to ensure data quality.
Overview
The bronze loader performs the following workflow:
- Source Check (optional): For Azure Blob sources, checks blob metadata without downloading to determine if any files have changed
- File Pulling: Downloads files from the configured source system
- Freshness Validation: Verifies that source files are not stale based on configurable age thresholds
- Change Detection (optional): Compares current files against a snapshot to skip loading if nothing has changed
- Data Loading: Processes files according to their file type and loads into Bronze tables
- History Tracking (optional): Maintains history records and validates that file changes correlate with data changes
Key Features
Skip-if-Unchanged (skipifsourceunchanged)
This feature prevents unnecessary bronze loads when source files have not changed. It works in two stages:
Stage 1: Azure Blob Pre-Check (for Azure Blob sources)
- Compares blob metadata (MD5 hash and size) without downloading files
- If all blobs match previous snapshot, entire load is skipped
- Significantly reduces bandwidth and processing time
Stage 2: File Change Detection (after file pull)
- Compares pulled files against previously saved snapshot
- Uses MD5 hash or file size for comparison
- Skips load if no changes detected
Enable this feature by setting skipifsourceunchanged: true in your table configuration.
File Freshness Validation (Validation 1)
Ensures source files are recent enough for processing:
- Checks
last_modifiedtimestamp for each file - Compares against
max_file_age_hours(default: 2 hours) - Logs warnings for stale files but continues processing
- Configurable per connection in ConfigManager
History Validation (Validation 2)
When history is enabled (keephistory: true), validates data integrity:
- Tracks MD5 hash and file size for each source file
- Compares file changes against the number of new records added to history table
- Warns if files changed but no new records were added (potential data issue)
- Logs detailed change information for debugging
File Tracker
The file tracker maintains snapshots of source files in the Bronze layer:
- Stores metadata (MD5, size, timestamp) in NDJSON format
- Located at the stable Bronze path for the table
- Used for change detection without re-downloading files
- Automatically updated after successful loads
Function Reference
run
def run(tablefile: str, config_manager: ConfigManager = None) -> str
Runs the bronze loader process for a specified table configuration and pulls files from the source, processes them, and loads them into the bronze layer.
Workflow:
- Validates table configuration and layer settings
- Computes stable Bronze folder path for file tracker
- Loads previous file snapshot from tracker (if exists)
- Optionally performs Azure Blob pre-check for unchanged sources
- Executes pre-bronze notebook (if configured)
- Pulls files from source system
- Validates file freshness (Validation 1)
- Checks for file changes using tracker snapshot (if
skipifsourceunchangedenabled) - Truncates Bronze table and loads file data by type
- Executes mid-bronze notebook and reloads data (if configured)
- Loads history records and validates correlation (Validation 2)
- Saves file snapshot for next run
- Executes post-bronze notebook (if configured)
Arguments:
tablefilestr - Path to the YAML file representing a table's configuration.config_managerConfigManager - An instance of ConfigManager used for accessing the application's configuration settings. Defaults to global config if not provided.
Returns:
str- A message indicating the outcome, such as file count, skip reason, or error details. ReturnsNoneif table is inactive or skipped.
Raises:
Exception- If ConfigManager is not initialized, table config is invalid, or filetype is unsupported.
Configuration Options:
skipifsourceunchanged(bool) - Enable skip-if-unchanged detectionbronzeloadskip(bool) - Skip entire bronze load for this tablekeephistory(bool) - Maintain history records and validationprebronzenotebook(str) - Path to notebook to run before loadingmidbronzenotebook(str) - Path to notebook to run between load and historypostbronzenotebook(str) - Path to notebook to run after loadingsourceorder(bool) - Sort files by sourceorder instead of namebronzefolder(str) - Override Bronze folder from connectionmax_file_age_hours(int) - Maximum file age for freshness validation (per connection)
dataframeloader
def dataframeloader(data_frame: DataFrame, load_config: LoadConfig,
table_config: TableConfig, config_manager: ConfigManager=None)
Loads a DataFrame into a specified data platform table using the provided configuration and manager.
This function handles the loading operation by using detailed configurations for the DataFrame, table, and the application configuration manager. It sets up logging, ensures required parameters are initialized, and supports specific settings for different layers (e.g., bronze layer). The function handles exception logging and provides mechanisms to stop processing upon encountering errors based on configuration settings.
Arguments:
data_frameDataFrame - The data to be loaded into the specified table.load_configLoadConfig - Contains configuration for the loading process, including destination table.table_configTableConfig - Holds table-specific settings, e.g., table name identifiers and layers.config_managerConfigManager - Manages and validates application-level configurations.
Returns:
str- Message indicating the result of the DataFrame loading process, including the target table name and error details if applicable.
Raises:
Exception- If the destination table name is missing from LoadConfig.Exception- If the ConfigManager is not properly initialized.
Supported File Types
The bronze loader supports the following file formats:
- CSV - Comma-separated values with automatic schema detection
- JSON - JSON objects and arrays
- XML - Extensible Markup Language files
- Parquet - Apache Parquet columnar format
- XLSX - Excel spreadsheets
- Notebook - Databricks notebooks for custom processing
Error Handling
The module provides comprehensive error handling:
- Pre-check failures: Logs warnings and continues with regular load if pre-check fails
- File change detection failures: Proceeds with load if change detection fails
- Stale files: Logs warnings for old files but continues processing
- Configuration errors: Raises exceptions for missing or invalid configurations
- Stop at error: Respects the
stop_at_errorconfig to halt on first error or continue with warnings
Example Usage
from easyfabric.load_data_bronze import run
from easyfabric.data import ConfigManager
# Initialize config manager
config_manager = ConfigManager.initialize()
# Run bronze loader for a specific table
result = run(
tablefile="/path/to/table_config.yaml",
config_manager=config_manager
)
print(result)
# Output: "5 files loaded for table: my_table" or "Bronze: my_table skipped (source unchanged)"
Best Practices
- Enable skip-if-unchanged for tables with infrequent source changes to save processing time and bandwidth
- Monitor validation warnings in logs to identify data quality issues early
- Configure max_file_age_hours appropriately for your data refresh schedule
- Use keephistory for critical tables to maintain an audit trail and detect data anomalies
- Set sourceorder if file processing order matters for your use case
- Pre- and post-notebooks for custom transformations and cleanup before/after loading