Skip to main content

Overview

RFM forecasting extends single-step regression to produce N sequential predictions for a single entity. The key trigger is the FORECAST N TIMEFRAMES clause in a PQL query, which causes the pipeline to produce one prediction row per forecast step rather than a single prediction.

1. Query Syntax

PREDICT SUM(table.column, 0, 1, days)
FORECAST 7 TIMEFRAMES
FOR EACH entity_table.entity_id
  • FORECAST N TIMEFRAMES sets num_forecasts = N
  • The aggregation window (e.g. 0, 1, days) defines step_size — one forecast step = one window width
  • FORECAST is optional. Without it, the query is a standard single-step regression (num_forecasts = 1)

2. Parse Stage (kumo-pql)

File: kumopql/parser/parser.py The PQL parser extracts the forecast AST node and produces a ParsedPredictiveQuery with:
  • num_forecasts — the N from FORECAST N TIMEFRAMES
  • problem_type = ProblemType.FORECAST — automatically set when the FORECAST clause is present
Validation rules enforced by ProblemTypeValidator:
RuleConstraint
MinimumN >= 1 (N=1 is a no-op, same as regression)
MaximumN <= 10,000
Target typeMust be a temporal aggregation with a date_offset_range (e.g. SUM(..., 0, 7, days))
StypeTarget column must be numerical
IncompatibleCannot combine with RANK / CLASSIFY / LIST_DISTINCT

3. SDK Layer (kumoai)

File: kumoai/experimental/rfm/rfm.py

3a. Problem type → task type conversion

if query.problem_type == ProblemType.FORECAST:
    task_type = TaskType.FORECASTING

3b. Step size derivation

step_size is derived from the aggregation’s end_date_offset — the right-hand boundary of the time window:
def _date_offset_to_ns(offset: pd.DateOffset) -> int:
    ref = pd.Timestamp('2020-01-01')
    delta = (ref + offset) - ref
    return int(delta.total_seconds()) * 1_000_000_000
For SUM(..., 0, 1, days)step_size = 86_400_000_000_000 ns (1 day).

3c. Anchor time semantics

For prediction (model.predict()):
  • anchor_time is the end of the historical window
  • Forecast windows are [anchor + step, anchor + 2*step, ..., anchor + N*step]
  • Defaults to the latest available timestamp in the data if not specified
For evaluation (model.evaluate()):
  • The anchor must be shifted back to leave room for N ground-truth windows:
    eval_anchor = max_data_time - (step_size × num_forecasts)
    
  • This ensures windows [anchor + step ... anchor + N*step] all have real observed values

3d. Single-entity constraint

if task_type == TaskType.FORECASTING and len(set(indices)) > 1:
    raise ValueError("Forecasting requires a single entity ID")
Forecasting is one entity at a time. To forecast for multiple entities, loop and call predict() per entity.

3e. use_prediction_time forced on

if task.task_type == TaskType.FORECASTING:
    use_prediction_time = True  # always
The anchor timestamp for each step is always passed as a feature input to the model.

3f. Context cap

if task.num_forecasts > task.num_context_examples:
    raise ValueError("num_forecasts exceeds available historical examples")
You cannot forecast further ahead than you have historical data points.

4. TaskTable

File: kumoai/experimental/rfm/task_table.py TaskTable carries all the forecasting metadata sent to the server:
TaskTable(
    task_type=TaskType.FORECASTING,
    context_df=...,           # historical rows (features + target)
    pred_df=...,              # entity rows to forecast
    num_forecasts=N,          # from FORECAST clause
    step_size=...,            # nanoseconds per step, from aggregation window
)

5. Server-Side Handling

File: rfm_rest/api_app/rfm.py The REST server parses the PQL query string using kumopql.parser.PQLParser (server-side). This is why the kumo-pql version on the server must include the FORECAST parser support — the SDK sends the raw query string and the server does the parsing. The parsed query flows into the same workflow pipeline as regression. The TaskType.FORECASTING flag in the Context object tells the model how many steps to generate.

6. Model Inference (kumo-ml)

File: kumoml/rfm/nn/icl.py The model output head is the same for FORECASTING and REGRESSION — both use quantile regression:
if task_type in {TaskType.REGRESSION, TaskType.FORECASTING}:
    # Extract median (50th percentile) or mean from quantile outputs
    return x.sort(dim=-1).values[..., index]
The difference is that the server calls the model N times (once per forecast step), each time with:
  • The same historical context
  • A different anchor timestamp (anchor + k * step_size for k = 1..N)
Each call produces one scalar prediction. Together they form the N-row output.

7. Output Structure

model.predict() returns a DataFrame with N rows per entity:
ENTITYANCHOR_TIMESTAMPTARGET_PRED
acct_1232025-01-161850.25
acct_1232025-01-171920.50
acct_1232025-01-182100.75
  • ANCHOR_TIMESTAMP — the target window start for that forecast step
  • TARGET_PRED — the model’s median prediction for that window
  • One row per forecast step, in chronological order

8. Evaluation

model.evaluate() computes holdout metrics (MAE, MSE, RMSE, etc.) by:
  1. Shifting anchor_time back by N * step_size to create a held-out window
  2. Building a Context that includes y_test — the actual observed target values at each of the N forecast windows
  3. Sending both predictions and actuals to the server, which computes metrics
The ground truth for each step is the real aggregated value from the data at anchor + k * step_size.

9. Full Flow Diagram

User writes PQL query


kumo-pql parser (server-side in rfm_rest)
  → extracts num_forecasts, problem_type=FORECAST


SDK builds TaskTable
  → step_size derived from aggregation window
  → anchor_time set / validated
  → context_df sampled from historical data


Context sent to RFM server
  → task_type=FORECASTING, num_forecasts=N, step_size=Δt


Model called N times (k = 1..N)
  → anchor = original_anchor + k * step_size
  → same historical context each time
  → one scalar output per call


N-row DataFrame returned to user
  → one row per forecast step
  → ANCHOR_TIMESTAMP, TARGET_PRED columns

10. Minimal Usage Example

import kumoai.experimental.rfm as rfm
import pandas as pd

rfm.init(api_key="...", url="https://kumorfm.ai/api")

# Build graph (entity table + fact table with timestamps)
graph = rfm.Graph(...)
model = rfm.KumoRFM(graph)

query = (
    "PREDICT SUM(usage.count, 0, 1, days) "
    "FORECAST 28 TIMEFRAMES "
    "FOR EACH accounts.account_id"
)

# Predict next 28 days for a single account
result = model.predict(
    query,
    indices=["acct_123"],
    anchor_time=pd.Timestamp("2025-03-01"),
)
# result has 28 rows, one per day

# Evaluate on held-out data
metrics = model.evaluate(query, metrics=["mae", "rmse"])

11. Common Errors

ErrorCauseFix
Task type 'forecasting' is not yet supportedServer running old kumo-ml (pre rfm-v0.4.1)Upgrade server to rfm-v0.4.1+
FORECAST clause not parsedServer running old kumo-pql (pre #26)Upgrade server to rfm-v0.4.2+
num_forecasts exceeds context examplesNot enough historical rowsProvide more history or reduce N
Forecasting requires a single entity IDPassed multiple indicesLoop per entity
Anchor timestamp for evaluation is after latest supportedanchor_time too recent for evalShift anchor back by N * step_size
Target not temporal aggregationUsing COUNT(*) or non-windowed aggUse SUM(..., 0, K, days) style aggregation