Event Stream Processing - Managing state with core.async
Introduction
In my last post I went through the code for a data flow using a product stock level tracker. It had a simple filter to ensure that the notifications occurred when a specific business rule was fired.
In that post I showed the basic mechanisms for creating and connecting channels. In this post I want to show how to combine data from multiple streams. To demonstrate this we will implement the processes around the Stock Management Tracker.
In the tracker we will join the two streams to emit a running total of stock on hand vs orders at any point in time.
Data first
Let us just quickly outline the data in our model:
Once again this is certainly a simple model but is sufficient to demonstrate the power of the solution.
To help with the simulation we will throw together some sample data and create a few small functions to generate the orders and deliveries.
Parts data generation
Order and delivery generation
Here we generate infinite streams of data with time stamps. That will be used in later examples to aggregate data into time series.
They differ only in the way that orders are consistent and deliveries are random. This is not a perfect model of the real world (yeah, I know) but is good enough for this purpose.
Short demo
Blocking without Thread/sleep == SCAAAAALE!
One aspect to note in the above code is the timeout function. It reads from a virtual channel and effects a pause on the generation like Thread/sleep but does not block.
Not blocking means that it is OK to start 100s or 1000s of go blocks with very little CPU or RAM overhead. This is similar in effect to the way that NGINX is architected.
Don’t change that channel
I’m going to show this next function in two forms. The first form shows how to use conditionals based on channel
In this first showing of the code we see the ability to read many channels at once using alts! which receives the data and the channel that was read.
In this example we use condp to run differing code based on the channel that was read. In this case we set the method to adjust stock.
Re-Stating our intentions
The gap we have with the above code is that it will only ever report 0 or 1.
That might might be useful if have an external aggregator but we can also aggregate in the go-loop directly:
Using the parameters to the go-loop we can initiate state and then maintain it via recur.
In this case we create a set of parts and set each stock item count to 0 when our function starts. One can imagine more complex initialisations!
In the loop we then pass the stock collection to the modify-stock function which returns a new version of the stock collection.
Here we exploit the fact that Clojure collections are very efficient with respect to minimising the costs of each new version.
This stock list could easily be scaled to tens of thousands of parts without adding any latency costs (barring side effects of swapping although luckily we are no longer limited to 32k, 32Mb or even 32Gb of memory like in the good old days). For more information see this rich visual explanation from Jean Niklas L’orange.
Short Demo Two
Modify without side effects
To finalise the example here is the code for modify-stock which runs the provided function to obtain the new value using the existing count on the item.
One nice aspect of using Clojure’s set data structure is that we can use conj as a succinct upsert.
Summary
In this example we saw how to implement a stock level tracker. To achieve this functionality we read multiple channels in one go-loop and distinguished data from those channels. Further we maintained aggregations and state around the go-loop to increase power and simplicity.