Build Your Own Feature Store with Streaming Databases

Yingjun Wu
Data Engineer Things
10 min readSep 8, 2023

--

Landscape of the modern feature stores.

With the increasing popularity of machine learning, a growing number of businesses are implementing it to address their operational challenges. As the volume of data continues to expand, organizations are encountering more issues related to data management than those associated with machine learning algorithms. The feature store is a crucial tool that companies leverage to tackle data-related difficulties.

To incorporate a feature store, one can opt for a cloud-based solution. For instance, Amazon’s Sagemaker offers a feature store. Tecton is another renowned feature engineering platform. Additionally, there are several other alternatives, such as Claypot, Fennel, and Chalk, to name a few.

Nonetheless, what if someone wants to construct a feature store from the ground up utilizing open-source technologies? In this article, we discuss how to build a minimal feature store using a streaming database.

What is a feature store?

Feature stores serve as a crucial component of a data platform, designed to address several pivotal challenges, including but not limited to:

  1. Data Consistency: In a large organization, multiple teams may work on similar tasks but utilize different tools, data, and features. This approach can cause inconsistencies in features, complicating comparisons of results or the scaling of solutions. A feature store works as a centralized repository of features, thereby ensuring uniformity across the entire organization.
  2. Feature Reusability: Data scientists typically dedicate a substantial amount of time to engineering features for their machine learning models. Unfortunately, this effort is often replicated across various teams and projects. A feature store facilitates the reuse of features crafted by others, thereby minimizing redundant efforts and accelerating the model development process.
  3. Data Transformation: Raw data often necessitates transformation into a format that is conducive for machine learning models. Such transformations can be computationally demanding and time-intensive. A feature store permits the pre-computation of features and their storage in an optimized format, thereby diminishing the time and computational resources required during model training.
  4. Feature Versioning: Over time, the methodologies for computing features may evolve. It is imperative to document these modifications to ensure reproducibility and to comprehend the implications of changes on model performance. A feature store encompasses versioning capabilities to administrate alterations to features over time.

Feature Store in Action

Before diving deep into building a feature store with streaming databases, let’s first take a look at how feature store works.

Essentially, a feature store is a repository designed to store various features and fetch specific ones as needed, thereby facilitating data services. It is required to offer two distinct categories of data services to accommodate different users: offline features and online features. These categories have unique attributes, as detailed below.

  1. Offline Features: These are tailored for the machine learning training phase, which typically demands a substantial quantity of data. However, only a select few specific features are necessary for this process. Since the training is carried out offline, there is no pressing need for fast query execution.
  2. Online Features: These are designed for the machine learning prediction phase and usually involve returning either all or a subset of the features of a small number of records. While the data required for online features is considerably less compared to offline features, there is a higher emphasis on fast query responses and real-time data availability.
A feature store maintains both offline features and online features for different purposes.

Now let’s consider building an application predicting taxi fare based on locations. We use a public online dataset: https://learn.microsoft.com/en-us/azure/open-datasets/dataset-taxi-green?tabs=azureml-opendatasets.

We need to first preprocess the data. In this dataset, there are many columns, and they can be broadly divided into three categories:

  1. User input and output. In this example, the inputs are the taxi’s starting point (pu_location_id) and destination (do_location_id). We will use location_id to represent both in the following paragraphs. The output, or result, is the total fare (fare_amount). We need to keep these data intact for use during training;
  2. Data that can be used for feature extractions. The so-called features refer to the attributes that are relevant to the prediction. For instance, if we want to predict the total fare, the average tip for starting from a particular location would be an essential feature. We need to extract features from these data and save them;
  3. Some data irrelevant to prediction. We need to filter out this data to prevent feature contamination.

After processing the data according to the above steps, we can use the features and the corresponding total fares to train machine learning. We can also import features into a well-trained model for prediction.

Why Streaming Database?

A streaming database is a type of database designed to handle real-time data by processing streams of data in real-time or near-real-time. Traditional databases are designed to handle data at rest, which means the data is stored and then queried or analyzed. In contrast, a streaming database processes data on the fly as it arrives, without needing to store it first.

A streaming database consumes streaming data, performs incremental computations when new data comes in, and updates results dynamically.

In the context of a streaming database, a materialized view is a view of the data that is pre-computed and stored, rather than computed on demand. Materialized views in streaming databases are incrementally updated as new data arrives, which allows them to reflect the current state of the data without the need to recompute the entire view.

Streaming databases are a perfect match for feature stores for several reasons:

  1. Real-time feature computation: Machine learning models often require real-time features to make predictions. For example, a fraud detection model may need the most recent transactions of a user to detect any fraudulent activities. A streaming database can compute and update features in real time as data is ingested into the system, which ensures that the machine learning models always have access to the most up-to-date features.
  2. Consistency: Maintaining consistency between the features used for training and serving the machine learning models is crucial for model performance. A streaming database can ensure that the same set of features and transformations are used both for training and serving the models.
  3. Event time processing: Machine learning models often require features to be computed based on event time (the time when an event actually occurred) rather than processing time (the time when the event is processed by the system). Streaming databases often provide support for event time processing, which ensures that the features are computed accurately even if there are delays in data ingestion or processing.
  4. Data freshness: For many applications, the value of data decreases rapidly with time. For example, the most recent transactions of a user are often more relevant for fraud detection than transactions that occurred several days ago. A streaming database can ensure that the features in the feature store are always fresh and up-to-date.

By leveraging a streaming database, organizations can build a robust and scalable infrastructure for managing features and serving machine learning models in real time. This can lead to more accurate and timely predictions, which is crucial for many applications.

Build a feature store with a streaming database

We now use a streaming database to build a feature store, e.g. RisingWave, a popular open-source streaming database. Users can create materialized views (MV) in RisingWave using SQL statements to build streaming pipelines for feature transformation and computation.

In the taxi fare example, we ingest data into RisingWave through Apache Kafka. RisingWave will extract features in real time and perform transformations according to the predefined logic expressed in materialized views. The results are then saved in RisingWave’s storage. To perform ML serving, users can directly query materialized views to fetch features in real time.

Use RisingWave, an open-source streaming database as a feature store.

When it comes to offline features, we have multiple solutions to consider:

  1. RisingWave offers native batch processing capability. This allows users to use SQL queries in RisingWave for batch data processing.
  2. If we need enhanced batch processing capabilities, RisingWave can connect with data warehouses (like Redshift and Snowflake) or other OLAP systems (like ClickHouse) tailored for batch processing through a sink connection.

Now, let’s delve deeper into the feature engineering process within RisingWave. Let’s reconsider the taxi fare example. As data pours into RisingWave, it first undergoes a filtering process. This removes any irrelevant data and channels the refined data into a designated Filter MV (Materialized View). Following this, we establish two types of MVs:

  1. User MV: This captures both the user’s input and desired outputs, like the starting and ending points of a taxi ride (represented by location_id) and the associated fare (fare_amount).
  2. Feature MV: This is designed to save features derived from the validated data. For simplicity during both training and prediction, the taxi’s origin and destination serve as the primary keys.

Both MVs are constantly updated as new data comes in, and users can always see consistent and fresh online features.

For the training phase, we input the desired feature names. We then pull the required offline features from the Feature MV and couple it with the corresponding fare amount from the User MV to train our model. When predicting, by just inputting the location_id, we can fetch the related online features from the Feature MV and employ our trained model for accurate predictions.

Use RisingWave to build a feature store.

Demo: How to use RisingWave to build a feature store

We now get our hands dirty and demonstrate how to use RisingWave to build a feature store.

Step 1: Fetch source code

git clone <https://github.com/risingwavelabs/risingwave.git>
cd integration_tests/nyc-taxi-feature-store-demo

Step 2: Deploy using Docker

docker compose up --build

Here, we launch RisingWave, Kafka, and Feature Store within the Docker cluster. RisingWave primarily consists of front-end nodes, computation nodes, metadata nodes, and MinIO. For this demo cluster, the data from the materialized views will reside in the MinIO instance, with relevant SQL statements outlined below. The core components of the Feature Store are the Server and Simulator. Once a Kafka node is established, topics are generated automatically.

Here’s what the Feature Store system will sequentially undertake:

  1. It will channel simulated offline data to Kafka, followed by RisingWave, and from there, derive both behavior and feature tables;
  2. By joining these tables, it retrieves the appropriate offline training data to initiate model training;
  3. It introduces the simulated online feature data to Kafka and subsequently to RisingWave;
  4. Leveraging the do_location_id (end location) and pu_location_id (start location), the system fetches the freshest online features from RisingWave. Using these features, predictions are made with the trained model.
CREATE source
IF NOT EXISTS taxiallfeature (
vendor_id INT,
lpep_pickup_datetime timestamp,
lpep_dropoff_datetime timestamp,
store_and_fwd_flag boolean,
ratecode_id FLOAT,
pu_location_id INT,
do_location_id INT,
passenger_count FLOAT,
trip_distance FLOAT,
fare_amount FLOAT,
extra FLOAT,
mta_tax FLOAT,
tip_amount FLOAT,
tolls_amount FLOAT,
ehail_fee FLOAT,
improvement_surcharge FLOAT,
total_amount FLOAT,
payment_type FLOAT,
trip_type FLOAT,
congestion_surcharge FLOAT,
) WITH (
connector = 'kafka',
topic = 'taxi',
properties.bootstrap.server = 'kafka:9092',
)
format plain encode json;
CREATE materialized view useful_filter
AS
SELECT window_start,
lpep_pickup_datetime ,
lpep_dropoff_datetime ,
do_location_id,
pu_location_id,
passenger_count ,
trip_distance ,
fare_amount,
extra ,
mta_tax,
tip_amount,
tolls_amount ,
improvement_surcharge,
total_amount,
congestion_surcharge
FROM (
SELECT *
FROM tumble(taxiallfeature,lpep_pickup_datetime,INTERVAL '5' hour) )
WHERE payment_type IN (1,2,4);
CREATE materialized VIEW location_id_store
AS SELECT do_location_id,
pu_location_id,
window_start,
fare_amount
FROM useful_filter;
CREATE materialized VIEW converted_features_with_do_location
AS SELECT do_location_id,
window_start,
Avg(Extract(epoch FROM lpep_dropoff_datetime - lpep_pickup_datetime)
::
INT) /
10 AS latency,
Avg(passenger_count) AS passenger_count,
Avg(trip_distance) AS trip_distance,
Avg(extra) AS extra,
Avg(mta_tax) AS mta_tax,
Avg(tip_amount) AS tip_amount,
Avg(tolls_amount) AS tolls_amount,
Avg(improvement_surcharge) AS improvement_surcharge,
Avg(total_amount) AS total_amount,
Avg(congestion_surcharge) AS congestion_surcharge,
--determine if the distance is longer than 30--
Avg(trip_distance) > 30 AS long_distance
FROM useful_filter
GROUP BY do_location_id,
window_start;
CREATE materialized VIEW converted_features_with_pu_location
AS SELECT pu_location_id,
window_start,
Avg(Extract(epoch FROM lpep_dropoff_datetime - lpep_pickup_datetime)
::
INT) /
10 AS latency,
Avg(passenger_count) AS passenger_count,
Avg(trip_distance) AS trip_distance,
Avg(extra) AS extra,
Avg(mta_tax) AS mta_tax,
Avg(tip_amount) AS tip_amount,
Avg(tolls_amount) AS tolls_amount,
Avg(improvement_surcharge) AS improvement_surcharge,
Avg(total_amount) AS total_amount,
Avg(congestion_surcharge) AS congestion_surcharge,
Avg(trip_distance) > 30 AS long_distance
FROM useful_filter
GROUP BY pu_location_id,
window_start;

Step 3: Retrieve system logs

cat .log/simulator_log

The sample logs are as follows. Since this article is solely to showcase the Feature Store built on RisingWave, the model parameters used in the demonstration have not been optimized, and the data volume is relatively small. Therefore, the prediction accuracy might not meet production requirements. It can be expanded as needed.

This is the recwave actor!
Connected to server
Write training data len is 9000
Start training
Offline feature has been written to written in kafka
write online feature, DOLocationID is 138
write online feature, DOLocationID is 243
write online feature, DOLocationID is 63
write online feature, DOLocationID is 37
write online feature, DOLocationID is 160
DOLocationID is 138 fare amount: predicted results 24.845151901245117 , real results 25.0
DOLocationID is 243 fare amount: predicted results 7.779667854309082 , real results 8.0
DOLocationID is 63 fare amount: predicted results 57.45588684082031 , real results 57.5
DOLocationID is 37 fare amount: predicted results 18.309465408325195 , real results 12.0
DOLocationID is 160 fare amount: predicted results 10.065141677856445 , real results 10.0

At this moment, RisingWave stores the following online features. It can be observed that the online data has been extracted based on the relevant window for corresponding features and saved in RisingWave.

do_location_id | avg_amount |    window_start     | latency | passenger_count |   trip_distance    | extra | mta_tax | tip_amount | tolls_amount | improvement_surcharge | total_amount | congestion_surcharge | long_distance
----------------+------------+---------------------+---------+-----------------+--------------------+-------+---------+------------+--------------+-----------------------+--------------+----------------------+---------------
243 | 8 | 2022-04-01 03:00:00 | 50.40 | 1 | 1.7399999999999998 | 0.5 | 0.5 | 0 | 0 | 0.3 | 9.3 | 0 | f
37 | 18.25 | 2022-03-31 22:00:00 | 221 | 1 | 3.825 | 0.25 | 0.25 | 4.455 | 0 | 0.29999999999999993 | 23.505 | 0 | f
160 | 10 | 2022-04-01 03:00:00 | 75.20 | 1 | 2.14 | 0.5 | 0.5 | 0 | 0 | 0.3 | 11.3 | 0 | f
138 | 25 | 2022-04-01 03:00:00 | 82 | 1 | 5.03 | 0 | 0 | 0 | 0 | 0.3 | 25.3 | 0 | f
63 | 57.5 | 2022-04-01 03:00:00 | 523.70 | 2 | 12.65 | 0.5 | 0.5 | 0 | 0 | 0.3 | 58.8 | 0 |

Conclusion

In the evolving landscape of machine learning, optimal data management is indispensable, spotlighting the significance of feature stores. While solutions such as Amazon’s Sagemaker and Tecton are readily available, utilizing open-source streaming databases to engineer a custom feature store offers granular control and adaptability. For teams aiming for deep architectural control and system integration, architecting their own feature store presents a technically robust approach.

--

--

Founder of RisingWave (risingwave.com), a distributed SQL streaming database. Previously AWS Redshift, IBM Research Almaden. NUS PhD, CMU-DB alumnus.