Upcoming Webinar: "Build Accurate ML Models, Faster with AI" Register now

03/29/2024

Graph Neural Networks: How Kumo’s Architecture Resolves Challenges in Production

Moving Graph Neural Networks to Production

The following is a typical workflow for training a single pquery. Upon pquery creation, Kumo first identifies the data dependencies and fetches/caches relevant data from the data source, taking into account these dependencies. Additionally, statistical computations and customized logic are executed to enhance the understanding of the data. Concurrently, features and edges are transformed into a format optimized for training, with a particular focus on facilitating mini-batch retrieval. These initial steps can be shared among pqueries with identical dependencies. Subsequent workflows become more specific to each individual pquery.

Kumo’s pquery compiler interprets the PQL and deduces the type of ML task. Then, it formulates a query execution plan based on the task requirements and data attributes. Subsequent workflows utilize this query execution plan as an input for target table creation and model training using the designated AutoML configuration. Crucially, Kumo’s AutoML processes initiate numerous training jobs and additional workflows in the pursuit of identifying the optimal model. 

In short, multiple backend processes are involved in training a single pquery, and running many pqueries simultaneously requires orders of magnitude more backend processes. However, Kumo is designed to scale seamlessly and on-demand to numerous data and compute intensive workflows, abstracting this complexity to the end user. 

Design principles of Kumo’s Platform Architecture

Scalability

To effectively manage a significant volume of intricate concurrent workflows, Kumo scales by using a microservice-based architecture. and conventional cluster management tools like K8s.

Separation of storage and compute

Well-defined workflows are crucial, drawing inputs from and delivering outputs to AWS S3. This practice upholds the principle of compute and storage segregation, and adherence to this principle facilitates the independent scaling of compute resources from storage, promoting efficient resource utilization and enhancing flexibility.

Flexibility

To accommodate a flexible microservice architecture, AWS instances are custom selected based on distinct workload demands. The potential for diverse data processing characteristics and ML workflows within Kumo calls for a mix of GPU, memory-optimized, and IO-optimized instances with large SSDs. This custom tailored approach to instance selection is imperative for maximizing ML performance and cost-effectiveness.

Incremental processing

Kumo is designed to handle data on the scale of tens of terabytes. For ongoing operations, cold starts following each data update are excessively costly and time-consuming. For this reason, Kumo processes newly incoming data and materializes graphs and features incrementally . This incremental approach is not only crucial for optimizing resource utilization but also for minimizing the overall turnaround time for training and predictions.

Kumo’s Platform Architecture

Kumo’s overall platform architecture can be broken down into four major components:

Control Plane

The control plane is the “always on” element that runs the REST API service, manages metadata, and orchestrates workflows. Additionally, the control plane houses the pquery planner/compiler that transforms the user’s pquery into an execution plan guided by the workflow orchestrator.

Data Engine

The data engine interfaces with the data sources and is responsible for data ingestion/caching, inferring column semantics, and computing statistical insights for table column edges to enhance data comprehension. Moreover, the data engine materializes edges and features from raw data into artifacts utilized by the graph engine for neighbor sampling and feature serving, respectively.

Compute Engine

The compute engine is the ML workhorse, with PyG at its core. This component is responsible for executing model training jobs for all experiments and pqueries. Additionally, the compute engine carries out the inference jobs that generate predictions or embeddings. During the training process, the compute engine operates on mini-batches of data, where each example in a mini-batch comprises a sub-graph containing a node and a subset of its neighborhood. It’s worth noting that the compute engine is the only component that requires GPU instances.

Graph Engine

The graph engine functions as an independent component that provides the following two critical services to the compute engine. First, it facilitates graph neighbor sampling; second, it provides feature servicing. These two services are essential for the compute engine to compile mini-batches for training purposes. The graph engine operates as a separately scalable shared service, accessible to all trainer instances within the compute engine.

Challenges that Kumo’s Architecture solves

Control Plane

Kumo is designed to handle multiple pqueries concurrently across potentially numerous graphs. Pquery execution involves the orchestration of a multitude of workflows, including secure data ingestion into caches, statistical computation, and the materialization of features and edges for the graph engine. Subsequently, additional workflows are initiated to automatically generate training and validation tables, train models, and conduct inference to generate model outputs.

To support these operations, the control plane uses a central orchestrator equipped with dynamic, highly scalable task queues capable of executing thousands of stateful workflows and activities concurrently. After thorough evaluation of various orchestrator options, Temporal’s distributed workflow orchestration platform was selected to power Kumo’s workflow orchestration processes. Additionally, the control plane incorporates a metadata manager and enterprise-grade security and access controls, with metadata stored in a highly available transactional database. Moreover, the control plane oversees all aspects of graph management, including scheduling incremental updates and versioning, and provides standard MLOps functionality such as model versioning and tools for monitoring data and model quality.

Data Engine

As previously mentioned, the data engine assumes responsibility for all processing and transformations of raw data within the system. Utilizing secure connectors, the data engine retrieves data from data sources and internally caches them in a standardized format. Throughout this process, the data engine deduces the semantic significance of columns and calculates comprehensive statistics at the table, column, and edge levels for enhancing data comprehension. This information is later used by the pquery planner to inform feature encoding decisions and define the AutoML search space. 

To facilitate efficient pquery execution, certain components require a columnar data format, while others require a row-oriented data format. For example, the data engine relies on columnar-format data for statistical computations, semantic inference, and edge materialization, whereas the feature store in the graph engine requires row-oriented data for rapid feature retrieval and mini-batch creation. The data engine efficiently generates data artifacts and manages their versioning and lifecycles in the cache.

The data engine’s primary function is feature materialization, or the conversion of raw data features from tables into a row-oriented format suitable for random feature serving. This process is not only data-intensive but also generates artifacts of considerable size; subsequently, the data engine streamlines the loading of these artifacts into the feature store to minimize additional processing overhead and bolster lightweight loading. Moreover, to minimize the loading time for materialized features, the data engine directly materializes features into the RocksDB internal SSD file format, as the feature store is implemented as a RocksDB key-value store. These SSD files can then be loaded directly by the feature store, resulting in optimized processing efficiency and loading speeds. During feature materialization, nodes are assigned unique indices that are also used when materializing lists of edges between connected tables. This approach ensures that edge sizes are fixed for enhanced storage efficiency. Lastly, feature materialization can be easily parallelized across tables and partitions within a table.

Another major data engine function is graph or edge materialization. To materialize a graph, the data engine generates a list of edges for every pair of connected tables within the graph. Graph creation in this step is automatic, so users do not have to provide their own graph. The edges are generated by performing joins between tables based on linking keys, and outputting in COO format enhances data parallelism during edge generation. Furthermore, as new data is introduced, the materialized edges are updated incrementally, resulting in faster and more efficient updates.

The data engine is responsible for a significant amount of activity, especially when dealing with data on the scale of tens of terabytes. Fortunately, all of this activity can be distributed and parallelized; that said, measures are taken to ensure that data processing within the data engine remains completely out of core (i.e., the entirety of the input data is not present in memory at any given time). To achieve this, the data engine utilizes PySpark with EMR on EKS, which allows the data engine to autoscale based on its computational requirements. EMR on EKS also seamlessly integrates with the control plane, which operates on Kubernetes.

Additionally, the data engine employs Apache Livy to manage Spark jobs. Livy provides a straightforward REST interface for submitting jobs as code snippets, as well as an intuitive means of retrieving results synchronously or asynchronously. Livy simplifies Spark context management and offers robust support for concurrent, long-running Spark contexts that can be reused across multiple jobs and clients. Moreover, Livy simplifies the sharing of cached RDDs across jobs and clients, resulting in more efficient job execution. Lastly, the data engine features a lightweight Livy driver that dynamically launches Spark jobs on-demand in EMR on EKS. These job launches from the Livy driver are triggered by workflow activities scheduled by the orchestrator.

Compute Engine

The compute engine serves as the primary ML powerhouse within Kumo’s GNN platform and handles AutoML-driven model training and model output generation from inference processes. Consider a typical scenario that involves a data pipeline for a single AutoML training job:a GPU-based trainer instance, with PyG as its core, continuously retrieves mini-batches of training examples from the graph engine. The trainer instance trains on these mini-batches and ensures that feature serving and mini-batch production are in sync with the training throughput. 

Mini-batch construction is a two-step process. Initially, the neighborhood is sampled for a specified set of nodes, and a subgraph is constructed for each of these nodes. Features are then retrieved for the nodes within these subgraphs, and a mini-batch of examples is constructed for training purposes. This process repeats for each step of model training, with Kumo supporting various sampling strategies. Although specific sampling methods may differ across trainer jobs, the sampler can ensure temporal consistency in all cases if event timestamps are available. 

An AutoML search algorithm determines trainer execution configuration based on data, task, and past performance metrics. The search algorithm’s objective is to ascertain that the optimal set of parameters is learned for any given task. At any given time, multiple trainer jobs per pquery may be awaiting execution, with many pqueries in progress simultaneously. To maintain a reasonable turnaround time, the compute engine must launch and execute numerous trainers in parallel. This scalability requirement is crucial for the compute engine to efficiently scale up the number of trainer jobs as needed.

To efficiently scale to a large number of trainers in parallel, Kumo relies on two fundamental concepts. First, a separation between the graph engine and compute engine allows for independent scaling of the two components. By sharing the feature and graph stores among multiple training jobs, Kumo can execute numerous training jobs concurrently with minimal resource demands on the GPU nodes themselves. These trainers communicate independently with the feature and graph stores via a shared mini-batch fetcher that not only retrieves but caches the requested mini-batches. In practice, this mini-batch fetcher implementation is highly beneficial for efficient training

Second, autoscaling trainers intelligently select the type of GPU instance used during training. The compute engine features a lightweight driver that manages a K8s cluster of trainer instances. Once prompted by the AutoML searcher, this driver can dynamically launch a trainer as needed, selecting the instance type that best suits the specific training job configuration. The resulting architecture is highly scalable, resource-efficient, and cost-effective. Moreover, this approach allows for exceptional flexibility and facilitates the seamless integration of new ML methodologies.

To enable the sharing of the feature store across multiple trainer jobs, Kumo stores only raw features in the feature store. Feature encoding occurs dynamically and within the compute engine. The specific encoding techniques, which form part of the AutoML search space, are determined by both the data and the task at hand. Advanced ML practitioners can choose to override feature encoders, along with other settings in the generated AutoML configuration. Kumo supports a wide range of encoding types out of the box and continues to add supported types on an ongoing basis.

Graph Engine

The following section explores the graph engine and explains how Kumo scales the feature and graph stores. 

Feature Store

The feature store functions as a horizontally scalable, persistent key-value store designed for optimal random read throughput. It operates as a service for feature retrieval via RPC. To enhance feature serving efficiency and the expediting of raw feature conversions in storage to tensors expected by the caller, Kumo includes the following three key optimizations.

The first optimization aims to reduce communication overhead. Kumo employs protobuf/gRPC for communication between the data server and the compute client. Individual node features are stored as protobufs. As evident in TensorFlow, these example features are typically defined as a list of individual feature messages containing the feature name and its associated value. However, this representation lacks storage efficiency due to name duplication and memory alignment issues. To address this shortcoming, Kumo uses a straightforward optimization process based on the following concept: a separate feature configuration is created to determine the order of columns in the protobuf containing the feature values. Additionally, columns in this feature configuration are grouped by data types, allowing features of the same type to be stored in a compacted array, further reducing the message size.

The second optimization streamlines the conversion of row-wise feature representation to a column-wise feature representations on the client side. Client side, the feature store receives features stored as protobuf in the row-wise feature representation, as extracted from the feature store. These features require conversion into a column-wise feature matrix to facilitate easy application of feature transformations to columns. 

Although the column-wise feature matrix is constructed within a client written in C++, it is later used in multiple Python code instances and processes. Arrow is used as Kumo’s column-wise data format to leverage its zero-copy design. Accomplishing this required addressing the most challenging aspect, namely handling NA values; to this end, Kumo uses custom lightweight mathematical techniques and careful design decisions to achieve the zero-copy design. 

To enhance feature access performance, Kumo improves data locality to maximize the number of features fetched within each field operation. This is accomplished through a simple yet highly effective strategy: reordering nodes in the feature store based on their neighbors. For instance, triangle nodes accessed by the same circle neighbors are arranged as closely as possible. Subsequently, square nodes accessed by the same triangle neighbors are positioned as close to each other as feasible possible. This optimization proves highly beneficial for numerous real-world applications, particularly in scenarios where graph traversal and feature retrieval for neighbors are required.

With these optimizations in place, Kumo is capable of a threefold speedup in end-to-end feature fetching, including the time required for feature-to-tensor conversion. Consequently, Kumo fully utilizes GPUs during model training and executes a significantly higher number of trainer instances in parallel.

Graph Store

The graph store functions as an in-memory store specifically designed for sampling, operating as a separate service to allow independent scaling from the feature store and the compute engine. Sampling in GNNs entails generating uniquely sampled subgraphs for each seed node. To ensure maximum flexibility during training, the graph store must facilitate rapid random access to outgoing neighbors given an input node. Moreover, to accommodate large graphs with tens of billions of edges, the graph store must minimize memory usage by employing compressed graph formats.

To achieve these objectives, the graph engine utilizes core PyG sampling algorithms optimized for heterogeneous graphs in CSR format. Further, when timestamps are available, the CSR edges are sorted by timestamp to enhance the temporal sampling process speed. Furthermore, given that enterprise graphs typically exhibit sparsity, the CSR representation itself can attain very high compression ratios.

Conclusion

GNNs represent a paradigm shift in generating predictions based on graph data prevalent in enterprise settings. By leveraging graph-based ML, ML practitioners and non-experts can easily generate predictive analytics using relational data, replacing ad hoc feature engineering with a principled approach that autonomously learns from the connections between entities in a graph. However, deploying GNNs poses significant challenges, particularly when operating at enterprise scale.

Kumo’s architecture is designed to scale GNNs for handling exceptionally large graphs. The platform excels in concurrently training numerous models and generating model outputs for numerous queries on the same graph simultaneously. While Kumo’s advanced capabilities would demand ML and data science expertise in both GNNs and high-performance distributed systems, these inherent complexities are abstracted away from users through a straightforward and user-friendly API.

To illustrate Kumo’s scalability in a specific customer deployment, the platform processed 45 trillion combinations of user-item pairs and generated 6.3 billion top-ranking link predictions from scratch within a couple of hours. By prioritizing flexibility in design, Kumo’s GNNs empower users to swiftly transition from business use cases to deployable GNNs at scale, rapidly accelerating the realization of business value.

Read more about how to deploy PyG and Kumo in production.

This blog is based on a presentation that was originally given by Subramanya Dulloor, a founding engineer at Kumo, at the QCon conference in March 2023.