> databricks-core-workflow-b

Execute Databricks secondary workflow: MLflow model training and deployment. Use when building ML pipelines, training models, or deploying to production. Trigger with phrases like "databricks ML", "mlflow training", "databricks model", "feature store", "model registry".

fetch
$curl "https://skillshub.wtf/jeremylongshore/claude-code-plugins-plus-skills/databricks-core-workflow-b?format=md"
SKILL.mddatabricks-core-workflow-b

Databricks Core Workflow B: MLflow Training & Serving

Overview

Full ML lifecycle on Databricks: Feature Engineering Client for discoverable features, MLflow experiment tracking with auto-logging, Unity Catalog model registry with aliases (champion/challenger), and Mosaic AI Model Serving endpoints for real-time inference via REST API.

Prerequisites

  • Completed databricks-install-auth and databricks-core-workflow-a
  • databricks-sdk, mlflow, scikit-learn installed
  • Unity Catalog enabled (required for model registry)

Instructions

Step 1: Feature Engineering with Feature Store

Create a feature table in Unity Catalog so features are discoverable and reusable.

from databricks.feature_engineering import FeatureEngineeringClient
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()
fe = FeatureEngineeringClient()

# Build features from gold layer tables
user_features = (
    spark.table("prod_catalog.gold.user_events")
    .groupBy("user_id")
    .agg(
        F.count("event_id").alias("total_events"),
        F.avg("session_duration_sec").alias("avg_session_sec"),
        F.max("event_timestamp").alias("last_active"),
        F.countDistinct("event_type").alias("unique_event_types"),
        F.datediff(F.current_date(), F.max("event_timestamp")).alias("days_since_last_active"),
    )
)

# Register as a feature table (creates or updates)
fe.create_table(
    name="prod_catalog.ml_features.user_behavior",
    primary_keys=["user_id"],
    df=user_features,
    description="User behavioral features for churn prediction",
)

Step 2: MLflow Experiment Tracking

import mlflow
from sklearn.model_selection import train_test_split
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# Point MLflow to Databricks tracking server
mlflow.set_tracking_uri("databricks")
mlflow.set_experiment("/Users/team@company.com/churn-prediction")

# Load features
features_df = spark.table("prod_catalog.ml_features.user_behavior").toPandas()
X = features_df.drop(columns=["user_id", "churned"])
y = features_df["churned"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train with experiment tracking
with mlflow.start_run(run_name="gbm-baseline") as run:
    params = {"n_estimators": 200, "max_depth": 5, "learning_rate": 0.1}
    mlflow.log_params(params)

    model = GradientBoostingClassifier(**params)
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)

    metrics = {
        "accuracy": accuracy_score(y_test, y_pred),
        "precision": precision_score(y_test, y_pred),
        "recall": recall_score(y_test, y_pred),
        "f1": f1_score(y_test, y_pred),
    }
    mlflow.log_metrics(metrics)

    # Log model with signature for serving validation
    mlflow.sklearn.log_model(
        model,
        artifact_path="model",
        input_example=X_test.iloc[:5],
        registered_model_name="prod_catalog.ml_models.churn_predictor",
    )
    print(f"Run {run.info.run_id}: accuracy={metrics['accuracy']:.3f}")

Step 3: Model Registry with Aliases

Unity Catalog model registry replaces legacy stages with aliases (champion, challenger).

from mlflow import MlflowClient

client = MlflowClient()
model_name = "prod_catalog.ml_models.churn_predictor"

# List versions
for mv in client.search_model_versions(f"name='{model_name}'"):
    print(f"v{mv.version}: status={mv.status}, aliases={mv.aliases}")

# Promote best version to champion
client.set_registered_model_alias(model_name, alias="champion", version="3")

# Load model by alias in downstream code
champion = mlflow.pyfunc.load_model(f"models:/{model_name}@champion")
predictions = champion.predict(X_test)

Step 4: Deploy Model Serving Endpoint

Mosaic AI Model Serving creates a REST API endpoint with auto-scaling.

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import (
    EndpointCoreConfigInput, ServedEntityInput,
)

w = WorkspaceClient()

# Create or update a serving endpoint
endpoint = w.serving_endpoints.create_and_wait(
    name="churn-predictor-prod",
    config=EndpointCoreConfigInput(
        served_entities=[
            ServedEntityInput(
                entity_name="prod_catalog.ml_models.churn_predictor",
                entity_version="3",
                workload_size="Small",
                scale_to_zero_enabled=True,
            )
        ]
    ),
)
print(f"Endpoint ready: {endpoint.name} ({endpoint.state.ready})")

Step 5: Query the Serving Endpoint

import requests

# Score via REST API
url = f"{w.config.host}/serving-endpoints/churn-predictor-prod/invocations"
headers = {
    "Authorization": f"Bearer {w.config.token}",
    "Content-Type": "application/json",
}
payload = {
    "dataframe_records": [
        {"total_events": 42, "avg_session_sec": 120.5,
         "unique_event_types": 7, "days_since_last_active": 3},
    ]
}
response = requests.post(url, headers=headers, json=payload)
print(response.json())  # {"predictions": [0]}

# Or use the SDK
result = w.serving_endpoints.query(
    name="churn-predictor-prod",
    dataframe_records=[
        {"total_events": 42, "avg_session_sec": 120.5,
         "unique_event_types": 7, "days_since_last_active": 3},
    ],
)
print(result.predictions)

Step 6: Batch Inference Job

# Scheduled Databricks job for daily batch scoring
model_name = "prod_catalog.ml_models.churn_predictor"
champion = mlflow.pyfunc.load_model(f"models:/{model_name}@champion")

# Score all active users
active_users = spark.table("prod_catalog.gold.active_users").toPandas()
feature_cols = ["total_events", "avg_session_sec", "unique_event_types", "days_since_last_active"]
active_users["churn_probability"] = champion.predict_proba(active_users[feature_cols])[:, 1]

# Write scores back to Delta
(spark.createDataFrame(active_users[["user_id", "churn_probability"]])
    .write.mode("overwrite")
    .saveAsTable("prod_catalog.gold.churn_scores"))

Output

  • Feature table in Unity Catalog (prod_catalog.ml_features.user_behavior)
  • MLflow experiment with logged runs, metrics, and artifacts
  • Model versions in registry with champion alias
  • Live serving endpoint at /serving-endpoints/churn-predictor-prod/invocations
  • Batch scoring pipeline writing to prod_catalog.gold.churn_scores

Error Handling

ErrorCauseSolution
RESOURCE_DOES_NOT_EXISTWrong experiment pathVerify with mlflow.search_experiments()
INVALID_PARAMETER_VALUE on log_modelMissing signaturePass input_example= to auto-infer signature
Model not found in registryWrong three-level nameUse catalog.schema.model_name format
Endpoint FAILEDModel loading errorCheck endpoint events: w.serving_endpoints.get("name").pending_config
429 on serving endpointRate limit exceededIncrease workload_size or add traffic splitting
FEATURE_TABLE_NOT_FOUNDTable not createdRun fe.create_table() first

Examples

Hyperparameter Sweep

from sklearn.model_selection import ParameterGrid

grid = {"n_estimators": [100, 200], "max_depth": [3, 5, 7], "learning_rate": [0.05, 0.1]}
for params in ParameterGrid(grid):
    with mlflow.start_run(run_name=f"gbm-d{params['max_depth']}-n{params['n_estimators']}"):
        mlflow.log_params(params)
        model = GradientBoostingClassifier(**params)
        model.fit(X_train, y_train)
        mlflow.log_metric("accuracy", accuracy_score(y_test, model.predict(X_test)))
        mlflow.sklearn.log_model(model, "model")

Resources

Next Steps

For common errors, see databricks-common-errors.

┌ stats

installs/wk0
░░░░░░░░░░
github stars1.7K
██████████
first seenMar 23, 2026
└────────────

┌ repo

jeremylongshore/claude-code-plugins-plus-skills
by jeremylongshore
└────────────