Overview
RFM forecasting extends single-step regression to produce N sequential predictions for a single entity. The key trigger is theFORECAST N TIMEFRAMES clause in a PQL query, which causes the pipeline to produce one prediction row per forecast step rather than a single prediction.
1. Query Syntax
FORECAST N TIMEFRAMESsetsnum_forecasts = N- The aggregation window (e.g.
0, 1, days) definesstep_size— one forecast step = one window width FORECASTis optional. Without it, the query is a standard single-step regression (num_forecasts = 1)
2. Parse Stage (kumo-pql)
File:kumopql/parser/parser.py
The PQL parser extracts the forecast AST node and produces a ParsedPredictiveQuery with:
num_forecasts— theNfromFORECAST N TIMEFRAMESproblem_type = ProblemType.FORECAST— automatically set when the FORECAST clause is present
ProblemTypeValidator:
| Rule | Constraint |
|---|---|
| Minimum | N >= 1 (N=1 is a no-op, same as regression) |
| Maximum | N <= 10,000 |
| Target type | Must be a temporal aggregation with a date_offset_range (e.g. SUM(..., 0, 7, days)) |
| Stype | Target column must be numerical |
| Incompatible | Cannot combine with RANK / CLASSIFY / LIST_DISTINCT |
3. SDK Layer (kumoai)
File:kumoai/experimental/rfm/rfm.py
3a. Problem type → task type conversion
3b. Step size derivation
step_size is derived from the aggregation’s end_date_offset — the right-hand boundary of the time window:
SUM(..., 0, 1, days) → step_size = 86_400_000_000_000 ns (1 day).
3c. Anchor time semantics
For prediction (model.predict()):
anchor_timeis the end of the historical window- Forecast windows are
[anchor + step, anchor + 2*step, ..., anchor + N*step] - Defaults to the latest available timestamp in the data if not specified
model.evaluate()):
- The anchor must be shifted back to leave room for N ground-truth windows:
- This ensures windows
[anchor + step ... anchor + N*step]all have real observed values
3d. Single-entity constraint
predict() per entity.
3e. use_prediction_time forced on
3f. Context cap
4. TaskTable
File:kumoai/experimental/rfm/task_table.py
TaskTable carries all the forecasting metadata sent to the server:
5. Server-Side Handling
File:rfm_rest/api_app/rfm.py
The REST server parses the PQL query string using kumopql.parser.PQLParser (server-side). This is why the kumo-pql version on the server must include the FORECAST parser support — the SDK sends the raw query string and the server does the parsing.
The parsed query flows into the same workflow pipeline as regression. The TaskType.FORECASTING flag in the Context object tells the model how many steps to generate.
6. Model Inference (kumo-ml)
File:kumoml/rfm/nn/icl.py
The model output head is the same for FORECASTING and REGRESSION — both use quantile regression:
- The same historical context
- A different anchor timestamp (
anchor + k * step_sizefor k = 1..N)
7. Output Structure
model.predict() returns a DataFrame with N rows per entity:
| ENTITY | ANCHOR_TIMESTAMP | TARGET_PRED |
|---|---|---|
acct_123 | 2025-01-16 | 1850.25 |
acct_123 | 2025-01-17 | 1920.50 |
acct_123 | 2025-01-18 | 2100.75 |
| … | … | … |
ANCHOR_TIMESTAMP— the target window start for that forecast stepTARGET_PRED— the model’s median prediction for that window- One row per forecast step, in chronological order
8. Evaluation
model.evaluate() computes holdout metrics (MAE, MSE, RMSE, etc.) by:
- Shifting
anchor_timeback byN * step_sizeto create a held-out window - Building a
Contextthat includesy_test— the actual observed target values at each of the N forecast windows - Sending both predictions and actuals to the server, which computes metrics
anchor + k * step_size.
9. Full Flow Diagram
10. Minimal Usage Example
11. Common Errors
| Error | Cause | Fix |
|---|---|---|
Task type 'forecasting' is not yet supported | Server running old kumo-ml (pre rfm-v0.4.1) | Upgrade server to rfm-v0.4.1+ |
| FORECAST clause not parsed | Server running old kumo-pql (pre #26) | Upgrade server to rfm-v0.4.2+ |
num_forecasts exceeds context examples | Not enough historical rows | Provide more history or reduce N |
Forecasting requires a single entity ID | Passed multiple indices | Loop per entity |
Anchor timestamp for evaluation is after latest supported | anchor_time too recent for eval | Shift anchor back by N * step_size |
| Target not temporal aggregation | Using COUNT(*) or non-windowed agg | Use SUM(..., 0, K, days) style aggregation |