Event Stream Processing - Coding introduction with core.async
Introduction
In my initial post I outlined a small bare-bones order management system that could act as a motivation for Event Stream Processing.
In this post I want to show some code to implement how we may implement the processes around the Stock Tracker.
Data first
Let us just quickly outline the data in our model:
This is certainly a simple model but not necessarily useless and reduces the clutter around the examples.
Start with a printer
At the REPL we define a generic printer for data…
The go-loop
construct is to establish a go block with a forever loop. The loop will recur until the if-let
condition fails while running a read <!
on the channel.
Any attempt to read from a closed channel will result in a nil and cause the read to fail. So once we close off the channel the program will quietly end.
Next we create a channel with the chan
function and pass that to create a running instance of the printer.
The printer is now sitting in the background waiting on data from that open channel and we have our first core.async program up and running.
Let’s play with it a little at the REPL and then shut it down:
put!
and take!
are used for interacting with the channels outside of a go
block or a go-loop
.
OK, so far so good. We have a basic example so let’s look at the next program that will do something useful in the system…
Next - the end
The most trivial consumer in the diagram is the Supplier Notifier which is at the end of the stream process flow in this case.
That process listens on the Supplier Order channel and calls the supplier API.
Let’s fake up a call to the service…
Next let’s get our Supplier Order channel consumer defined so that it can invoke the API…
Nothing suprising there. We substitute the printing function with the call to the API.
Let’s get the channels created and hooked up:
The incredibly observant amongst you will have noticed that Order Reconciliation will also listen to the supplier order channel. So this time we have used mult
to create a multiple of the supplier-order-ch
.
mult
enables several clients to listen to the same channel when they call the tap
function on a new channel. tap
as the name implies copies the data verbatim from the original channel onto the new channel that we are using for our notifier.
Let’s give that a quick spin:
Warm up complete
So that’s the first edge of the system complete.
The interesting general point so far is that we have been able to develop and test this all out in the REPL.
We want to keep that up as we go forward to model the Stock Tracker:
In this case see our first transducer - the filter function that is attached to the channel. The filter here acts as a simple barrier on the output channel.
That’s it. Our Stock Tracker is complete. Let’s test it out…
Here we set up the channels in the same manner as our last case with the addition of our printer to have eyes on whether the filter is doing its job.
Let’s run a couple of test cases through:
In the second case our low water mark has not been breached so the data is dropped.
Streaming hot
Finally let’s hook up all of the components and run through the same test cases:
We don’t need the printer this time around since the notify function will report is invocation.
And here we see it flow through:
Summary
We have seen the stock data flow through a tracker which had a simple filter to ensure that the notifications occurred when a specific business rule was fired.
And this all happened in real time with minimal resource usage. Scaling down is often as equally important as scaling up.
The code is all on my GitHub.
Next next next
In the next post I show the Stock Management Tracker to combine data from more than one channel and maintain local state within the go block.
And finally - Thanks!
Thanks for making it through. I have a better understanding of core.async after writing this and I hope that’s true for you too!
Zing me or ping me if this was useful via Twitter or in the comments below.