> audit-logging
Implement tamper-evident audit logs for compliance (SOC 2, HIPAA, PCI DSS). Use when building compliance audit trails, tracking who did what and when, or implementing immutable event logs that satisfy regulatory retention requirements.
curl "https://skillshub.wtf/TerminalSkills/skills/audit-logging?format=md"Audit Logging
Overview
Compliance audit logs must answer: who did what to which resource, when, from where, and with what result. They must also be tamper-evident — an auditor must be able to verify logs haven't been modified.
Regulatory retention requirements:
| Standard | Retention |
|---|---|
| HIPAA | 6 years |
| PCI DSS | 12 months online + 12 months archive |
| SOC 2 (typical) | 1 year |
| GDPR | Defined by purpose (often 1-3 years) |
Required Log Fields
interface AuditLogEntry {
// Core required fields
id: string; // UUID — unique event identifier
timestamp: string; // ISO 8601 UTC — when the event occurred
actor_id: string; // User/service that performed the action
actor_type: 'user' | 'service' | 'admin' | 'system';
action: string; // What happened: "read" | "write" | "delete" | "login" | "export"
resource_type: string; // "patient_record" | "invoice" | "user_account"
resource_id: string; // ID of the affected resource
result: 'success' | 'failure' | 'denied';
// Context fields
ip_address: string; // Source IP
user_agent?: string; // Browser/client identifier
session_id?: string; // Session identifier
// Optional fields
changed_fields?: string[]; // For write operations: which fields changed
old_values?: Record<string, unknown>; // Previous values (be careful with PII)
reason?: string; // Clinical justification (HIPAA), business reason
// Tamper-evidence
prev_hash?: string; // Hash of previous log entry (hash chaining)
hash: string; // SHA-256 of this entry
}
Hash Chain Implementation
Hash chaining makes log tampering detectable: each entry includes a hash of the previous entry. Modifying any entry breaks the chain.
import json
import hashlib
import uuid
from datetime import datetime, timezone
from typing import Optional
class AuditLogger:
def __init__(self, db_connection):
self.db = db_connection
self._last_hash: Optional[str] = None
def _compute_hash(self, entry: dict, prev_hash: Optional[str]) -> str:
"""Compute SHA-256 of the log entry for tamper-evidence."""
hashable = {
"id": entry["id"],
"timestamp": entry["timestamp"],
"actor_id": entry["actor_id"],
"action": entry["action"],
"resource_type": entry["resource_type"],
"resource_id": entry["resource_id"],
"result": entry["result"],
"prev_hash": prev_hash or "GENESIS"
}
content = json.dumps(hashable, sort_keys=True)
return hashlib.sha256(content.encode()).hexdigest()
async def log(
self,
actor_id: str,
actor_type: str,
action: str,
resource_type: str,
resource_id: str,
result: str,
ip_address: str,
**kwargs
) -> dict:
"""Create and persist a tamper-evident audit log entry."""
# Get last hash for chaining
prev_hash = self._last_hash or await self._get_last_hash()
entry = {
"id": str(uuid.uuid4()),
"timestamp": datetime.now(timezone.utc).isoformat(),
"actor_id": actor_id,
"actor_type": actor_type,
"action": action,
"resource_type": resource_type,
"resource_id": resource_id,
"result": result,
"ip_address": ip_address,
"prev_hash": prev_hash or "GENESIS",
**kwargs
}
entry["hash"] = self._compute_hash(entry, prev_hash)
self._last_hash = entry["hash"]
# Persist to append-only storage
await self.db.audit_logs.insert_one(entry)
return entry
async def verify_chain(self, limit: int = 1000) -> bool:
"""Verify the hash chain integrity."""
entries = await self.db.audit_logs.find(
sort=[("timestamp", 1)],
limit=limit
)
prev_hash = "GENESIS"
for i, entry in enumerate(entries):
expected_hash = self._compute_hash(entry, prev_hash)
if entry["hash"] != expected_hash:
print(f"❌ Chain broken at entry {i}: id={entry['id']}")
return False
if entry["prev_hash"] != prev_hash:
print(f"❌ prev_hash mismatch at entry {i}")
return False
prev_hash = entry["hash"]
print(f"✅ Chain verified: {len(entries)} entries intact")
return True
async def _get_last_hash(self) -> Optional[str]:
last = await self.db.audit_logs.find_one(sort=[("timestamp", -1)])
return last["hash"] if last else None
Express.js Audit Middleware
const { v4: uuidv4 } = require('uuid');
const crypto = require('crypto');
class AuditLogger {
constructor(db) {
this.db = db;
}
async log({ actorId, actorType = 'user', action, resourceType, resourceId,
result, ipAddress, sessionId, reason, changedFields }) {
const prevHash = await this.getLastHash();
const entry = {
id: uuidv4(),
timestamp: new Date().toISOString(),
actor_id: actorId,
actor_type: actorType,
action,
resource_type: resourceType,
resource_id: resourceId,
result,
ip_address: ipAddress,
session_id: sessionId,
reason,
changed_fields: changedFields,
prev_hash: prevHash || 'GENESIS',
};
entry.hash = this.computeHash(entry);
await this.db.auditLogs.create(entry);
return entry;
}
computeHash(entry) {
const hashable = JSON.stringify({
id: entry.id,
timestamp: entry.timestamp,
actor_id: entry.actor_id,
action: entry.action,
resource_type: entry.resource_type,
resource_id: entry.resource_id,
result: entry.result,
prev_hash: entry.prev_hash,
});
return crypto.createHash('sha256').update(hashable).digest('hex');
}
async getLastHash() {
const last = await this.db.auditLogs.findOne({ order: [['timestamp', 'DESC']] });
return last?.hash || null;
}
}
// Express middleware — auto-log all requests
const createAuditMiddleware = (auditLogger) => async (req, res, next) => {
const start = Date.now();
// Capture response
const originalSend = res.send;
res.send = function(body) {
res.send = originalSend;
// Log after response sent
setImmediate(async () => {
try {
await auditLogger.log({
actorId: req.user?.id || 'anonymous',
actorType: req.user ? 'user' : 'anonymous',
action: `${req.method.toLowerCase()}_${req.route?.path || req.path}`,
resourceType: req.params.resourceType || 'api_request',
resourceId: req.params.id || req.path,
result: res.statusCode < 400 ? 'success' :
res.statusCode === 403 ? 'denied' : 'failure',
ipAddress: req.ip,
sessionId: req.session?.id,
duration_ms: Date.now() - start,
});
} catch (err) {
console.error('Audit log failed:', err);
// Never let audit logging errors break the main flow
}
});
return originalSend.apply(this, arguments);
};
next();
};
PostgreSQL Append-Only Table
-- Append-only audit log table
-- The CHECK constraint and trigger prevent UPDATE/DELETE
CREATE TABLE audit_logs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
actor_id TEXT NOT NULL,
actor_type TEXT NOT NULL CHECK (actor_type IN ('user', 'service', 'admin', 'system')),
action TEXT NOT NULL,
resource_type TEXT NOT NULL,
resource_id TEXT NOT NULL,
result TEXT NOT NULL CHECK (result IN ('success', 'failure', 'denied')),
ip_address INET,
session_id TEXT,
reason TEXT,
changed_fields TEXT[],
prev_hash TEXT,
hash TEXT NOT NULL UNIQUE
);
-- Create append-only trigger — prevent modification
CREATE OR REPLACE FUNCTION prevent_audit_modification()
RETURNS TRIGGER AS $$
BEGIN
RAISE EXCEPTION 'Audit logs are immutable — modification not allowed';
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER audit_logs_immutable
BEFORE UPDATE OR DELETE ON audit_logs
FOR EACH ROW EXECUTE FUNCTION prevent_audit_modification();
-- Indexes for common queries
CREATE INDEX idx_audit_actor_id ON audit_logs(actor_id);
CREATE INDEX idx_audit_resource ON audit_logs(resource_type, resource_id);
CREATE INDEX idx_audit_timestamp ON audit_logs(timestamp DESC);
CREATE INDEX idx_audit_action ON audit_logs(action);
-- Query: All actions by a user in the last 30 days
SELECT timestamp, action, resource_type, resource_id, result, ip_address
FROM audit_logs
WHERE actor_id = 'user-123'
AND timestamp > NOW() - INTERVAL '30 days'
ORDER BY timestamp DESC;
-- Query: All access to a specific patient record
SELECT timestamp, actor_id, action, result, ip_address, reason
FROM audit_logs
WHERE resource_type = 'patient_record'
AND resource_id = 'patient-456'
ORDER BY timestamp DESC;
-- Query: Failed access attempts (security monitoring)
SELECT actor_id, COUNT(*) as failed_attempts, MAX(timestamp) as last_attempt
FROM audit_logs
WHERE result IN ('failure', 'denied')
AND timestamp > NOW() - INTERVAL '24 hours'
GROUP BY actor_id
HAVING COUNT(*) > 5
ORDER BY failed_attempts DESC;
AWS CloudTrail + Immutable S3
# Create CloudTrail that logs to immutable S3 bucket
aws cloudtrail create-trail \
--name compliance-audit-trail \
--s3-bucket-name my-audit-logs-bucket \
--include-global-service-events \
--is-multi-region-trail \
--enable-log-file-validation # Cryptographic log integrity
# Create S3 bucket with Object Lock (WORM - Write Once Read Many)
aws s3api create-bucket \
--bucket my-audit-logs-bucket \
--object-lock-enabled-for-bucket
# Set default retention: compliance mode (cannot delete even by admin)
aws s3api put-object-lock-configuration \
--bucket my-audit-logs-bucket \
--object-lock-configuration '{
"ObjectLockEnabled": "Enabled",
"Rule": {
"DefaultRetention": {
"Mode": "COMPLIANCE",
"Years": 6
}
}
}'
# Verify log file integrity (CloudTrail)
aws cloudtrail validate-logs \
--trail-arn arn:aws:cloudtrail:us-east-1:123456789:trail/compliance-audit-trail \
--start-time 2024-01-01T00:00:00Z
FastAPI Audit Middleware (Python)
from fastapi import FastAPI, Request
import time
app = FastAPI()
@app.middleware("http")
async def audit_middleware(request: Request, call_next):
start_time = time.time()
# Extract context before processing
actor_id = None
if hasattr(request.state, 'user'):
actor_id = request.state.user.id
response = await call_next(request)
duration_ms = int((time.time() - start_time) * 1000)
# Determine result from status code
if response.status_code < 400:
result = "success"
elif response.status_code == 403:
result = "denied"
else:
result = "failure"
# Log asynchronously to avoid blocking response
import asyncio
asyncio.create_task(audit_logger.log(
actor_id=actor_id or "anonymous",
actor_type="user" if actor_id else "anonymous",
action=f"{request.method.lower()}:{request.url.path}",
resource_type="api_endpoint",
resource_id=str(request.url.path),
result=result,
ip_address=request.client.host,
duration_ms=duration_ms,
http_status=response.status_code,
))
return response
Compliance Checklist
- All required fields logged (actor, action, resource, timestamp, IP, result)
- Tamper-evidence implemented (hash chaining or digital signatures)
- Append-only storage (trigger prevents UPDATE/DELETE, or S3 Object Lock)
- Log aggregation centralized (avoid local file logs that can be deleted)
- Retention policy matches regulation (HIPAA: 6yr, PCI: 1yr+1yr archive)
- Alerts for anomalies (failed logins, access spikes, unusual hours)
- Logs reviewed periodically (monthly for SOC 2, daily for high-security)
- Log access itself is audited (who viewed audit logs)
- Backup of audit logs separate from primary storage
- WORM storage for regulatory compliance archives
> related_skills --same-repo
> zustand
You are an expert in Zustand, the small, fast, and scalable state management library for React. You help developers manage global state without boilerplate using Zustand's hook-based stores, selectors for performance, middleware (persist, devtools, immer), computed values, and async actions — replacing Redux complexity with a simple, un-opinionated API in under 1KB.
> zod
You are an expert in Zod, the TypeScript-first schema declaration and validation library. You help developers define schemas that validate data at runtime AND infer TypeScript types at compile time — eliminating the need to write types and validators separately. Used for API input validation, form validation, environment variables, config files, and any data boundary.
> xero-accounting
Integrate with the Xero accounting API to sync invoices, expenses, bank transactions, and contacts — and generate financial reports like P&L and balance sheet. Use when: connecting apps to Xero, automating bookkeeping workflows, syncing accounting data, or pulling financial reports programmatically.
> windsurf-rules
Configure Windsurf AI coding assistant with .windsurfrules and workspace rules. Use when: customizing Windsurf for a project, setting AI coding standards, creating team-shared Windsurf configurations, or tuning Cascade AI behavior.