Last week, I wrote about the use of streaming ETL in a real-life use case. Being a big proponent of semantic layers, I thought it would be good to consider how to use a semantic layer with streaming data.
I don’t know how the internals of a streaming data warehouse like Materialize or ksqlDB work, exactly. What I explore below is a simplistic model of how systems like this could work. There is no magic in computer science - the work needs to happen. The only real question is efficiency.
Streaming data warehouses have storage, similar to batch data warehouses. Batch data warehouses can have streaming inputs, too. Last week, I talked about Snowflake and Snowpipe - the streaming way to send data to Snowflake.
A streaming data warehouse will also keep fresh data in-memory for fast access and write older data to disk/cloud storage after this memory is full. A batch data warehouse would write all data to disk or cloud storage immediately. They reduce compute consumption by design - keeping things in memory incurs continual compute cost.
For example, Snowpipe will write to Snowflake micro-partitions as soon as reasonable to do so. Snowpipe does use a small amount of compute, which customers are billed for in a similar way to the use of serverless APIs, like AWS Lambda. When I was at Lyst, we found that web event data could make it into Snowflake micro-partitions within two minutes of generation. Most of the lag is due to micro-batching of event data into chunks large enough to write a micro-partition.
It’s also possible to use DuckDB on streaming data in a similar way. Where streaming data is stored in a data lake, DuckDB queries run against this external data will always be running on very fresh data. This would likely have a similar lag to the Snowflake/Snowpipe config outlined above. Both of these are made possible by separating storage from compute.
Where compute and storage is coupled, it is also theoretically possible to do this. It involves having near-constant write transactions to the database to handle the input for the streaming data. From what I’ve seen first-hand, this does not work well at scale.
So what’s the difference between batch and streaming queries? Streaming data warehouses have native queries on a stream, where the whole data relevant for a query is not scanned each time the query is run. In fact, the query is not really “run” as such, it’s more of a process that persists and reduces data as it arrives in the stream.
Let’s imagine that the data is very simple, with three fields:
user_id as string
transaction_id as string
transaction_amount as number
I’m not going to be specific about the exact types.
You could have a query which calculated the mean transaction amount and the total transaction amount. The last result is always available - let’s say it is: mean_tx_amt = 25, total_tx_amt = 200. Then you have a new record with transaction_amount of 70. Updating total_tx_amt is easy, as you can just add the 70 to the current result of 200, giving 270.
Updating mean_tx_amt isn’t so straightforward, as you need to know how many previous transactions there were. You can calculate it by dividing the total_tx_amt by mean_tx_amt to derive tx_count. In reality, it makes more sense to keep this denominator to calculate mean_tx_amt stored and up to date. Either way, it’s 8 and then incremented to 9 with the new transaction. The numerator is already stored in total_tx_amt and then the updated avg_tx_amt is 30.
It’s easy to extend this pattern to reducing a micro-batch of new transactions. The query reduction process could run on the micro-batch, generating the results and metadata in the same format as the main query result. Then the metadata for both, like tx_count, is aggregated appropriately and the new result is calculated.
As you can see, even simple calculations like mean() can become complicated when doing aggregations on a native streaming query. Things like calculating count(distinct user_id) become much more difficult. Like I said, there is no magic - in order to do this on streaming data, you would need to store a list of the previously seen user_ids. Then compare the user_ids in the new data with them, to see if the count should be incremented or not. If the user_id has high cardinality, then this can become impractical, unless the time window of the query is short.
As you can imagine, window functions can become even more difficult. Anything involving sequence is more or less impossible in a streaming query, using the pattern above. Partitioning for functions like sum() are possible, but more metadata is required to be stored - for each partition. Joins aren’t practical apart from on very small data, which then ends up working like the count(distinct user_id) example given above. Rather than storing and maintaining a generated dataset to do comparisons, one is acquired from elsewhere. Things like complex CTEs and subqueries would also be impractical.
Where does the semantic layer come in? As I’ve written about before: the semantic layer abstracts entities, attributes and metrics from the logical data structure. So, in this instance, the definition of mean_transaction_amount as mean(transaction_amount) and what stream of data to calculate it on... would be defined in the semantic layer. When the semantic layer has a request, it could compile the request to a new streaming query, a batch query on the equivalent long term data store for the data stream (typically a data lake) or a combination of the two.
Where the semantic layer generates a new streaming query, this acts as a cache for the result and is continuously updated. This is different to how a semantic layer may store the results of a batch query in its own cache for performance and cost savings. These cached results from batch queries become out of date when the underlying data changes. It would be ideal for a semantic layer to be able to configure the reduction cadence (for how long/to what size does the micro-batch accumulate before being reduced into the main result - this should remind you of Snowflake’s dynamic table feature from last week) for streaming queries.
The lifetime of streaming queries would need to be configurable. Streaming queries need to be stopped from running rather than being run when wanted. Semantic layer caching for batch queries tends to have invalidation periods, typically 24 hours - an equivalent for streaming would be lifetime. You could imagine wanting a streaming query to only live during the hours that a business operates. So, it could also be good to have set hours for a specific query to live.
What are the benefits of using the semantic layer with streaming data like this? As you might expect, there are most of the benefits of using the semantic layer on batch data that I have described previously in this substack.
The simplicity of the cache remaining in the streaming data warehouse and staying fresh is an extra boon. However, the limitations that native streaming queries have, as described above, remain.
The semantic layer could also prevent duplication of streaming queries. An equivalent request would be pointed to an existing running query. It may be the case that streaming data warehouses would prevent duplicate queries running, anyhow.
Many streaming use cases don’t require all the complexity of calculation that a batch data warehouse allows. Calculating restaurant end of service: tips, revenue, inventory... would be very straightforward - referring back to SpotOn’s use case from last week.
If the cache for a semantic layer was advanced enough to become an additive cache, where new data is reduced into existing results, then the semantic layer could convert any data warehouse into a streaming data warehouse. Thus providing functionality similar to Snowflake dynamic tables. The semantic layer cache could also reduce data from a stream, like Kafka or PubSub, into the cached results of a batch query from Snowflake, BigQuery or Databricks. This is then federation of querying across streaming and batch.
Send me a message if you want to discuss or have any questions!