> clari-core-workflow-a

Build a Clari forecast export pipeline to your data warehouse. Use when exporting forecast calls, quota data, and CRM totals from Clari to Snowflake, BigQuery, or a local database. Trigger with phrases like "clari forecast export", "clari data pipeline", "clari to snowflake", "clari to bigquery", "export clari data".

fetch
$curl "https://skillshub.wtf/jeremylongshore/claude-code-plugins-plus-skills/clari-core-workflow-a?format=md"
SKILL.mdclari-core-workflow-a

Clari Core Workflow: Forecast Export Pipeline

Overview

Primary workflow: build an automated pipeline that exports forecast submissions, quota, adjustments, and CRM data from Clari to your data warehouse. Supports Snowflake, BigQuery, and PostgreSQL as targets.

Prerequisites

  • Completed clari-install-auth and clari-sdk-patterns setup
  • Target database or data warehouse with write access
  • Python 3.10+ with requests and your DB driver

Instructions

Step 1: Define Export Configuration

# config.py
from dataclasses import dataclass

@dataclass
class ExportConfig:
    forecast_name: str          # From Clari forecast list
    time_periods: list[str]     # e.g., ["2026_Q1", "2025_Q4"]
    export_types: list[str] = None
    currency: str = "USD"
    include_historical: bool = True

    def __post_init__(self):
        if self.export_types is None:
            self.export_types = [
                "forecast",           # Submitted forecast call
                "forecast_updated",   # Updated forecast history
                "quota",              # Quota values
                "adjustment",         # Manager adjustments
                "crm_total",          # Total CRM pipeline
                "crm_closed",         # Closed-won CRM amounts
            ]

Step 2: Build the Export Pipeline

# export_pipeline.py
from clari_client import ClariClient
from config import ExportConfig
import json
from datetime import datetime

def run_export(config: ExportConfig) -> list[dict]:
    client = ClariClient()
    all_entries = []

    for period in config.time_periods:
        print(f"Exporting {config.forecast_name} for {period}...")

        data = client.export_and_download(
            forecast_name=config.forecast_name,
            time_period=period,
        )

        entries = data.get("entries", [])
        for entry in entries:
            entry["_exported_at"] = datetime.utcnow().isoformat()
            entry["_forecast_name"] = config.forecast_name

        all_entries.extend(entries)
        print(f"  {len(entries)} records exported")

    return all_entries

def transform_forecast_data(entries: list[dict]) -> dict:
    total_forecast = sum(e.get("forecastAmount", 0) for e in entries)
    total_quota = sum(e.get("quotaAmount", 0) for e in entries)
    total_closed = sum(e.get("crmClosed", 0) for e in entries)

    return {
        "total_forecast": total_forecast,
        "total_quota": total_quota,
        "total_closed": total_closed,
        "attainment_percent": (total_closed / total_quota * 100) if total_quota else 0,
        "coverage_ratio": (total_forecast / total_quota) if total_quota else 0,
        "rep_count": len(entries),
        "reps": entries,
    }

Step 3: Load to Snowflake

# load_snowflake.py
import snowflake.connector

def load_to_snowflake(entries: list[dict], table: str = "CLARI_FORECASTS"):
    conn = snowflake.connector.connect(
        account=os.environ["SNOWFLAKE_ACCOUNT"],
        user=os.environ["SNOWFLAKE_USER"],
        password=os.environ["SNOWFLAKE_PASSWORD"],
        database="REVENUE_DATA",
        schema="CLARI",
    )

    cursor = conn.cursor()
    cursor.execute(f"""
        CREATE TABLE IF NOT EXISTS {table} (
            owner_name VARCHAR,
            owner_email VARCHAR,
            forecast_amount FLOAT,
            quota_amount FLOAT,
            crm_total FLOAT,
            crm_closed FLOAT,
            adjustment_amount FLOAT,
            time_period VARCHAR,
            exported_at TIMESTAMP,
            forecast_name VARCHAR
        )
    """)

    for entry in entries:
        cursor.execute(f"""
            INSERT INTO {table} VALUES (
                %(ownerName)s, %(ownerEmail)s, %(forecastAmount)s,
                %(quotaAmount)s, %(crmTotal)s, %(crmClosed)s,
                %(adjustmentAmount)s, %(timePeriod)s,
                %(_exported_at)s, %(_forecast_name)s
            )
        """, entry)

    conn.commit()
    print(f"Loaded {len(entries)} records to {table}")

Step 4: Schedule with Cron or Airflow

# Run daily export
if __name__ == "__main__":
    config = ExportConfig(
        forecast_name="company_forecast",
        time_periods=["2026_Q1"],
    )
    entries = run_export(config)
    summary = transform_forecast_data(entries)
    print(f"Pipeline complete: {summary['rep_count']} reps, "
          f"${summary['total_forecast']:,.0f} forecast, "
          f"{summary['attainment_percent']:.1f}% attainment")
    load_to_snowflake(entries)

Error Handling

ErrorCauseSolution
Empty entriesNo submitted forecasts for periodVerify period has data in Clari UI
Job timeoutLarge exportIncrease max_poll_attempts
Snowflake auth errorWrong credentialsCheck env vars
Duplicate recordsRe-run without dedupAdd upsert logic with MERGE

Resources

Next Steps

For pipeline analytics and deal inspection, see clari-core-workflow-b.

┌ stats

installs/wk0
░░░░░░░░░░
github stars1.7K
██████████
first seenMar 23, 2026
└────────────

┌ repo

jeremylongshore/claude-code-plugins-plus-skills
by jeremylongshore
└────────────