How to use System.IO.Pipelines and System.Threading.Channels APIs to speed up processing
BlockingCollection involves blocking; there are no task-based APIs to perform work asynchronously. Channels is all about asynchrony; there are no synchronously-blocking APIs — Stephen Toub. Okay, enough talking. Show me the code!
Base Implementation
Let’s start from the base implementation (i.e., before using System.Threading.Channels APIs); the code is simply adding each line processing method to the Task list and await on all of them.
Note that code on this post is slightly adjusted for benchmarks purpose; eliminating unrelated parts as much as possible.
[Benchmark(Baseline=true)]
public async TaskProcessTasksAsync()
{
while (true) {
ReadResult result = await_reader.ReadAsync().ConfigureAwait(false);
ReadOnlySequence <byte> buffer= result.Buffer;
AddToTaskList(ref buffer);
_reader.AdvanceTo ( buffer.Start, buffer.End);
if (result.IsCompleted) break;
}
await Task.WhenAll(_tasks).ConfigureAwait(false);
await _reader.CompleteAsync().ConfigureAwait(false);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void AddToTaskList ( ref ReadOnlySequence <byte> buffer)
{
string str=null;
if (buffer.IsSingleSegment)
{
var span=buffer.FirstSpan;
int consumed;
while (span.Length>0)
{
var newLine = span.IndexOf(NewLine);
if (newLine==-1) break;
var line=span.Slice(0, newLine);
str = Encoding.UTF8.GetString(line);
// add to Task list
_tasks.Add ( ProcessLineCoreAsync ( new MyData
{
Content=str, No=++_counter
}
));
consumed=line.Length+NewLine.Length;
span=span.Slice(consumed);
buffer=buffer.Slice(consumed);
}
}
else
{
var sequenceReader = new SequenceReader <byte>(buffer);
while (!sequenceReader.End)
{
while (sequenceReader.TryReadTo
( out ReadOnlySequence <byte> line, NewLine))
{
str=Encoding.UTF8.GetString(line);
// add to Task list
_tasks.Add ( ProcessLineCoreAsync ( new MyData
{
Content=str, No = ++_counter
}
));
}
buffer=buffer.Slice(sequenceReader.Position);
sequenceReader.Advance(buffer.Length);
}
}
}
We can see the task list growing as we process the line.
Task list
Channel Writer (Producer)
Here is the producer part.
[Benchmark]
public async Task ProcessTasksUsingChannelAsync()
{
while (true)
{
ReadResult result =await _reader.ReadAsync().ConfigureAwait(false);
ReadOnlySequence <byte> buffer=result.Buffer;
WriteToChannel ( ref buffer);
_reader.AdvanceTo(buffer.Start, buffer.End);
if (result.IsCompleted)
break;
}
// mark the channel as being complete, meaning no more items will
be written to it
._channel.Writer.TryComplete();
// await the Task that completes when no more data will ever be
available to be read from this channel.
await _channel.Reader.Completion.ConfigureAwait(false);
// wait the ProcessLineCoreAsync to finish
await Task.WhenAll(_channelReaderTasks).ConfigureAwait(false);
await _reader.CompleteAsync().ConfigureAwait(false);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void WriteToChannel ( ref ReadOnlySequence <byte> buffer)
{
string str = null;
if (buffer.IsSingleSegment)
{
var span = buffer.FirstSpan;
int consumed;
while (span.Length>0)
{
var newLine = span.IndexOf(NewLine);
if (newLine == -1) break;
var line=span.Slice(0, newLine);
str = Encoding.UTF8.GetString(line);
// write to the channel
_=_channel.Writer.WriteAsync ( new MyData
{
Content = str, No = ++_counter
});
consumed=line.Length+NewLine.Length;
span=span.Slice(consumed);
buffer=buffer.Slice(consumed);
}
}
else
{
var sequenceReader = new SequenceReader <byte>(buffer);
while (!sequenceReader.End)
{
while (sequenceReader.TryReadTo
(out ReadOnlySequence <byte> line, NewLine))
{
str=Encoding.UTF8.GetString(line);
// write to the channel
_=_channel.Writer.WriteAsync ( new MyData
{
Content = str, No = ++_counter
}); }
buffer = buffer.Slice(sequenceReader.Position);
sequenceReader.Advance(buffer.Length);
}
}}
To help spot the changes quickly, here is the diff:
Base implementation vs Channel implementation
Should be pretty easy to spot the differences.
Channel Reader (Consumer)
Finally, the consumer part.
private const int ChannelReaderCount = 3;
public override void IterationSetup()
{
// ...
_channel=Channel.CreateUnbounded<MyData>( new
UnboundedChannelOptions()
{
SingleReader=ChannelReaderCount==1 });
for (var i=0; i < ChannelReaderCount; i++)
{
_channelReaderTasks.Add(DoProcessLine(async (s)
=>_=awaitProcessLineCoreAsync(s).ConfigureAwait(false)));
}
}
private async Task DoProcessLine(Func < MyData, Task <string>> func)
{
var channelReader = _channel.Reader;
await foreach
(varitem in channelReader.ReadAllAsync().ConfigureAwait(false))
{
_=await func(item).ConfigureAwait(false);
}
}
Here I defined 3 channel readers and set SingleReader to false (by evaluating ChannelReaderCount == 1). This way, we will have 3 consumers that will process the line concurrently. This can be observed from the Visual Studio Parallel Stacks window.
Parallel Stacks Window
Tune this value and measure until you get the best performance. Start small, increment the value until you reach to the point where it will give you slower results; it’s the point where you have too many active Task resources, possibly too many context switches.
Benchmarks Result
Okay, let’s see the benchmarks result.
Here is the gist version:
That’s the benchmarks result. What about my case? Well, I saved another 10 minutes, so about 20 minutes faster in total!
Conclusion
If you have huge text files in size containing hundreds of thousands of lines to be processed, consider to use System.IO.Pipelines for reading and parsing the lines, and combine it with System.Threading.Channels APIs to spread the workload concurrently, asynchronously.
Source: Medium
The Tech Platform
Comments