More thoughts on TaskSchedulers and TaskFactories

I would never have expected it, but it seems that in my application, the custom TaskScheduler I came up with the other day actually performs so well that I now use it as the default in most places. That is, I use a couple of schedulers where I need finer-grained control of the work being scheduled. Of course controlling the Task scheduling makes more sense in some places than others, and perhaps is not needed in many applications.

Since my software is shared, the point of this post is to try explaining how I have used the various TaskShedulers in the shared code, and how it has worked for me. I’m by no means an expert, but having started playing with async code around the 2nd Async CTP, I hope that my lessons learned may be of value to others.

Get the latest code here.

I wrote last time that you probably won’t normally need to use anything besides the default TaskScheduler, but if you find yourself in the situation where you have several concurrent asynchronous operations running, and you need to define explicitly what order they run in, or even if you occasionally need to pre-empt one already running asynchronous operation with another, then it makes sense to use different schedulers for different kinds of Tasks.

What can you get out of this?
Hopefully, it should prove to be a useful example of how different TaskSchedulers can be used for scheduling different work items concurrently

I’m using a few of the sample schedulers from the Microsoft sample code. That is:

  • QueuedTaskScheduler
  • WorkStrealingTaskScheduler
  • StaTaskScheduler
  • and my own ParallelTaskScheduler
  • Almost forgot… A UI-Synchronized TaskScheduler, via TaskScheduler.FromCurrentSynchronizationContext

The QueuedTaskScheduler runs on top of another scheduler, allowing you to then add priority queues. Since much of my application’s async work is to build thumbnails, I created three priority queues on top of my own ParallelTaskScheduler. (although I normally only use two of the queues)

I’ve written about this before, so won’t go into too much detail here, but the the main thing is, thumbnails are added in batches. I found that when using my own scheduler for this, it worked well, but the newest added batches somehow got higher priority than those already being added. (When adding thumbnails in several batches for hundreds or thousands of files at a time, this caused thumbnails added later, only visible if you scroll down in the application, to load their images before those at the top.)

It so happened that using a higher priority queue for thumbnails based on their “Showing” (where Showing is a property on the thumbnail control that returns true only when the control is both visible and its bounds are within the visible portion of the screen), solved the issue. That is, I thought it would not be enough (because only the first batch being added should be showing) but it worked out that it always causes all the controls being added to fetch their images and update in the correct order.

What happens now in the application when a user traverses a directory is:

  • Thumbnails are added in batches via the UI-synchronized scheduler. As the batches are added, each one’s Path property is set, which will cause the asynchronous loading of its image.
  • Setting the Path so soon would cause an issue if the thumbnails were added synchronously: They would not paint anything initially because they would already be loading their images. Adding using the UI-synchronized scheduler actually slows things down, but also allows them to paint placeholder images before they load their images. (unless the images are already cached)
  • The images are loaded in parallel batches in more or less the order most wanted. This is done efficiently. That is, it performs fairly well on my home machine, with only 1.5GB RAM and 2 processor cores, and absolutely flies on my work machine, with 8 cores and 8GB RAM.
  • In the meantime, if a user selects one or more files, a WorkStealingTaskScheduler is used to update the status strip with info on the selection. (Or a StaTaskScheduler if they clicked a video, because a call is using DirectShowLib.) Actually it probably doesn’t really matter what scheduler is used for this, as long as it is not the same one adding thumbnails. (which would leave the status strip without an update until after all the thumbnails finish loading their images.)

To help make the various schedulers accessible (except for the UI-synchronized one, which is inherited from my base Form), I created a class that contains properties of the various scheduler types, like so:

namespace System.Threading.Tasks.Schedulers
{
    public class TaskSchedulers : IDisposable
    {
        #region Fields

        private static volatile bool cleanupScheduled;

        private bool disposed;

        private QueuedTaskScheduler queued;

        private TaskScheduler highPriority;

        private TaskScheduler lowerPriority;

        private TaskScheduler normalPriority;

        private TaskScheduler sta = new StaTaskScheduler(Environment.ProcessorCount);

        private TaskScheduler workStealing = new WorkStealingTaskScheduler();

        private static readonly TaskSchedulers schedulers = new TaskSchedulers();

        #endregion Fields

        #region Constructors

        ~TaskSchedulers()
        {
            Dispose(false);
        }

        public TaskSchedulers()
        {
            queued = new QueuedTaskScheduler(Parallel, Environment.ProcessorCount);
            normalPriority = queued.ActivateNewQueue(1);
            highPriority = queued.ActivateNewQueue(0);
            lowerPriority = queued.ActivateNewQueue(2);

            if (!cleanupScheduled)
            {
                cleanupScheduled = true;
                System.Windows.Forms.Application.ApplicationExit += (sender, e) => schedulers.Dispose();
            }
        }

        #endregion Constructors

        #region Properties

        public TaskScheduler HighPriority
        {
            get { return highPriority; }
        }

        public TaskScheduler LowerPriority
        {
            get { return lowerPriority; }
        }

        public TaskScheduler NormalPriority
        {
            get { return normalPriority; }
        }

        public static TaskScheduler Parallel
        {
            get { return Romy.Core.Tasks.Scheduler; }
        }

        public TaskScheduler Queued
        {
            get { return queued; }
        }

        public TaskScheduler Sta
        {
            get { return sta; }
        }

        public TaskScheduler WorkStealing
        {
            get { return workStealing; }
        }

        public static TaskSchedulers Schedulers
        {
            get { return TaskSchedulers.schedulers; }
        }

        #endregion Properties

        #region Methods

        #region Public Methods

        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        #endregion Public Methods

        #region Protected Methods

        protected virtual void Dispose(bool disposing)
        {
            if (!disposed)
            {
                if (disposing)
                {
                    var cleanup = new Action<dynamic>(s =>
                    {
                        if (s != null)
                        {
                            var disposable = s as IDisposable;

                            if (disposable != null)
                                disposable.Dispose();

                            s = null;
                        }
                    });

                    cleanup(queued);
                    cleanup(sta);
                    cleanup(workStealing);
                }

                disposed = true;
            }
        }

        #endregion Protected Methods

        #endregion Methods
    }
}

The cleanup action above is unnecessary, of course. It’s a leftover from some code when I was playing around with the idea of easily disposing objects, without being sure they are disposable. Its usefulness is dubious…

I also use my ParallelTaskScheduler explicitly from some lower-level code. For example, when finding files, or when using anything in the UI that operates on all the files of a directory, I call ForEachAsync, an extension method from a sample by Stephen Toub. The original calls Task.Run. But I have changed my one as follows:

        /// <summary>A ForEachAsync implementation. Based on a sample in an article by Stephen Toub,
        /// <a href="http://blogs.msdn.com/b/pfxteam/archive/2012/03/05/10278165.aspx">
        /// Implementing a simple ForEachAsync, part 2</a>.</summary>
        /// <remarks>
        /// I've changed this from calling <b>Task.Run</b> to call <b>Task.Factory.StartNew</b> in order to have it use my 
        /// custom ParallelTaskScheduler rather than the default. (This was intended to be an experimental change, but I 
        /// decided to leave it like this.)</remarks>
        public static Task ForEachAsync<T>(this IEnumerable<T> source,
            int maxDegreeOfParallelism,
            Func<T, Task> body)
        {
            return Task.WhenAll(
                from partition in Partitioner.Create(source).GetPartitions(maxDegreeOfParallelism)
                select Task.Factory.StartNew(async () =>
                {
                    using (partition)
                        while (partition.MoveNext())
                            await body(partition.Current);
                }, CancellationToken.None,
                TaskCreationOptions.DenyChildAttach | TaskCreationOptions.HideScheduler,
                new ParallelTaskScheduler(maxDegreeOfParallelism)).Unwrap());
        }

        /// <summary>An asynchronous ForAsync implementation.</summary>
        /// <remarks>It simply creates an <b>Enumerable.Range</b> and wraps <b>ForEachAsync</b>.</remarks>
        public static Task ForAsync(int fromInclusive, int toExclusive, int maxDegreeOfParallelism, Func<int, Task> body)
        {
            return Enumerable.Range(
                fromInclusive, toExclusive).
                ForEachAsync(maxDegreeOfParallelism, async i => await body(i));
        }

I also recently added a global TaskFactory, so that it can contain convenience shortcut methods, similar to Task.Run, which I will be able to access via Tasks.Run, like so:

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Schedulers;

namespace Romy.Core
{
    public static class Tasks
    {
        private static TaskFactory factory = new TaskFactory(CancellationToken.None,
           TaskCreationOptions.DenyChildAttach | TaskCreationOptions.HideScheduler,
           TaskContinuationOptions.ExecuteSynchronously, Scheduler);

        private static readonly TaskScheduler scheduler = new ParallelTaskScheduler();

        public static TaskFactory Factory
        {
            get { return Tasks.factory; }
        }

        public static TaskScheduler Scheduler
        {
            get { return Tasks.scheduler; }
        }

        public static Task Run(Func<Task> body)
        {
            return factory.StartNew(async () => await body()).Unwrap();
        }
    }
}
Advertisements

About Jerome

I am a senior C# developer in Johannesburg, South Africa. I am also a recovering addict, who spent nearly eight years using methamphetamine. I write on my recovery blog about my lessons learned and sometimes give advice to others who have made similar mistakes, often from my viewpoint as an atheist, and I also write some C# programming articles on my programming blog.
This entry was posted in Programming and tagged , , , . Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s