> databricks-data-handling

Implement Delta Lake data management patterns including GDPR, PII handling, and data lifecycle. Use when implementing data retention, handling GDPR requests, or managing data lifecycle in Delta Lake. Trigger with phrases like "databricks GDPR", "databricks PII", "databricks data retention", "databricks data lifecycle", "delete user data".

fetch
$curl "https://skillshub.wtf/jeremylongshore/claude-code-plugins-plus-skills/databricks-data-handling?format=md"
SKILL.mddatabricks-data-handling

Databricks Data Handling

Overview

Implement GDPR compliance, PII masking, data retention, and row-level security in Delta Lake with Unity Catalog. Covers data classification tagging, right-to-deletion workflows, automated retention enforcement, column-level masking functions, and subject access request (SAR) reporting.

Prerequisites

  • Unity Catalog enabled
  • Understanding of data classification requirements (GDPR, CCPA, HIPAA)
  • Admin access for tags and masking functions

Instructions

Step 1: Classify and Tag Data

Use Unity Catalog tags to classify tables and columns for automated compliance enforcement.

-- Tag tables with classification and retention
ALTER TABLE prod_catalog.silver.customers
SET TAGS ('data_classification' = 'PII', 'retention_days' = '730');

ALTER TABLE prod_catalog.silver.orders
SET TAGS ('data_classification' = 'CONFIDENTIAL', 'retention_days' = '365');

ALTER TABLE prod_catalog.gold.metrics
SET TAGS ('data_classification' = 'INTERNAL', 'retention_days' = '1825');

-- Tag PII columns
ALTER TABLE prod_catalog.silver.customers
ALTER COLUMN email SET TAGS ('pii_type' = 'email');

ALTER TABLE prod_catalog.silver.customers
ALTER COLUMN phone SET TAGS ('pii_type' = 'phone');

ALTER TABLE prod_catalog.silver.customers
ALTER COLUMN full_name SET TAGS ('pii_type' = 'name');

Step 2: GDPR Right-to-Deletion

Delete all user data across PII-tagged tables with audit logging.

from pyspark.sql import SparkSession
from datetime import datetime

spark = SparkSession.builder.getOrCreate()

class GDPRHandler:
    """Handle GDPR deletion requests across all PII-tagged tables."""

    def __init__(self, catalog: str):
        self.catalog = catalog

    def find_pii_tables(self) -> list[str]:
        """Find all tables tagged as PII."""
        result = spark.sql(f"""
            SELECT table_catalog, table_schema, table_name
            FROM {self.catalog}.information_schema.table_tags
            WHERE tag_name = 'data_classification' AND tag_value = 'PII'
        """).collect()
        return [f"{r.table_catalog}.{r.table_schema}.{r.table_name}" for r in result]

    def process_deletion(self, user_id: str, request_id: str, dry_run: bool = True) -> dict:
        """Delete user data from all PII tables. Returns audit record."""
        pii_tables = self.find_pii_tables()
        audit = {
            "request_id": request_id,
            "user_id": user_id,
            "timestamp": datetime.utcnow().isoformat(),
            "dry_run": dry_run,
            "tables_processed": [],
        }

        for table in pii_tables:
            # Check if table has a user_id-like column
            cols = [c.name for c in spark.table(table).schema]
            user_col = next((c for c in cols if c in ("user_id", "customer_id", "account_id")), None)

            if not user_col:
                continue

            count = spark.sql(
                f"SELECT COUNT(*) AS cnt FROM {table} WHERE {user_col} = '{user_id}'"
            ).first().cnt

            if count > 0 and not dry_run:
                spark.sql(f"DELETE FROM {table} WHERE {user_col} = '{user_id}'")

            audit["tables_processed"].append({
                "table": table,
                "column": user_col,
                "rows_affected": count,
                "action": "DELETED" if not dry_run else "WOULD_DELETE",
            })

        # Log audit record
        if not dry_run:
            spark.createDataFrame([audit]).write.mode("append").saveAsTable(
                f"{self.catalog}.compliance.gdpr_audit_log"
            )

        return audit

# Usage
gdpr = GDPRHandler("prod_catalog")
# Always dry-run first
report = gdpr.process_deletion("user-12345", "GDPR-2024-001", dry_run=True)
for t in report["tables_processed"]:
    print(f"  {t['table']}: {t['rows_affected']} rows {t['action']}")

Step 3: Automated Data Retention

class RetentionEnforcer:
    """Delete data older than retention policy set via table tags."""

    def __init__(self, catalog: str):
        self.catalog = catalog

    def enforce(self, dry_run: bool = True) -> list[dict]:
        """Process all tables with retention_days tag."""
        tagged = spark.sql(f"""
            SELECT table_catalog, table_schema, table_name, tag_value AS retention_days
            FROM {self.catalog}.information_schema.table_tags
            WHERE tag_name = 'retention_days'
        """).collect()

        results = []
        for row in tagged:
            table = f"{row.table_catalog}.{row.table_schema}.{row.table_name}"
            retention_days = int(row.retention_days)

            # Find date column (prefer created_at, event_date, order_date)
            cols = [c.name for c in spark.table(table).schema]
            date_col = next(
                (c for c in cols if c in ("created_at", "event_date", "order_date", "timestamp")),
                None,
            )
            if not date_col:
                continue

            expired = spark.sql(f"""
                SELECT COUNT(*) AS cnt FROM {table}
                WHERE {date_col} < current_timestamp() - INTERVAL {retention_days} DAYS
            """).first().cnt

            if expired > 0 and not dry_run:
                spark.sql(f"""
                    DELETE FROM {table}
                    WHERE {date_col} < current_timestamp() - INTERVAL {retention_days} DAYS
                """)
                # Clean up deleted files
                spark.sql(f"VACUUM {table} RETAIN 168 HOURS")

            results.append({
                "table": table, "retention_days": retention_days,
                "expired_rows": expired, "action": "DELETED" if not dry_run else "WOULD_DELETE",
            })

        return results

# Schedule as a daily Databricks job
enforcer = RetentionEnforcer("prod_catalog")
for r in enforcer.enforce(dry_run=True):
    print(f"  {r['table']}: {r['expired_rows']} rows > {r['retention_days']} days {r['action']}")

Step 4: Column-Level PII Masking

-- Create masking functions for different PII types
CREATE OR REPLACE FUNCTION prod_catalog.compliance.mask_email(val STRING)
  RETURN IF(IS_ACCOUNT_GROUP_MEMBER('pii-readers'), val,
            CONCAT(LEFT(val, 1), '***@', SUBSTRING_INDEX(val, '@', -1)));

CREATE OR REPLACE FUNCTION prod_catalog.compliance.mask_phone(val STRING)
  RETURN IF(IS_ACCOUNT_GROUP_MEMBER('pii-readers'), val,
            CONCAT('***-***-', RIGHT(val, 4)));

CREATE OR REPLACE FUNCTION prod_catalog.compliance.mask_name(val STRING)
  RETURN IF(IS_ACCOUNT_GROUP_MEMBER('pii-readers'), val,
            CONCAT(LEFT(val, 1), REPEAT('*', LENGTH(val) - 1)));

-- Apply masks to columns
ALTER TABLE prod_catalog.silver.customers
  ALTER COLUMN email SET MASK prod_catalog.compliance.mask_email;

ALTER TABLE prod_catalog.silver.customers
  ALTER COLUMN phone SET MASK prod_catalog.compliance.mask_phone;

ALTER TABLE prod_catalog.silver.customers
  ALTER COLUMN full_name SET MASK prod_catalog.compliance.mask_name;

-- Test: non-privileged users see masked data
-- email: j***@company.com
-- phone: ***-***-1234
-- name: J****

Step 5: Row-Level Security

-- Restrict data access by department/region
CREATE OR REPLACE FUNCTION prod_catalog.compliance.region_filter(region STRING)
  RETURN IF(IS_ACCOUNT_GROUP_MEMBER('global-admins'), true,
            region IN (SELECT allowed_region
                       FROM prod_catalog.compliance.user_region_access
                       WHERE user_email = current_user()));

ALTER TABLE prod_catalog.gold.sales
  SET ROW FILTER prod_catalog.compliance.region_filter ON (region);

-- Analysts only see data for their assigned regions

Step 6: Subject Access Request (SAR)

def generate_sar_report(catalog: str, user_id: str) -> dict:
    """Generate a GDPR Subject Access Request report."""
    gdpr = GDPRHandler(catalog)
    pii_tables = gdpr.find_pii_tables()

    report = {"user_id": user_id, "generated_at": datetime.utcnow().isoformat(), "data": {}}

    for table in pii_tables:
        cols = [c.name for c in spark.table(table).schema]
        user_col = next((c for c in cols if c in ("user_id", "customer_id")), None)
        if not user_col:
            continue

        rows = spark.sql(f"SELECT * FROM {table} WHERE {user_col} = '{user_id}'").toPandas()
        if not rows.empty:
            report["data"][table] = rows.to_dict(orient="records")

    return report

# Generate and export
sar = generate_sar_report("prod_catalog", "user-12345")
print(f"Found data in {len(sar['data'])} tables")

Output

  • Data classification tags on tables and PII columns
  • GDPR deletion workflow with dry-run and audit logging
  • Automated retention enforcement via tagged policies
  • Column masking functions for email, phone, name
  • Row-level security restricting access by region/department
  • SAR report generation for compliance requests

Error Handling

ErrorCauseSolution
VACUUM failsRetention below 7 daysSet minimum RETAIN 168 HOURS
DELETE times outVery large tablePartition deletes across multiple runs
Mask function errorColumn type mismatchEnsure mask function signature matches column type
Missing user_id columnNon-standard schemaMaintain a table-to-user-column mapping
Row filter performanceComplex subqueryMaterialize user permissions as a small lookup table

Examples

Quick Compliance Check

-- Find all PII-tagged tables and their masking status
SELECT t.table_name, t.tag_value AS classification,
       COUNT(c.column_name) AS masked_columns
FROM prod_catalog.information_schema.table_tags t
LEFT JOIN prod_catalog.information_schema.column_tags c
    ON t.table_name = c.table_name AND c.tag_name = 'pii_type'
WHERE t.tag_name = 'data_classification' AND t.tag_value = 'PII'
GROUP BY t.table_name, t.tag_value;

Resources

Next Steps

For enterprise RBAC, see databricks-enterprise-rbac.

┌ stats

installs/wk0
░░░░░░░░░░
github stars1.7K
██████████
first seenMar 23, 2026
└────────────

┌ repo

jeremylongshore/claude-code-plugins-plus-skills
by jeremylongshore
└────────────