Kumo’s online serving runs predictions at request time — suitable for live recommendations, real-time fraud scoring, and any use case where you need a score in milliseconds rather than a scheduled job. Unlike batch prediction, which scores your entire dataset at once, online serving keeps a live endpoint running that you query with a single entity at a time.
How it works
Online serving in Kumo combines two models:
- A base model trained on your full graph that produces rich entity embeddings capturing long-term patterns.
- A distilled model that runs at request time, combining those stored embeddings with the latest signals to produce a fast, accurate prediction.
The end-to-end flow is:
- Train a base model on your graph.
- Train a distilled model using the base model’s embeddings.
- Generate embeddings from the base model and export the serving bundle to S3.
- Deploy the bundle to a live inference endpoint.
- Query the endpoint from your application.
Steps 1–3 use the fine-tuning SDK. Steps 4–5 use the kumoai.online deployment SDK.
Train the base model
Train your base model as usual — this is the same workflow covered in Training & Predictions. Save the job ID; you’ll need it in the next step.
import kumoai as kumo
train_table = pq.generate_training_table()
model_plan = pq.suggest_model_plan()
trainer = kumo.Trainer(graph, pq, model_plan)
result = trainer.fit(train_table, non_blocking=False)
base_job_id = result.job_id
Train the distilled model
The distilled model is a smaller, faster model trained to predict at request time. It takes the base model’s embeddings as inputs — pass base_model_id to link the two.
from kumoai.trainer import DistillationTrainer
train_table_serving = pq_serving.generate_training_table()
distilled_plan = pq_serving.suggest_distilled_model_plan(base_model_id=base_job_id)
distiller = DistillationTrainer(distilled_plan, base_job_id)
dist_result = distiller.fit(graph, train_table_serving, non_blocking=False)
distilled_job_id = dist_result.job_id
pq_serving is a PredictiveQuery on the same graph as pq, targeting the entity you want to score at request time — for example, a transaction or a recommendation candidate.
Generate embeddings and export
Run batch prediction on the base model to produce entity embeddings, then export both the distilled model and the embeddings to S3 as a ready-to-deploy bundle.
from kumoai.trainer.config import OutputConfig
from kumoai.trainer import ModelOutputConfig, export_model
base_trainer = kumo.Trainer.load(base_job_id)
pred_table = pq.generate_prediction_table(non_blocking=True)
bp_result = base_trainer.predict(
graph=graph,
prediction_table=pred_table,
output_config=OutputConfig(
output_types={"predictions", "embeddings"},
output_connector=connector,
output_table_name="embeddings_for_export",
),
training_job_id=base_job_id,
non_blocking=False,
)
export_config = ModelOutputConfig(
training_job_id=distilled_job_id,
batch_prediction_job_id=bp_result.job_id,
output_path="s3://your-bucket/path/to/serving-bundle/",
)
export_model(export_config, non_blocking=False)
Export targets S3 URIs (s3://…). Contact your Kumo team if you need to export to a different storage provider.
Connect to the deployment control plane
With your bundle in S3, switch to the kumoai.online SDK to deploy and manage the live service.
Install the SDK:
Your Kumo team will provide these values when they provision your tenant:
| Variable | What it is |
|---|
BASE_URL | Your control-plane API URL |
CLIENT_ID | Cognito app client ID |
CLIENT_SECRET | Cognito app client secret |
TOKEN_URL | Cognito token endpoint |
import os
import kumoai.online as kumo_online
BASE_URL = os.environ.get("BASE_URL", "<your-control-plane-url>")
CLIENT_ID = os.environ.get("CLIENT_ID", "<your-cognito-client-id>")
CLIENT_SECRET = os.environ.get("CLIENT_SECRET", "<your-cognito-client-secret>")
TOKEN_URL = os.environ.get("TOKEN_URL", "<your-cognito-token-url>")
SCOPE = os.environ.get("SCOPE", "https://inference.kumo.ai/invoke")
client = kumo_online.init(
url=BASE_URL,
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
token_url=TOKEN_URL,
scope=SCOPE,
)
print("healthy:", client.health())
Register your model
Point the SDK at the S3 bundle from Step 3. The last path segment becomes the model name.
MODEL_ID = "my-model-v1"
S3_MODEL_URI = "s3://your-bucket/path/to/serving-bundle/two_stage_gnn/"
client.register_model(model_id=MODEL_ID, s3_model_uri=S3_MODEL_URI)
Deploy an inference service
Choose a GPU instance type and create the service:
import time
instance_types = client.list_instance_types()
INSTANCE_TYPE = "g6.4xlarge"
SERVICE_NAME = "my-service"
svc = client.create_inference_service(
name=SERVICE_NAME,
model_id=MODEL_ID,
instance_type=INSTANCE_TYPE,
)
g6.4xlarge is a good starting point for most models. The service takes 1–5 minutes to start — poll until it’s ready:
def wait_until_ready(name: str, timeout: int = 600, poll: int = 10) -> None:
deadline = time.time() + timeout
while time.time() < deadline:
statuses = client.get_status()
s = statuses.get(name)
ready = bool(s and s.ready)
print(f"{name}: ready={ready} {s.message if s else ''}")
if ready:
return
time.sleep(poll)
raise TimeoutError(f"{name} did not become ready within {timeout}s")
wait_until_ready(SERVICE_NAME)
Run inference
Send a request to your live endpoint. Input names and shapes come from your exported model’s config.pbtxt.
inputs = [
{
"name": "anchor_time",
"datatype": "INT64",
"shape": [1, 1],
"data": [1352371793912000000],
},
]
response = svc.infer(inputs=inputs)
print(response)
You can also load inputs from a JSON file:import json
from pathlib import Path
body = json.loads(Path("request.json").read_text())
inputs = body["inputs"] if isinstance(body, dict) else body
print(svc.infer(inputs=inputs))
Clean up
Delete the service and registered model when you’re done to avoid ongoing costs:
svc.delete()
client.delete_registered_model(MODEL_ID)
Next Steps
The example notebook covers two additional features once you’re comfortable with the basics.
Autoscaling — automatically scale replicas based on CPU usage:
from kumoai.online import ScaleMetric
svc.create_autoscaling(
min_capacity=1,
max_capacity=5,
scale_metric=ScaleMetric.cpu,
scale_target=70,
)
Canary rollouts — gradually shift traffic to a new model version while the existing one keeps serving the rest:
client.register_model(model_id="my-model-v2", s3_model_uri=S3_MODEL_URI_V2)
svc.start_canary(canary_model_id="my-model-v2", canary_traffic_percent=10)
Once you’re satisfied, call svc.promote() to make the canary the new stable version. If something goes wrong, call svc.rollback() to route all traffic back to the original.
See also