Skip to main content

fabric.integrity_utils

build_join_condition

def build_join_condition(source_alias: str, target_alias: str,
join_columns: list[str]) -> str

Constructs a SQL join condition string by comparing specified columns from the source and target tables, using COALESCE to handle null values and CAST to enforce consistent data type comparison as strings.

Arguments:

  • source_alias str - The alias of the source table used in the SQL query.
  • target_alias str - The alias of the target table used in the SQL query.
  • join_columns list[str] - A list of column names to be used in the join condition between the source and target tables.

Returns:

  • str - A SQL string representing the join condition.

build_join_delcondition

def build_join_delcondition(source_alias: str, target_alias: str,
join_columns: list[str]) -> str

Builds a join delete condition string for SQL queries.

This function generates a string used in SQL queries to define the join delete condition by comparing specified columns in the source and target tables. The columns used for comparison are specified in the join_columns parameter. The resulting string will have the format: target_alias.column = source_alias.column for each column and will join all comparisons with the "AND" operator.

Arguments:

  • source_alias str - The alias representing the source table in the SQL query.
  • target_alias str - The alias representing the target table in the SQL query.
  • join_columns list[str] - A list of column names to be used for the comparison between the source and target tables.

Returns:

  • str - A SQL condition string joining the specified columns between the source and target tables.

remove_items_from_list

def remove_items_from_list(items: list[str],
items_to_remove: list[str]) -> list[str]

Removes specific items from a list.

This function takes a list of items and removes all elements found in a second list of items to remove. It returns a new list containing only the elements not present in the items_to_remove list.

Arguments:

  • items list[str] - The original list of items.
  • items_to_remove list[str] - The list of items to be removed from the items list.

Returns:

  • list[str] - A new list containing only the elements from items that are not present in items_to_remove.

build_update_condition

def build_update_condition(source_alias: str, target_alias: str, join_columns,
columns: list[str])

Builds a SQL condition for checking differences between source and target columns that should determine whether an update is needed. The condition excludes join columns, as those are not updated. The resulting condition accounts for:

  1. Null values in the target that are non-null in the source.
  2. Mismatches in values between target and source except when the source is null.

Arguments:

  • source_alias str - Alias for the source table in the SQL query.
  • target_alias str - Alias for the target table in the SQL query.
  • join_columns - Columns used as join keys between the source and target tables. These columns are not updated.
  • columns list[str] - All columns in the source and target tables that may be considered for updating.

Returns:

  • str - A SQL string that represents the condition to determine whether an update is needed for any column in the source-target comparison.

build_update_set

def build_update_set(source_alias: str, target_alias: str, columns: list[str])

Constructs a dictionary mapping column names from a source alias to a target alias.

This function is designed to create a mapping for updating column values in a target using corresponding columns from a source. The returned dictionary uses the target alias prefixed with column names as keys, and the source alias prefixed with column names as values.

Arguments:

  • source_alias - The alias to be used for the source columns in the mapping.
  • target_alias - The alias to be used for the target columns in the mapping.
  • columns - A list of column names for which the dictionary mapping is created. The same column names are used for both source and target.

Returns:

  • dict - A dictionary containing the mapping between target and source alias-prefixed columns. Keys are formatted as target_alias.column_name and values as source_alias.column_name.

check_for_duplicate_keys_table_bool

def check_for_duplicate_keys_table_bool(table_name: str,
key_columns: list) -> bool

Checks for duplicate keys in a given table and returns a boolean result.

This function validates whether there are any duplicate keys in the specified table based on the key columns provided. It utilizes Spark functionalities to perform this operation efficiently on large datasets.

Arguments:

  • table_name - The name of the table to check for duplicate keys.
  • key_columns - A list of column names that define the key to check duplication.

Returns:

  • bool - True if duplicate keys are found, otherwise False.

check_for_duplicate_keys_table

def check_for_duplicate_keys_table(table_name: str, key_columns: list) -> None

Checks for duplicate keys in a specified table and raises an exception if found.

This function fetches data from a specified table, checks for duplicate rows based on the provided key columns, and raises an exception if duplicate keys are found. A snippet of the duplicate key examples is included in the exception message to assist in debugging.

Arguments:

  • table_name str - Name of the table to be checked for duplicate keys.
  • key_columns list - List of columns that form the composite key to determine uniqueness in the table.

Raises:

  • Exception - If duplicate keys are found in the specified table, an exception is raised with the first few duplicate key examples.

check_for_duplicate_keys_dataframe

def check_for_duplicate_keys_dataframe(
df: DataFrame,
key_columns: list,
key_violation_action: str = "raise") -> DataFrame

Identifies and handles duplicate keys in a DataFrame based on the specified key columns and action. This function checks for duplicate keys within the provided DataFrame and performs one of the following actions: removes duplicates, keeps only the first occurrence of each duplicate, or raises an exception. The action is controlled by the key_violation_action parameter.

Arguments:

  • df DataFrame - The input DataFrame to check for duplicate keys.
  • key_columns list - The columns that should be considered as keys for duplicate identification.
  • key_violation_action str, optional - The action to take when duplicates are found. Possible values are:
    • "remove": Removes all duplicated rows from the DataFrame.
    • "keepone": Keeps the first occurrence of each duplicate row based on the key columns.
    • "raise": Raises an exception when duplicates are found. Defaults to "raise".

Returns:

  • DataFrame - The resulting DataFrame after applying the specified action for duplicate keys.

check_for_duplicate_df_keys

def check_for_duplicate_df_keys(
df: DataFrame,
key_columns: list,
key_violation_action: str = "raise") -> DataFrame

Checks for duplicate keys in a DataFrame and handles duplicates based on the specified action.

This function inspects the input DataFrame for duplicate values across specified key columns. It can handle detected duplicates by removing them, keeping the first occurrence, or raising an exception based on the value provided for the key_violation_action argument.

Arguments:

  • df DataFrame - The input DataFrame to check for duplicate keys.
  • key_columns list - A list of column names to consider as composite keys for detecting duplicates.
  • key_violation_action str, optional - Specifies the action to take in case of duplicate keys. It can have one of the following values:
    • "remove": Removes rows with duplicate keys, keeping none.
    • "keepone": Removes duplicates but keeps the first occurrence.
    • "raise": Raises an exception if duplicates are detected. Defaults to "raise".

Returns:

  • DataFrame - The input DataFrame after handling duplicates according to the specified action.

Raises:

  • Exception - If key_violation_action is set to "raise" and duplicate keys are present.