Saving Millions with 5-Minute Streaming SQL Techniques & Future-Proofing an Optimized Composable Data Ecosystem

Zhenzhong Xu
Data Engineer Things
23 min readOct 8, 2023

--

The blog is an extended written version of my 2023 talk at the Current conference. Please find the link to the talk recording and the slide deck.

Abstract

Companies are looking for ways to reduce streaming infrastructure costs in the current macroeconomic environment. However, it is a difficult task for two reasons. First, cutting costs without sacrificing latency or correctness requires a deep knowledge of engine implementation details and a keen eye to identify opportunities. Second, optimization techniques are less accessible when working with high-level language abstraction, such as SQL. These techniques often involve engine query planning, requiring even deeper expertise. Many data engineers and data scientists need more depth in intermediate representations (IR) and optimization rules. They also may not care about the details of applying streaming watermarks to reduce the runtime complexity for Point-In-Time-Correct join queries.

In this post, I cover some simple optimization techniques you can apply with streaming FlinkSQL in just a few minutes that can cut costs by 10x or even 100x. Then, we’ll gradually dive deeper into some novel techniques that optimize across your distributed storage and compute stacks.

By the end of this blog post, I hope readers take away concrete optimization techniques they can apply immediately and, more importantly, understand the reasoning framework. With that foundation, I’ll lead us through some drivers and evidence on why the industry needs an intelligent optimization layer that connects the fabric of the composable data ecosystem.

Table of Content

Each section can stand alone but best read in sequence for connected logic.

  1. Why Real-Time Decisions are Driving Streaming Needs
  2. What Optimization is About
  3. Fraud Detection Feature Optimization with FlinkSQL
  4. Optimization Across a Composable Data Fabric

Why Real-Time Decisions are Driving Streaming Needs

Let’s picture ourselves in three scenarios.

  • Recommendation Based on Session Intent: While browsing an online store or Netflix, the website quickly determines your intent, offering precise recommendations based on your search queries.
  • Fraud Prevention: After a tiring workday, you receive a text about an unusual credit card transaction. Realizing you’ve lost your card, you decline the transaction via text, preventing future disputes.
  • Pharmaceutical Interaction Alert: On your yearly checkup day, after getting a prescription in the morning, a different doctor prescribes another medication in the afternoon. The pharmacist alerts you about a potential adverse interaction between the two drugs, guiding you based on timely and accurate information, ensuring your health isn’t compromised.

There is a pattern here. All three scenarios involve making in-the-moment decisions, regardless of whether driven by an ML model (e.g., generate recommendations or determine fraudulent transaction) or a human mental model (e.g., determine drug-drug interactions). Contrary to the popular thinking that real-time ML is only about having good models, having timely, relevant, and correct information to feed into models is equally critical to producing good outcomes. It’s not hard to imagine worse results without such information: recommendations that aren’t relevant because they’re based on search queries from a day ago, dealing with a cumbersome dispute process because your credit card company didn’t have relevant behavior information to flag the transaction as it happened, or getting diarrhea because your pharmacist wasn’t aware of an earlier interfering prescription.

Streaming pipelines are increasingly critical in these real-time decision use cases. The industry tends to agree. We have observed the patterns repeatedly in the past few years. For example, in 2022, LinkedIn moved some batch features/labels that were hours-level stale to near real-time. By doing that alone, it boosted the accuracy of a bad actor/bot detection model by upwards of 30%. Another example shows a 4.45% lift in personalization model performance when shifting from a 24-hour delay to 1 minute only. Many other verticals, such as logistics, healthcare, gaming, and energy, are among the next round of innovators to leverage the technology.

Streaming use cases are growing. There are three main reasons behind the trend:

  1. Streaming is the central nervous system that brings data from disparate sources together.
  2. Streaming allows pre-processing data and, combined with (incremental) materialized views, enables low-latency retrieval of fresh results. As a result, compute requirements are shifting from entirely online or offline processing to data-in-motion processing.
  3. Maturing streaming SQL and Python language abstractions broadens the user base to data engineers, data scientists, and analysts.

Given increasing awareness about streaming, it’s not uncommon for tech companies to invest millions or even tens of millions of dollars in such infrastructures. For the rest of the article, we’ll discuss how you can help your organization with optimizations. We’ll break it down into three sections:

  • What optimization is about
  • A FlinkSQL optimization technique that can save you 100x
  • A case for advanced optimization layer in a distributed, composable data fabric

What Optimization Is About

The above diagram is from Tyler’s 2019 talk. Even though the initial discussion was in the context of batch and streaming tradeoffs, it is still relevant today in the broader scope of data ecosystems. The idea is simple: within any individual processing engine, you must choose two out of three dimensions (latency/freshness, cost, and correctness) and be flexible on the third. The generalized concept also applies in the distributed system world, where no single distributed algorithm, data structure, or technology would give users everything at once. It’s all about making the appropriate tradeoff at the right moment. Given that the tradeoff gap will never go away completely, we’ll explore how to minimize it in this blog post. We’ll examine the perspectives of optimizing a single engine and optimizing across a composable data infrastructure.

Let’s start by looking at the big picture.

The diagram above shows a modern architecture that represents a feature engineering platform. As data flows from the edge and microservices to a streaming transport such as Kafka, we’ll need a data warehouse/lakehouse to back up the raw events for reprocessing (or “backfill”) at a later time. The processing layer will perform transformations/aggregations using a compute engine such as Flink or Spark. The results are emitted to an online store for online inference and an offline store for training dataset generation.

The above diagram contains three main categories of latency (sans network latency because it’s outside of the topic of optimization). The following sections will dive into the definitions of the sources of latency and how we can measure them respectively. Then, we’ll dig into the various factors contributing to cost before finally reviewing the correctness factors.

Computation Freshness

Computation freshness is the time lag between when the raw event is seen by the computation engine until the point the computed results that take account of this raw event become available for serving downstream. It’s an indication of how fresh the results are. Generally, streaming engines enable higher computation freshness than batch engines because computations are continuous. Another factor that impacts the freshness is the trigger interval (or, in layperson’s terms, emission cadence) defined by the windowing semantics in stateful processing. For example, if a feature computes from all events in a user session, you’d expect the results won’t emit until the session concludes.

You can measure the computation latency:

  • Use latency markers. Note that latency markers are an approximation, as they do not count time spent on user-specified transformation logic; for more details, read the original documentation.
  • Measure event time lag:

Backfill Latency

Backfill latency, also known as backfill catch-up time or streaming job bootstrapping time, refers to the duration it takes for the system to start processing historical raw data until the watermark reaches close to the current wall clock time and stabilizes, ensuring that the event time lag remains within an acceptable range.

You can measure the catch-up velocity of the backfill job so we can estimate the remaining time:

You need a catch-up velocity greater than 1 for your job to catch up eventually. The bigger the number, the faster the job will catch up. The modern data processing engines are being unified at various abstraction levels (e.g., LinkedIn’s approach using Apache Beam, Flink backend support on Ibis, separation of processing and querying capabilities in streaming databases such as RisingWave and Materialize, etc.). The performance gap between the traditional sense of batch vs. streaming is closing quickly. At the time of writing, Confluent also announced a 2024 plan to work on Flink mixed execution mode.

Serving Latency

In this architecture, the results are served via a request-response-based paradigm. We define serving latency as the time difference between initiating the read request for online or offline features until the caller fully receives the results. Similar ideas work in an event-driven architecture, but we’ll leave it here since serving latency is not the main focus today.

For cost, it’s divided into three areas discussed in detail below. Another noteworthy cost source is operation, but we’ll leave that outside of the scope of this article.

Computation Cost

Low latency feature computation is about performing transformations and aggregations as raw data arrives. The computation cost is related to the processing load and state management load. Below are the impacting factors:

Processing load:

  • Events ingestion speed and variance/spike pattern
  • Complexity of transformation/aggregation
  • Results emission frequency and percentage of data recomputation
  • UDF may require cross-runtime RPC & serialization overhead
  • Data serialization/deserialization
  • Low-level computation overhead, such as disk <> memory <> CPU register <> instruction set bus utilization

State management load:

  • Per-event size
  • Window length
  • Total keyspace and in-state keyspace
  • State lookup/IO efficiency
  • Memory pressure and IO latency in the state store

Storage Cost

The discussed architecture utilizes many storages, ranging from streaming transport, and data warehouse/lakehouse to online KV stores, offline columnar stores, or even in-memory computation engines for local experimentation. The biggest cost saver is determining the right storage engine for the right job. Each engine makes different tradeoffs, and the impacting factors are the following:

  • Underlying data structure and optimized access patterns (r/w)
  • ACID requirements
  • Volume/scale
  • Keyspace
  • Cold vs. hot storage
  • Row vs. column orientation
  • Resilience/redundancy overhead

Data Movement Cost

Data constantly moves across distributed systems boundaries from point A to point B. Streaming technology is critical to keep the movement “continuous.” Some factors to consider:

  • Data close to compute vs. data close to storage or both
  • Data consistency, delivery semantics, and requirements on row-level updates or retractions
  • What simple transformations are to be applied while data is in motion
  • Data transmitted within the same VPC will be cheaper. Data transmitted over the internet backbone will be more expensive (cross-cloud provider, cross-region)
  • TLS encryption requirements
  • Privacy governance requirements

Objective & Subjective Correctness

Correctness is objectively necessary for many modern data computation needs. There have been many developments in recent years to improve engine resilience, fault tolerance, exactly-once processing, and exactly-once delivery semantics.

Correctness can also be use-case specific and requires subjective judgment for tradeoff decisions. For example, in a fraud prevention scenario, an ML model needs to decide whether a transaction is fraudulent, typically within 100 ms of when it occurs. As a result, it’s much more critical to have accurate results at the transaction time instead of a few minutes later. (We’ll see a concrete example shortly in the second section demonstrating how to make such tradeoffs). Please note, in such an eventual consistent distributed system, despite things being “eventually” consistent, it’s important to define the tolerance boundary so that we can measure the “subjective correctness” objectively.

Optimization Framework

When dealing with optimization, we should evaluate options and tradeoffs systematically. Below is a simple framework to assist the thinking process. When dealing with optimization, in most cases, people care about how to speed things up and save costs.

To speed things up:

  • Use more power. So you get to do more in less time.
  • Process less. So you get to finish on time.
  • Spend less time waiting. So you get to finish earlier.
  • Use a better execution plan. So you get work done faster and smarter.

To lower costs:

  • Less redundant/unnecessary things. So you can spend less on compute and storage.
  • Use the more cost-effective hardware/technology. So you can push expensive costs to cheap ones.
  • Approximate for the right use cases. So you get to do less work.

Fraud Detection Feature Optimization with FlinkSQL

The Scenario & Problem Statement

Let’s work on a hypothetical scenario. As a data scientist, you have a task to implement a streaming feature for a fraud detection model. We’ll constrain our toolbox to a data-scientist-friendly interface like Flink SQL. So, lower-level Table APIs or DataStream APIs are not allowed.

We model the scenario after a mid-sized fintech company, so about 10MM active credit cards, 30% daily active cards, 1–2% hourly active cards during peak, and on average, each credit card incurs ~10 transactions or less per day.

If we have our math correct, we’ll see about 350 transactions per second, but for the sake of the demonstration, we’ll simulate 1000 transactions per second. We’ll also see roughly 200k new unique cards every hour window. The above table shows the transaction schema; every event represents a transaction that will flow into a Kafka topic.

The ask:

Compute every credit card’s average transaction amount from the last two-hour window. We want the computation freshness to be around one second or less.

Before checking out the solution, let’s take a second to think about what FlinkSQL you would write.

First Implementation

Above is a simple sliding window implementation to satisfy the requirements. Let’s dive deeper into two aspects.

Table-Valued Function

Table-valued function (TVF) is at the center of dividing elements/events into windows. It’s at the heart of processing infinite streams using the SQL interface. Windows split the stream into “buckets” of finite size, over which we can perform computations. Apache Flink provides a few TVFs out of the box, including Tumble Windows, Hop Windows, and Cumulate Windows. In our case, a sliding/hopping window is the best option to solve our problem.

Sliding/Hop Window

The window size is 2 hours to capture all transaction events in the past 2 hours.

The window slide is 1 sec to satisfy the 1-sec computation freshness requirement.

The keyspace after the job stables after 2 hours will be around 200k (credit cards) managed in the state on average. (don’t worry — we’ll dive into why in a second).

Each window, on average, will contain less than ten elements to compute for the AVG() aggregation. (Because each credit card, on average, incurs less than ten transactions per day.)

Please try to remember these parameters because they will be important when making optimization decisions.

Take a guess: how much will this feature cost you?

$8,400 is the amortized monthly cost projection based on our benchmark experiment. We benchmarked on the M6i instance type managed by Flink Kubernetes Operator in an AWS EKS cluster. The streaming job uses 224 cores and runs hot at 90.70% CPU utilization for this experiment.

As a data scientist, you might be surprised, given the job is relatively simple and the scale is nothing close to crazy. You know you will need not just one but hundreds, if not thousands, of features to power an ML model. The staggering cost could easily reach millions of dollars. It’s undoubtedly infeasible.

You may also have an intuition here that the main culprit is related to the 1-second window slide. You are right about it, but let’s first understand why it’s needed before we explore optimization options.

In the diagram below, let’s first look at how a sliding window with a longer slide would behave. We have four consecutive windows with a window size of 15 units and a slide of 5 units in length.

Interestingly, even though the five events (e1, … e5) fall within the window size, none of the window firings would capture all five events. (i.e., w1 captures e1-e4, and w2 captures only e3-e5). As a result, between the moment shortly after e5 enters the processing until w2 fires, the computation result is deemed inaccurate. Hence, the longer the window slides, the more probable results can be inaccurate at any given time.

We can compensate by shortening the window slide to 1 unit to bound the inaccuracy, as shown below.

Nice, one stone two birds. We solved both the computation latency and accuracy problem using a shorter window slide. But this comes at a significant cost. Let’s drill into why.

In most streaming engines, for a sliding window, within each group key (credit card), a window is created as the first element enters the computation DAG. After that, a new window is created at the sliding cadence (i.e., every second). Each window is closed (or fired) when the watermark reaches window size (i.e., 2 hours) from the initial creation of the respective window. The aggregation computation is triggered at window firing time.

After the job stabilizes, within 1 hour, we are talking about 3600 window materializations for every single credit card. If we look at a slice in time, we will expect about 200,000 window firings within any second, once per key. That’s a lot of computations, and the majority are redundant! Assuming each aggregation takes one millisecond and with perfect parallelization, that still requires about 200 cores or threads. It’s worth noting in this scenario that even though we manage many windows in the states, the number of elements is small, so memory is not yet a bottleneck. The main reason behind the high cost is the frequent window firings and redundant aggregations.

So far, besides the cost concerns, we are good with the solution. But do we? What if we encounter edge conditions such as skew?

The scenario is not uncommon. Imagine a fraudster trying to run 10,000 transactions on a stolen credit card over one minute. The long tail skew will introduce a few additional complexities.

Inaccuracy

In this scenario, due to the high frequency of events ingestion, each window will likely include changes from more than one transaction (more likely hundreds within one second). In this scenario, getting exact results on a per-transaction level is challenging due to the bundling of events in window firings.

State management and redundant computations

We store relevant fields of the raw individual events (optimized by event pruning) in the state backend for each group key. If the skew keys contain a large number of transaction events, states will spill from memory into disk and result in every single computation needing additional IO time. On top of that, every window firing for the same key would perform mostly redundant aggregations, given that new events entering the window comprise only a small percentage of the total population within a long window. Combining both reasons may put the job into an operationally down-spiraling situation. There are a few previously proposed solutions (cutty, scotty) to enable aggregate sharing via window slicing. We won’t drill into details because these techniques are unfortunately not available on Flink SQL abstraction. Cascading Window Aggregation, however, is available on Flink SQL.

Algorithmic Complexity

It’s not uncommon for fraud detection features to require longer windows, say six months or a year. So, if we increase the window size, it will impact two parameters:

  1. The keyspace will increase linearly. As we mentioned earlier, the total keyspace is 10MM, and over a much longer window, we’ll likely see most of the keys managed in the states.
  2. The number of transactions within each window will also increase linearly. We’ll likely see hundreds, if not thousands, of transactions over a year for each card.

So, combining both linear changes, we’ll see a superlinear increase in required computation power concerning window length. It’s obviously not scalable or cost-feasible. The benchmark chart below demonstrates this conclusion (blue line). The chart also hinted that a linear solution exists (red line). So, let’s look into a second implementation.

Second Implementation

In this second implementation, we use an OVER Aggregation window. It’s different from TVF. OVER aggregates compute an aggregated value for every input element over a range of ordered elements. In this case, upon every event (or element) entering the computation DAG, it looks up an event-time-ordered list of elements from the last two hours for the current partition key and then fires the aggregation.

As we mentioned earlier, the simulation ingests about 1,000 transactions per second. Immediately, this implementation reduces from 200,000 window firings to about 1,000 firings per second. We should expect a massive cost reduction.

So yes. That’s precisely what we see: the second implementation achieved more than 100x cost reduction. The benchmark ran on the same setup but only used 2 CPU cores, running at 0.49% CPU utilization with moderate memory pressure.

But what’s the catch? There is always a catch.

The problem is we introduced new inaccuracies in the system. Let’s take a look at the above diagram. As discussed, each OVER window fires when the event enters the pipeline, so the result is always as fresh as possible immediately after the firing. However, we don’t have a mechanism to trigger firing when events leave the window.

Let’s look at the highlighted arrow. At this point, e1 and e2 have already left the window, and ideally, the correct computation result should capture e3, e4, and e5. However, OVER aggregate only fires when an event enters the pipeline, so the computation result is potentially incorrect when queried with a delay.

I do want to point out this is an easy problem to solve if we can leverage low-level Flink API capabilities such as custom windows, triggers, event time timers, etc. The ideal solution can be a custom window that fires when an event both enters and exits the window. In that case, we’ll look at about 2,000 computations per second on average, which is still significantly cheaper. However, we are working on this task based on the assumption of a “data-scientist-friendly” approach, so our hands are bound to Flink SQL. Many of these “advanced” capabilities are unavailable on FlinkSQL, so let’s figure out a different alternative.

Before we get into the details, let’s look at the benchmark numbers. The main takeaway is that sliding windows with a one-sec slide trade cost for accuracy, while OVER aggregates trade accuracy & freshness for cost.

Here is another attempt at the solution:

Let’s visualize the solution in the diagram below.

The blue windows are OVER windows, and the yellow windows are Sliding windows. We combine them to get the benefits of both worlds while leaving a knob for users to specify inaccuracy tolerance. This demonstration shows that results are always fresh and correct when a transaction immediately triggers a firing. Inaccuracy can exist between when an older event exits the window and when any subsequent window fires, but in this case, it’s guaranteed that results won’t stale for more than 60 seconds.

Based on some napkin math, the job with an inaccuracy tolerance of 60 seconds will generate about 4,000 computations per second. The cost should be no more than $80–100/month. Users can make appropriate tradeoffs between cost and correctness using an inaccuracy tolerance knob.

What Do We Learn From This Example?

In this example, we’ve explored tradeoffs between computation latency, computation cost, and subjective correctness. An abstraction such as FlinkSQL provides some tradeoff flexibility. However, we can still capture optimization intent using declarative knobs (e.g., the inaccuracy tolerance knob) to guide users through the optimization or abstract the details in a managed optimization layer.

Streaming engines are mature, and as streaming SQL gets traction in the industry, there is still a significant gap between what the engine is capable of and what streaming SQL abstraction can do. The gap is gradually closing, but standardization will take time. However, industry use cases won’t wait, so we have much work to provide close-gap solutions.

That said, we need an open distributed optimization layer that can work seamlessly across a modern composable data ecosystem. There are many benefits beyond the demonstrated example. Let’s discuss more about it in the final section.

Optimizations Across a Composable Data Fabric

The above diagram illustrates a modern application architecture. The upper portion showcases the transactional aspect of the data landscape, where data originates from devices and microservices funnels into transactional data systems (e.g., Postgres, CockroachDB, Cassandra, etc.). On the bottom half, we have the analytical part of the ecosystem (e.g., Iceberg, Delta, Presto, Pinot, Tableau, etc.) that powers BI, analytics, and more recently, AI/ML use cases. In the middle, the streaming transport and processing systems (e.g., Kafka, Flink, etc.) are the central nervous system that connects both parts.

Many different technologies compose this complex data architecture; data is duplicated in many storages, and cross-accessing, querying, and processing remain difficult. I want to raise two questions:

  1. What does it take to have a unified, easier way to access all these data more cohesively?
  2. What are the opportunities to perform optimization across layers of distributed systems like this?

To answer these questions, let’s first explore two recent foundational developments that can make a solution possible.

Stream-Table Duality

The concept of stream-table duality allows us to use relational algebra to capture relational expressions as IRs across all processing paradigms: batch, streaming, transactional, and partially analytical. The relational expression represented as an IR is not a new technology. It’s the foundation in many databases to capture logic declaration, build query plans, and perform optimizations based on table statistics. The development of streaming technology as a central nervous system and additions of streaming semantics (e.g., watermark, windows, etc.) into the IR open the door for cross-system optimizations.

On top of that, relational expressions/IRs allow us to translate from/to many different SQL dialects easily. In some cases, it’s possible for Python libraries, such as Pandas implementation, to be interchangeablely converted between SQL declarations. It opens the door for easier access without being forced into any specific language or dialect.

Data Contract and Governance

Recent advancements in Data Contracts and Governance facilitate schema management across systems. Tools like Kafka’s schema registry and Apache Iceberg’s schema evolution implementations exemplify this. Using those APIs can enable cross-system schema forward or backward compatibility enforcement. So when upstream schema changes, we have a way to propagate, block the change, or have mechanisms to version the data. It is a critical piece to help a data fabric layer work across engines.

Let’s attempt to answer the first question after establishing a common ground on the two foundations. What does it take to have a unified, easier way to access all these data more cohesively?

An IR capable of capturing major processing and querying semantics enables a transpilable SQL interface that works across many storage and processing engines. If you haven’t yet, check out Calcite and ibis. Additionally, with data contracts, data and processing (within streams or tables) can be semantically connected in a lineage, enabling a unified interface for data discovery, evolution, and governance coordination.

Optimizations across a Composable Data Systems

For the second question, we’ll touch upon a few overarching examples to clarify our points. However, we’ll only delve deep enough, as that would necessitate its own dedicated post.

The concept of optimizing across a modular data fabric appears as described above. After establishing the business logic workloads or query declarations, we can translate them into IRs and process them via a unified processing layer. Depending on the optimization context, various storage and computing engines can execute all or a portion of the workload. Underneath the hood, it will figure out how to break down the IR and optimize the runtime executions across systems.

To make the idea more concrete, I will use three examples.

Distributed Predicate Push Down/Up

We’ll use Apache Iceberg as a first example. Iceberg uses two levels of metadata (manifest files and manifest list) to track the files in a snapshot. Iceberg first filters manifests for fast scan planning using the partition value ranges in the manifest list. Then, it reads each manifest to get data files. With this scheme, the manifest list acts as an index over the manifest files, making planning possible without reading all manifests. This structuring of data allows us to do efficient scan planning (scan planning is the process of finding the files in a table that are needed for a query) via metadata filtering and data filtering.

Consider a data scientist aiming to test a new feature using a subset of warehouse data. It’s much more efficient to push the predicate down to Iceberg and only retrieve relevant data files. The smaller data files may fit into memory-backed local pandas dataframe, making the experimentation more productive. It enables the data scientists to experiment cheaply and quickly before committing the full workload to production. When the experimentation result is satisfactory, the data scientist can directly push the same declaration into production for processing at scale without having to rewrite the logic on a different engine.

Consider expanding on the idea with another example. Given that we can push predicates down to Iceberg, could we apply similar principles in microservices ecosystems? In a large deployment, it’s common for microservices to generate logs amounting to petabytes. While most logs are not helpful for daily operations, some become critical when performing live ad-hoc analyses, e.g., retrieving all user session logs during customer support.

Storing all these logs just for occasional use cases might be inefficient. Now, envision employing a similar strategy: by integrating a predicate push protocol within each microservice, we could selectively subscribe to specific events and process them in real-time. This approach could lead to significant cost savings in the millions range.

Unified Streaming Storage

Apache Kafka is becoming more popular as a streaming transport. Apache Iceberg is becoming more popular as a streaming data warehouse. There is an opportunity to get both engines to work together.

One practical application is in supporting backfill. Running Kafka at scale is significantly more expensive than storing Parquet files on S3. Hence, having the ability to continuously stream raw events from Kafka into a data warehouse table can aid in reducing Kafka retention costs while still offering flexibility for backfilling.

During a backfill, it’s possible to perform stream or batch processing on Iceberg. If your use case has a strict freshness requirement (< mins), you can optionally switch to the streaming source after the backfill catches up.

The above diagram shows the data flow between Kafka and Iceberg and unified reading from a source abstraction. The uppercase letters in the diagram denote the reader/writer schema compatibility flow.

Composable & Unified Processing

A unified and composable processing world is closer than you think. We’ll start with an ML example.

For traditional ML training dataset generation, maintaining train-predict consistency is critical. As a result, point-in-time correct join is a must for correctness.

Traditionally, training datasets are generated by batch processing on a cadence (typically daily or weekly). It’s a common practice to use bucket joins on the partition level to enable efficient range join. (the challenge with point-in-time-join is that join is performed between rows with the closest time stamps, not with an exact match. It incurs many inefficient table scans if not optimized. If you want to learn more, here is a good read.)

Let’s imagine a world where models constantly degrade, features drift over time, and we are moving into the future state of “continual training” to keep your model fresh. Instead of performing table-to-table joins, we must now move into performing stream-to-table joins. A more efficient join algorithm would leverage a watermark that forms a more granular bucket for joining between the stream and table. In this case, the batch tables need to support a watermark implementation.

The difference between efficiently running point-in-time join on a daily cadence or continuously is all about picking an appropriate algorithm. We’ll inevitably move away from the categorical divide between batch and streaming.

It doesn’t stop here. There is a plethora of optimization opportunities across the boundaries of hardware and software architecture paradigms. A unified and composable processing world can be very exciting!

I’ll conclude the blog post with a final point.

I would love to hear feedback and new ideas from you!

Acknowledgments

Many team members from Claypot contributed to the ideas, implementations, and benchmark results mentioned in this post. This blog post is dedicated to all your great work. Thank you: Chloe, Tony, Jiupeng, Deepyaman, Mehmet, Jiting, and Chip. I also want to thank Bingfeng Xia, Sharon Xie, Sherin Thomas, and Yingjun Wu for reviewing this post.

--

--

Co-founder at Claypot AI. Building real-time ML platform for every data driven organization.