Powering Predictions in Snowflake Intelligence with KumoRFM

Learn More
News/ Collaborations /

Building the Future of Agents for Ecommerce

November 3, 2025
Sally Liu
James Briggs

Sally Liu, James Briggs

E-commerce has long been a wellspring for technical innovations. The industry offers (1) bucket-loads of data, and (2) big financial rewards for those who can predict their customers' wants and needs. These two factors have made e-commerce a popular playground for many generations of machine learning (ML) and AI engineers.

The success of companies like Amazon and Netflix is a direct result of their obsession with scraping huge swathes of data and moulding that data into predictive insights.

Structured data has been key to e-commerce's early adoption of many AI technologies. However, that has led to a lack of adoption in the latest boom - generative AI. The reason for this is simple: past AI excelled with structured data - generative AI excels with unstructured data.

In this article, we will explore the cutting-edge of AI in e-commerce and how merging both generative and traditional AI technologies will enable the next wave of AI innovation in e-commerce.

Chatbots in E-commerce

Chatbots are the most visible tech adopted by e-commerce from the recent wave of gen AI. When used well, they have enabled improved UX and customer satisfaction, but they've been used to a tiny degree of their real potential in the industry.

Today, chatbots are typically thrown on e-commerce sites as a glorified FAQ autocomplete. Engineers will plug in some docs via a Retrieval Augmented Generation (RAG) pipeline to make the chatbot more "on-brand" and helpful when users have brand-specific queries - but this is as far as most gen-AI projects get in ecommerce.

In the past, e-commerce paved the way for huge technologies like approximate nearest neighbors search (ANNS) that unlocked billion-scale search and modern recommendation systems, giving us personalized product recommendations, ad targeting, and even the movies that Netflix surfaces to us on a Saturday night.

To look forward at the future of AI in ecommerce, we need to look back and understand how the latest advances in AI, such as graph neural networks (GNNs) and large language models (LLMs), can be used alongside the predictive analytics use-cases that made ecommerce the giant that it is today.

Predictive Agents

LLMs are not well-suited for parsing the huge-scale structured data that powers predictive analytics in e-commerce. An LLM is a brilliant synergizer of information but cannot read, memorize, and then synergize information at the scale we need in e-commerce. But when LLMs are used as tool-wielding agents, things get interesting.

With agents, our LLMs don't need to directly interact with our data. Instead, we can give our LLMs a set of tools that allow them to run predictive analytics with other AI-technologies that are suited for the scale and relation-building logic that we need.


This is where Kumo's Relational Foundation Model (KumoRFM) comes in. KumoRFM is a merger between LLMs and GNNs. The LLM part allows KumoRFM to quickly discover and map relationships and data structures. The GNN part allows KumoRFM to make sense of those relationships and data structures and enable on-the-fly predictions to be made with no heavy pretraining.

By integrating KumoRFM into a broader agent, we are able to benefit from both general-purpose LLMs and Kumo's fast predictive analytics capabilities.

Initialize KumoRFM

We'll be jumping into the data preparation in a moment, but before we do, we'll need to initialize our KumoRFM instance. A free API key can be obtained by running the code below:

python
import os
from kumoai.experimental import rfm


if not os.environ.get("KUMO_API_KEY"):
   rfm.authenticate()


Then we initialize our local KumoRFM client like so:

python
rfm.init(api_key=os.environ.get("KUMO_API_KEY"))

H&M Ecommerce Dataset

We're going to use a sample of the H&M ecommerce dataset. The sample is available on Hugging Face Datasets at jamescalam/hm-sample. It includes three tables: the customers table with 1.1K rows, articles with 5K rows, and transactions with 15.7K rows.

Let's begin with the customers table. We download and view its schema like so:

python
from datasets import load_dataset


customers = load_dataset(
   "jamescalam/hm-sample", data_files="customers.jsonl", split="train"
)
customers
python
Dataset({
   features: [
       'customer_id', 'FN', 'Active', 'club_member_status',
       'fashion_news_frequency', 'age', 'postal_code'
   ],
   num_rows: 1100
})

Now moving onto articles:

python
articles = load_dataset(
   "jamescalam/hm-sample", data_files="articles.jsonl", split="train"
)
articles
python
Dataset({
   features: [
       'article_id', 'product_code', 'prod_name', 'product_type_no',
       'product_type_name', 'product_group_name', 'graphical_appearance_no',
       'graphical_appearance_name', 'colour_group_code',
       'colour_group_name', 'perceived_colour_value_id',
       'perceived_colour_value_name', 'perceived_colour_master_id',
       'perceived_colour_master_name', 'department_no', 'department_name',
       'index_code', 'index_name', 'index_group_no', 'index_group_name',
       'section_no', 'section_name', 'garment_group_no',
       'garment_group_name', 'detail_desc'
   ],
   num_rows: 5000
})

Finally, our transactions:

python
transactions = load_dataset(
   "jamescalam/hm-sample", data_files="transactions.jsonl", split="train"
)
transactions
python
Dataset({
   features: [
       't_dat', 'customer_id', 'article_id', 'price', 'sales_channel_id'
   ],
   num_rows: 15773
})

We read these into Kumo by first transforming our Hugging Face datasets into Pandas dataframes:

python
customers_df = customers.to_pandas()
articles_df = articles.to_pandas()
transactions_df = transactions.to_pandas()

Once we have our dataframes, we will transform them into rfm.LocalTable objects. These are lightweight abstractions of pandas dataframes that allow us to interface our data with KumoRFM. We use the .infer_metadata() method to automatically infer what types of data we have in our tables:

python
Detected primary key 'customer_id' in table 'customers'
Detected time column 't_dat' in table 'transactions'
Detected primary key 'article_id' in table 'articles'
python
customers = rfm.LocalTable(customers_df, name="customers").infer_metadata()
transactions = rfm.LocalTable(transactions_df, name="transactions").infer_metadata()
articles = rfm.LocalTable(articles_df, name="articles").infer_metadata()

We can update the column types as needed, like so:

python
# update semantic type of columns
customers["customer_id"].stype = "ID"
customers["age"].stype = "numerical"


# primary keys
customers.primary_key = "customer_id"
articles.primary_key = "article_id"


# time column
transactions.time_column = "t_dat"

Then we create the graph:

python
# select the tables
graph = rfm.LocalGraph(tables=[
   customers, transactions, articles
])
# link the tables
graph.link(src_table="transactions", fkey="customer_id", dst_table="customers")
python
LocalGraph(
 tables=[
   customers,
   transactions,
   articles,
 ],
 edges=[
   transactions.customer_id ⇔ customers.customer_id,
   transactions.article_id ⇔ articles.article_id,
 ],
)

We initialize the KumoRFM model within the Kumo local graph.

python
model = rfm.KumoRFM(graph=graph)

Now we can make predictions. Let's see how likely one of our products is to be purchased over the next 30 days. Let's choose an article ID:

python
article_id = articles_df.iloc[0].article_id.item()
article_id
text
675662003

Let's see how often we can expect this article to be purchased:

python
# forecast 30-day product demand for specific item/article
df = model.predict(
   f"PREDICT SUM(transactions.price, 0, 30, days) FOR articles.article_id={article_id}"
)
display(df)
markdown
|     | ENTITY | ANCHOR_TIMESTAMP | TARGET_PRED |
| --- | ------ | ---------------- | ----------- |
| 0   | 675662003 | 1600732800000 | 0.000009    |

The likelihood of this article being purchased in the next 30 days is output in the TARGET_PRED column - with an almost zero probability of purchase at 0.000009.

That's okay, all we want right now is to see how to make predictions. Let's see how likely two specific customers are to purchase over the next 90 days.

python
csample = customers_df.iloc[:2].customer_id.tolist()
csample
text
['1935b6baf9d28d1f19b7ffad18a9da418954a9bf38f59336f2f86d7a5615d1d2',
'75ebdc56559b1f2739ce5832bd85a921ba827c72383135bdcc08a616d320e948']

We'll use these two customer IDs.

python
# predict likelihood of two specific users not ordering in the next 90 days
df = model.predict(
   "PREDICT COUNT(transactions.*, 0, 90, days)=0 "
   f"FOR customers.customer_id IN ('{csample[0]}', '{csample[1]}')"
)
display(df)
markdown
ENTITY  ANCHOR_TIMESTAMP    TARGET_PRED False_PROB  True_PROB
0   1935b6baf9d28d1f19b7ffad18a9da418954a9bf38f593...   1600732800000   False   0.672468    0.327532
1   75ebdc56559b1f2739ce5832bd85a921ba827c72383135...   1600732800000   False   0.746204    0.253796

This time, the TARGET_PRED column is a boolean value, indicating whether or not our prediction condition is true or false. In this case, we're asking KumoRFM to predict whether our customers will notorder in the next 90 days (note the =0 in our PREDICT query). So a TARGET_PRED of False means the customer will order.

We also see two more columns here, False_PROB and True_PROB. These columns are the probability of the prediction being False and True, respectively, from 0.0 to 1.0. These are most useful when treated as confidence scores in our prediction.

We've seen how to make predictions with KumoRFM; the next step is integrating this into a fully-fledged e-commerce agent.

Building an Agent

For the core LLM in our agent, we'll use OpenAI and gpt-4.1-mini. This provider and model are completely interchangeable. You can use Anthropic, Mistral, or host locally; it's all possible and easy to swap out.

python
import os
from getpass import getpass

os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY") or \
   getpass("Enter your OpenAI API key: ")

We then generate completions using both async and streaming like so:

python
from openai import AsyncOpenAI

client = AsyncOpenAI()

response = await client.chat.completions.create(
   model="gpt-4.1-mini",
   messages=[
       {"role": "user", "content": "Tell me something interesting about GNNs"}
   ],
   stream=True,
)

async for chunk in response:
   if (token := chunk.choices[0].delta.content) is not None:
       print(token, end="", flush=True)

Now we set up our agent with tools. We use the "no-framework" framework GraphAI.

Using this library, we are expected to create our own tool functions, LLM API calls, etc. The library primarily acts as a graph execution framework without any AI abstractions. With that in mind, we will first define two tools for our agent.

Tool One: Query Dataframes

The first tool runs a namespace exec instance, allowing our LLM to run Python code against our pandas dataframes.

With graphai, we typically define tools with two components: a pydantic BaseModel to outline the tool schema for our agent, and the Python function that will be executed when the tool is called.

python
import json
import pandas as pd
from pydantic import BaseModel, Field
from graphai import node
from graphai.callback import EventCallback

class QueryDataframes(BaseModel):
   """Execute simple filtered queries on the ecommerce dataframes. Will execute code in
   a namespace with the following dataframes:
  
   - transactions_df
   - articles_df
   - customers_df
  
   You can also access pandas library via `pd` for dataframe operations. Ensure you use
   assign the results you need to the `out` variable, otherwise nothing will be returned
   as this will be run with `exec()`. After execution we access the `out` variable and
   return it to you.

   If outputting a dataframe, you must use the .to_markdown() method to output an easily
   readable markdown table.
   """
   query: str = Field(..., description="The python code to execute")


@node(stream=True)
async def query_dataframes(input: dict, state: dict, callback: EventCallback) -> dict:
   try:
       tool_call_args = json.loads(state["events"][-1]["tool_calls"][0]["function"]["arguments"])
       # get dataframes, pandas, and set `out` to None
       namespace = {
           "transactions_df": state["transactions_df"],
           "articles_df": state["articles_df"],
           "customers_df": state["customers_df"],
           "pd": pd,
           "out": None,
       }
       # grab query from LLM to be executed
       query = tool_call_args.get("query")
       if not query:
           raise ValueError("No query provided")
       # remove escaped newlines as it frequently breaks the query
       query = query.replace("\\n", "\n")
       # execute query within predefined namespace
       exec(query, namespace)
       # pull out the `out` value
       out = namespace.get("out")
       if out is None:
           out = "No result returned via the `out` variable"
       content = [{"type": "text", "text": json.dumps(out, default=str)}]
   except Exception as e:
       content = [{
           "type": "text",
           "text": (
               f"Error executing query: {str(e)}. "
               "Please fix your query and trying again."
           )
       }]
   # stream tool output
   await callback.acall(
       type="tool_output",
       params={
           "id": state["events"][-1]["tool_calls"][0]["id"],
           "name": "predict_customer_purchase",
           "arguments": tool_call_args,
           "output": content[0]["text"]
       }
   )
   # Add tool call event to state
   event = {
       "role": "tool",
       "content": content,
       "tool_call_id": state["events"][-1]["tool_calls"][0]["id"]
   }
   state["events"].append(event)
   return {"input": {}}

Tool Two: Query KumoRFM

The second tool will provide access to KumoRFM's PQL queries. For this tool to work, we need to add some guidelines on how to use it for our agent. We'll first grab those. The full prompt used can be found here.

python
import requests

pql_file = requests.get(
   "https://raw.githubusercontent.com/jamescalam/ecommerce-agent/refs/heads/main/api/pluto/prompts/developer.py"
).text
# strip first and last two lines as they contain python boilerplate
pql_reference = "\n".join(pql_file.split("\n")[1:-2])
print(pql_reference)
text
# KumoRFM Predictive Query Language (PQL) Reference

## Overview

Predictive Query Language (PQL) is KumoRFM's declarative SQL-like syntax for defining predictive modeling tasks using the foundation model.

## Core Structure

### Basic Syntax
\`\`\`sql
PREDICT <target>
FOR <entity> IN (<id1>, <id2>, ...)
[WHERE <filter>]
[ASSUMING <conditions>]
\`\`\`

### Minimum Requirements
- **Entity**: Explicit list of entity IDs (required)
- **Target**: What to predict (required)  
- **Filter**: Optional conditions to scale down results

## Main Commands

### PREDICT
Defines the target variable to predict. Must start every PQL query.

\`\`\`sql
PREDICT SUM(transactions.price, 0, 30, days)
PREDICT COUNT(transactions.*, 0, 30, days) = 0
PREDICT FIRST(purchases.type, 0, 7)
\`\`\`

### FOR ... IN
Specifies explicit entity IDs for predictions. KumoRFM requires specific entity lists, not general entity types.

\`\`\`sql
FOR customers.customer_id IN ('cust_123', 'cust_456', 'cust_789')
FOR users.user_id IN ('user_abc', 'user_def')
FOR products.product_id IN ('prod_1', 'prod_2', 'prod_3')
\`\`\`

### WHERE
Applies filtering conditions. Can be used multiple times:
- Entity filters
- Target filters within aggregations  
- Conditional aggregations

\`\`\`sql
WHERE COUNT(transactions.*, -30, 0) > 0
WHERE users.status = 'active'
WHERE transactions.value > 50
\`\`\`

### ASSUMING
Specifies conditions assumed true during prediction time. Used for hypothetical scenarios.

\`\`\`sql
ASSUMING COUNT(notifications.*, 0, 7) > 2
ASSUMING LIST_DISTINCT(coupons.type, 0, 3) CONTAINS '50% off'
\`\`\`

## Aggregation Functions

### Syntax
\`\`\`sql
AGGREGATION_FUNCTION(table.column, start, end, [time_unit])
\`\`\`

**Parameters:**
- **start/end**: Time period boundaries (must be non-negative integers)
- **end > start**: End value must be greater than start value
- **time_unit**: days (default), hours, months

### Available Functions

| Function | Description | Example |
|----------|-------------|---------|
| `AVG()` | Average value | `AVG(sales.amount, 0, 30, days)` |
| `COUNT()` | Count occurrences | `COUNT(transactions.*, -90, 0, days)` |
| `COUNT_DISTINCT()` | Count unique values | `COUNT_DISTINCT(products.id, 0, 7)` |
| `FIRST()` | First value in range | `FIRST(purchases.type, 0, 7)` |
| `LAST()` | Last value in range | `LAST(sessions.status, -30, 0)` |
| `LIST_DISTINCT()` | List of unique values | `LIST_DISTINCT(articles.id, 0, 7)` |
| `MAX()` | Maximum value | `MAX(transactions.amount, 0, 30)` |
| `MIN()` | Minimum value | `MIN(orders.value, -7, 0)` |
| `SUM()` | Sum of values | `SUM(purchases.price, 0, 30, days)` |

## Boolean Operators

### Comparison Operators
- `=` Equal
- `!=` Not equal
- `<` Less than
- `<=` Less than or equal
- `>` Greater than
- `>=` Greater than or equal

### Logical Operators
- `AND` Logical AND
- `OR` Logical OR
- `NOT` Logical NOT

### String Operations
- `CONTAINS` String contains
- `STARTS_WITH` String starts with
- `ENDS_WITH` String ends with
- `IN` Value in list
- `LIKE` Pattern matching

## Time Windows

### Time Reference System
- **Positive values**: Future time (0 = now, 30 = 30 days future)
- **Negative values**: Past time (-90 = 90 days ago)
- **Zero point**: Prediction time

### Supported Time Units
- `days` (default)
- `hours`
- `months`

### Examples
\`\`\`sql
-- Last 90 days
COUNT(transactions.*, -90, 0, days)

-- Next 30 days  
COUNT(transactions.*, 0, 30, days)

-- Last 3 months
SUM(sales.amount, -3, 0, months)

-- Next week in hours
COUNT(sessions.*, 0, 168, hours)
\`\`\`

## Task Types

Kumo automatically determines task type based on query structure:

### 1. Regression
Predicts continuous real number values.

\`\`\`sql
PREDICT SUM(transactions.price, 0, 30, days)
FOR customers.customer_id IN ('cust_123', 'cust_456')
\`\`\`

### 2. Binary Classification
Predicts true/false outcomes using comparison operators.

\`\`\`sql
PREDICT COUNT(transactions.*, 0, 30, days) = 0
FOR customers.customer_id IN ('cust_123', 'cust_456')
WHERE COUNT(transactions.*, -90, 0, days) > 0
\`\`\`

### 3. Multi-class/Multi-label Classification
Predicts class labels.

\`\`\`sql
PREDICT FIRST(purchases.type, 0, 7) 
FOR users.user_id IN ('user_abc', 'user_def')
\`\`\`

### 4. Link Prediction
Predicts lists of items using ranking.

\`\`\`sql
PREDICT LIST_DISTINCT(transactions.article_id, 0, 7) 
RANK TOP 10 FOR customers.customer_id IN ('cust_123', 'cust_456')
\`\`\`

## Temporal vs Static Queries

### Temporal Queries
- Predict aggregations over specific time windows
- Require time columns
- Handle complex temporal data splitting
- Prevent data leakage through proper time-based splits

\`\`\`sql
PREDICT SUM(transactions.price, 0, 30, days)
FOR customers.customer_id IN ('cust_123', 'cust_456')
\`\`\`

### Static Queries
- Do not require time columns
- Use random 80/10/10 data split
- Simpler data handling

\`\`\`sql
PREDICT user.category
FOR users.user_id IN ('user_abc', 'user_def')
WHERE users.active = true
\`\`\`

## Advanced Features

### Nested Filters
\`\`\`sql
COUNT(orders.* WHERE orders.status = 'completed' AND orders.value > 50, -30, 0)
\`\`\`

### Multiple Conditions
\`\`\`sql
PREDICT COUNT(sessions.*, 0, 7) > 10 OR SUM(transactions.value, 0, 5) > 100
FOR users.user_id IN ('user_123', 'user_456')
WHERE users.status = 'active' AND COUNT(sessions.*, -30, 0) > 5
\`\`\`

### Inline Filters
\`\`\`sql
SUM(transactions.price WHERE transactions.category = 'electronics', 0, 30)
\`\`\`

### Column References
- Format: `table.column` or `table.*`
- Supports dot notation for nested fields
- Use `*` to reference all columns/records

### Ranking (Link Prediction)
\`\`\`sql
PREDICT LIST_DISTINCT(products.id, 0, 30)
RANK TOP 5 FOR customers.customer_id IN ('cust_123', 'cust_456')
\`\`\`

## Complete Examples

### Customer Churn Prediction (Binary Classification)
\`\`\`sql
PREDICT COUNT(transactions.*, 0, 30, days) = 0
FOR customers.customer_id IN ('cust_123', 'cust_456', 'cust_789')
WHERE COUNT(transactions.*, -90, 0, days) > 0
\`\`\`
*Predicts if active customers will churn in next 30 days*

### Revenue Forecasting (Regression)
\`\`\`sql
PREDICT SUM(transactions.price, 0, 30, days)
FOR customers.customer_id IN ('gold_cust_1', 'platinum_cust_2')
\`\`\`

### High-Value Transaction Prediction
\`\`\`sql
PREDICT COUNT(transactions.* WHERE transactions.value > 100, 0, 7)
FOR users.user_id IN ('user_abc', 'user_def', 'user_ghi')
WHERE COUNT(transactions.*, -30, 0) > 5
\`\`\`

### Product Recommendation (Link Prediction)
\`\`\`sql
PREDICT LIST_DISTINCT(purchases.product_id, 0, 14)
RANK TOP 10 FOR customers.customer_id IN ('cust_123', 'cust_456')
WHERE COUNT(sessions.*, -7, 0) > 0
\`\`\`

### Multi-Category Classification
\`\`\`sql
PREDICT FIRST(transactions.category, 0, 30)
FOR customers.customer_id IN ('active_cust_1', 'active_cust_2')
\`\`\`

### Complex Conditional Prediction
\`\`\`sql
PREDICT (COUNT(premium_features.*, 0, 30) > 5) AND (SUM(usage.minutes, 0, 30) > 1000)
FOR users.user_id IN ('trial_user_1', 'trial_user_2')
WHERE users.subscription_type = 'trial'
ASSUMING COUNT(notifications.*, 0, 7) > 3
ASSUMING marketing_campaigns.type = 'premium_upgrade'
\`\`\`

### Product Recommendation with Ranking
\`\`\`sql
PREDICT LIST_DISTINCT(transactions.article_id, 0, 30)
RANK TOP 10 FOR customers.customer_id IN ('cust_123', 'cust_456')
\`\`\`

### Predict Most Valuable Customers Over Next 30 Days From Sample
\`\`\`sql
PREDICT SUM(transactions.price, 0, 30, days)
FOR customers.customer_id IN ('cust_123', 'cust_456')
\`\`\`

*IMPORTANT: when using RANK TOP K you CANNOT set K=1, so if finding the top customer just use RANK TOP 2*

### Predict Most Likely Purchases For Customer Over Next 30 Days
\`\`\`sql
PREDICT LIST_DISTINCT(transactions.article_id, 0, 30, days)
RANK TOP 5 FOR customers.customer_id = 'cust_123'
\`\`\`

## Syntax Rules & Constraints

### Time Window Rules
1. Both start and end must be non-negative integers
2. End value must be greater than start value
3. Time unit defaults to 'days' if not specified

### Entity Rules
1. Must specify explicit entity IDs using IN clause
2. Use `FOR table.column IN (id1, id2, ...)` format
3. Entity IDs must exist in your graph schema
4. Cannot use `FOR EACH` - must provide specific entity list

### Target Rules
1. Must use `PREDICT` as first command
2. Can combine multiple conditions with AND/OR/NOT
3. Comparison operators create classification tasks
4. Raw aggregations create regression tasks

### Filter Rules
1. WHERE clauses can be nested and combined
2. Support both static and temporal conditions
3. Can filter at entity level or within aggregations
4. Multiple WHERE clauses are combined with AND

## Best Practices

### Performance Optimization
1. **Filter Early**: Use WHERE clauses to reduce computation
2. **Choose Appropriate Time Windows**: Match business context
3. **Entity Selection**: Filter entities to relevant subset

### Query Design
1. **Start Simple**: Begin with basic queries, add complexity gradually
2. **Test Incrementally**: Validate each component before combining
3. **Clear Intent**: Make prediction goal explicit in target definition

### Temporal Considerations
1. **Avoid Data Leakage**: Use proper time boundaries
2. **Balance Splits**: Ensure sufficient data in each time period
3. **Business Logic**: Align time windows with business cycles

### Task Type Selection
1. **Regression**: Use for continuous predictions
2. **Classification**: Use comparison operators for categories
3. **Link Prediction**: Use LIST_DISTINCT with RANK TOP K
4. **Multi-class**: Use FIRST/LAST for category prediction

## Error Prevention

### Common Mistakes
1. **Invalid Time Windows**: Ensure end > start and both ≥ 0
2. **Missing Entities**: Verify entity exists in graph
3. **Type Mismatches**: Match aggregation functions to data types
4. **Data Leakage**: Don't reference future data in historical queries

### Validation Checklist
- [ ] Entity specified with FOR table.column IN (...) 
- [ ] Target defined with PREDICT
- [ ] Time windows follow start < end rule
- [ ] Filters reference valid columns
- [ ] Syntax follows SQL-like structure
- [ ] Time boundaries prevent data leakage
- [ ] No use of FOR EACH (KumoRFM requires explicit entity lists)

Given the size of these guidelines, we'll insert them directly into our system/developer message rather than our tool description.

Now we define the KumoRFM tool like so:

python
class KumoRFM(BaseModel):
   """This tool allows you to write any PQL query to the KumoRFM model.
   """
   query: str = Field(..., description="The PQL query to predict")


@node(stream=True)
async def kumorfm(input: dict, state: dict, callback: EventCallback) -> dict:
   try:
       tool_call_args = json.loads(state["events"][-1]["tool_calls"][0]["function"]["arguments"])
       query = tool_call_args.get("query")
       if not query:
           raise ValueError("No query provided")
      
       df = state["kumorfm"].predict(query)
       out = df.to_dict(orient="records")
       content = [{"type": "text", "text": json.dumps(out)}]
   except Exception as e:
       content = [{"type": "text", "text": str(e)}]
   # stream tool output
   await callback.acall(
       type="tool_output",
       params={
           "id": state["events"][-1]["tool_calls"][0]["id"],
           "name": "predict_customer_purchase",
           "arguments": tool_call_args,
           "output": content[0]["text"]
       }
   )
   event = {
       "role": "tool",
       "content": content,
       "tool_call_id": state["events"][-1]["tool_calls"][0]["id"]
   }
   state["events"].append(event)
   return {"input": {}}

For our LLM to be able to read our tool schemas, we will be using the built-in FunctionSchema method. We can use this to consume our pydantic base models and later output them into an OpenAI-friendly schema format.

python
from graphai.utils import FunctionSchema


query_df_schema = FunctionSchema.from_pydantic(QueryDataframes)
query_df_schema.name = "query_dataframes"
kumorfm_schema = FunctionSchema.from_pydantic(KumoRFM)
kumorfm_schema.name = "kumorfm"


tools = [query_df_schema, kumorfm_schema]

The schemas can then be created using the to_openai method (when using OpenAI models).

python
tools[1].to_openai(api="completions")
json
{'type': 'function',
'function': {'name': 'kumorfm',
 'description': 'This tool allows you to write any PQL query to the KumoRFM model.\n    ',
 'parameters': {'type': 'object',
  'properties': {'query': {'description': 'The PQL query to predict',
    'type': 'string'}},
  'required': ['query']}}}

Defining our Agent Graph

Graphs are constructed from nodes and edges, with various special nodes and edges within that broader structure. For our use case, we don't need to dive into anything too exotic. All we need to do is define our nodes and construct our graph to join those together.

Nodes

The graph will consist of five total nodes, two of which we have already defined with our tools. The remaining three are:

  • llm router node will contain the logic for calling our LLM and handling our LLM's tool-calling decisions.
  • start and end nodes are graphai-specific boilerplate; they act as the entry and exit points of our graph

We will first define the llm router:

python
from graphai import router


@router(stream=True)
async def llm(input: dict, state: dict, callback: EventCallback) -> dict:
   # get client initialized in lifespan
   client = state["client"]
   # call openai (or another provider as preferred)
   stream = await client.chat.completions.create(
       model="gpt-4.1-mini",
       messages=state["events"],
       tools=[x.to_openai(api="completions") for x in tools],
       stream=True,
       seed=9000,  # keep consistent results
       parallel_tool_calls=False,
   )
   direct_answer: str = ""
   tool_call_args = ""
   async for chunk in stream:
       if (token := chunk.choices[0].delta.content) is not None:
           # this handles direct text output
           direct_answer += token
           await callback.acall(token=token)
       # handle tool calls
       tool_calls_out = chunk.choices[0].delta.tool_calls
       if tool_calls_out and (tool_name := tool_calls_out[0].function.name) is not None:
           # this handles the initial tokens of a tool call
           tool_call["id"] = tool_calls_out[0].id
           tool_call["name"] = tool_name
           # we can return the tool name
           await callback.acall(
               type="tool_call",
               params=tool_call
           )
       elif tool_calls_out and (tool_args := tool_calls_out[0].function.arguments) is not None:
           # this handles the arguments of a tool call
           tool_call_args += tool_args
           # we can output these too
           await callback.acall(
               type="tool_args",
               params={
                   **tool_call,
                   "arguments": tool_args
               }
           )
   if direct_answer:
       # if we got a direct answer we create a standard assistant message
       state["events"].append(
           {
               "role": "assistant",
               "content": direct_answer,
           }
       )
       # choice controls the next node destination
       choice = "end"
   elif tool_call:
       # if we got a tool call we create an assistant tool call message
       state["events"].append(
           {
               "role": "assistant",
               "tool_calls": [{
                   "id": tool_call["id"],
                   "type": "function",
                   "function": {
                       "name": tool_call["name"],
                       "arguments": tool_call_args,
                   }
               }]
           }
       )
       choice = tool_call["name"]
   return {"input": input, "choice": choice}

And now our two boilerplate start and end nodes:

python
@node(start=True)
async def start(input: dict) -> dict:
   return {"input": input}

@node(end=True)
async def end(input: dict, state: dict) -> dict:
   return {"output": state["events"]}

Connecting the Nodes

Our broader graph contains the logic that connects our various nodes and defines the initial state of the workflow. We will first define our state, which will consist of our initial system/developer message, our KumoRFM instance, and the three H&M dataframes.

We will begin by defining the developer message:

python
dev_message = {
   "role": "developer",
   "content": (
       "You are a helpful assistant that uses the various tools and "
       "KumoRFM integration to answer the user's analytics questions "
       "about our H&M ecommerce dataset."
       "\n"
       "When answering questions, you may use the various tools "
       "multiple times before answering to the user. You should aim "
       "aim to have all of the information you need from the tools "
       "before answering the user."
       "\n"
       "There is a limit of 30 steps to each interaction, measured "
       "as the number of tool calls made between the user's most "
       "recent message and your response to the user. Keep that limit "
       "in mind but ensure you are still thorough in your analysis."
       # remember to include our KumoRFM reference guide
       "\n\n"
       "## PQL (Predictive Query Language) Reference\n"
       "Use this syntax when working with KumoRFM predictions:\n"
       "\n"
       f"{pql_reference}"
   )
}

And now our initial state:

python
initial_state = {
   "events": [dev_message],
   "kumorfm": model,
   "transactions_df": transactions_df,
   "articles_df": articles_df,
   "customers_df": customers_df,
   "client": client
}

This state can be added to our graph using the set_state method. Alongside this, we will also be adding the various nodes and routers to our graph with add_node and add_router. We then set all edges with add_edge. Finally, once our graph is fully defined, we compile it.

python
from graphai import Graph


# create graph
graph = (
   Graph(max_steps=30)
   .set_state(initial_state)
   .add_node(start)
   .add_node(llm)
   .add_node(kumorfm)
   .add_node(query_dataframes)
   .add_node(end)
   .add_router(
       sources=[start],
       router=llm,
       destinations=[
           kumorfm,
           query_dataframes,
           end
       ]
   )
   .add_edge(kumorfm, llm)
   .add_edge(query_dataframes, llm)
   .add_edge(llm, end)
   .compile()
)

Testing the E-commerce Agent

The agent is now fully defined, and we can start using it. We call it with await graph.execute like so:

python
import asyncio

cb = EventCallback()
# add our input message to the state
graph.update_state({
   "events": [
       *graph.state["events"],
       {
           "role": "user",
           "content": f"Can you predict the demand for article {article_id} over the next 30 days"
       }
   ]
})
# now execute
_ = asyncio.create_task(
   graph.execute({"input": {}}, callback=cb)
)

# and (optionally) stream the output
async for event in cb.aiter():
   if str(event.type) == "callback":
       # this indicates direct text output
       print(event.token, end="", flush=True)
   elif event.type == "tool_call":
       # this indicates the first event in a tool call
       # this contains tool name and ID
       print(event.params["name"], flush=True)
   elif event.type == "tool_args":
       # this indicates the arguments of a tool call
       print(event.params["arguments"], end="", flush=True)
   elif event.type == "tool_output":
       # this indicates the output of a tool call
       # these can be very long so we'll avoid printing them
       # but feel free to try
       #print(event.params["output"])
       print()
       pass
python
import asyncio

cb = EventCallback()
# add our input message to the state
graph.update_state({
   "events": [
       *graph.state["events"],
       {
           "role": "user",
           "content": f"Can you predict the demand for article {article_id} over the next 30 days"
       }
   ]
})
# now execute
_ = asyncio.create_task(
   graph.execute({"input": {}}, callback=cb)
)

# and (optionally) stream the output
async for event in cb.aiter():
   if str(event.type) == "callback":
       # this indicates direct text output
       print(event.token, end="", flush=True)
   elif event.type == "tool_call":
       # this indicates the first event in a tool call
       # this contains tool name and ID
       print(event.params["name"], flush=True)
   elif event.type == "tool_args":
       # this indicates the arguments of a tool call
       print(event.params["arguments"], end="", flush=True)
   elif event.type == "tool_output":
       # this indicates the output of a tool call
       # these can be very long so we'll avoid printing them
       # but feel free to try
       #print(event.params["output"])
       print()
       pass

Here we can see a few print statements as our agent is calling the kumorfm node/tool multiple times. The reason it is doing this is that the syntax in queries one and two is incorrect, so an error would have been returned to our agent, and a correction to the query would have been made before attempting again. On the third try, the query syntax is correct, and we see the text response from our agent follow. Our agent predicts an extremely low demand for this specific article.

Let's try some more queries, but first, we can write a helper function to keep the chat history tracking and agent execution calls a little simpler:

python
async def chat(content: str):
   cb = EventCallback()
   graph.update_state({
       "events": [
           *graph.state["events"],
           {"role": "user", "content": content}
       ]
   })


   _ = asyncio.create_task(
       graph.execute({"input": {}}, callback=cb)
   )
  
   async for event in cb.aiter():
       if str(event.type) == "callback":
           # this handles direct text output
           print(event.token, end="", flush=True)
       elif event.type == "tool_call":
           # this indicates the first event in a tool call
           # this contains tool name and ID
           print(event.params["name"], flush=True)
       elif event.type == "tool_args":
           # this indicates the arguments of a tool call
           print(event.params["arguments"], end="", flush=True)
       elif event.type == "tool_output":
           # this indicates the output of a tool call
           # these can be very long so we'll avoid printing them
           # but feel free to try
           #print(event.params["output"])
           print()
           pass

Now let's try:

python
await chat(
   "What other useful info can you give me? I'm preparing our monthly marketing "
   "emails"
)

Our agent queries the dataframes once to understand the available data:

text
query_dataframes
{"query":"# Let's look into some potentially useful insights for marketing emails\n# 1. Top selling articles in last 30 days\n# 2. Customer segments based on purchase behavior\n# 3. Popular categories\n\n# Top selling articles in last 30 days\ntop_selling_articles = transactions_df[transactions_df['t_dat'] > (transactions_df['t_dat'].max() - pd.Timedelta(days=30))]\ntop_selling_articles = top_selling_articles.groupby('article_id').agg({'price': 'sum'}).reset_index().sort_values(by='price', ascending=False).head(10)\n\n# Popular categories in last 30 days\npopular_categories = transactions_df[transactions_df['t_dat'] > (transactions_df['t_dat'].max() - pd.Timedelta(days=30))]\npopular_categories = popular_categories.merge(articles_df[['article_id', 'product_type_name']], on='article_id')\npopular_categories = popular_categories.groupby('product_type_name').agg({'price': 'sum'}).reset_index().sort_values(by='price', ascending=False).head(5)\n\n# Customer segments based on purchase frequency and recency\ncustomers_last_purchase = transactions_df.groupby('customer_id').agg({'t_dat': 'max', 'price': 'sum', 'article_id': 'count'}).reset_index()\ncustomers_last_purchase['days_since_last_purchase'] = (transactions_df['t_dat'].max() - customers_last_purchase['t_dat']).dt.days\n\n# Creating simple segments\nfrequent_customers = customers_last_purchase[customers_last_purchase['article_id'] > 5]\ninfrequent_customers = customers_last_purchase[(customers_last_purchase['article_id'] <= 5) & (customers_last_purchase['days_since_last_purchase'] <= 30)]\ndormant_customers = customers_last_purchase[customers_last_purchase['days_since_last_purchase'] > 30]\n\nout = {\n    'top_selling_articles': top_selling_articles,\n    'popular_categories': popular_categories,\n    'frequent_customers_count': frequent_customers.shape[0],\n    'infrequent_customers_count': infrequent_customers.shape[0],\n    'dormant_customers_count': dormant_customers.shape[0]\n}"}

Then the agent uses this data to produce the following answer:

text
For your monthly marketing emails, here are some useful insights:

1. Top Selling Articles in the Last 30 Days:
| article_id | price   |
|------------|---------|
| 894763002  | 1.09322 |
| 902419001  | 0.35525 |
| 891898001  | 0.33890 |
| 914805002  | 0.30498 |
| 904625001  | 0.29649 |
| 863583002  | 0.26807 |
| 906169002  | 0.26434 |
| 873279006  | 0.25414 |
| 926638001  | 0.23898 |
| 932383001  | 0.23722 |

2. Popular Product Categories in the Last 30 Days:
| product_type_name | price   |
|-------------------|---------|
| Trousers          | 2.21549 |
| Sweater           | 2.17353 |
| Dress             | 1.33981 |
| Coat              | 1.26102 |
| Jacket            | 0.78636 |

3. Customer Segments by Purchase Behavior:
- Frequent Customers (more than 5 purchases): 634
- Infrequent Customers (5 or fewer purchases but recent): 12
- Dormant Customers (no purchases in last 30 days): 750

You can tailor your marketing campaigns targeting frequent customers with loyalty offers, infrequent customers with engagement offers, and dormant customers with reactivation campaigns. Featuring the top-selling articles and popular categories in emails might increase sales effectively.

Let's continue the conversation:

python
await chat("Can you help me find customers likely to churn?")

Both query_dataframes and kumorfm tools are used this time:

text
 kumorfm
{"query":"PREDICT COUNT(transactions.*, 0, 30, days) = 0\nFOR customers.customer_id IN (SELECT DISTINCT customer_id FROM transactions_df) \nWHERE COUNT(transactions.*, -90, 0, days) > 0"}
query_dataframes
{"query":"# Fetch all distinct customer_ids from transactions\ndistinct_customers = transactions_df['customer_id'].unique().tolist()\nout = distinct_customers[:50]  # Limiting to first 50 for prediction sample"}
kumorfm
{"query":"PREDICT COUNT(transactions.*, 0, 30, days) = 0\nFOR customers.customer_id IN ('8ef783d3815860cc145c2923f740f88728e373f2c3cb37aa638c15810ac531cc', '2d545e697d8cf36558c81eb56c1776cb30f893585ed21aa1531863c727a42fbb', '4330e0469755c75b92a58a5f5002c729479147d470e6cc42a3206572352a1e28', '26e237aa2bc47082d06d49af58bbd65785cb979daf3110313d1484b95adac609', '8df45859ccd71ef1e48e2ee9d1c65d5728c31c46ae957d659fa4e5c3af6cc076', '59470fe7e847d5c05976da6c41fd27fa221b1fb7f7e3b76d2509994011435375', 'd9d809b2a22dfe4afcbe5351c5c3ca2ac6f375ae0dba65156ec9ea422428053b', '01c19c0ba392de6d2bee657a616eca254d8fa6d06dde299b73d4276381b54554', '03d0011487606c37c1b1ed147fc72f285a50c05f00b9712e0fc3da400c864296', 'c4e748d5bf4f10c86410d8b0cf62535ace6b502a80ed253ab4328f3eb3ca32ca', 'fc4842d6365813761635d..."}
kumorfm
{"query":"PREDICT COUNT(transactions.*, 0, 30, days) = 0\nFOR customers.customer_id IN ('8ef783d3815860cc145c2923f740f88728e373f2c3cb37aa638c15810ac531cc', '2d545e697d8cf36558c81eb56c1776cb30f893585ed21aa1531863c727a42fbb', '4330e0469755c75b92a58a5f5002c729479147d470e6cc42a3206572352a1e28', '26e237aa2bc47082d06d49af58bbd65785cb979daf3110313d1484b95adac609', '8df45859ccd71ef1e48e2ee9d1c65d5728c31c46ae957d659fa4e5c3af6cc076', '59470fe7e847d5c05976da6c41fd27fa221b1fb7f7e3b76d2509994011435375')\nWHERE COUNT(transactions.*, -90, 0, days) > 0"}

Our agent's final response is:

text
I predicted churn likelihood for a sample of customers who have purchased in the last 90 days. Here are the results for six customers:


- Customer 8ef783d3815860cc145c2923f740f88728e373f2c3cb37aa638c15810ac531cc has about 70.9% probability to churn (not make a purchase) in next 30 days.
- Customer 2d545e697d8cf36558c81eb56c1776cb30f893585ed21aa1531863c727a42fbb has about 60.3% probability to churn.
- Customer 4330e0469755c75b92a58a5f5002c729479147d470e6cc42a3206572352a1e28 has about 74.9% probability to churn.
- Customer 26e237aa2bc47082d06d49af58bbd65785cb979daf3110313d1484b95adac609 has about 60.1% probability to churn.
- Customer 8df45859ccd71ef1e48e2ee9d1c65d5728c31c46ae957d659fa4e5c3af6cc076 has about 38.3% probability to churn.
- Customer 59470fe7e847d5c05976da6c41fd27fa221b1fb7f7e3b76d2509994011435375 has about 72.0% probability to churn.


I can continue and extract more customers likely to churn or analyze churn for specific segments if you want.

We can continue the conversation with our agent for as long as we need - and in the example notebookwe do just that. But the code and the approach is the same, so we won't continue repeating ourselves here.

Over the several steps in our notebook example, we managed to produce a high-quality and personalized marketing email for our customer, using a process that can be easily run again and again for many customers.

text
# final agent email output
# ------------------------

Subject: Thought You’d Like These Cozy New Arrivals

Hi there,

Since you’ve shown a love for cozy sweaters and hoodies, we thought you’d be interested in these picks we think you'd really like:

- PE - CLARA SCARF [703737001] — A soft and stylish scarf that'll keep you warm and add a touch of elegance to any outfit. (You'll see an image of this beautiful scarf in the email!)

- Love Lock Down Dress [633208001] — Perfect for both casual days and nights out, this dress pairs well with your favorite sweaters and jackets.

- Joel Light Down Jacket [659460002] — Stay comfortably warm while looking sharp with this lightweight yet effective jacket.

We hope one (or all!) of these catches your eye. We’d love to welcome you back soon — and who knows, there might even be a little surprise waiting for you when you do.

Take care,

The H&M Team

By integrating LLMs with KumoRFM's real-time predictive analytics, we've built a truly unique and powerful e-commerce agent. In our example, we demonstrated it's use as an "internal marketing copilot", but with this exact dataset and some small prompt tweaks, we could easily deploy an agent like this into a business analytics copilot, internal FAQ agent, or even serve the agent direct-to-consumers as an online shopping assistant.

Merging dynamic LLMs with KumoRFM's predictive capability allows us to do all of this and much more, both beyond these described e-commerce use-cases, and even beyond e-commerce into many other industries.

Join our community on Discord.

Connect with developers and data professionals, share ideas, get support, and stay informed about product updates, events, and best practices.

Join the discussion