TL;DR – This article became so long that I decided to break it down in 2 parts. This is part 1 where I present Dataflow in terms of the capabilities of their elements (Dataflow blocks).
Concurrency is hard.
It’s hard because it’s easy to end up in situations where the results of the execution of your concurrent code are different depending on the order in which your code is executed.
The worst part of this is that some of these errors are very hard to reproduce because they might happen so infrequently that you won’t be able to consistently reproduce them.
There are language features like synchronization primitives (e.g. the lock
keyword) whose purpose is enable concurrent code to run in a predictable manner.
However they are hard and tedious to get right.
Recently, I’ve been working on ElectronCgi and I had to add the ability to handle requests concurrently.
ElectronCgi allows a Node.js process to interact with a .Net Core process by sending it requests that the .Net Core process handles. This is especially useful if you want to create a cross-platform application using .Net Core that has a graphical user interface. You can build the GUI using Electron (using HTML, CSS and Javascript) while having the logic written in .Net.
One limitation of the first version of ElectronCgi.DotNet
was that each request would only be served after the previous request’s response had been sent to Node.js
.
While researching what would be the best way to enable ElectronCgi
to serve requests concurrently I stumbled upon the Task Parallel Library’s Dataflow library.
Dataflow allowed me to enable concurrent requests on ElectronCgi
without using a single lock statement. Also, it will probably make some future features easier to implement and test.
Even though on it’s surface Dataflow seems very simple to use, some features in it surprised me enough times that I felt I needed to write this blog post. So here it is.
What is Dataflow
Dataflow is a library (Nuget package System.Threading.Tasks.Dataflow) where you can connect “blocks” to each in order to create a pipeline (or graph). There are different types of blocks that provide different functionality and that can operate concurrently.
When data arrives at one block it is processed and can then be sent to other blocks that are linked to it.
Here’s a simple example:
In this made up example we can see that there are different types of “blocks” (TransformBlock
, BroadcastBlock
and ActionBlock
) with different options (EnsureOrdered
, MaxDegreeOfParallelism
) that are connected together (Linked
) and that there might be conditions associated with the links between those blocks.
Although it’s never mentioned in the documentation I imagine that the elements of Dataflow are called blocks exactly because they lend themselves well to being drawn in the manner of the example above.
Types of blocks
Even though the official documentation for Dataflow splits the types of blocks between execution, grouping and batching, I think it’s easier to start by what interfaces they implement.
ITargetBlock
You can think of a block that implements ITargetBlock<T>
as a block that can “receive” data of type T
. This means that you can call .Post(data)
on the block.
The simplest of blocks that implements ITargetBlock<T>
is ActionBlock<T>
.
Here’s an example:
var actionBlock = new ActionBlock<int>(async request =>
{
var result = await ComplicatedComputation(request);
Console.WriteLine(result);
});
actionBlock.Post(1);
actionBlock.Post(2);
actionBlock.Post(3);
actionBlock.Post(4);
Here we are sending the values 1,2,3,4 to the ActionBlock
which will then execute the ComplicatedComputation
for each of those values.
Even though this is a simple example it will allow us to describe quite a few intricacies of the Dataflow library.
The first one, that might come as a surprise, is that none of the code in the example will run concurrently. The ComplicatedComputation
will run for 1, then when that finishes, it will run for 2, then 3 and finally 4. If each value takes 5 seconds to complete, handling all of them will take 20 seconds.
Thankfully it’s really easy to change that. The ActionBlock
constructor takes a second argument of type ExecutionDataflowBlockOptions
.
ExecutionDataflowBlockOptions
has a property named MaxDegreeOfParallelism
which by default is 1, which means only one “message” at a time. We can change that to some other value, for example 8, meaning that there will be at most 8 tasks running concurrently.
Alternatively you can specify the constant DataflowBlockOptions.Unbounded
which doesn’t set an upper limit.
Here’s how that looks like:
var actionBlock = new ActionBlock<int>(async request =>
{
var result = await ComplicatedComputation(request);
Console.WriteLine(result);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
actionBlock.Post(1);
actionBlock.Post(2);
actionBlock.Post(3);
actionBlock.Post(4);
Now if each value takes 5 seconds to complete you’d get all the results in 5 seconds as they would all run at the same time.
Another important aspect that you should keep in mind when dealing with a target block is it’s capacity. You can think about capacity as the maximum number of the sum of messages that are being executed or are waiting to be executed (or are waiting to be sent to another block in the case of a source block) in a block.
To illustrate that let’s rewrite the example above with a BoundedCapacity of 2 (BoundedCapacity is the property name used to define the capacity of a block).
var actionBlock = new ActionBlock<int>(async request =>
{
var result = await ComplicatedComputation(request);
Console.WriteLine(result);
}, new ExecutionDataflowBlockOptions
{
BoundedCapacity = 2
});
actionBlock.Post(1); // goes into execution
actionBlock.Post(2); // gets in the block and waits in a queue for the first request to finish
actionBlock.Post(3); // is rejected and will never be executed
actionBlock.Post(4); // same as above
3 and 4 above get rejected because at the time .Post
is called the action block won’t have had time to deal with 1 and 2.
This is a good time to mention that the Post method returns a boolean that will be true if the request is accepted and false otherwise.
This might not be ideal if you want to make sure that all requests are processed.
Thankfully there’s another method that you can use called SendAsync
that returns Task<bool>
. Using this method you can wait until the block is ready to accept your request. When the block eventually accepts the request, the Task
resolves with true
. If for some reason the block can’t accept the request (e.g. there was an exception), the Task
will resolve with false
. Here’s how the example above looks like with it:
var actionBlock = new ActionBlock<int>(async request =>
{
var result = await ComplicatedComputation(request);
Console.WriteLine(result);
}, new ExecutionDataflowBlockOptions
{
BoundedCapacity = 2
});
await actionBlock.SendAsync(1); // is processed immediately
await actionBlock.SendAsync(2); // is accepted immediately, goes into a queue "inside" the ActionBlock
await actionBlock.SendAsync(3); // blocks waiting for one of the previous request to finish, when processed throws exception
var wasAccepted = await actionBlock.SendAsync(4); // wasAccepted will be false
In the example above request 3 (throws exception) and 4 illustrate that if there’s an exception .SendAsync
will be false.
Speaking of waiting for things to finish, ITargetBlock<T>
has a property named Completion
of type Task
that you can use to be notified of when the target block finishes.
There are two scenarios that will cause the Completion
Task to complete. One is explicitly calling the Complete
method on the target block and the other is when there’s an exception thrown when handling a request. Here’s an example:
var actionBlock = new ActionBlock<int>(async request =>
{
var result = await ComplicatedComputation(request);
Console.WriteLine(result);
});
actionBlock.Post(1);
actionBlock.Post(2);
actionBlock.Complete();
var wasAccepted = actionBlock.Post(3); //wasAccepted will be false
actionBlock.Complete();
await actionBlock.Completion; //blocks here until request 1 and 2 complete
Now imagine that request 2 throws an InvalidOperationException
:
var actionBlock = new ActionBlock<int>(async request =>
{
var result = await ComplicatedComputation(request);
Console.WriteLine(result);
});
actionBlock.Post(1);
actionBlock.Post(2); //throws InvalidOperationException
var wasAccepted = actionBlock.Post(3); //wasAccepted will be true (it's in the queue because BoundedCapacity's default value is Unbounded)
actionBlock.Complete();
try
{
await actionBlock.Completion; //blocks here until request 2 throws exception
}
catch (Exception ex)
{
Console.WriteLine(ex.GetType().Name); //Prints InvalidOperationException
}
ISourceBlock
A source block is a Dataflow block that implements the ISourceBlock<T>
interface. You can think of a source block as a block that is a source of data.
However, because that data must get to the block somehow, all source blocks in Dataflow are also target blocks.
The important think to keep in mind about source blocks is that they can be linked to other blocks.
The most common type of source block is probably the TransformBlock<TInput, TOutput>
. A transform block receives an instance of TInput
and produces a TOutput
.
Here’s an example where we have a TransformBlock<Request, Response>
that links to an ActionBlock<Response>
:
var transformBlock = new TransformBlock<Request, Response>(async request =>
{
var response = await ProcessRequest(request);
return response;
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var actionBlock = new ActionBlock<Response>(response =>
{
Console.WriteLine(response.RequestType + " " + (response.HasError ? "Request failed" : "Request was successful"));
});
transformBlock.LinkTo(actionBlock);
transformBlock.Post(new Request {RequestType = "request type 1"});
transformBlock.Post(new Request {RequestType = "request type 2"});
transformBlock.Post(new Request {RequestType = "request type 3"});
In this simple example instances of the class Request
are sent to the transform block that performs some operation on them and “sends” them to an action block that just prints to the console if the request failed or not.
In the previous section we saw that by default the target blocks only process one request at a time. Because we want our TransformBlock
to process requests in parallel we’ve set the MaxDegreeOfParallelism
to DataflowBlockOptions.Unbounded
in the TransformBlock
‘s ExecutionDataflowBlockOptions
.
The reason I’m highlighting this again here is that the way that the Responses
are sent to the linked action block will probably surprise you. Let me illustrate with an example.
Ordering
Imagine “request type 1” takes 5 seconds to be handled and “request type 2” and “request type 3” each take 1 second. It would be reasonable to expect that you’d see output for “request type 2” and 3 after 1 second and for “request type 1” after 5 seconds. What will happen instead is that after 5 seconds you’ll see the output for “request type 1” 2 and 3 all come at the same time.
The reason for why this happens is that the default behavior is to preserve the order of the requests. What this means is that even though all requests will be processed concurrently, the response for the first one is the first to be sent.
Thankfully it’s very simple to turn off this behavior and have the responses be sent as soon as they are produced. To do it we just need to set the EnsureOrdered
property in ExecutionDataflowBlockOptions
to false
:
var transformBlock = new TransformBlock<Request, Response>(async request =>
{
var response = await ProcessRequest(request);
return response;
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
EnsureOrdered = false
});
var actionBlock = new ActionBlock<Response>(response =>
{
Console.WriteLine(response.RequestType + " " + (response.HasError ? "Request failed" : "Request was successful"));
});
transformBlock.LinkTo(actionBlock);
transformBlock.Post(new Request {RequestType = "request type 1"}); //takes 5 seconds to process
transformBlock.Post(new Request {RequestType = "request type 2"}); //takes 1 second to process
transformBlock.Post(new Request {RequestType = "request type 3"}); //takes 1 second to process
transformBlock.Complete();
actionBlock.Completion.Wait();
With EnsureOrdered=false
we will see output for “request type 2” and 3 after 1 second and then “request type 1” after 5 seconds.
You might have noticed that in this last example I called .Complete()
on the transform block and called .Wait()
on the action block’s Completion
Task.
PropagateCompletion
If you run the last example as is it will never actually complete. The reason for that is that the Complete
method is being called on the transform block and the the waiting is being done on the action block’s Completion
task.
Thankfully this behavior is configurable. When linking a source to target block we pass in a DataFlowLinkOptions
instance as the second argument to the LinkTo
function. In this case we want to set the PropagateCompletion
flag to true
:
transformBlock.LinkTo(actionBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
Now, when the Complete
method is called in the transform block the action block will also complete.
Another important feature of PropagateCompletion
is that it will also cause exceptions to be propagated. Imagine one of the requests throwns an InvalidOperationException
in the transform block and you wrapped your action block’s Completion
Task Wait
call in a try catch block:
try
{
actionBlock.Completion.Wait();
}
catch (AggregateException ex)
{
var flatAggregateException = ex.Flatten();
Console.WriteLine(flatAggregateException.InnerException.GetType().Name);
//will print InvalidOperationException
}
If you’ve never had to deal with AggregateException
or if you are wondering about me calling Flatten
I recommend reading this slightly outdated but still very relevant blog post: Task Exception Handling in .NET 4.5. Alternatively, if you async await
you can “catch” the InvalidOperationException
in the try/catch directly because await
unwraps AggregateException
and “gives you” the first exception in the aggregate.
Link predicates
In the last example in the action block we print to the console if the request “failed” or not (by checking the HasError
flag in the Response
that the transform block produces).
Imagine now that we only care about responses that don’t have errors. We simply want to discard Responses that have the HasError
flag set to true.
We can use an overload of LinkTo
that receives a predicate to achieve that:
var transformBlock = new TransformBlock<Request, Response>(async request =>
{
var response = await ProcessRequest(request);
return response;
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
EnsureOrdered = false
});
var actionBlock = new ActionBlock<Response>(response =>
{
Console.WriteLine("Response for request type " + response.RequestType);
});
transformBlock.LinkTo(actionBlock, new DataflowLinkOptions
{
PropagateCompletion = true
}, response => !response.HasError); // LinkTo only for Responses with HasError != true
transformBlock.Post(new Request {RequestType = "request type 1"}); //produces a response with HasError=true
transformBlock.Post(new Request {RequestType = "request type 2"});
transformBlock.Post(new Request {RequestType = "request type 3"});
transformBlock.Complete();
actionBlock.Completion.Wait();
There’s a problem with this example though. If there are requests that produce responses with HasError=true
the Completion
Task will never finish even if we have the PropagateCompletion
property set to true in the DataFlowLinkOptions
.
I have to make a small aside here and show you how that is described in the documentation:
Because each predefined source dataflow block type guarantees that messages are propagated out in the order in which they are received, every message must be read from the source block before the source block can process the next message. Therefore, when you use filtering to connect multiple targets to a source, make sure that at least one target block receives each message. Otherwise, your application might deadlock.
That’s not entirely accurate since we can set EnsureOrdered
to false
and the messages won’t be propagated in the order in which they are received but the part about the requirement of having at least a source receive the message is accurate. You can end up waiting for ever for a Completion
task to finish.
Thankfully this is very easy to solve, but for it to make sense you need to be aware of what is the behavior of a source block when it “links to” several target blocks (I’ll come back to this problem later, I promise).
Multiple LinkTo
s
A source block can be linked to several target blocks. The fist link will always be given preference over the second link. The second will always be given preference over the third, and so on.
Here’s an example where a transform block is linked to two action blocks, both with BoundedCapacity=1
and where the first action block takes 100ms to finish:
var transformBlock = new TransformBlock<Request, Response>(r => new Response
{
return await ProcessRequest(request);
});
var action1 = new ActionBlock<Response>(async response =>
{
Console.WriteLine("Action1: Received response for " + response.RequestType);
await Task.Delay(100); // <------------------
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1});
var action2 = new ActionBlock<Response>(response =>
{
Console.WriteLine("Action2: Received response for " + response.RequestType);
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1});
transformBlock.LinkTo(action1);
transformBlock.LinkTo(action2);
transformBlock.Post(new Request {RequestType = "request 1"}); //handled by action1
transformBlock.Post(new Request {RequestType = "request 2"}); //handled by action2
transformBlock.Post(new Request {RequestType = "request 3"}); //handled by action2
await Task.Delay(150); //enough time for action1 to become free again
transformBlock.Post(new Request {RequestType = "request 4"}); //handled by action1
In this example “request 1” will be processed by action1
. Request 2
and Request 3
will be processed by action2
after which we are waiting for 150ms (enough time to make sure that action1
is free again). Request 4
goes back to being handled by action1
because that’s the target block that was linked first.
Why does this have to do with the previous section?
DataflowBlock.NullTraget
There’s a type of block named DataflowBlock.NullTarget<T>
that is a target block that just consumes messages and does nothing with them. It basically offers a way to discard messages.
Going back to the example where we specified a predicate (Response.HasError != false
) when we linked the transform block to the action block. We can now add a new LinkTo
a DataFlowBlock.NullTarget<Response>
to make sure that responses with error are consumed and we don’t end up in a situation where the transform block never completes, here’s that example with the addition of the DataFlowBlock.NullTarget<Response>
:
var transformBlock = new TransformBlock<Request, Response>(async request =>
{
var response = await ProcessRequest(request);
return response;
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
EnsureOrdered = false
});
var actionBlock = new ActionBlock<Response>(response =>
{
Console.WriteLine("Response for request type " + response.RequestType);
});
transformBlock.LinkTo(actionBlock, new DataflowLinkOptions
{
PropagateCompletion = true
}, response => !response.HasError); // LinkTo only for Responses with HasError != true
transformBlock.LinkTo(DataflowBlock.NullTarget<Response>());
transformBlock.Post(new Request {RequestType = "request type 1"}); //produces a response with HasError=true
transformBlock.Post(new Request {RequestType = "request type 2"});
transformBlock.Post(new Request {RequestType = "request type 3"});
transformBlock.Complete();
actionBlock.Completion.Wait();
This works because as we saw in the previous section the first LinkTo
is always given preference over the second. The only messages that will be sent to the second block (the NullTarget) are the ones that the first block can’t consume. In this case that happens only then Reponse.HasError = true
.
That’s the end of Part 1. Next week or so I’ll post Part 2 that will cover Buffering and Grouping. If you managed to read everything up to this point, thank you, and if you can let me know what you thought of it in the comments.