Documentation Index
Fetch the complete documentation index at: https://kumo.ai/docs/llms.txt
Use this file to discover all available pages before exploring further.
Starting with raw data (as SourceTable objects), you created a Graph of Kumo Table objects and defined a PredictiveQuery. Now you can train a model and generate predictions.
Creating a Trainer
Training requires a ModelPlan, which defines the search space of model configurations. Generate one from your predictive query:
model_plan = pquery.suggest_model_plan()
print(model_plan)
The model plan can be edited with full granularity — see the Model Plan reference for customizable attributes. Once ready, create a Trainer:
trainer = kumo.Trainer(model_plan)
Training a Model
Call fit() with your graph and training table:
training_job = trainer.fit(
graph=graph,
train_table=pquery.generate_training_table(non_blocking=True),
non_blocking=False,
)
print(f"Training metrics: {training_job.metrics()}")
The Kumo SDK makes extensive use of non_blocking as an optional parameter for long-running operations. Setting this flag to True lets a long-running operation return immediately, returning a Future object that tracks the operation as it runs in the background. Setting this flag to False lets it wait until completion before returning.
Viewing Metrics and Artifacts
A completed training job exposes metrics and artifacts:
# View evaluation metrics:
print(training_job.metrics())
# Download holdout dataset as a DataFrame:
holdout_df = training_job.holdout_df()
# Open the job in the Kumo UI:
print(training_job.tracking_url)
Generating Batch Predictions
Once a model has been trained, generate batch predictions and write to an external data source:
# For v1.4 and above:
from kumoai.artifact_export.config import OutputConfig
# For v1.3 and below (backward compatible):
# from kumoai.trainer.config import OutputConfig
prediction_job = trainer.predict(
graph=graph,
prediction_table=pquery.generate_prediction_table(non_blocking=True),
output_config=OutputConfig(
output_types={'predictions', 'embeddings'},
output_connector=connector,
output_table_name='kumo_predictions',
),
training_job_id=training_job.job_id,
non_blocking=False,
)
print(f'Batch prediction job summary: {prediction_job.summary()}')
Full Code Example
A complete end-to-end example on the CustomerLTV dataset:
import kumoai as kumo
# Initialize the SDK:
kumo.init(url="https://<customer_id>.kumoai.cloud/api", api_key=API_KEY)
# Create a Connector:
connector = kumo.S3Connector("s3://kumo-public-datasets/customerltv_mini_integ_test/")
# Create Tables from SourceTables:
customer = kumo.Table.from_source_table(
source_table=connector.table('customer'),
primary_key='CustomerID',
).infer_metadata()
stock = kumo.Table.from_source_table(
source_table=connector.table('stock'),
primary_key='StockCode',
).infer_metadata()
transaction = kumo.Table.from_source_table(
source_table=connector.table('transaction'),
time_column='InvoiceDate',
).infer_metadata()
# Create a Graph:
graph = kumo.Graph(
tables={
'customer': customer,
'stock': stock,
'transaction': transaction,
},
edges=[
dict(src_table='transaction', fkey='StockCode', dst_table='stock'),
dict(src_table='transaction', fkey='CustomerID', dst_table='customer'),
],
)
graph.validate(verbose=True)
# Create a Predictive Query:
pquery = kumo.PredictiveQuery(
graph=graph,
query=(
"PREDICT MAX(transaction.Quantity, 0, 30)\n"
"FOR EACH customer.CustomerID\n"
"ASSUMING SUM(transaction.UnitPrice, 0, 7, days) > 15"
),
)
pquery.validate(verbose=True)
# Train a model:
model_plan = pquery.suggest_model_plan()
trainer = kumo.Trainer(model_plan)
training_job = trainer.fit(
graph=graph,
train_table=pquery.generate_training_table(non_blocking=True),
non_blocking=False,
)
print(f"Training metrics: {training_job.metrics()}")
# Generate predictions:
from kumoai.artifact_export.config import OutputConfig
prediction_job = trainer.predict(
graph=graph,
prediction_table=pquery.generate_prediction_table(non_blocking=True),
output_config=OutputConfig(
output_types={'predictions', 'embeddings'},
output_connector=connector,
output_table_name='kumo_predictions',
),
training_job_id=training_job.job_id,
non_blocking=False,
)
print(f'Batch prediction job summary: {prediction_job.summary()}')
Polling Job Status
Any job scheduled with non_blocking=True returns a Future object with methods to track progress:
# Schedule a training job and get a future:
training_future = trainer.fit(graph=graph, train_table=training_table, non_blocking=True)
# Print the job ID:
print(f"Training job ID: {training_future.id}")
# Attach to watch logs live (detach anytime without cancelling the job):
training_future.attach()
# Or poll status in a loop:
print(training_future.status())
# Or block until complete:
training_job = training_future.result()
The same pattern applies to prediction jobs, training table jobs, and prediction table jobs.
Next Steps
While this example covered the core concepts, the SDK provides much more advanced functionality to help improve model iteration speed, evaluate champion/challenger models in production use-cases, integrate cleanly with upstream and downstream data pipelines, and more.