> anth-webhooks-events

Implement event-driven patterns with Claude API: streaming SSE events, Message Batches callbacks, and async processing architectures. Use when building real-time Claude integrations or processing batch results. Trigger with phrases like "anthropic events", "claude streaming events", "anthropic async processing", "claude batch callbacks".

fetch
$curl "https://skillshub.wtf/jeremylongshore/claude-code-plugins-plus-skills/anth-webhooks-events?format=md"
SKILL.mdanth-webhooks-events

Anthropic Events & Async Processing

Overview

The Claude API does not use traditional webhooks. Instead it provides two event-driven patterns: Server-Sent Events (SSE) for real-time streaming and the Message Batches API for async bulk processing. This skill covers both.

SSE Streaming Events

import anthropic

client = anthropic.Anthropic()

# Process each SSE event type
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Explain microservices."}]
) as stream:
    for event in stream:
        match event.type:
            case "message_start":
                print(f"Started: {event.message.id}")
            case "content_block_start":
                if event.content_block.type == "tool_use":
                    print(f"Tool call: {event.content_block.name}")
            case "content_block_delta":
                if event.delta.type == "text_delta":
                    print(event.delta.text, end="", flush=True)
                elif event.delta.type == "input_json_delta":
                    print(event.delta.partial_json, end="")
            case "message_delta":
                print(f"\nStop: {event.delta.stop_reason}")
                print(f"Output tokens: {event.usage.output_tokens}")
            case "message_stop":
                print("[Complete]")

SSE Event Reference

EventWhenKey Data
message_startStream beginsmessage.id, message.model, message.usage.input_tokens
content_block_startNew block beginscontent_block.type (text or tool_use), index
content_block_deltaIncremental contentdelta.text or delta.partial_json
content_block_stopBlock finishesindex
message_deltaMessage-level updatedelta.stop_reason, usage.output_tokens
message_stopStream complete(empty)
pingKeepalive(empty)

Async Batch Processing

# Submit batch (up to 100K requests, 50% cheaper)
batch = client.messages.batches.create(
    requests=[
        {
            "custom_id": f"doc-{i}",
            "params": {
                "model": "claude-sonnet-4-20250514",
                "max_tokens": 1024,
                "messages": [{"role": "user", "content": f"Summarize: {doc}"}]
            }
        }
        for i, doc in enumerate(documents)
    ]
)

# Poll for completion
import time
while True:
    status = client.messages.batches.retrieve(batch.id)
    if status.processing_status == "ended":
        break
    counts = status.request_counts
    print(f"Processing: {counts.processing} | Done: {counts.succeeded} | Errors: {counts.errored}")
    time.sleep(30)

# Stream results
for result in client.messages.batches.results(batch.id):
    if result.result.type == "succeeded":
        print(f"[{result.custom_id}]: {result.result.message.content[0].text[:100]}")
    else:
        print(f"[{result.custom_id}] ERROR: {result.result.error}")

Event-Driven Architecture Pattern

# Use queues to decouple Claude requests from user-facing endpoints
from redis import Redis
from rq import Queue

redis = Redis()
queue = Queue(connection=redis)

def process_with_claude(prompt: str, callback_url: str):
    """Background job for async Claude processing."""
    client = anthropic.Anthropic()
    msg = client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}]
    )
    # Notify your system via internal callback
    import requests
    requests.post(callback_url, json={
        "text": msg.content[0].text,
        "usage": {"input": msg.usage.input_tokens, "output": msg.usage.output_tokens}
    })

# Enqueue from your API handler
job = queue.enqueue(process_with_claude, prompt="...", callback_url="https://internal/callback")

Error Handling

IssueCauseFix
Stream disconnectsNetwork timeoutReconnect and re-request (responses are not resumable)
Batch expiredNot processed in 24hResubmit the batch
errored resultsIndividual request was invalidCheck result.error.message per request

Resources

Next Steps

For performance optimization, see anth-performance-tuning.

┌ stats

installs/wk0
░░░░░░░░░░
github stars1.7K
██████████
first seenMar 23, 2026
└────────────

┌ repo

jeremylongshore/claude-code-plugins-plus-skills
by jeremylongshore
└────────────