This is the second blog post on the Dataflow library. You can find the first one here.
If you are unfamiliar with Dataflow, it’s a library that allows you to write concurrent code without having to use low-level synchronization mechanisms such as locks.
In this blog post we’ll look at buffering and grouping dataflow blocks.
Buffering
The types of dataflow blocks we have been discussing up to this point (in Part 1) are called execution dataflow blocks.
Apart form ActionBlock<T>
and TransformBlock<TIn,TOut>
there’s also TransformManyBlock<TIn, TOut>
which is similar to a transform block, the only difference being that instead of the output being a single instance of TOut
it will be an IEnumerable<TOut>
.
The other two types of dataflow blocks are called buffering and grouping blocks.
In this section we’ll be looking at buffering blocks of which there are three: BufferBlock<T>
, BroadcastBlock<T>
and WriteOnceBlock<T>
.
BufferBlock
The typical usage of a buffer block is to hold messages until a consumer is ready to receive them.
Here’s how you can create a buffer block that can hold at most 5 messages of type Request
:
var bufferBlock = new BufferBlock<Request>(new DataflowBlockOptions { BoundedCapacity = 5 });
Since all the execution blocks we mentioned previously have a “buffering mechanism” built-in (through the use of the BoundedCapacity
property in ExecutionDataflowBlockOptions
), it’s hard to find examples of using a BufferBlock
that don’t feel a bit convoluted.
But there’s at least one scenario where it can be useful. When you have several source blocks (e.g. transform blocks) linked to a buffer block and that is then linked to several target blocks (e.g. action blocks). If all the target blocks are configured with BoundedCapacity=1
this can be a way to distribute the “load” from multiple producers to multiple consumers in a way that doesn’t require you to link every producer to every consumer. Here’s how that looks like visually:
Also with this setup you can add more producers and/or consumers without having to deal with linking them together. You can simply link new producers to the buffer block and link the buffer block to new consumers.
“Manual” use of a BufferBlock
One way to use a buffer block is to take advantage of the fact that all the operations in dataflow blocks are thread-safe, and use it to create a producer/consumer pattern.
Imagine you have a thread that is producing values that you want handled in another thread but you don’t want to deal with the issues of having a shared data structure (e.g. a List or even a thread-safe collection like ConcurrentBag<T>
) between those two threads.
In case you are wondering how this approach could be superior to using a member of System.Collections.Concurrent
(e.g. ConcurrentBag<T>
), here’s how: even thought the members of System.Collection.Concurrent
are thread safe, there’s no built-in mechanism in them where you can wait for a new value to arrive (and you probably want to do that in a consumer).
Here’s an example that illustrates how using a buffer block makes this scenario simple:
var bufferBlock = new BufferBlock<int>();
Task.Run(async () =>
{
bufferBlock.Post(1);
bufferBlock.Post(2);
await Task.Delay(5000); //consumer will idle here for 5 seconds freeing resources for other threads
bufferBlock.Post(3);
bufferBlock.Post(4);
bufferBlock.Complete(); //will cause OutputAvailableAsync to return false
});
var consumer = Task.Run(async () =>
{
while (await bufferBlock.OutputAvailableAsync()) //blocks here until data arrives or .Complete is called
{
var data = bufferBlock.Receive();
Console.WriteLine("Received: " + data);
}
});
consumer.Wait();
Using this approach there are no locks or convoluted ways of dealing with waiting for data to arrive at the consumer.
We wait for data to arrive by calling .OutputAvailableAsync()
on the buffer block which will asynchronously return true
when data arrives and false
when .Complete()
is called on the buffer block. It also has an overload where you can pass a cancellation token parameter.
Usually you wouldn’t see an example where the buffer block was explicitly shared like above. Instead you’d have the producer thread use the ITargetBlock<T>
and the consumer thread the ISourceBlock<T>
portions of BufferBlock<T>
, like this:
var bufferBlock = new BufferBlock<int>();
ITargetBlock<int> producer = bufferBlock;
ISourceBlock<int> consumer = bufferBlock;
Task.Run(async () =>
{
producer.Post(1);
producer.Post(2);
await Task.Delay(5000); //consumer will idle here for 5 seconds freeing resources for other threads
producer.Post(3);
producer.Post(4);
producer.Complete(); //will cause OutputAvailableAsync to return false
});
var consumerTask = Task.Run(async () =>
{
while (await consumer.OutputAvailableAsync()) //blocks here until data arrives or .Complete is called
{
var data = consumer.Receive();
Console.WriteLine("Received: " + data);
}
});
consumerTask.Wait();
BroadcastBlock and WriteOnceBlock
The other two types of buffering dataflow blocks are the WriteOnceBlock<T>
and the BroadcastBlock<T>
.
A broadcast block will send the messages it receives to all the blocks that it has links to. If more links are added from it to other blocks while the broadcast block “holds” a value it will send it to those new blocks.
To put this is more concrete terms imagine that a broadcast block is linked to block A. The broadcast block receives the value X (e.g. someone calls .Post
on it) and that value ends up being sent to block A.
After this, a new link is created between the broadcast block and block B. Block B is immediately sent value X, i.e. the broadcast block holds the values it receives and gives them to any blocks it’s linked to until it receives a new value (even if these links were created after the value was received by the broadcast block).
As soon as a broadcast block receives a new value it discards the old one.
Usually you would use a broadcast block if you want to send the same value to several other dataflow blocks without actually sending the value to each one of them individually.
The broadcast block also accepts as a parameter in its constructor, a cloning function (BroadcastBlock<T>(Func<T,T> cloningFunction)
), that allows you to create a copy of the object that you want to broadcast. It might be necessary if you have blocks that the broadcast block links to that mutate the value they receive. If that’s not the case you can pass null
as the cloning function.
The other type of buffering block is the WriteOnceBlock<T>
.
This type of block, as the name suggests, will accept only one value and reject all subsequent ones. Because it is sometimes used in scenarios where it is linked to other blocks it expects a cloning function (or null
if not required) to be passed as a parameter to its constructor.
A useful scenario for a WriteOnceBlock<T>
is one where you have multiple blocks “competing” against each other and you want the result from the one that is the quickest. Here’s how that looks like:
In the example depicted above we have an input that goes into a broadcast block and is then sent to two transform blocks which will be running two competing algorithms. The first one that produces a value will have that value accepted by the write once block, the slowest one will be rejected. We can then get the quickest value from the write once block.
Here’s how that looks like in code:
Task<TOutput> GetValueFromFastest<TInput, TOutput>(TInput input, Func<TInput, Task<TOutput>> algorithm1,
Func<TInput, Task<TOutput>> algorithm2)
{
var broadcastBlock = new BroadcastBlock<TInput>(null);
var transformBlock1 = new TransformBlock<TInput, TOutput>(algorithm1);
var transformBlock2 = new TransformBlock<TInput, TOutput>(algorithm2);
broadcastBlock.LinkTo(transformBlock1);
broadcastBlock.LinkTo(transformBlock2);
var writeOnceBlock = new WriteOnceBlock<TOutput>(null);
transformBlock1.LinkTo(writeOnceBlock);
transformBlock2.LinkTo(writeOnceBlock);
broadcastBlock.Post(input);
return writeOnceBlock.ReceiveAsync();
}
Grouping
The last type of dataflow blocks are called grouping blocks. Of this type there are the BactchBlock<T>
, the JoinBlock<T1,T2>
and the BatchJoinBlock<T1,T2>
.
BatchBlock
The most common grouping block is the BatchBlock<T>
which has a constructor that expects an int
parameter where you can set the batch size. For example this would create a batch block with a batch size of 5 Measure
s:
var batchBlock = new BatchBlock<Measure>(batchSize: 5);
I used the type Measure
in the example because I imagine that the most common use for this type of block is to accumulate a certain number of values (for example from a sensor) and then take action when a certain number of them (the batch size) is reached.
When you link a batch block to another dataflow block, the other dataflow block will “receive” an IEnumerable
of the batch block type, in this case IEnumerable<Measure>
.
Here’s an hypothetical example that prints the average of the last 5 measurements:
var batchBlock = new BatchBlock<Measure>(5);
var actionBlock = new ActionBlock<IEnumerable<Measure>>(measures =>
{
Console.WriteLine("The average value is: " + measures.Average(m => m.Value));
});
batchBlock.LinkTo(actionBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
batchBlock.Post(new Measure {Value = 1});
batchBlock.Post(new Measure {Value = 2});
batchBlock.Post(new Measure {Value = 3});
batchBlock.Post(new Measure {Value = 4});
batchBlock.Post(new Measure {Value = 5});
batchBlock.Complete();
Up until now the description we’ve made of how grouping blocks work has been pretty straightforward.
However, grouping blocks have two modes of operation: greedy and non-greedy. The default is greedy and the following two examples illustrates how it works:
var producer1 = new BufferBlock<int>(null);
var producer2 = new BufferBlock<int>(null);
var batchBlock = new BatchBlock<int>(2, new GroupingDataflowBlockOptions
{
Greedy = true // this is the default so you can omit
});
producer1.LinkTo(batchBlock);
producer2.LinkTo(batchBlock);
var actionBlock = new ActionBlock<IEnumerable<int>>(values =>
{
Console.WriteLine("The average value is: " + values.Average());
});
batchBlock.LinkTo(actionBlock);
producer1.Post(10);
producer1.Post(10);
Thread.Sleep(10); // time for the values to propagate
producer2.Post(20);
producer2.Post(20);
Here we are defining a batch value of 2 and linking two “producers”. The producers are buffer blocks that we are using solely to illustrate how the batch block behaves when there’s multiple blocks linked to it.
Through producer1
we send the value 10 twice and through producer2
we send the value 20 twice. If you run this snippet the action block will output:
The average value is: 10
The average value is: 20
That’s not surprising. We sent 10 two times from producer1
and that is enough “fill” a batch (of average 10), and then we send 20 two times and that makes the other batch.
What happens when we set the greedy flag to false
?
var batchBlock = new BatchBlock<int>(2, new GroupingDataflowBlockOptions
{
Greedy = false
});
We get this output:
The average value is: 15
The average value is: 15
What’s happening is that the batch block, when not in greedy mode, will only create batches using values from different sources.
So even though we sent 10 twice through producer1
only the first value will be reserved for the batch. When we send the value 20 thorough producer2
it will “enter” the batch making it a batch of 10 and 20, the average of which is 15. Same happens for the other two values for producer1
and producer2
.
If there are more data flow blocks linked to a non-greedy batch block than its batch size, the batch block will produce a value as soon as it has a batch-size-number of values from different data flow blocks linked to it.
If a non-greedy batch block is defined with a batch block larger than the number of data flow blocks linked to it, it will never produce a value.
JoinBlock
A join block offers two variants. One where the join block has two targets that can linked to by two dataflow blocks, and another that offers three (JoinBlock<T1, T2>
and JoinBlock<T1, T2, T3>
).
They both operate identically. When in greedy mode (the default) they will produce a tuple (Tupple<T1, T2>
or Tupple<T1, T2, T3>
) when they receive at least one value on each of the target properties they expose. Here’s an example:
var producer1 = new BufferBlock<int>();
var producer2 = new BufferBlock<int>();
var joinBlock = new JoinBlock<int, int>();
producer1.LinkTo(joinBlock.Target1);
producer2.LinkTo(joinBlock.Target2);
var actionBlock = new ActionBlock<Tuple<int, int>>(v =>
{
var (item1, item2) = v;
Console.WriteLine(item1 + ", " + item2);
});
joinBlock.LinkTo(actionBlock);
In the example above as soon as producer1
and producer2
have emitted at least one value each the join block will produce a Tuple<int, int>
with those values.
In non-greedy mode things get a little more involved. The join block will reserve values for each of its targets until it has a value for all. While the values are “reserved” they can be consumed by other data flow blocks, here’s an example that illustrates this:
var producer1 = new BufferBlock<int>();
var producer2 = new BufferBlock<int>();
var sinkActionBlock = new ActionBlock<int>(value => { Console.WriteLine("Action block consumed: " + value); });
var joinBlock = new JoinBlock<int, int>(new GroupingDataflowBlockOptions
{
Greedy = false
});
producer1.LinkTo(joinBlock.Target1);
producer2.LinkTo(joinBlock.Target2);
var printToConsoleActionBlock = new ActionBlock<Tuple<int, int>>(value =>
{
Console.WriteLine("Received value from join block: (" + value.Item1 + ", " + value.Item2 + ")" );
});
joinBlock.LinkTo(printToConsoleActionBlock);
producer1.Post(1);
producer1.Post(1);
producer2.Post(2);
Task.Delay(100).Wait(); //time for the join block consume the values
producer1.LinkTo(sinkActionBlock); //because the join block is non-greedy the second value from producer1 goes to the sink action block
producer2.Post(2); //this value will never be consumed by the join block
The output for the example is:
Received value from join block: (1, 2)
Action block consumed: 1
What this illustrates is that the join block, when not in greedy mode, will not consume the messages until it has messages available for all the targets.
This is what the output would look like with Greedy = true
:
Received value from join block: (1, 2)
Received value from join block: (1, 2)
BatchedJoinBlock
Finally, the batched join block. Event though its name suggests that it produces batches of the output of a join block, in reality this is not how this dataflow block behaves.
As a join block produces a tuple (e.g. Tuple<T1, T2>
for a JoinBlock<T1, T2>
) one might think that a batched join block produces a collection of tuples, e.g. IEnumerable<Tuple<T1, T2>>
.
This is not what a join block actually does, its result is instead Tuple<IList<T1>, IList<T2>>
. And the batch size is the sum of elements in IList<T1>
with IList<T2>
(and either list can be empty).
Also, the batched join block does not have a non-greedy mode.
An example will make things clearer.
var batchedJoinBlock = new BatchedJoinBlock<int, int>(3); //non-greedy not supported
var printToConsoleActionBlock = new ActionBlock<Tuple<IList<int>, IList<int>>>(result =>
{
foreach (var item1 in result.Item1)
{
Console.WriteLine($"From Target1: {item1}");
}
foreach (var item2 in result.Item2)
{
Console.WriteLine($"From Target2: {item2}");
}
Console.WriteLine("-------------");
});
batchedJoinBlock.LinkTo(printToConsoleActionBlock);
batchedJoinBlock.Target1.Post(1);
batchedJoinBlock.Target1.Post(2);
batchedJoinBlock.Target2.Post(3);
batchedJoinBlock.Target2.Post(4);
batchedJoinBlock.Target2.Post(5);
batchedJoinBlock.Target1.Post(6);
Here we are creating a batched join block with a batch size of three and sending 2 values to Target1
and one value to Target2
which will make up the first batch. Then we send three more values, this time 2 to Target2
and one to Target1
.
This is the ouput where you can see the two batches (separated by --------
):
From Target1: 1
From Target1: 2
From Target2: 3
-------------
From Target1: 6
From Target2: 4
From Target2: 5
And those are all the dataflow blocks from the System.Threading.Tasks.Dataflow library.
Conclusion
Dataflow is a library that allows creating concurrent code without requiring the use of low-level synchronization mechanisms such as locks. It’s easy to reason about because it lends itself well to be represented graphically.
It does have it’s share of idiosyncrasies but hopefully this blog post can help with that.