> customerio-reference-architecture
Implement Customer.io enterprise reference architecture. Use when designing integration layers, event-driven architectures, or enterprise-grade Customer.io setups. Trigger: "customer.io architecture", "customer.io design", "customer.io enterprise", "customer.io integration pattern".
curl "https://skillshub.wtf/jeremylongshore/claude-code-plugins-plus-skills/customerio-reference-architecture?format=md"Customer.io Reference Architecture
Overview
Enterprise-grade reference architecture for Customer.io: a service layer separating Track and App API concerns, event-driven processing with message queues, repository pattern for user-to-CIO sync, webhook event bus, and infrastructure as code.
Architecture Principles
- Two Clients, Two Concerns —
TrackClientfor behavioral data in,APIClientfor messages out - Event-Driven — Message queues decouple your app from Customer.io API availability
- Idempotent Operations — All writes safely retryable via content hashing
- Service Layer — Business logic never calls Customer.io SDK directly
- Observability — Every operation emits timing and error metrics
Architecture Diagram
┌─────────────┐ ┌───────────────────┐ ┌──────────────┐
│ Application │───>│ MessagingService │───>│ Track API │
│ Routes │ │ (service layer) │ │ identify() │
└─────────────┘ │ │ │ track() │
│ - identify users │ └──────────────┘
│ - track events │
│ - send txn emails │ ┌──────────────┐
│ │───>│ App API │
└───────────────────┘ │ sendEmail() │
│ │ broadcast() │
│ └──────────────┘
v
┌───────────────────┐
│ Event Queue │ ┌──────────────┐
│ (Redis/Kafka) │───>│ DLQ │
│ for reliability │ │ (failures) │
└───────────────────┘ └──────────────┘
┌─────────────┐ ┌───────────────────┐ ┌──────────────┐
│ Customer.io │───>│ Webhook Handler │───>│ BigQuery │
│ Webhooks │ │ HMAC verification │ │ (analytics) │
└─────────────┘ │ Event routing │ └──────────────┘
Instructions
Step 1: Core Service Layer
// services/messaging-service.ts
import { EventEmitter } from "events";
import { TrackClient, APIClient, SendEmailRequest, RegionUS, RegionEU } from "customerio-node";
interface MessagingConfig {
siteId: string;
trackApiKey: string;
appApiKey: string;
region: "us" | "eu";
}
export class MessagingService extends EventEmitter {
private track: TrackClient;
private app: APIClient;
constructor(config: MessagingConfig) {
super();
const region = config.region === "eu" ? RegionEU : RegionUS;
this.track = new TrackClient(config.siteId, config.trackApiKey, { region });
this.app = new APIClient(config.appApiKey, { region });
}
async identifyUser(userId: string, attrs: Record<string, any>): Promise<void> {
const start = Date.now();
try {
await this.track.identify(userId, {
...attrs,
last_seen_at: Math.floor(Date.now() / 1000),
});
this.emit("identify", { userId, latencyMs: Date.now() - start });
} catch (err) {
this.emit("error", { operation: "identify", userId, err });
throw err;
}
}
async trackEvent(
userId: string,
name: string,
data?: Record<string, any>
): Promise<void> {
const start = Date.now();
try {
await this.track.track(userId, { name, data });
this.emit("track", { userId, name, latencyMs: Date.now() - start });
} catch (err) {
this.emit("error", { operation: "track", userId, name, err });
throw err;
}
}
async sendTransactional(
to: string,
templateId: string,
data: Record<string, any>,
identifiers?: { id?: string; email?: string }
): Promise<{ delivery_id: string }> {
const start = Date.now();
try {
const request = new SendEmailRequest({
to,
transactional_message_id: templateId,
message_data: data,
identifiers,
});
const result = await this.app.sendEmail(request);
this.emit("transactional", { to, templateId, latencyMs: Date.now() - start });
return result;
} catch (err) {
this.emit("error", { operation: "transactional", to, templateId, err });
throw err;
}
}
async triggerBroadcast(
broadcastId: number,
data: Record<string, any>,
options: { segment?: { id: number }; emails?: string[]; ids?: string[] }
): Promise<void> {
await this.app.triggerBroadcast(broadcastId, data, options);
this.emit("broadcast", { broadcastId });
}
async suppressUser(userId: string): Promise<void> {
await this.track.suppress(userId);
}
async deleteUser(userId: string): Promise<void> {
await this.track.destroy(userId);
}
}
Step 2: Queue-Backed Reliability Layer
// services/messaging-queue.ts
// Wraps MessagingService with queue-based reliability
import { Queue, Worker, Job } from "bullmq";
import { MessagingService } from "./messaging-service";
const REDIS_URL = process.env.REDIS_URL ?? "redis://localhost:6379";
const identifyQueue = new Queue("cio:identify", { connection: { url: REDIS_URL } });
const trackQueue = new Queue("cio:track", { connection: { url: REDIS_URL } });
const transactionalQueue = new Queue("cio:transactional", {
connection: { url: REDIS_URL },
});
export class QueuedMessagingService {
constructor(private messaging: MessagingService) {}
async enqueueIdentify(
userId: string,
attrs: Record<string, any>
): Promise<void> {
await identifyQueue.add("identify", { userId, attrs }, {
attempts: 3,
backoff: { type: "exponential", delay: 2000 },
});
}
async enqueueTrack(
userId: string,
name: string,
data?: Record<string, any>
): Promise<void> {
await trackQueue.add("track", { userId, name, data }, {
attempts: 3,
backoff: { type: "exponential", delay: 2000 },
});
}
startWorkers(): void {
new Worker("cio:identify", async (job: Job) => {
await this.messaging.identifyUser(job.data.userId, job.data.attrs);
}, { connection: { url: REDIS_URL }, concurrency: 10 });
new Worker("cio:track", async (job: Job) => {
await this.messaging.trackEvent(
job.data.userId,
job.data.name,
job.data.data
);
}, { connection: { url: REDIS_URL }, concurrency: 10 });
new Worker("cio:transactional", async (job: Job) => {
await this.messaging.sendTransactional(
job.data.to,
job.data.templateId,
job.data.data,
job.data.identifiers
);
}, { connection: { url: REDIS_URL }, concurrency: 5 });
}
}
Step 3: Repository Pattern
// repositories/user-messaging-repo.ts
// Syncs your user database with Customer.io profiles
import { MessagingService } from "../services/messaging-service";
interface User {
id: string;
email: string;
firstName: string;
lastName: string;
plan: string;
createdAt: Date;
preferences: { marketing: boolean; transactional: boolean };
}
export class UserMessagingRepository {
constructor(private messaging: MessagingService) {}
async syncUser(user: User): Promise<void> {
if (!user.preferences.transactional && !user.preferences.marketing) {
// User has opted out of all messaging — suppress
await this.messaging.suppressUser(user.id);
return;
}
await this.messaging.identifyUser(user.id, {
email: user.email,
first_name: user.firstName,
last_name: user.lastName,
plan: user.plan,
created_at: Math.floor(user.createdAt.getTime() / 1000),
marketing_opt_in: user.preferences.marketing,
transactional_opt_in: user.preferences.transactional,
});
}
async onUserDeleted(userId: string): Promise<void> {
await this.messaging.suppressUser(userId);
await this.messaging.deleteUser(userId);
}
}
Step 4: Infrastructure as Code (Terraform)
# terraform/customerio.tf
# Secrets
resource "google_secret_manager_secret" "cio_site_id" {
secret_id = "customerio-site-id"
replication { auto {} }
}
resource "google_secret_manager_secret" "cio_track_key" {
secret_id = "customerio-track-api-key"
replication { auto {} }
}
resource "google_secret_manager_secret" "cio_app_key" {
secret_id = "customerio-app-api-key"
replication { auto {} }
}
# Cloud Run service
resource "google_cloud_run_v2_service" "cio_service" {
name = "customerio-service"
location = "us-central1"
template {
scaling {
min_instance_count = 1
max_instance_count = 10
}
containers {
image = "gcr.io/${var.project_id}/customerio-service:latest"
env {
name = "CUSTOMERIO_REGION"
value = "us"
}
env {
name = "CUSTOMERIO_SITE_ID"
value_source {
secret_key_ref {
secret = google_secret_manager_secret.cio_site_id.secret_id
version = "latest"
}
}
}
resources {
limits = { cpu = "1", memory = "512Mi" }
}
}
}
}
Error Handling
| Issue | Solution |
|---|---|
| Queue worker failure | BullMQ retries with exponential backoff; check DLQ |
| Service layer error | EventEmitter "error" event logged + alerted |
| Secret rotation | Update Secret Manager version, redeploy |
| Cross-service consistency | Use idempotent operations (identify is idempotent) |
Resources
Next Steps
After implementing architecture, proceed to customerio-multi-env-setup for multi-environment configuration.
> related_skills --same-repo
> fathom-cost-tuning
Optimize Fathom API usage and plan selection. Trigger with phrases like "fathom cost", "fathom pricing", "fathom plan".
> fathom-core-workflow-b
Sync Fathom meeting data to CRM and build automated follow-up workflows. Use when integrating Fathom with Salesforce, HubSpot, or custom CRMs, or creating automated post-meeting email summaries. Trigger with phrases like "fathom crm sync", "fathom salesforce", "fathom follow-up", "fathom post-meeting workflow".
> fathom-core-workflow-a
Build a meeting analytics pipeline with Fathom transcripts and summaries. Use when extracting insights from meetings, building CRM sync, or creating automated meeting follow-up workflows. Trigger with phrases like "fathom analytics", "fathom meeting pipeline", "fathom transcript analysis", "fathom action items sync".
> fathom-common-errors
Diagnose and fix Fathom API errors including auth failures and missing data. Use when API calls fail, transcripts are empty, or webhooks are not firing. Trigger with phrases like "fathom error", "fathom not working", "fathom api failure", "fix fathom".