Skip to main content
Starting with raw data (as SourceTable objects), you created a Graph of Kumo Table objects and defined a PredictiveQuery. Now you can train a model and generate predictions.

Training a Model

model_plan = pquery.suggest_model_plan()
trainer = kumo.Trainer(model_plan)
training_job = trainer.fit(
    graph=graph,
    train_table=pquery.generate_training_table(non_blocking=True),
    non_blocking=False,
)
print(f"Training metrics: {training_job.metrics()}")
  • Line 1 defines the Kumo modeling plan that the predictive query suggests for training. You can use the default model plan directly, or adjust any parameters to your liking.
  • Line 2 creates a Trainer object initialized with the model plan.
  • Line 3 calls fit() with a graph and training table, and trains the model.
  • Line 4 outputs metrics for the job.
The Kumo SDK makes extensive use of non_blocking as an optional parameter for long-running operations. Setting this flag to True lets a long-running operation return immediately, returning a Future object that tracks the operation as it runs in the background. Setting this flag to False lets it wait until completion before returning.

Generating Batch Predictions

Once a model has been trained, generate batch predictions and write to an external data source:
# For v1.4 and above:
from kumoai.artifact_export.config import OutputConfig
# For v1.3 and below (backward compatible):
# from kumoai.trainer.config import OutputConfig

prediction_job = trainer.predict(
    graph=graph,
    prediction_table=pquery.generate_prediction_table(non_blocking=True),
    output_config=OutputConfig(
        output_types={'predictions', 'embeddings'},
        output_connector=connector,
        output_table_name='kumo_predictions',
    ),
    training_job_id=training_job.job_id,
    non_blocking=False,
)
print(f'Batch prediction job summary: {prediction_job.summary()}')

Full Code Example

A complete end-to-end example on the CustomerLTV dataset:
import kumoai as kumo

# Initialize the SDK:
kumo.init(url="https://<customer_id>.kumoai.cloud/api", api_key=API_KEY)

# Create a Connector:
connector = kumo.S3Connector("s3://kumo-public-datasets/customerltv_mini_integ_test/")

# Create Tables from SourceTables:
customer = kumo.Table.from_source_table(
    source_table=connector.table('customer'),
    primary_key='CustomerID',
).infer_metadata()

stock = kumo.Table.from_source_table(
    source_table=connector.table('stock'),
    primary_key='StockCode',
).infer_metadata()

transaction = kumo.Table.from_source_table(
    source_table=connector.table('transaction'),
    time_column='InvoiceDate',
).infer_metadata()

# Create a Graph:
graph = kumo.Graph(
    tables={
        'customer': customer,
        'stock': stock,
        'transaction': transaction,
    },
    edges=[
        dict(src_table='transaction', fkey='StockCode', dst_table='stock'),
        dict(src_table='transaction', fkey='CustomerID', dst_table='customer'),
    ],
)
graph.validate(verbose=True)

# Create a Predictive Query:
pquery = kumo.PredictiveQuery(
    graph=graph,
    query=(
        "PREDICT MAX(transaction.Quantity, 0, 30)\n"
        "FOR EACH customer.CustomerID\n"
        "ASSUMING SUM(transaction.UnitPrice, 0, 7, days) > 15"
    ),
)
pquery.validate(verbose=True)

# Train a model:
model_plan = pquery.suggest_model_plan()
trainer = kumo.Trainer(model_plan)
training_job = trainer.fit(
    graph=graph,
    train_table=pquery.generate_training_table(non_blocking=True),
    non_blocking=False,
)
print(f"Training metrics: {training_job.metrics()}")

# Generate predictions:
from kumoai.artifact_export.config import OutputConfig

prediction_job = trainer.predict(
    graph=graph,
    prediction_table=pquery.generate_prediction_table(non_blocking=True),
    output_config=OutputConfig(
        output_types={'predictions', 'embeddings'},
        output_connector=connector,
        output_table_name='kumo_predictions',
    ),
    training_job_id=training_job.job_id,
    non_blocking=False,
)
print(f'Batch prediction job summary: {prediction_job.summary()}')

Next Steps

While this example covered the core concepts, the SDK provides much more advanced functionality to help improve model iteration speed, evaluate champion/challenger models in production use-cases, integrate cleanly with upstream and downstream data pipelines, and more.