A Multi-Threaded Background Processing Pipeline for .NET

home-windows

In this article I’ll be illustrating an architecture for a background processing pipeline in .NET.

I use a similar architecture as part of my PhD project to process large amounts of data coming from Kinect using Open CV on a background thread. In my application each frame contains approx 9MB of data and Kinect runs at a rate of 30 fps. That’s about 270 MB per second flowing through the pipeline!. This pipeline pattern allows that volume of data to be processed without impacting the UI thread or the rate that Kinect frames are received.

The pipeline code and a sample project can be found on GitHub at https://github.com/ginocoates/BackgroundPipeline.

Pipeline Classes

The diagram below shows the main classes in the pipeline. .NET Generics are used to allow the pipeline to process any kind of frame object. The pipeline uses BlockingCollection<T> which wraps a ConcurrentQueue<T> as the background queue. The BlockingCollection blocks the executing thread and waits for items to be added to the collection before allowing the thread to continue. The ConcurrentQueue allows thread safe access to the background queue from both the UI and background threads.

The pipeline holds a collection of modules implementing the IPipelineModule interface. At run-time, frames added to the background queue are passed to the Process method of each of the registered modules.

The pipeline also uses a PipelineTimer class to calculate some run-time metrics such as frames processed. Event notifications are sent to consumers periodically. E.g. this can be used to display the processing frame rate on screen.

Finally the pipeline raises a number of other useful run-time events:

  • FrameStart – The pipeline is just about to process a frame.
  • FrameComplete – The pipeline has completed processing a frame.
  • QueueComplete – The pipeline queue has been fully processed.

Classes

 

How it Works

The sequence diagram below shows the basic operation of the pipeline.

pipeline-basic

 

  • The BackgroundPipeline.Start method is called, which creates the BlockingCollection, starts the timer and creates the background thread.
  • Once created the thread loops, trying to take items from the queue until BlockingCollection.IsComplete is true. Inside the loop the BlockingCollection will block the thread until there is something to process. Once TryTake returns a frame the frame is passed to the Process method of each module registered with the pipeline.
  • The consuming application then calls BackgroundPipeline.Enqueue to add work to the pipeline. Any work added will unblock the background thread.
  • To end process the consuming application calls BackgroundPipeline.Stop which stops the timer and calls BlockingCollection.CompleteAdding. After this no more work can be added to the pipeline. The background thread will process any remaining frames and exit the loop.

Note that the background thread is created using TaskFactory.StartNew passing TaskCreationOptions.LongRunning to ensure that the scheduler creates a separate thread. The code to create the thread and process the items in the queue is shown below. Inside the loop we use the BlockingCollection.TryTake method with a timeout of infinite. This method blocks if the queue is currently empty:


processingTask = Task.Factory.StartNew(() =>
{
    while (FrameQueue != null && !FrameQueue.IsCompleted && !cancellationToken.IsCancellationRequested)
    {
        try
        {
            // blocks until frame is available
            T frame;

            if (FrameQueue.TryTake(out frame, -1, cancellationToken.Token))
            {
                ProcessFrame(frame);
            }
        }
        catch (OperationCanceledException)
        {
            Debug.WriteLine("Queue processing cancelled.");
        }

    }

    OnQueueComplete();

}, cancellationToken.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);

Sample Application

In the git repo I’ve provided a sample WPF application that gets data from a Kinect sensor and processes it using the background queue. The goal of the sample is to ensure that the background processing does not impact the performance of the UI or throughput of Kinect frames. The sample shows the FPS of the UI thread, the background pipeline, along with the number of frames still to be processed on screen.

benchmark-sample

The sample provides a dummy module that simulates a background load by looping for a pre-defined period of time. i.e. the Process method simply waits for 40 ms before continuing:


var end = DateTime.Now + TimeSpan.FromMilliseconds(40);
while (DateTime.Now < end) ;

This module also computes the interval between KinectFrames as they arrive.

When Stop is clicked the sample will create a report.csv file containing the UI and background calculated FPS.  The diagram below shows the main classes in use in the sample.

 

Sample

 

Note that the KinectFrame class stores the Kinect data using unmanaged code, and is therefore IDisposable. By doing this we can stave off expensive Gen2 garbage collection events when dealing with large amounts of data. Note that the sample targets the x64 platform so that we can allocate more than 2GB of memory at run-time.

Pipeline Performance

The chart below illustrates the performance of the sample app over 60 seconds. Metrics for the fps for Kinect (RenderFPS) and pipeline are shown, along with the number of frames yet to be processed in the queue. The Interval metric is the amount of time between Kinect frames being processed by the dummy module. This metric allows us to monitor if there are large gaps in the data which might indicate a performance problem when getting frames from Kinect. Total process memory used and number of Gen1 and Gen2 collections are also shown.

pipeline-performace-basic

As you can see the UI thread runs at a constant 30 fps, while the background pipeline is processing through the frames at 25 fps without impacting performanceof the UI. The interval between frames processed is also constant throughout the capture. This is important as it means we are not losing any Kinect frames due to performance bottlenecks in the background. Gen2 collections are also kept to a minimum and don’t impact performance of the application.

Conclusion

The pipeline presented allows background processing of intensive operations while not impacting UI thread performance. The pipeline uses .NET concurrent collections to achieve thread safe access to the queue and enable processing in the background. Intervals between Kinect frames in the queue were constant, indicating that the pipeline processing does not impact the performance of the thread interacting with Kinect.

For more information on the performance of .NET Concurrent Collections, see this MSDN article:

Performance of Concurrent Collections in .NET 4

The code for the pipeline and sample can be found on Github here: https://github.com/ginocoates/BackgroundPipeline.

If you want to know more about programming with the Kinect SDK, Read the docs or pick up this book on Amazon.com.

Kinect for Xbox One Sensor

Kinect for Xbox One Adapter