Event Stream Processing - Managing time series with core.async
Introduction
In my last post I went through the code for a data flow using a product stock level tracker by combining channels and maintaining state around the go-loop
In this post I want to show how to segregate data from a stream into time series.
In this use case we want to notify the supplier if we have a spike in demand. This spike example is fairly trivial but we will see that core.async can handle many millions of events per minute and provide fine grained time series with very little code.
Data first
As usual in this series we will outline the simple data we use in our model:
Timing - thinking out of the box
core.async
does not provide any models for managing time in the library.
The addition of this support however is relatively simple and surprisingly concise. Well, maybe not that surprising by now.
The development of this model was prompted by a short conversation with @jgdavey in the core.async channel on Slack
If you were to implement yourself, you could make a
go
that simply pulls from a channel, and adds to another as a[timestamp item]
pair, then finally pushes into an unboundedchan
that has a transducer that filters based on age of that timestamp.– Joshua Davey
I couldn’t get my head around the suggestion at first but I decided to give it a try - what could possibly go wrong? In the end mine was a different take on Joshua’s idea but it served as inspiration.
Oh, and Joshua has his own implementation which is interesting in its own right.
Timing orders
We saw this function in a previous example to generate infinite streams of data with time stamps so this satisfies the first part of generating pairs of data with a time / data tuple.
Timing windows
We have a multi-arity function for creating time windows. With one argument it will produce contiguous windows of X seconds. With two arguments the windows will overlap.
We use state on the go-loop
to maintain the interval contiguousness (I checked, yes that is the right word)
We also have a few contractual checks using {pre:
to ensure that the arguments are somewhat sane.
Time in Windows
There are many ways to skin this particular cat. But before I go on I just want to say that no cats were actually skinned.
I went away from Joshua’s concept and decided to place the data into the window by means of a vector of items.
We will see that this enables further aggregations based on the data accumulated in each window, which is a reasonably common requirement.
To complete the code, here is the function to generate the windows
Merging time and data
This function creates a go-loop
that combines data from channels on which the windows and time-stamped data are emitted.
When the data is incoming from the item-ch
the items are added to appropriate window(s) using add-timed-item-to-windows
and we recur
on the result.
When the data comes in from the window-ch
the new window is added to the list. We will show the maintain-active-windows
function shortly.
Finally, the transducer limits the output to closed, non-empty windows.
Time management
Again, no cats skinned but there are choices. In this case I chose to maintain the windows by tracking a boolean and then reaping the windows after a certain time threshold. Here is the code abstracted out into a small function:
I chose a retention period of 500ms on the basis that 1000ms is the minimum window size in this design. This way I am always guaranteed to clean up on each new time window.
It also means that there is some small additional time for catching stragglers although that is the limit of any effort to deal with that particular problem. In general stragglers are silently dropped.
Data management
The code for adding items to the windows is fairly boiler plate but presented here so that you can have a more complete view:
The thing I like about this code is that we have avoided creating global state.
Demo time
Aggregations
Here is a small general purpose aggegrator that take a function to operate on each window:
Summary
So that’s our final experiment with aggregating over time series.
The amount of code is pleasantly small and we can see many possibilities for playing with time series data.
The code is all on my GitHub.
Conclusions
This really has been me scraping the surface of core.async by trying to scratch a few itches. I found the model quite straightforward to use and extremely powerful. I will continue to experiment with the library and write up some samples as further revelations unfold!
And finally - Thanks!
Thanks for making it through, especially of you ploughed through the series!
I have a better understanding of core.async after writing this and I hope that’s true for you too!
Zing me or ping me if this was useful via Twitter or in the comments below.