Photo by Joshua Sortino on Unsplash
Handling Data Redaction in Delta Live Tables for Reliable Data Pipelines ๐
In today's rapidly evolving digital landscape, data security and compliance have become paramount. Delta Live Tables (DLT) is a powerful framework primarily used for data ingestion and transformation within the Databricks ecosystem. This article explores a critical aspect of data pipeline management: how to handle data redaction in DLT while maintaining the integrity and reliability of the pipeline.
Why Choose DLT? ๐ค
Delta Live Tables (DLT) is a framework for building reliable, real-time data pipelines in Databricks. It simplifies ETL (Extract, Transform, Load) processes by automating data quality, monitoring, and incremental processing. This automation streamlines the development and maintenance of robust data pipelines.
To illustrate data ingestion and transformation using DLT, we will examine a toy example based on the well-established medallion architecture. This architecture organizes data into three distinct layers:
Bronze -
cdc_users
table:This table captures all change data capture (CDC) events from the source users table.
For each modification in the source system, there will be one record indicating the type of change: insert, update, or delete.
Silver -
users_silver
:This layer represents the most recent state derived from the Bronze layer.
In our example, since a single user can have multiple CDC records, this table combines these records to generate the latest user information.
System-generated metadata fields are also removed in this layer.
Gold -
users_gold
:This layer captures only premium users based on specific business logic.
The purpose of this layer is to provide a targeted dataset of premium users for campaigns and other analytical purposes.
import dlt
import pyspark.sql.functions as F
@dlt.view(name="users_bronze", comment="Raw CDC events from users source")
def users_bronze():
return (
spark.readStream.option("skipChangeCommits", "true").table("cdc_users") # your raw CDC source table
)
dlt.create_streaming_table("users_silver")
dlt.apply_changes(
target = "users_silver",
source = "users_bronze",
keys = ["user_id"],
except_column_list=["_change_type", "_commit_timestamp"],
sequence_by = "CAST(_commit_timestamp AS TIMESTAMP)",
apply_as_deletes = "(_change_type = 'delete')",
ignore_null_updates = True,
stored_as_scd_type = "1" # Optional: if you want SCD Type 1 semantics
)
@dlt.expect_or_drop("id_not_null", "user_id IS NOT NULL")
@dlt.table(name="users_gold", comment="Premium users only")
def users_gold():
return (
dlt.read("users_silver")
.filter("premium_user = true")
)
To demonstrate the pipeline's functionality, we first create a sample users table and populate it with initial data:
create or replace table demo.cdc_users(
user_id int,
name string,
phone string,
prefix string,
premium_user boolean,
_change_type string,
_commit_timestamp timestamp
);
INSERT INTO cdc_users (
user_id, name, phone, prefix, premium_user,
_change_type, _commit_timestamp
) VALUES
(1, 'Alice Johnson', '9876543210', '+91', TRUE, 'insert', current_timestamp()),
(2, 'Bob Smith', '8765432109', '+1', FALSE, 'insert', current_timestamp()),
(3, 'Charlie Brown', '7654321098', '+44', TRUE, 'insert', current_timestamp()),
(4, 'David Williams', '6543210987', '+61', FALSE, 'insert', current_timestamp()),
(5, 'Emma Davis', '5432109876', '+33', TRUE, 'insert', current_timestamp()),
(6, 'Frank Miller', '4321098765', '+49', TRUE, 'insert', current_timestamp()),
(7, 'Grace Lee', '3210987654', '+81', FALSE, 'insert', current_timestamp()),
(8, 'Henry Wilson', '2109876543', '+7', FALSE, 'insert', current_timestamp()),
(9, 'Isla Moore', '1098765432', '+86', TRUE, 'insert', current_timestamp()),
(10, 'Jack Taylor', '0987654321', '+34', FALSE, 'insert', current_timestamp());
A sample run of this pipeline produces the following output:
For quick deployment and testing, you can use the following DLT JSON configuration:
{
"id": "",
"pipeline_type": "WORKSPACE",
"development": true,
"continuous": false,
"channel": "CURRENT",
"photon": true,
"libraries": [
{
"notebook": {
"path": "/Users/xxxx/DLT Toy"
}
}
],
"name": "test-pipeline",
"serverless": true,
"catalog": "dev_sandbox",
"schema": "demo",
"data_sampling": false
}
To simulate subsequent data changes, such as updates and deletes, you can execute the following SQL scripts:
-- Simulate an update to user 1
INSERT INTO cdc_users VALUES
(1, 'Alice J.', '9876543210', '+91', TRUE, 'update', current_timestamp());
-- Simulate a delete for user 2
INSERT INTO cdc_users VALUES
(2, NULL, NULL, NULL, NULL, 'delete', current_timestamp());
After running the DLT pipeline again with these changes, we observe 9 active users in the Silver table and the same 5 premium users in the Gold table, as the update only affected a premium user's name.
We can identify several key advantages of using DLT for managing CDC pipelines:
Simplified CDC Management with
apply_changes
:- DLT eliminates the manual effort typically required to identify and process the latest records in a CDC stream.
Data Quality Enforcement with Expectations:
- The use of expectations ensures that only valid records (e.g., non-null
user_id
) propagate to downstream tables like Gold.
- The use of expectations ensures that only valid records (e.g., non-null
Automatic Performance Optimization:
DLT automatically optimizes the underlying Delta tables for improved query performance.
Specifying clustering or optimization keys is advisable for further performance tuning.
Data Redaction ๐ก๏ธ
Consider a scenario where this pipeline is running in production, processing premium users for targeted discounts. Suddenly, a user requests to have their data redacted from our platform, opting out of receiving further communications.
INSERT INTO cdc_users VALUES
(9, NULL, NULL, NULL, NULL, 'delete', current_timestamp());
-- Simulates user leaving the platform
While Isla Moore is correctly removed from the Silver and Gold tables, the original entry persists in the cdc_users
table and the users_bronze
materialized view.
A common approach to data redaction is to update Personally Identifiable Information (PII) fields to null. This allows us to retain a record of a former premium user without exposing their sensitive details. Following this approach, the privacy team requests the data engineer to execute the following update:
update cdc_users set name=null where user_id=9;
However, upon running the DLT pipeline again, an error occurs:
The error message clearly indicates an issue with processing updates on a streaming source when change tracking is enabled. While a full refresh of the DLT table could resolve this for our toy dataset, it is often impractical for real-world datasets containing millions of records due to the processing overhead. Fortunately, DLT provides a more efficient solution.
SkipChangeCommits ๐
As highlighted in the error message, the skipChangeCommits
option is the appropriate solution for this scenario. The name has the entire answer i.e. skip commits which are of nature change(update/delete).
@dlt.view(name="users_bronze", comment="Raw CDC events from users source")
def users_bronze():
return (
spark.readStream.option("skipChangeCommits", "true").table("cdc_users") # your raw CDC source table
)
This indicates to the pipeline that any updates or changes to the original stream can be discarded. Append only events are accepted as normal and processed
Astute readers among you may ask - why it is not happening for silver to gold pipeline ? Silver is always mutated and why it is not happening for it ?
The answer is in the syntax. read_stream
vs read
. Only for streaming sources, we need them to be append only. For non streaming sources, it is processed in entirety so updates are fine. Silver to gold may take time to update and can be run even once per day in real world and hence it is perfectly normal to run in batch mode
In conclusion, we looked at the process of building DLT pipelines for data ingestion and transformation and addressed the critical scenario of data redaction. By leveraging options like skipChangeCommits
, we can effectively manage data privacy requirements without resorting to costly full refreshes, ensuring the robustness and efficiency of our data pipelines.