> databricks-data-handling
Implement Delta Lake data management patterns including GDPR, PII handling, and data lifecycle. Use when implementing data retention, handling GDPR requests, or managing data lifecycle in Delta Lake. Trigger with phrases like "databricks GDPR", "databricks PII", "databricks data retention", "databricks data lifecycle", "delete user data".
curl "https://skillshub.wtf/jeremylongshore/claude-code-plugins-plus-skills/databricks-data-handling?format=md"Databricks Data Handling
Overview
Implement GDPR compliance, PII masking, data retention, and row-level security in Delta Lake with Unity Catalog. Covers data classification tagging, right-to-deletion workflows, automated retention enforcement, column-level masking functions, and subject access request (SAR) reporting.
Prerequisites
- Unity Catalog enabled
- Understanding of data classification requirements (GDPR, CCPA, HIPAA)
- Admin access for tags and masking functions
Instructions
Step 1: Classify and Tag Data
Use Unity Catalog tags to classify tables and columns for automated compliance enforcement.
-- Tag tables with classification and retention
ALTER TABLE prod_catalog.silver.customers
SET TAGS ('data_classification' = 'PII', 'retention_days' = '730');
ALTER TABLE prod_catalog.silver.orders
SET TAGS ('data_classification' = 'CONFIDENTIAL', 'retention_days' = '365');
ALTER TABLE prod_catalog.gold.metrics
SET TAGS ('data_classification' = 'INTERNAL', 'retention_days' = '1825');
-- Tag PII columns
ALTER TABLE prod_catalog.silver.customers
ALTER COLUMN email SET TAGS ('pii_type' = 'email');
ALTER TABLE prod_catalog.silver.customers
ALTER COLUMN phone SET TAGS ('pii_type' = 'phone');
ALTER TABLE prod_catalog.silver.customers
ALTER COLUMN full_name SET TAGS ('pii_type' = 'name');
Step 2: GDPR Right-to-Deletion
Delete all user data across PII-tagged tables with audit logging.
from pyspark.sql import SparkSession
from datetime import datetime
spark = SparkSession.builder.getOrCreate()
class GDPRHandler:
"""Handle GDPR deletion requests across all PII-tagged tables."""
def __init__(self, catalog: str):
self.catalog = catalog
def find_pii_tables(self) -> list[str]:
"""Find all tables tagged as PII."""
result = spark.sql(f"""
SELECT table_catalog, table_schema, table_name
FROM {self.catalog}.information_schema.table_tags
WHERE tag_name = 'data_classification' AND tag_value = 'PII'
""").collect()
return [f"{r.table_catalog}.{r.table_schema}.{r.table_name}" for r in result]
def process_deletion(self, user_id: str, request_id: str, dry_run: bool = True) -> dict:
"""Delete user data from all PII tables. Returns audit record."""
pii_tables = self.find_pii_tables()
audit = {
"request_id": request_id,
"user_id": user_id,
"timestamp": datetime.utcnow().isoformat(),
"dry_run": dry_run,
"tables_processed": [],
}
for table in pii_tables:
# Check if table has a user_id-like column
cols = [c.name for c in spark.table(table).schema]
user_col = next((c for c in cols if c in ("user_id", "customer_id", "account_id")), None)
if not user_col:
continue
count = spark.sql(
f"SELECT COUNT(*) AS cnt FROM {table} WHERE {user_col} = '{user_id}'"
).first().cnt
if count > 0 and not dry_run:
spark.sql(f"DELETE FROM {table} WHERE {user_col} = '{user_id}'")
audit["tables_processed"].append({
"table": table,
"column": user_col,
"rows_affected": count,
"action": "DELETED" if not dry_run else "WOULD_DELETE",
})
# Log audit record
if not dry_run:
spark.createDataFrame([audit]).write.mode("append").saveAsTable(
f"{self.catalog}.compliance.gdpr_audit_log"
)
return audit
# Usage
gdpr = GDPRHandler("prod_catalog")
# Always dry-run first
report = gdpr.process_deletion("user-12345", "GDPR-2024-001", dry_run=True)
for t in report["tables_processed"]:
print(f" {t['table']}: {t['rows_affected']} rows {t['action']}")
Step 3: Automated Data Retention
class RetentionEnforcer:
"""Delete data older than retention policy set via table tags."""
def __init__(self, catalog: str):
self.catalog = catalog
def enforce(self, dry_run: bool = True) -> list[dict]:
"""Process all tables with retention_days tag."""
tagged = spark.sql(f"""
SELECT table_catalog, table_schema, table_name, tag_value AS retention_days
FROM {self.catalog}.information_schema.table_tags
WHERE tag_name = 'retention_days'
""").collect()
results = []
for row in tagged:
table = f"{row.table_catalog}.{row.table_schema}.{row.table_name}"
retention_days = int(row.retention_days)
# Find date column (prefer created_at, event_date, order_date)
cols = [c.name for c in spark.table(table).schema]
date_col = next(
(c for c in cols if c in ("created_at", "event_date", "order_date", "timestamp")),
None,
)
if not date_col:
continue
expired = spark.sql(f"""
SELECT COUNT(*) AS cnt FROM {table}
WHERE {date_col} < current_timestamp() - INTERVAL {retention_days} DAYS
""").first().cnt
if expired > 0 and not dry_run:
spark.sql(f"""
DELETE FROM {table}
WHERE {date_col} < current_timestamp() - INTERVAL {retention_days} DAYS
""")
# Clean up deleted files
spark.sql(f"VACUUM {table} RETAIN 168 HOURS")
results.append({
"table": table, "retention_days": retention_days,
"expired_rows": expired, "action": "DELETED" if not dry_run else "WOULD_DELETE",
})
return results
# Schedule as a daily Databricks job
enforcer = RetentionEnforcer("prod_catalog")
for r in enforcer.enforce(dry_run=True):
print(f" {r['table']}: {r['expired_rows']} rows > {r['retention_days']} days {r['action']}")
Step 4: Column-Level PII Masking
-- Create masking functions for different PII types
CREATE OR REPLACE FUNCTION prod_catalog.compliance.mask_email(val STRING)
RETURN IF(IS_ACCOUNT_GROUP_MEMBER('pii-readers'), val,
CONCAT(LEFT(val, 1), '***@', SUBSTRING_INDEX(val, '@', -1)));
CREATE OR REPLACE FUNCTION prod_catalog.compliance.mask_phone(val STRING)
RETURN IF(IS_ACCOUNT_GROUP_MEMBER('pii-readers'), val,
CONCAT('***-***-', RIGHT(val, 4)));
CREATE OR REPLACE FUNCTION prod_catalog.compliance.mask_name(val STRING)
RETURN IF(IS_ACCOUNT_GROUP_MEMBER('pii-readers'), val,
CONCAT(LEFT(val, 1), REPEAT('*', LENGTH(val) - 1)));
-- Apply masks to columns
ALTER TABLE prod_catalog.silver.customers
ALTER COLUMN email SET MASK prod_catalog.compliance.mask_email;
ALTER TABLE prod_catalog.silver.customers
ALTER COLUMN phone SET MASK prod_catalog.compliance.mask_phone;
ALTER TABLE prod_catalog.silver.customers
ALTER COLUMN full_name SET MASK prod_catalog.compliance.mask_name;
-- Test: non-privileged users see masked data
-- email: j***@company.com
-- phone: ***-***-1234
-- name: J****
Step 5: Row-Level Security
-- Restrict data access by department/region
CREATE OR REPLACE FUNCTION prod_catalog.compliance.region_filter(region STRING)
RETURN IF(IS_ACCOUNT_GROUP_MEMBER('global-admins'), true,
region IN (SELECT allowed_region
FROM prod_catalog.compliance.user_region_access
WHERE user_email = current_user()));
ALTER TABLE prod_catalog.gold.sales
SET ROW FILTER prod_catalog.compliance.region_filter ON (region);
-- Analysts only see data for their assigned regions
Step 6: Subject Access Request (SAR)
def generate_sar_report(catalog: str, user_id: str) -> dict:
"""Generate a GDPR Subject Access Request report."""
gdpr = GDPRHandler(catalog)
pii_tables = gdpr.find_pii_tables()
report = {"user_id": user_id, "generated_at": datetime.utcnow().isoformat(), "data": {}}
for table in pii_tables:
cols = [c.name for c in spark.table(table).schema]
user_col = next((c for c in cols if c in ("user_id", "customer_id")), None)
if not user_col:
continue
rows = spark.sql(f"SELECT * FROM {table} WHERE {user_col} = '{user_id}'").toPandas()
if not rows.empty:
report["data"][table] = rows.to_dict(orient="records")
return report
# Generate and export
sar = generate_sar_report("prod_catalog", "user-12345")
print(f"Found data in {len(sar['data'])} tables")
Output
- Data classification tags on tables and PII columns
- GDPR deletion workflow with dry-run and audit logging
- Automated retention enforcement via tagged policies
- Column masking functions for email, phone, name
- Row-level security restricting access by region/department
- SAR report generation for compliance requests
Error Handling
| Error | Cause | Solution |
|---|---|---|
VACUUM fails | Retention below 7 days | Set minimum RETAIN 168 HOURS |
DELETE times out | Very large table | Partition deletes across multiple runs |
| Mask function error | Column type mismatch | Ensure mask function signature matches column type |
Missing user_id column | Non-standard schema | Maintain a table-to-user-column mapping |
| Row filter performance | Complex subquery | Materialize user permissions as a small lookup table |
Examples
Quick Compliance Check
-- Find all PII-tagged tables and their masking status
SELECT t.table_name, t.tag_value AS classification,
COUNT(c.column_name) AS masked_columns
FROM prod_catalog.information_schema.table_tags t
LEFT JOIN prod_catalog.information_schema.column_tags c
ON t.table_name = c.table_name AND c.tag_name = 'pii_type'
WHERE t.tag_name = 'data_classification' AND t.tag_value = 'PII'
GROUP BY t.table_name, t.tag_value;
Resources
Next Steps
For enterprise RBAC, see databricks-enterprise-rbac.
> related_skills --same-repo
> fathom-cost-tuning
Optimize Fathom API usage and plan selection. Trigger with phrases like "fathom cost", "fathom pricing", "fathom plan".
> fathom-core-workflow-b
Sync Fathom meeting data to CRM and build automated follow-up workflows. Use when integrating Fathom with Salesforce, HubSpot, or custom CRMs, or creating automated post-meeting email summaries. Trigger with phrases like "fathom crm sync", "fathom salesforce", "fathom follow-up", "fathom post-meeting workflow".
> fathom-core-workflow-a
Build a meeting analytics pipeline with Fathom transcripts and summaries. Use when extracting insights from meetings, building CRM sync, or creating automated meeting follow-up workflows. Trigger with phrases like "fathom analytics", "fathom meeting pipeline", "fathom transcript analysis", "fathom action items sync".
> fathom-common-errors
Diagnose and fix Fathom API errors including auth failures and missing data. Use when API calls fail, transcripts are empty, or webhooks are not firing. Trigger with phrases like "fathom error", "fathom not working", "fathom api failure", "fix fathom".