> prefect
Prefect is a modern workflow orchestration framework for Python data pipelines. Learn to define flows and tasks with decorators, handle retries and scheduling, create deployments, and monitor via the Prefect UI.
curl "https://skillshub.wtf/TerminalSkills/skills/prefect?format=md"Prefect
Prefect turns Python functions into observable, schedulable workflows with minimal boilerplate. Add @flow and @task decorators to get retries, logging, caching, and a monitoring UI.
Installation
# Install Prefect
pip install prefect
# Start the local Prefect server (UI + API)
prefect server start
# UI at http://localhost:4200
# Or use Prefect Cloud (managed)
prefect cloud login
Basic Flow
# flows/hello.py: Simple flow with tasks
from prefect import flow, task, get_run_logger
from datetime import timedelta
@task(retries=3, retry_delay_seconds=10)
def fetch_data(url: str) -> dict:
import httpx
logger = get_run_logger()
logger.info(f"Fetching {url}")
response = httpx.get(url)
response.raise_for_status()
return response.json()
@task(cache_expiration=timedelta(hours=1))
def transform(data: dict) -> list:
return [
{"id": item["id"], "value": item["amount"] * 100}
for item in data["results"]
]
@task
def load(records: list) -> int:
logger = get_run_logger()
logger.info(f"Loading {len(records)} records")
# Insert into database...
return len(records)
@flow(name="etl-pipeline", log_prints=True)
def etl_pipeline(api_url: str = "https://api.example.com/data"):
raw = fetch_data(api_url)
cleaned = transform(raw)
count = load(cleaned)
print(f"Processed {count} records")
return count
if __name__ == "__main__":
etl_pipeline()
Scheduling and Deployments
# flows/deploy.py: Create a deployment with schedule
from prefect import flow
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
@flow
def daily_report():
print("Generating daily report...")
if __name__ == "__main__":
# Deploy via Python
daily_report.serve(
name="daily-report-deployment",
cron="0 8 * * *", # Every day at 8 AM
tags=["reporting"],
parameters={"param1": "value1"},
)
# deploy.sh: Deploy and manage via CLI
# Create deployment from flow file
prefect deploy flows/hello.py:etl_pipeline \
--name etl-prod \
--pool default-agent-pool \
--cron "*/30 * * * *"
# Start a worker to execute deployments
prefect worker start --pool default-agent-pool
# Trigger a deployment run
prefect deployment run "etl-pipeline/etl-prod" --param api_url=https://api.example.com
Error Handling and Concurrency
# flows/advanced.py: Concurrent tasks, error handling, and sub-flows
from prefect import flow, task
from prefect.tasks import task_input_hash
import asyncio
@task(
retries=2,
retry_delay_seconds=[10, 60], # Exponential backoff
cache_key_fn=task_input_hash,
timeout_seconds=300,
)
def process_item(item_id: int) -> dict:
# Process a single item
return {"id": item_id, "status": "done"}
@flow
def batch_process(item_ids: list[int]):
# Submit tasks concurrently
futures = [process_item.submit(id) for id in item_ids]
results = [f.result() for f in futures]
succeeded = [r for r in results if r["status"] == "done"]
print(f"Processed {len(succeeded)}/{len(item_ids)} items")
@flow
async def async_pipeline():
# Async flow for I/O-bound work
results = await asyncio.gather(
fetch_from_api("source_a"),
fetch_from_api("source_b"),
)
return results
Blocks and Infrastructure
# flows/blocks.py: Use blocks for reusable configuration
from prefect.blocks.system import Secret, JSON
from prefect_sqlalchemy import SqlAlchemyConnector
# Store secrets (set via UI or CLI)
# prefect block register -m prefect_sqlalchemy
# Then configure in UI at http://localhost:4200/blocks
# Use in flows
@flow
def db_flow():
api_key = Secret.load("my-api-key").get()
config = JSON.load("pipeline-config").value
with SqlAlchemyConnector.load("prod-db") as conn:
result = conn.fetch_all("SELECT count(*) FROM users")
print(result)
Notifications
# flows/notifications.py: Send alerts on failure
from prefect import flow
from prefect.blocks.notifications import SlackWebhook
@flow
def monitored_flow():
try:
# ... do work
pass
except Exception as e:
slack = SlackWebhook.load("alerts-channel")
slack.notify(f"❌ Pipeline failed: {e}")
raise
# Or use automations in Prefect UI:
# Automations → Create → Trigger: Flow run failed → Action: Send Slack notification
CLI Reference
# cli.sh: Common Prefect CLI commands
# Check connection
prefect version
prefect config view
# List flows and deployments
prefect flow-run ls
prefect deployment ls
# View logs
prefect flow-run logs <flow-run-id>
# Manage work pools
prefect work-pool create my-pool --type process
prefect work-pool ls
> related_skills --same-repo
> zustand
You are an expert in Zustand, the small, fast, and scalable state management library for React. You help developers manage global state without boilerplate using Zustand's hook-based stores, selectors for performance, middleware (persist, devtools, immer), computed values, and async actions — replacing Redux complexity with a simple, un-opinionated API in under 1KB.
> zoho
Integrate and automate Zoho products. Use when a user asks to work with Zoho CRM, Zoho Books, Zoho Desk, Zoho Projects, Zoho Mail, or Zoho Creator, build custom integrations via Zoho APIs, automate workflows with Deluge scripting, sync data between Zoho apps and external systems, manage leads and deals, automate invoicing, build custom Zoho Creator apps, set up webhooks, or manage Zoho organization settings. Covers Zoho CRM, Books, Desk, Projects, Creator, and cross-product integrations.
> zod
You are an expert in Zod, the TypeScript-first schema declaration and validation library. You help developers define schemas that validate data at runtime AND infer TypeScript types at compile time — eliminating the need to write types and validators separately. Used for API input validation, form validation, environment variables, config files, and any data boundary.
> zipkin
Deploy and configure Zipkin for distributed tracing and request flow visualization. Use when a user needs to set up trace collection, instrument Java/Spring or other services with Zipkin, analyze service dependencies, or configure storage backends for trace data.