> rust-async-patterns
Master Rust async programming with Tokio, async traits, error handling, and concurrent patterns. Use when building async Rust applications, implementing concurrent systems, or debugging async code.
curl "https://skillshub.wtf/Harmeet10000/skills/rust-async-patterns?format=md"Rust Async Patterns
Production patterns for async Rust programming with Tokio runtime, including tasks, channels, streams, and error handling.
When to Use This Skill
- Building async Rust applications
- Implementing concurrent network services
- Using Tokio for async I/O
- Handling async errors properly
- Debugging async code issues
- Optimizing async performance
Core Concepts
1. Async Execution Model
Future (lazy) → poll() → Ready(value) | Pending
↑ ↓
Waker ← Runtime schedules
2. Key Abstractions
| Concept | Purpose |
|---|---|
Future | Lazy computation that may complete later |
async fn | Function returning impl Future |
await | Suspend until future completes |
Task | Spawned future running concurrently |
Runtime | Executor that polls futures |
Quick Start
# Cargo.toml
[dependencies]
tokio = { version = "1", features = ["full"] }
futures = "0.3"
async-trait = "0.1"
anyhow = "1.0"
tracing = "0.1"
tracing-subscriber = "0.3"
use tokio::time::{sleep, Duration};
use anyhow::Result;
#[tokio::main]
async fn main() -> Result<()> {
// Initialize tracing
tracing_subscriber::fmt::init();
// Async operations
let result = fetch_data("https://api.example.com").await?;
println!("Got: {}", result);
Ok(())
}
async fn fetch_data(url: &str) -> Result<String> {
// Simulated async operation
sleep(Duration::from_millis(100)).await;
Ok(format!("Data from {}", url))
}
Patterns
Pattern 1: Concurrent Task Execution
use tokio::task::JoinSet;
use anyhow::Result;
// Spawn multiple concurrent tasks
async fn fetch_all_concurrent(urls: Vec<String>) -> Result<Vec<String>> {
let mut set = JoinSet::new();
for url in urls {
set.spawn(async move {
fetch_data(&url).await
});
}
let mut results = Vec::new();
while let Some(res) = set.join_next().await {
match res {
Ok(Ok(data)) => results.push(data),
Ok(Err(e)) => tracing::error!("Task failed: {}", e),
Err(e) => tracing::error!("Join error: {}", e),
}
}
Ok(results)
}
// With concurrency limit
use futures::stream::{self, StreamExt};
async fn fetch_with_limit(urls: Vec<String>, limit: usize) -> Vec<Result<String>> {
stream::iter(urls)
.map(|url| async move { fetch_data(&url).await })
.buffer_unordered(limit) // Max concurrent tasks
.collect()
.await
}
// Select first to complete
use tokio::select;
async fn race_requests(url1: &str, url2: &str) -> Result<String> {
select! {
result = fetch_data(url1) => result,
result = fetch_data(url2) => result,
}
}
Pattern 2: Channels for Communication
use tokio::sync::{mpsc, broadcast, oneshot, watch};
// Multi-producer, single-consumer
async fn mpsc_example() {
let (tx, mut rx) = mpsc::channel::<String>(100);
// Spawn producer
let tx2 = tx.clone();
tokio::spawn(async move {
tx2.send("Hello".to_string()).await.unwrap();
});
// Consume
while let Some(msg) = rx.recv().await {
println!("Got: {}", msg);
}
}
// Broadcast: multi-producer, multi-consumer
async fn broadcast_example() {
let (tx, _) = broadcast::channel::<String>(100);
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();
tx.send("Event".to_string()).unwrap();
// Both receivers get the message
let _ = rx1.recv().await;
let _ = rx2.recv().await;
}
// Oneshot: single value, single use
async fn oneshot_example() -> String {
let (tx, rx) = oneshot::channel::<String>();
tokio::spawn(async move {
tx.send("Result".to_string()).unwrap();
});
rx.await.unwrap()
}
// Watch: single producer, multi-consumer, latest value
async fn watch_example() {
let (tx, mut rx) = watch::channel("initial".to_string());
tokio::spawn(async move {
loop {
// Wait for changes
rx.changed().await.unwrap();
println!("New value: {}", *rx.borrow());
}
});
tx.send("updated".to_string()).unwrap();
}
Pattern 3: Async Error Handling
use anyhow::{Context, Result, bail};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum ServiceError {
#[error("Network error: {0}")]
Network(#[from] reqwest::Error),
#[error("Database error: {0}")]
Database(#[from] sqlx::Error),
#[error("Not found: {0}")]
NotFound(String),
#[error("Timeout after {0:?}")]
Timeout(std::time::Duration),
}
// Using anyhow for application errors
async fn process_request(id: &str) -> Result<Response> {
let data = fetch_data(id)
.await
.context("Failed to fetch data")?;
let parsed = parse_response(&data)
.context("Failed to parse response")?;
Ok(parsed)
}
// Using custom errors for library code
async fn get_user(id: &str) -> Result<User, ServiceError> {
let result = db.query(id).await?;
match result {
Some(user) => Ok(user),
None => Err(ServiceError::NotFound(id.to_string())),
}
}
// Timeout wrapper
use tokio::time::timeout;
async fn with_timeout<T, F>(duration: Duration, future: F) -> Result<T, ServiceError>
where
F: std::future::Future<Output = Result<T, ServiceError>>,
{
timeout(duration, future)
.await
.map_err(|_| ServiceError::Timeout(duration))?
}
Pattern 4: Graceful Shutdown
use tokio::signal;
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
async fn run_server() -> Result<()> {
// Method 1: CancellationToken
let token = CancellationToken::new();
let token_clone = token.clone();
// Spawn task that respects cancellation
tokio::spawn(async move {
loop {
tokio::select! {
_ = token_clone.cancelled() => {
tracing::info!("Task shutting down");
break;
}
_ = do_work() => {}
}
}
});
// Wait for shutdown signal
signal::ctrl_c().await?;
tracing::info!("Shutdown signal received");
// Cancel all tasks
token.cancel();
// Give tasks time to cleanup
tokio::time::sleep(Duration::from_secs(5)).await;
Ok(())
}
// Method 2: Broadcast channel for shutdown
async fn run_with_broadcast() -> Result<()> {
let (shutdown_tx, _) = broadcast::channel::<()>(1);
let mut rx = shutdown_tx.subscribe();
tokio::spawn(async move {
tokio::select! {
_ = rx.recv() => {
tracing::info!("Received shutdown");
}
_ = async { loop { do_work().await } } => {}
}
});
signal::ctrl_c().await?;
let _ = shutdown_tx.send(());
Ok(())
}
Pattern 5: Async Traits
use async_trait::async_trait;
#[async_trait]
pub trait Repository {
async fn get(&self, id: &str) -> Result<Entity>;
async fn save(&self, entity: &Entity) -> Result<()>;
async fn delete(&self, id: &str) -> Result<()>;
}
pub struct PostgresRepository {
pool: sqlx::PgPool,
}
#[async_trait]
impl Repository for PostgresRepository {
async fn get(&self, id: &str) -> Result<Entity> {
sqlx::query_as!(Entity, "SELECT * FROM entities WHERE id = $1", id)
.fetch_one(&self.pool)
.await
.map_err(Into::into)
}
async fn save(&self, entity: &Entity) -> Result<()> {
sqlx::query!(
"INSERT INTO entities (id, data) VALUES ($1, $2)
ON CONFLICT (id) DO UPDATE SET data = $2",
entity.id,
entity.data
)
.execute(&self.pool)
.await?;
Ok(())
}
async fn delete(&self, id: &str) -> Result<()> {
sqlx::query!("DELETE FROM entities WHERE id = $1", id)
.execute(&self.pool)
.await?;
Ok(())
}
}
// Trait object usage
async fn process(repo: &dyn Repository, id: &str) -> Result<()> {
let entity = repo.get(id).await?;
// Process...
repo.save(&entity).await
}
Pattern 6: Streams and Async Iteration
use futures::stream::{self, Stream, StreamExt};
use async_stream::stream;
// Create stream from async iterator
fn numbers_stream() -> impl Stream<Item = i32> {
stream! {
for i in 0..10 {
tokio::time::sleep(Duration::from_millis(100)).await;
yield i;
}
}
}
// Process stream
async fn process_stream() {
let stream = numbers_stream();
// Map and filter
let processed: Vec<_> = stream
.filter(|n| futures::future::ready(*n % 2 == 0))
.map(|n| n * 2)
.collect()
.await;
println!("{:?}", processed);
}
// Chunked processing
async fn process_in_chunks() {
let stream = numbers_stream();
let mut chunks = stream.chunks(3);
while let Some(chunk) = chunks.next().await {
println!("Processing chunk: {:?}", chunk);
}
}
// Merge multiple streams
async fn merge_streams() {
let stream1 = numbers_stream();
let stream2 = numbers_stream();
let merged = stream::select(stream1, stream2);
merged
.for_each(|n| async move {
println!("Got: {}", n);
})
.await;
}
Pattern 7: Resource Management
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock, Semaphore};
// Shared state with RwLock (prefer for read-heavy)
struct Cache {
data: RwLock<HashMap<String, String>>,
}
impl Cache {
async fn get(&self, key: &str) -> Option<String> {
self.data.read().await.get(key).cloned()
}
async fn set(&self, key: String, value: String) {
self.data.write().await.insert(key, value);
}
}
// Connection pool with semaphore
struct Pool {
semaphore: Semaphore,
connections: Mutex<Vec<Connection>>,
}
impl Pool {
fn new(size: usize) -> Self {
Self {
semaphore: Semaphore::new(size),
connections: Mutex::new((0..size).map(|_| Connection::new()).collect()),
}
}
async fn acquire(&self) -> PooledConnection<'_> {
let permit = self.semaphore.acquire().await.unwrap();
let conn = self.connections.lock().await.pop().unwrap();
PooledConnection { pool: self, conn: Some(conn), _permit: permit }
}
}
struct PooledConnection<'a> {
pool: &'a Pool,
conn: Option<Connection>,
_permit: tokio::sync::SemaphorePermit<'a>,
}
impl Drop for PooledConnection<'_> {
fn drop(&mut self) {
if let Some(conn) = self.conn.take() {
let pool = self.pool;
tokio::spawn(async move {
pool.connections.lock().await.push(conn);
});
}
}
}
Debugging Tips
// Enable tokio-console for runtime debugging
// Cargo.toml: tokio = { features = ["tracing"] }
// Run: RUSTFLAGS="--cfg tokio_unstable" cargo run
// Then: tokio-console
// Instrument async functions
use tracing::instrument;
#[instrument(skip(pool))]
async fn fetch_user(pool: &PgPool, id: &str) -> Result<User> {
tracing::debug!("Fetching user");
// ...
}
// Track task spawning
let span = tracing::info_span!("worker", id = %worker_id);
tokio::spawn(async move {
// Enters span when polled
}.instrument(span));
Best Practices
Do's
- Use
tokio::select!- For racing futures - Prefer channels - Over shared state when possible
- Use
JoinSet- For managing multiple tasks - Instrument with tracing - For debugging async code
- Handle cancellation - Check
CancellationToken
Don'ts
- Don't block - Never use
std::thread::sleepin async - Don't hold locks across awaits - Causes deadlocks
- Don't spawn unboundedly - Use semaphores for limits
- Don't ignore errors - Propagate with
?or log - Don't forget Send bounds - For spawned futures
Resources
> related_skills --same-repo
> vibe-ppt
Convert this into a web based slide deck using reveal.js. Use the following brand colour and logo. Primary colour: #EE4822 Theme: Light Logo: https://media.licdn.com/dms/image/v2/D560BAQFeaNrDEATcKQ/company-logo_200_200/company-logo_200_200/0/1709465010800/100xengineers_logo?e=2147483647&v=beta&t=qKncqAfB_j9ckDOxOx1eN9EEPocLTbNqliLnAU3sP6c Slide Content: Vibe Coding with Gemini Canvas Slide 1: Vibe Coding with Gemini Canvas Slide 2: What is Vibe Coding? Vibe Coding: Use natural language pro
> upwork-scrape-apply
# Upwork Job Scrape & Apply Pipeline Scrape Upwork jobs matching AI/automation keywords, generate personalized cover letters and proposals, and output to a Google Sheet with one-click apply links. ## Inputs - **Keywords**: List of search terms (default: automation, ai agent, n8n, gpt, workflow, api integration, scraping, ai consultant) - **Limit**: Max jobs to fetch (default: 50) - **Days**: Only jobs from last N days (default: 1 = last 24 hours) - **Filters**: - `--verified-payment`: Only
> ui-ux-pro-max
UI/UX design intelligence. 50 styles, 21 palettes, 50 font pairings, 20 charts, 9 stacks (React, Next.js, Vue, Svelte, SwiftUI, React Native, Flutter, Tailwind, shadcn/ui). Actions: plan, build, create, design, implement, review, fix, improve, optimize, enhance, refactor, check UI/UX code. Projects: website, landing page, dashboard, admin panel, e-commerce, SaaS, portfolio, blog, mobile app, .html, .tsx, .vue, .svelte. Elements: button, modal, navbar, sidebar, card, table, form, chart. Styles: g
> typescript-magician
Designs complex generic types, refactors `any` types to strict alternatives, creates type guards and utility types, and resolves TypeScript compiler errors. Use when the user asks about TypeScript (TS) types, generics, type inference, type guards, removing `any` types, strict typing, type errors, `infer`, `extends`, conditional types, mapped types, template literal types, branded/opaque types, or utility types like `Partial`, `Record`, `ReturnType`, and `Awaited`.