fabric.integrity_utils
build_join_condition
def build_join_condition(source_alias: str, target_alias: str,
join_columns: list[str]) -> str
Constructs a SQL join condition string by comparing specified columns from the source and target tables, using COALESCE to handle null values and CAST to enforce consistent data type comparison as strings.
Arguments:
source_alias
str - The alias of the source table used in the SQL query.target_alias
str - The alias of the target table used in the SQL query.join_columns
list[str] - A list of column names to be used in the join condition between the source and target tables.
Returns:
str
- A SQL string representing the join condition.
build_join_delcondition
def build_join_delcondition(source_alias: str, target_alias: str,
join_columns: list[str]) -> str
Builds a join delete condition string for SQL queries.
This function generates a string used in SQL queries to define the
join delete condition by comparing specified columns in the source
and target tables. The columns used for comparison are specified in
the join_columns
parameter. The resulting string will have the format:
target_alias.column = source_alias.column
for each column and
will join all comparisons with the "AND" operator.
Arguments:
source_alias
str - The alias representing the source table in the SQL query.target_alias
str - The alias representing the target table in the SQL query.join_columns
list[str] - A list of column names to be used for the comparison between the source and target tables.
Returns:
str
- A SQL condition string joining the specified columns between the source and target tables.
remove_items_from_list
def remove_items_from_list(items: list[str],
items_to_remove: list[str]) -> list[str]
Removes specific items from a list.
This function takes a list of items and removes all elements found in a
second list of items to remove. It returns a new list containing only
the elements not present in the items_to_remove
list.
Arguments:
items
list[str] - The original list of items.items_to_remove
list[str] - The list of items to be removed from theitems
list.
Returns:
list[str]
- A new list containing only the elements fromitems
that are not present initems_to_remove
.
build_update_condition
def build_update_condition(source_alias: str, target_alias: str, join_columns,
columns: list[str])
Builds a SQL condition for checking differences between source and target columns that should determine whether an update is needed. The condition excludes join columns, as those are not updated. The resulting condition accounts for:
- Null values in the target that are non-null in the source.
- Mismatches in values between target and source except when the source is null.
Arguments:
source_alias
str - Alias for the source table in the SQL query.target_alias
str - Alias for the target table in the SQL query.join_columns
- Columns used as join keys between the source and target tables. These columns are not updated.columns
list[str] - All columns in the source and target tables that may be considered for updating.
Returns:
str
- A SQL string that represents the condition to determine whether an update is needed for any column in the source-target comparison.
build_update_set
def build_update_set(source_alias: str, target_alias: str, columns: list[str])
Constructs a dictionary mapping column names from a source alias to a target alias.
This function is designed to create a mapping for updating column values in a target using corresponding columns from a source. The returned dictionary uses the target alias prefixed with column names as keys, and the source alias prefixed with column names as values.
Arguments:
source_alias
- The alias to be used for the source columns in the mapping.target_alias
- The alias to be used for the target columns in the mapping.columns
- A list of column names for which the dictionary mapping is created. The same column names are used for both source and target.
Returns:
dict
- A dictionary containing the mapping between target and source alias-prefixed columns. Keys are formatted astarget_alias.column_name
and values assource_alias.column_name
.
check_for_duplicate_keys_table_bool
def check_for_duplicate_keys_table_bool(table_name: str,
key_columns: list) -> bool
Checks for duplicate keys in a given table and returns a boolean result.
This function validates whether there are any duplicate keys in the specified table based on the key columns provided. It utilizes Spark functionalities to perform this operation efficiently on large datasets.
Arguments:
table_name
- The name of the table to check for duplicate keys.key_columns
- A list of column names that define the key to check duplication.
Returns:
bool
- True if duplicate keys are found, otherwise False.
check_for_duplicate_keys_table
def check_for_duplicate_keys_table(table_name: str, key_columns: list) -> None
Checks for duplicate keys in a specified table and raises an exception if found.
This function fetches data from a specified table, checks for duplicate rows based on the provided key columns, and raises an exception if duplicate keys are found. A snippet of the duplicate key examples is included in the exception message to assist in debugging.
Arguments:
table_name
str - Name of the table to be checked for duplicate keys.key_columns
list - List of columns that form the composite key to determine uniqueness in the table.
Raises:
Exception
- If duplicate keys are found in the specified table, an exception is raised with the first few duplicate key examples.
check_for_duplicate_keys_dataframe
def check_for_duplicate_keys_dataframe(
df: DataFrame,
key_columns: list,
key_violation_action: str = "raise") -> DataFrame
Identifies and handles duplicate keys in a DataFrame based on the specified key columns and action. This function checks for duplicate
keys within the provided DataFrame and performs one of the following actions: removes duplicates, keeps only the first occurrence
of each duplicate, or raises an exception. The action is controlled by the key_violation_action
parameter.
Arguments:
df
DataFrame - The input DataFrame to check for duplicate keys.key_columns
list - The columns that should be considered as keys for duplicate identification.key_violation_action
str, optional - The action to take when duplicates are found. Possible values are:- "remove": Removes all duplicated rows from the DataFrame.
- "keepone": Keeps the first occurrence of each duplicate row based on the key columns.
- "raise": Raises an exception when duplicates are found. Defaults to "raise".
Returns:
DataFrame
- The resulting DataFrame after applying the specified action for duplicate keys.
check_for_duplicate_df_keys
def check_for_duplicate_df_keys(
df: DataFrame,
key_columns: list,
key_violation_action: str = "raise") -> DataFrame
Checks for duplicate keys in a DataFrame and handles duplicates based on the specified action.
This function inspects the input DataFrame for duplicate values across specified key columns.
It can handle detected duplicates by removing them, keeping the first occurrence, or raising
an exception based on the value provided for the key_violation_action
argument.
Arguments:
df
DataFrame - The input DataFrame to check for duplicate keys.key_columns
list - A list of column names to consider as composite keys for detecting duplicates.key_violation_action
str, optional - Specifies the action to take in case of duplicate keys. It can have one of the following values:- "remove": Removes rows with duplicate keys, keeping none.
- "keepone": Removes duplicates but keeps the first occurrence.
- "raise": Raises an exception if duplicates are detected. Defaults to "raise".
Returns:
DataFrame
- The input DataFrame after handling duplicates according to the specified action.
Raises:
Exception
- Ifkey_violation_action
is set to "raise" and duplicate keys are present.