Documentation Index
Fetch the complete documentation index at: https://kumo.ai/docs/llms.txt
Use this file to discover all available pages before exploring further.
Overview
RFM forecasting extends single-step regression to produce N sequential predictions for a single entity. The key trigger is the FORECAST N TIMEFRAMES clause in a PQL query, which causes the pipeline to produce one prediction row per forecast step rather than a single prediction.
1. Query Syntax
PREDICT SUM(table.column, 0, 1, days)
FORECAST 7 TIMEFRAMES
FOR EACH entity_table.entity_id
FORECAST N TIMEFRAMES sets num_forecasts = N
- The aggregation window (e.g.
0, 1, days) defines step_size — one forecast step = one window width
FORECAST is optional. Without it, the query is a standard single-step regression (num_forecasts = 1)
2. Parse Stage (kumo-pql)
File: kumopql/parser/parser.py
The PQL parser extracts the forecast AST node and produces a ParsedPredictiveQuery with:
num_forecasts — the N from FORECAST N TIMEFRAMES
problem_type = ProblemType.FORECAST — automatically set when the FORECAST clause is present
Validation rules enforced by ProblemTypeValidator:
| Rule | Constraint |
|---|
| Minimum | N >= 1 (N=1 is a no-op, same as regression) |
| Maximum | N <= 10,000 |
| Target type | Must be a temporal aggregation with a date_offset_range (e.g. SUM(..., 0, 7, days)) |
| Stype | Target column must be numerical |
| Incompatible | Cannot combine with RANK / CLASSIFY / LIST_DISTINCT |
3. SDK Layer (kumoai)
File: kumoai/experimental/rfm/rfm.py
3a. Problem type → task type conversion
if query.problem_type == ProblemType.FORECAST:
task_type = TaskType.FORECASTING
3b. Step size derivation
step_size is derived from the aggregation’s end_date_offset — the right-hand boundary of the time window:
def _date_offset_to_ns(offset: pd.DateOffset) -> int:
ref = pd.Timestamp('2020-01-01')
delta = (ref + offset) - ref
return int(delta.total_seconds()) * 1_000_000_000
For SUM(..., 0, 1, days) → step_size = 86_400_000_000_000 ns (1 day).
3c. Anchor time semantics
For prediction (model.predict()):
anchor_time is the end of the historical window
- Forecast windows are
[anchor + step, anchor + 2*step, ..., anchor + N*step]
- Defaults to the latest available timestamp in the data if not specified
For evaluation (model.evaluate()):
- The anchor must be shifted back to leave room for N ground-truth windows:
eval_anchor = max_data_time - (step_size × num_forecasts)
- This ensures windows
[anchor + step ... anchor + N*step] all have real observed values
3d. Single-entity constraint
if task_type == TaskType.FORECASTING and len(set(indices)) > 1:
raise ValueError("Forecasting requires a single entity ID")
Forecasting is one entity at a time. To forecast for multiple entities, loop and call predict() per entity.
3e. use_prediction_time forced on
if task.task_type == TaskType.FORECASTING:
use_prediction_time = True # always
The anchor timestamp for each step is always passed as a feature input to the model.
3f. Context cap
if task.num_forecasts > task.num_context_examples:
raise ValueError("num_forecasts exceeds available historical examples")
You cannot forecast further ahead than you have historical data points.
4. TaskTable
File: kumoai/experimental/rfm/task_table.py
TaskTable carries all the forecasting metadata sent to the server:
TaskTable(
task_type=TaskType.FORECASTING,
context_df=..., # historical rows (features + target)
pred_df=..., # entity rows to forecast
num_forecasts=N, # from FORECAST clause
step_size=..., # nanoseconds per step, from aggregation window
)
5. Server-Side Handling
File: rfm_rest/api_app/rfm.py
The REST server parses the PQL query string using kumopql.parser.PQLParser (server-side). This is why the kumo-pql version on the server must include the FORECAST parser support — the SDK sends the raw query string and the server does the parsing.
The parsed query flows into the same workflow pipeline as regression. The TaskType.FORECASTING flag in the Context object tells the model how many steps to generate.
6. Model Inference (kumo-ml)
File: kumoml/rfm/nn/icl.py
The model output head is the same for FORECASTING and REGRESSION — both use quantile regression:
if task_type in {TaskType.REGRESSION, TaskType.FORECASTING}:
# Extract median (50th percentile) or mean from quantile outputs
return x.sort(dim=-1).values[..., index]
The difference is that the server calls the model N times (once per forecast step), each time with:
- The same historical context
- A different anchor timestamp (
anchor + k * step_size for k = 1..N)
Each call produces one scalar prediction. Together they form the N-row output.
7. Output Structure
model.predict() returns a DataFrame with N rows per entity:
| ENTITY | ANCHOR_TIMESTAMP | TARGET_PRED |
|---|
acct_123 | 2025-01-16 | 1850.25 |
acct_123 | 2025-01-17 | 1920.50 |
acct_123 | 2025-01-18 | 2100.75 |
| … | … | … |
ANCHOR_TIMESTAMP — the target window start for that forecast step
TARGET_PRED — the model’s median prediction for that window
- One row per forecast step, in chronological order
8. Evaluation
model.evaluate() computes holdout metrics (MAE, MSE, RMSE, etc.) by:
- Shifting
anchor_time back by N * step_size to create a held-out window
- Building a
Context that includes y_test — the actual observed target values at each of the N forecast windows
- Sending both predictions and actuals to the server, which computes metrics
The ground truth for each step is the real aggregated value from the data at anchor + k * step_size.
9. Full Flow Diagram
User writes PQL query
│
▼
kumo-pql parser (server-side in rfm_rest)
→ extracts num_forecasts, problem_type=FORECAST
│
▼
SDK builds TaskTable
→ step_size derived from aggregation window
→ anchor_time set / validated
→ context_df sampled from historical data
│
▼
Context sent to RFM server
→ task_type=FORECASTING, num_forecasts=N, step_size=Δt
│
▼
Model called N times (k = 1..N)
→ anchor = original_anchor + k * step_size
→ same historical context each time
→ one scalar output per call
│
▼
N-row DataFrame returned to user
→ one row per forecast step
→ ANCHOR_TIMESTAMP, TARGET_PRED columns
10. Minimal Usage Example
import kumoai.experimental.rfm as rfm
import pandas as pd
rfm.init(api_key="...", url="https://kumorfm.ai/api")
# Build graph (entity table + fact table with timestamps)
graph = rfm.Graph(...)
model = rfm.KumoRFM(graph)
query = (
"PREDICT SUM(usage.count, 0, 1, days) "
"FORECAST 28 TIMEFRAMES "
"FOR EACH accounts.account_id"
)
# Predict next 28 days for a single account
result = model.predict(
query,
indices=["acct_123"],
anchor_time=pd.Timestamp("2025-03-01"),
)
# result has 28 rows, one per day
# Evaluate on held-out data
metrics = model.evaluate(query, metrics=["mae", "rmse"])
11. Common Errors
| Error | Cause | Fix |
|---|
Task type 'forecasting' is not yet supported | Server running old kumo-ml (pre rfm-v0.4.1) | Upgrade server to rfm-v0.4.1+ |
| FORECAST clause not parsed | Server running old kumo-pql (pre #26) | Upgrade server to rfm-v0.4.2+ |
num_forecasts exceeds context examples | Not enough historical rows | Provide more history or reduce N |
Forecasting requires a single entity ID | Passed multiple indices | Loop per entity |
Anchor timestamp for evaluation is after latest supported | anchor_time too recent for eval | Shift anchor back by N * step_size |
| Target not temporal aggregation | Using COUNT(*) or non-windowed agg | Use SUM(..., 0, K, days) style aggregation |