> clari-performance-tuning

Optimize Clari API performance with caching, batch exports, and data pipeline efficiency. Use when exports take too long, optimizing data warehouse load times, or reducing API calls in multi-forecast environments. Trigger with phrases like "clari performance", "clari slow export", "optimize clari pipeline", "clari caching".

fetch
$curl "https://skillshub.wtf/jeremylongshore/claude-code-plugins-plus-skills/clari-performance-tuning?format=md"
SKILL.mdclari-performance-tuning

Clari Performance Tuning

Overview

Optimize Clari export pipelines: reduce export times, cache forecast data, and parallelize multi-period exports.

Instructions

Parallel Multi-Period Export

from concurrent.futures import ThreadPoolExecutor, as_completed

def parallel_export(
    client,
    forecast_name: str,
    periods: list[str],
    max_workers: int = 3,
) -> dict[str, list[dict]]:
    results = {}

    def export_period(period: str) -> tuple[str, list[dict]]:
        data = client.export_and_download(forecast_name, period)
        return period, data.get("entries", [])

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(export_period, p): p for p in periods
        }
        for future in as_completed(futures):
            period, entries = future.result()
            results[period] = entries
            print(f"  {period}: {len(entries)} entries")

    return results

Cache Export Results

import json
import hashlib
from pathlib import Path
from datetime import datetime, timedelta

class ExportCache:
    def __init__(self, cache_dir: str = ".cache/clari", ttl_hours: int = 4):
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(parents=True, exist_ok=True)
        self.ttl = timedelta(hours=ttl_hours)

    def _key(self, forecast: str, period: str) -> str:
        return hashlib.md5(f"{forecast}:{period}".encode()).hexdigest()

    def get(self, forecast: str, period: str) -> list[dict] | None:
        path = self.cache_dir / f"{self._key(forecast, period)}.json"
        if not path.exists():
            return None
        meta = json.loads(path.read_text())
        cached_at = datetime.fromisoformat(meta["cached_at"])
        if datetime.utcnow() - cached_at > self.ttl:
            return None
        return meta["entries"]

    def set(self, forecast: str, period: str, entries: list[dict]):
        path = self.cache_dir / f"{self._key(forecast, period)}.json"
        path.write_text(json.dumps({
            "cached_at": datetime.utcnow().isoformat(),
            "entries": entries,
        }))

Incremental Load to Warehouse

-- Use MERGE for incremental updates instead of full reload
MERGE INTO clari_forecasts AS target
USING staging_clari AS source
ON target.owner_email = source.owner_email
   AND target.time_period = source.time_period
   AND target.forecast_name = source.forecast_name
WHEN MATCHED THEN UPDATE SET
    forecast_amount = source.forecast_amount,
    quota_amount = source.quota_amount,
    crm_total = source.crm_total,
    crm_closed = source.crm_closed,
    exported_at = source.exported_at
WHEN NOT MATCHED THEN INSERT VALUES (
    source.owner_name, source.owner_email, source.forecast_amount,
    source.quota_amount, source.crm_total, source.crm_closed,
    source.adjustment_amount, source.time_period,
    source.exported_at, source.forecast_name
);

Performance Benchmarks

OptimizationBeforeAfter
Sequential 4-period export2 min40s (parallel)
Cache hit5-10s API call<1ms
Full table reload30s5s (MERGE)

Resources

Next Steps

For cost optimization, see clari-cost-tuning.

┌ stats

installs/wk0
░░░░░░░░░░
github stars1.7K
██████████
first seenMar 23, 2026
└────────────

┌ repo

jeremylongshore/claude-code-plugins-plus-skills
by jeremylongshore
└────────────