Skip to main content

easyfabric.loaders.file_tracker

contextlib

json

logging

os

datetime

timezone

notebookutils

ObjectInfo

get_tracker_file_path

def get_tracker_file_path(bronze_abfs_base_folder: str) -> str

Returns the ABFS path for the tracker file inside the table's Bronze folder.

The tracker always lives at::

<bronze_abfs_base_folder>/_tracking/tracker.json

For azblob (date-partitioned) pass the folder above the date partition, e.g. abfss://…/Files/afas/Projecten. For fabricfiles pass the folder that directly contains the files, e.g. abfss://…/Files/afas/Projecten.

Arguments:

  • bronze_abfs_base_folder - Non-date-partitioned Bronze folder for this table.

Returns:

  • str - Full ABFS path to _tracking/tracker.json.

load_previous_snapshot

def load_previous_snapshot(tracker_path: str) -> list[dict]

Loads all historical entries from the tracker (NDJSON — one entry per line).

Returns an empty list on first run (file does not exist yet).

Arguments:

  • tracker_path - ABFS path returned by :func:get_tracker_file_path.

Returns:

List of entry dicts ordered oldest-first.

save_snapshot

def save_snapshot(tracker_path: str,
file_list: list[ObjectInfo],
shortcode: str,
existing: list[dict] | None = None,
max_entries: int = 1000) -> None

Appends the current run's file metadata to the tracker (NDJSON format) and trims the log to max_entries total lines when the cap is exceeded.

Uses notebookutils.fs.append() for normal writes — the same approach as OneLakeFileHandler — so no read-before-write is needed on the happy path. put() is only used during a trim to rewrite the compacted file.

Arguments:

  • tracker_path - ABFS path from :func:get_tracker_file_path.
  • 0 - Current :class:1 list from pull_files.
  • 4 - Dataplatform object name (table_config.dataplatformobjectname``).
  • 7 - Previously loaded snapshot (from :func:8). Used only for the trim count check. When None and a trim is needed, the file is re-read.
  • ``1 - Maximum total entries to retain. Oldest are trimmed first.

has_files_changed

def has_files_changed(file_list: list[ObjectInfo],
previous: list[dict]) -> bool

Compares the current file list against the latest tracker entry per partial_filename.

Because the tracker is an append-log, previous may contain multiple entries for the same file (one per run). The dict comprehension below iterates oldest-first so the last value wins — giving us the most recent known state for each file.

Returns True on first run (empty previous).

Arguments:

  • file_list - Current list of :class:ObjectInfo from pull_files.
  • 2 - All entries loaded by :func:3.

Returns:

  • 4 - Trueif any file has changed or is new;False`` if all match.