> redis
Build applications with Redis — caching, session storage, pub/sub, streams, rate limiting, leaderboards, and queues. Use when tasks involve in-memory data storage, real-time messaging, distributed locking, or performance optimization with caching layers.
curl "https://skillshub.wtf/TerminalSkills/skills/redis?format=md"Redis
Build fast, scalable applications with Redis as a cache, message broker, session store, or real-time data engine.
Setup
Docker (quickstart)
# Redis 7 with persistence
docker run -d --name redis -p 6379:6379 \
-v redis-data:/data \
redis:7-alpine redis-server --appendonly yes --requirepass "your-password"
Connection
"""redis_client.py — Redis connection with connection pooling."""
import redis
# Single connection
r = redis.Redis(host="localhost", port=6379, password="your-password", db=0,
decode_responses=True) # Auto-decode bytes to strings
# Connection pool (recommended for production)
pool = redis.ConnectionPool(
host="localhost", port=6379, password="your-password",
max_connections=20, # Match your app's concurrency
decode_responses=True,
)
r = redis.Redis(connection_pool=pool)
# Verify connection
r.ping() # Returns True
// redis-client.js — Node.js connection with ioredis
import Redis from 'ioredis';
const redis = new Redis({
host: 'localhost', port: 6379, password: 'your-password',
maxRetriesPerRequest: 3,
retryStrategy: (times) => Math.min(times * 50, 2000),
});
Caching Patterns
Cache-Aside (most common)
"""cache_aside.py — Cache-aside pattern with automatic expiration."""
import json
def get_user(user_id: int) -> dict:
"""Fetch user from cache, falling back to database.
Args:
user_id: The user's ID.
Returns:
User dict from cache or database.
"""
cache_key = f"user:{user_id}"
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# Cache miss — fetch from database
user = db.query("SELECT * FROM users WHERE id = %s", user_id)
r.setex(cache_key, 3600, json.dumps(user)) # Cache for 1 hour
return user
def update_user(user_id: int, data: dict):
"""Update user in database and invalidate cache.
Args:
user_id: The user's ID.
data: Fields to update.
"""
db.execute("UPDATE users SET ... WHERE id = %s", user_id)
r.delete(f"user:{user_id}") # Invalidate — next read repopulates
Write-Through Cache
"""write_through.py — Write-through: update cache and DB together."""
def save_product(product_id: str, data: dict):
"""Save product to both database and cache atomically.
Args:
product_id: Product identifier.
data: Product data dict.
"""
# Write to DB first (source of truth)
db.execute("INSERT INTO products ... ON CONFLICT UPDATE ...", data)
# Then update cache
r.setex(f"product:{product_id}", 7200, json.dumps(data)) # 2h TTL
Session Storage
"""session_store.py — HTTP session storage in Redis."""
import secrets, json
SESSION_TTL = 86400 # 24 hours
def create_session(user_id: int, metadata: dict = None) -> str:
"""Create a new session and return the session token.
Args:
user_id: Authenticated user's ID.
metadata: Optional session metadata (IP, user-agent, etc.).
"""
token = secrets.token_urlsafe(32)
session_data = {"user_id": user_id, "created_at": time.time(), **(metadata or {})}
r.setex(f"session:{token}", SESSION_TTL, json.dumps(session_data))
# Track active sessions per user for "log out everywhere"
r.sadd(f"user_sessions:{user_id}", token)
return token
def get_session(token: str) -> dict | None:
"""Validate and return session data, extending TTL on access.
Args:
token: Session token from cookie/header.
"""
data = r.get(f"session:{token}")
if not data:
return None
r.expire(f"session:{token}", SESSION_TTL) # Sliding expiration
return json.loads(data)
def destroy_all_sessions(user_id: int):
"""Invalidate all sessions for a user (password change, security breach).
Args:
user_id: The user whose sessions to destroy.
"""
tokens = r.smembers(f"user_sessions:{user_id}")
if tokens:
r.delete(*[f"session:{t}" for t in tokens])
r.delete(f"user_sessions:{user_id}")
Rate Limiting
Sliding Window
"""rate_limiter.py — Sliding window rate limiter using sorted sets."""
import time
def is_rate_limited(key: str, limit: int, window_seconds: int) -> bool:
"""Check if a key has exceeded its rate limit.
Args:
key: Identifier (e.g., IP address, API key, user ID).
limit: Maximum requests allowed in the window.
window_seconds: Window size in seconds.
Returns:
True if rate limited, False if request is allowed.
"""
now = time.time()
window_start = now - window_seconds
pipe = r.pipeline()
rk = f"ratelimit:{key}"
pipe.zremrangebyscore(rk, 0, window_start) # Remove expired entries
pipe.zadd(rk, {f"{now}": now}) # Add current request
pipe.zcard(rk) # Count requests in window
pipe.expire(rk, window_seconds) # Auto-cleanup
results = pipe.execute()
count = results[2]
return count > limit
For smoother rate limiting, consider a token bucket implementation using a Lua script that tracks tokens and refill timestamps in a Redis hash.
Pub/Sub
"""pubsub.py — Real-time messaging with Redis pub/sub."""
import threading
def publish_event(channel: str, event: dict):
"""Publish an event to a channel.
Args:
channel: Channel name (e.g., "notifications:user:123").
event: Event data dict — serialized to JSON.
"""
r.publish(channel, json.dumps(event))
def subscribe_to_events(pattern: str, callback):
"""Subscribe to channels matching a pattern.
Args:
pattern: Glob pattern (e.g., "notifications:*").
callback: Function called with (channel, data) for each message.
"""
ps = r.pubsub()
ps.psubscribe(pattern)
def listener():
for msg in ps.listen():
if msg["type"] == "pmessage":
callback(msg["channel"], json.loads(msg["data"]))
thread = threading.Thread(target=listener, daemon=True)
thread.start()
return ps # Return for cleanup: ps.punsubscribe()
Streams (persistent messaging)
Unlike pub/sub, streams persist messages and support consumer groups. Use XADD to add events, XGROUP CREATE to create consumer groups, and XREADGROUP/XACK to consume and acknowledge messages. Set maxlen on XADD to cap stream memory.
Distributed Locking
"""distributed_lock.py — Distributed lock using Redis (Redlock pattern)."""
def acquire_lock(name: str, timeout: int = 10) -> str | None:
"""Acquire a distributed lock.
Args:
name: Lock name (e.g., "process:invoice:12345").
timeout: Lock expiration in seconds (prevents deadlocks).
Returns:
Lock token if acquired, None if already held.
"""
token = secrets.token_urlsafe(16)
acquired = r.set(f"lock:{name}", token, nx=True, ex=timeout)
return token if acquired else None
# Lua script ensures atomic check-and-delete (only owner can release)
RELEASE_SCRIPT = """
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
end
return 0
"""
def release_lock(name: str, token: str) -> bool:
"""Release a lock (only if we own it).
Args:
name: Lock name.
token: Token returned by acquire_lock.
"""
return r.eval(RELEASE_SCRIPT, 1, f"lock:{name}", token) == 1
Leaderboards
Use sorted sets (ZADD, ZREVRANGE, ZREVRANK) for real-time leaderboards. ZADD sets scores, ZREVRANGE returns top N entries, and ZREVRANK gets a member's rank.
Production Configuration
Key settings: maxmemory 2gb, maxmemory-policy allkeys-lru, appendonly yes, appendfsync everysec. Always set a maxmemory limit to prevent out-of-memory crashes.
Guidelines
- Use
SETEX/SET ... EXwith TTLs for all cache keys -- keys without expiration leak memory - Pipeline multiple commands when doing batch operations -- reduces round trips
- Use Lua scripts for atomic multi-step operations (check-and-set, compare-and-delete)
- Prefer Streams over pub/sub when message persistence matters -- pub/sub drops messages if no subscriber is listening
- Key naming convention: use colons as separators (
user:123:profile,cache:product:456) - Monitor memory usage with
INFO memory-- Redis is in-memory, running out kills the process - Use
SCANinstead ofKEYS *in production --KEYSblocks the server on large datasets - Connection pooling is essential -- creating a new connection per request adds 1-2ms latency
- Redis is single-threaded for commands -- one slow Lua script blocks everything
> related_skills --same-repo
> zustand
You are an expert in Zustand, the small, fast, and scalable state management library for React. You help developers manage global state without boilerplate using Zustand's hook-based stores, selectors for performance, middleware (persist, devtools, immer), computed values, and async actions — replacing Redux complexity with a simple, un-opinionated API in under 1KB.
> zoho
Integrate and automate Zoho products. Use when a user asks to work with Zoho CRM, Zoho Books, Zoho Desk, Zoho Projects, Zoho Mail, or Zoho Creator, build custom integrations via Zoho APIs, automate workflows with Deluge scripting, sync data between Zoho apps and external systems, manage leads and deals, automate invoicing, build custom Zoho Creator apps, set up webhooks, or manage Zoho organization settings. Covers Zoho CRM, Books, Desk, Projects, Creator, and cross-product integrations.
> zod
You are an expert in Zod, the TypeScript-first schema declaration and validation library. You help developers define schemas that validate data at runtime AND infer TypeScript types at compile time — eliminating the need to write types and validators separately. Used for API input validation, form validation, environment variables, config files, and any data boundary.
> zipkin
Deploy and configure Zipkin for distributed tracing and request flow visualization. Use when a user needs to set up trace collection, instrument Java/Spring or other services with Zipkin, analyze service dependencies, or configure storage backends for trace data.