> anth-load-scale

Implement load testing, auto-scaling, and capacity planning for Claude API. Use when running performance benchmarks, planning for traffic spikes, or configuring horizontal scaling for Claude-powered services. Trigger with phrases like "anthropic load test", "claude scaling", "anthropic capacity planning", "scale claude api".

fetch
$curl "https://skillshub.wtf/jeremylongshore/claude-code-plugins-plus-skills/anth-load-scale?format=md"
SKILL.mdanth-load-scale

Anthropic Load & Scale

Overview

Capacity planning and load testing for Claude API integrations. Key constraint: your rate limits (RPM/ITPM/OTPM) are the ceiling, not your infrastructure.

Capacity Planning

# Calculate required tier based on traffic
def plan_capacity(
    requests_per_minute: int,
    avg_input_tokens: int,
    avg_output_tokens: int,
    model: str = "claude-sonnet-4-20250514"
) -> dict:
    itpm = requests_per_minute * avg_input_tokens
    otpm = requests_per_minute * avg_output_tokens

    # Estimate monthly cost
    pricing = {
        "claude-haiku-4-20250514": (0.80, 4.00),
        "claude-sonnet-4-20250514": (3.00, 15.00),
        "claude-opus-4-20250514": (15.00, 75.00),
    }
    rates = pricing[model]
    cost_per_request = (avg_input_tokens * rates[0] + avg_output_tokens * rates[1]) / 1_000_000
    monthly_cost = cost_per_request * requests_per_minute * 60 * 24 * 30

    return {
        "rpm_needed": requests_per_minute,
        "itpm_needed": itpm,
        "otpm_needed": otpm,
        "cost_per_request": f"${cost_per_request:.4f}",
        "monthly_estimate": f"${monthly_cost:,.0f}",
        "recommendation": "Contact Anthropic sales for Scale tier" if requests_per_minute > 500 else "Self-serve tiers sufficient",
    }

print(plan_capacity(100, 500, 200))

Load Testing Script

import anthropic
import asyncio
import time
from dataclasses import dataclass

@dataclass
class LoadTestResult:
    total_requests: int = 0
    successful: int = 0
    failed: int = 0
    rate_limited: int = 0
    avg_latency_ms: float = 0
    p99_latency_ms: float = 0
    total_input_tokens: int = 0
    total_output_tokens: int = 0

async def load_test(
    concurrency: int = 10,
    total_requests: int = 100,
    model: str = "claude-haiku-4-20250514"
) -> LoadTestResult:
    client = anthropic.Anthropic()
    result = LoadTestResult()
    latencies = []
    semaphore = asyncio.Semaphore(concurrency)

    async def single_request():
        async with semaphore:
            start = time.monotonic()
            try:
                msg = client.messages.create(
                    model=model,
                    max_tokens=64,
                    messages=[{"role": "user", "content": "Respond with exactly: OK"}]
                )
                duration = (time.monotonic() - start) * 1000
                latencies.append(duration)
                result.successful += 1
                result.total_input_tokens += msg.usage.input_tokens
                result.total_output_tokens += msg.usage.output_tokens
            except anthropic.RateLimitError:
                result.rate_limited += 1
            except Exception:
                result.failed += 1
            result.total_requests += 1

    tasks = [single_request() for _ in range(total_requests)]
    await asyncio.gather(*tasks)

    if latencies:
        latencies.sort()
        result.avg_latency_ms = sum(latencies) / len(latencies)
        result.p99_latency_ms = latencies[int(len(latencies) * 0.99)]

    return result

# Run: asyncio.run(load_test(concurrency=10, total_requests=50))

Scaling Strategies

StrategyWhenImplementation
Queue-based processing> 50 RPM sustainedRedis/SQS queue + worker pool
Model routingMixed workloadsHaiku for simple, Sonnet for complex
Message BatchesOffline processing100K requests, 50% cheaper, no RPM impact
Prompt cachingRepeated system prompts90% input token savings
Request coalescingDuplicate promptsCache identical request hashes

Horizontal Scaling Pattern

# Multiple application instances sharing the same API key
# Rate limits are per-organization, NOT per-instance
# Use a shared rate limiter (Redis) to coordinate

import redis

r = redis.Redis()

def check_rate_limit(key: str = "claude:rpm", limit: int = 100, window: int = 60) -> bool:
    current = r.incr(key)
    if current == 1:
        r.expire(key, window)
    return current <= limit

Error Handling

IssueCauseFix
429 during load testExceeded tier limitsReduce concurrency or upgrade tier
Increasing latency under loadOutput queue saturationReduce max_tokens
Uneven request distributionNo load balancingUse queue for fair distribution

Resources

Next Steps

For reliability patterns, see anth-reliability-patterns.

┌ stats

installs/wk0
░░░░░░░░░░
github stars1.7K
██████████
first seenMar 23, 2026
└────────────

┌ repo

jeremylongshore/claude-code-plugins-plus-skills
by jeremylongshore
└────────────