Skip to main content

Documentation Index

Fetch the complete documentation index at: https://kumo.ai/docs/llms.txt

Use this file to discover all available pages before exploring further.

Kumo’s online serving runs predictions at request time — suitable for live recommendations, real-time fraud scoring, and any use case where you need a score in milliseconds rather than a scheduled job. Unlike batch prediction, which scores your entire dataset at once, online serving keeps a live endpoint running that you query with a single entity at a time.
The deployment steps below mirror the interactive SDK example notebook, which you can download and run end to end.

How it works

Online serving in Kumo combines two models:
  • A base model trained on your full graph that produces rich entity embeddings capturing long-term patterns.
  • A distilled model that runs at request time, combining those stored embeddings with the latest signals to produce a fast, accurate prediction.
The end-to-end flow is:
  1. Train a base model on your graph.
  2. Train a distilled model using the base model’s embeddings.
  3. Generate embeddings from the base model and export the serving bundle to S3.
  4. Deploy the bundle to a live inference endpoint.
  5. Query the endpoint from your application.
Steps 1–3 use the fine-tuning SDK. Steps 4–5 use the kumoai.online deployment SDK.

Train the base model

Train your base model as usual — this is the same workflow covered in Training & Predictions. Save the job ID; you’ll need it in the next step.
import kumoai as kumo

train_table = pq.generate_training_table()
model_plan  = pq.suggest_model_plan()

trainer    = kumo.Trainer(graph, pq, model_plan)
result     = trainer.fit(train_table, non_blocking=False)
base_job_id = result.job_id

Train the distilled model

The distilled model is a smaller, faster model trained to predict at request time. It takes the base model’s embeddings as inputs — pass base_model_id to link the two.
from kumoai.trainer import DistillationTrainer

train_table_serving = pq_serving.generate_training_table()

distilled_plan   = pq_serving.suggest_distilled_model_plan(base_model_id=base_job_id)
distiller        = DistillationTrainer(distilled_plan, base_job_id)
dist_result      = distiller.fit(graph, train_table_serving, non_blocking=False)
distilled_job_id = dist_result.job_id
pq_serving is a PredictiveQuery on the same graph as pq, targeting the entity you want to score at request time — for example, a transaction or a recommendation candidate.

Generate embeddings and export

Run batch prediction on the base model to produce entity embeddings, then export both the distilled model and the embeddings to S3 as a ready-to-deploy bundle.
from kumoai.trainer.config import OutputConfig
from kumoai.trainer import ModelOutputConfig, export_model

base_trainer = kumo.Trainer.load(base_job_id)
pred_table   = pq.generate_prediction_table(non_blocking=True)

bp_result = base_trainer.predict(
    graph=graph,
    prediction_table=pred_table,
    output_config=OutputConfig(
        output_types={"predictions", "embeddings"},
        output_connector=connector,
        output_table_name="embeddings_for_export",
    ),
    training_job_id=base_job_id,
    non_blocking=False,
)

export_config = ModelOutputConfig(
    training_job_id=distilled_job_id,
    batch_prediction_job_id=bp_result.job_id,
    output_path="s3://your-bucket/path/to/serving-bundle/",
)
export_model(export_config, non_blocking=False)
Export targets S3 URIs (s3://…). Contact your Kumo team if you need to export to a different storage provider.

Connect to the deployment control plane

With your bundle in S3, switch to the kumoai.online SDK to deploy and manage the live service. Install the SDK:
pip install kumoai
Your Kumo team will provide these values when they provision your tenant:
VariableWhat it is
BASE_URLYour control-plane API URL
CLIENT_IDCognito app client ID
CLIENT_SECRETCognito app client secret
TOKEN_URLCognito token endpoint
import os
import kumoai.online as kumo_online

BASE_URL      = os.environ.get("BASE_URL",      "<your-control-plane-url>")
CLIENT_ID     = os.environ.get("CLIENT_ID",     "<your-cognito-client-id>")
CLIENT_SECRET = os.environ.get("CLIENT_SECRET", "<your-cognito-client-secret>")
TOKEN_URL     = os.environ.get("TOKEN_URL",     "<your-cognito-token-url>")
SCOPE         = os.environ.get("SCOPE",         "https://inference.kumo.ai/invoke")

client = kumo_online.init(
    url=BASE_URL,
    client_id=CLIENT_ID,
    client_secret=CLIENT_SECRET,
    token_url=TOKEN_URL,
    scope=SCOPE,
)

print("healthy:", client.health())

Register your model

Point the SDK at the S3 bundle from Step 3. The last path segment becomes the model name.
MODEL_ID     = "my-model-v1"
S3_MODEL_URI = "s3://your-bucket/path/to/serving-bundle/two_stage_gnn/"

client.register_model(model_id=MODEL_ID, s3_model_uri=S3_MODEL_URI)

Deploy an inference service

Choose a GPU instance type and create the service:
import time

instance_types = client.list_instance_types()
INSTANCE_TYPE  = "g6.4xlarge"
SERVICE_NAME   = "my-service"

svc = client.create_inference_service(
    name=SERVICE_NAME,
    model_id=MODEL_ID,
    instance_type=INSTANCE_TYPE,
)
g6.4xlarge is a good starting point for most models. The service takes 1–5 minutes to start — poll until it’s ready:
def wait_until_ready(name: str, timeout: int = 600, poll: int = 10) -> None:
    deadline = time.time() + timeout
    while time.time() < deadline:
        statuses = client.get_status()
        s = statuses.get(name)
        ready = bool(s and s.ready)
        print(f"{name}: ready={ready}  {s.message if s else ''}")
        if ready:
            return
        time.sleep(poll)
    raise TimeoutError(f"{name} did not become ready within {timeout}s")

wait_until_ready(SERVICE_NAME)

Run inference

Send a request to your live endpoint. Input names and shapes come from your exported model’s config.pbtxt.
inputs = [
    {
        "name": "anchor_time",
        "datatype": "INT64",
        "shape": [1, 1],
        "data": [1352371793912000000],
    },
]

response = svc.infer(inputs=inputs)
print(response)
You can also load inputs from a JSON file:
import json
from pathlib import Path

body   = json.loads(Path("request.json").read_text())
inputs = body["inputs"] if isinstance(body, dict) else body
print(svc.infer(inputs=inputs))

Clean up

Delete the service and registered model when you’re done to avoid ongoing costs:
svc.delete()
client.delete_registered_model(MODEL_ID)

Next Steps

The example notebook covers two additional features once you’re comfortable with the basics. Autoscaling — automatically scale replicas based on CPU usage:
from kumoai.online import ScaleMetric

svc.create_autoscaling(
    min_capacity=1,
    max_capacity=5,
    scale_metric=ScaleMetric.cpu,
    scale_target=70,
)
Canary rollouts — gradually shift traffic to a new model version while the existing one keeps serving the rest:
client.register_model(model_id="my-model-v2", s3_model_uri=S3_MODEL_URI_V2)
svc.start_canary(canary_model_id="my-model-v2", canary_traffic_percent=10)
Once you’re satisfied, call svc.promote() to make the canary the new stable version. If something goes wrong, call svc.rollback() to route all traffic back to the original.

See also