> clickhouse-core-workflow-b

Insert, query, and aggregate data in ClickHouse with real SQL patterns. Use when writing analytical queries, inserting data at scale, building dashboards, or implementing materialized views. Trigger: "clickhouse query", "clickhouse insert", "clickhouse aggregate", "clickhouse materialized view", "clickhouse SQL".

fetch
$curl "https://skillshub.wtf/jeremylongshore/claude-code-plugins-plus-skills/clickhouse-core-workflow-b?format=md"
SKILL.mdclickhouse-core-workflow-b

ClickHouse Insert & Query (Core Workflow B)

Overview

Insert data efficiently and write analytical queries with aggregations, window functions, and materialized views.

Prerequisites

  • Tables created (see clickhouse-core-workflow-a)
  • @clickhouse/client connected

Instructions

Step 1: Bulk Insert Patterns

import { createClient } from '@clickhouse/client';

const client = createClient({
  url: process.env.CLICKHOUSE_HOST!,
  username: process.env.CLICKHOUSE_USER ?? 'default',
  password: process.env.CLICKHOUSE_PASSWORD ?? '',
});

// Insert many rows efficiently — @clickhouse/client buffers internally
await client.insert({
  table: 'analytics.events',
  values: events,   // Array of objects matching table columns
  format: 'JSONEachRow',
});

// Insert from file (CSV, Parquet, etc.)
import { createReadStream } from 'fs';

await client.insert({
  table: 'analytics.events',
  values: createReadStream('./data/events.csv'),
  format: 'CSVWithNames',
});

Insert best practices:

  • Batch rows: aim for 10K-100K rows per INSERT (not one at a time)
  • ClickHouse creates a new "part" per INSERT — too many small inserts cause "too many parts"
  • For real-time streams, buffer 1-5 seconds then flush

Step 2: Analytical Queries

-- Top events by tenant in the last 7 days
SELECT
    tenant_id,
    event_type,
    count()                  AS event_count,
    uniqExact(user_id)       AS unique_users,
    min(created_at)          AS first_seen,
    max(created_at)          AS last_seen
FROM analytics.events
WHERE created_at >= now() - INTERVAL 7 DAY
GROUP BY tenant_id, event_type
ORDER BY event_count DESC
LIMIT 100;
-- Funnel analysis: signup → activation → purchase
SELECT
    level,
    count() AS users
FROM (
    SELECT
        user_id,
        groupArray(event_type) AS journey
    FROM analytics.events
    WHERE event_type IN ('signup', 'activation', 'purchase')
      AND created_at >= today() - 30
    GROUP BY user_id
)
ARRAY JOIN arrayEnumerate(journey) AS level
GROUP BY level
ORDER BY level;
-- Retention: users active this week who were also active last week
SELECT
    count(DISTINCT curr.user_id) AS retained_users
FROM analytics.events AS curr
INNER JOIN analytics.events AS prev
    ON curr.user_id = prev.user_id
WHERE curr.created_at >= toMonday(today())
  AND prev.created_at >= toMonday(today()) - 7
  AND prev.created_at < toMonday(today());

Step 3: Parameterized Queries in Node.js

// Use {param:Type} syntax for safe parameterized queries
const rs = await client.query({
  query: `
    SELECT event_type, count() AS cnt
    FROM analytics.events
    WHERE tenant_id = {tenant_id:UInt32}
      AND created_at >= {from_date:DateTime}
    GROUP BY event_type
    ORDER BY cnt DESC
  `,
  query_params: {
    tenant_id: 1,
    from_date: '2025-01-01 00:00:00',
  },
  format: 'JSONEachRow',
});
const rows = await rs.json();

Step 4: Materialized Views (Pre-Aggregation)

-- Source table receives raw events
-- Materialized view aggregates automatically on INSERT

CREATE MATERIALIZED VIEW analytics.hourly_stats_mv
TO analytics.hourly_stats  -- target table
AS
SELECT
    toStartOfHour(created_at) AS hour,
    tenant_id,
    event_type,
    count()                   AS event_count,
    uniqState(user_id)        AS unique_users_state
FROM analytics.events
GROUP BY hour, tenant_id, event_type;

-- Target table uses AggregatingMergeTree
CREATE TABLE analytics.hourly_stats (
    hour              DateTime,
    tenant_id         UInt32,
    event_type        LowCardinality(String),
    event_count       UInt64,
    unique_users_state AggregateFunction(uniq, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY (tenant_id, event_type, hour);

-- Query the materialized view (merge aggregation states)
SELECT
    hour,
    sum(event_count)           AS events,
    uniqMerge(unique_users_state) AS unique_users
FROM analytics.hourly_stats
WHERE tenant_id = 1
GROUP BY hour
ORDER BY hour;

Step 5: Window Functions

-- Running total and rank within each tenant
SELECT
    tenant_id,
    event_type,
    count()   AS cnt,
    sum(count()) OVER (PARTITION BY tenant_id ORDER BY count() DESC) AS running_total,
    row_number() OVER (PARTITION BY tenant_id ORDER BY count() DESC) AS rank
FROM analytics.events
WHERE created_at >= today() - 7
GROUP BY tenant_id, event_type
ORDER BY tenant_id, rank;

Step 6: Common ClickHouse Functions

FunctionDescriptionExample
count()Row countcount()
uniq(col)Approximate distinct count (HyperLogLog)uniq(user_id)
uniqExact(col)Exact distinct countuniqExact(user_id)
quantile(0.95)(col)Percentilequantile(0.95)(latency_ms)
arrayJoin(arr)Unnest array to rowsarrayJoin(tags)
JSONExtractString(col, key)Extract from JSON stringJSONExtractString(properties, 'plan')
toStartOfHour(dt)Truncate to hourtoStartOfHour(created_at)
formatReadableSize(n)Human-readable bytesformatReadableSize(bytes)
if(cond, then, else)Conditionalif(cnt > 0, cnt, NULL)
multiIf(...)Multi-branch conditionalmultiIf(x>10, 'high', x>5, 'med', 'low')

Error Handling

ErrorCauseSolution
Too many parts (300)Frequent small insertsBatch inserts, increase parts_to_throw_insert
Memory limit exceededLarge GROUP BY / JOINAdd WHERE filters, increase max_memory_usage
UNKNOWN_FUNCTIONWrong ClickHouse versionCheck SELECT version()
Cannot parse datetimeWrong formatUse YYYY-MM-DD HH:MM:SS format

Resources

Next Steps

For error troubleshooting, see clickhouse-common-errors.

┌ stats

installs/wk0
░░░░░░░░░░
github stars1.7K
██████████
first seenMar 23, 2026
└────────────

┌ repo

jeremylongshore/claude-code-plugins-plus-skills
by jeremylongshore
└────────────