A custom TaskScheduler in C#

Yesterday I expressed my confusion around TaskSchedulers. I was struggling with wondering what happens when I have several different TaskScheduler implementations in my application. Actually, this is a non-issue.

A TaskScheduler is just a simple abstraction for a low-level object that runs your tasks. That is, it implements the logic to schedule the running of the underlying threads, and so on. Normally the default implementation, which intelligently decides whether to run tasks on the ThreadPool or inline (on the current thread), and whether to use work-stealing (which is a way of load balancing), is perfect. You only need to change it if you need lower-level control of the underlying threads; for example, you want them to run in a particular order, or you want some to have priority over others.

There is precious little documentation of TaskSchedulers online, so it turns out the easiest way to understand them is just to write one yourself. That is, implement the abstract TaskScheduler class. And that is exactly what I did today.

Introducing the ParallelTaskScheduler. I wrote this in a few minutes, plugged it into my application, replacing the scheduler I used for populating my cache files with this one, and it works like a charm.

And I kid you not… Thanks to the huge amount of high quality sample parallel programming code from Microsoft, this literally took only a few minutes to write. (There are several custom TaskScheduler implementations in the Parallel Extensions project, which is part of the samples linked to.)

All mine does is, it groups all the tasks queued to it into batches, the size of which is determined by the maxDegreeOfParallelism parameter passed to its constructor, then for each batch, runs its tasks on the ThreadPool in parallel. More detail below.

using System.Collections.Generic;
using System.Linq;

namespace System.Threading.Tasks.Schedulers
{
    /// <summary>Custom TaskScheduler that processes work items in batches, where 
    /// each batch is processed by a ThreadPool thread, in parallel.</summary>
    /// <remarks>
    /// This is used as the default scheduler in several places in this solution, by, 
    /// for example, calling it directly in <see cref="TaskExtensions.ForEachAsync"/>, 
    /// or by accessing the relevant property of the static <see cref="TaskSchedulers"/> 
    /// class.</remarks>
    public class ParallelTaskScheduler : TaskScheduler
    {
        [ThreadStatic]
        private static bool currentThreadIsProcessingItems;

        private int maxDegreeOfParallelism;

        private volatile int runningOrQueuedCount;

        private readonly LinkedList<Task> tasks = new LinkedList<Task>();

        public ParallelTaskScheduler(int maxDegreeOfParallelism)
        {
            if (maxDegreeOfParallelism < 1)
                throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");

            this.maxDegreeOfParallelism = maxDegreeOfParallelism;
        }

        public ParallelTaskScheduler() : this(Environment.ProcessorCount) { }

        public override int MaximumConcurrencyLevel
        {
            get { return maxDegreeOfParallelism; }
        }

        protected override bool TryDequeue(Task task)
        {
            lock (tasks) return tasks.Remove(task);
        }

        protected override bool TryExecuteTaskInline(Task task,
            bool taskWasPreviouslyQueued)
        {
            if (!currentThreadIsProcessingItems) return false;
            
            if (taskWasPreviouslyQueued) TryDequeue(task);

            return base.TryExecuteTask(task);
        }

        protected override IEnumerable<Task> GetScheduledTasks()
        {
            var lockTaken = false;
            try
            {
                Monitor.TryEnter(tasks, ref lockTaken);
                if (lockTaken) return tasks.ToArray();
                else throw new NotSupportedException();
            }
            finally { if (lockTaken) Monitor.Exit(tasks); }
        }

        protected override void QueueTask(Task task)
        {
            lock (tasks) tasks.AddLast(task);

            if (runningOrQueuedCount < maxDegreeOfParallelism)
            {
                runningOrQueuedCount++;
                RunTasks();
            }
        }

        /// <summary>Runs the work on the ThreadPool.</summary>
        /// <remarks>
        /// This TaskScheduler is similar to the <see cref="LimitedConcurrencyLevelTaskScheduler"/> 
        /// sample implementation, until it reaches this method. At this point, rather than pulling 
        /// one Task at a time from the list, up to maxDegreeOfParallelism Tasks are pulled, and run 
        /// on a single ThreadPool thread in parallel.</remarks>
        private void RunTasks()
        {
            ThreadPool.UnsafeQueueUserWorkItem(_ =>
            {
                List<Task> taskList = new List<Task>();

                currentThreadIsProcessingItems = true;
                try
                {
                    while (true)
                    {
                        lock (tasks)
                        {
                            if (tasks.Count == 0)
                            {
                                runningOrQueuedCount--;
                                break;
                            }

                            var t = tasks.First.Value;
                            taskList.Add(t);
                            tasks.RemoveFirst();
                        }
                    }

                    if (taskList.Count == 1)
                    {
                        base.TryExecuteTask(taskList[0]);
                    }
                    else if (taskList.Count > 0)
                    {
                        var batches = taskList.GroupBy(
                            task => taskList.IndexOf(task) / maxDegreeOfParallelism);

                        foreach (var batch in batches)
                        {
                            batch.AsParallel().ForAll(task =>
                                base.TryExecuteTask(task));
                        }
                    }
                }
                finally { currentThreadIsProcessingItems = false; }

            }, null);
        }
    }
}

There are three abstract methods that must be implemented. The only really important one is QueueTask. All that my one does is:

  • Add the task to an internal list.
  • Remove and execute all the queued tasks at that moment in time.

The actual running of the tasks is dead simple. Break them into batches, and run them via Parallel.Foreach to process each batch in turn.

The other abstract methods are:

  • TryExecuteTaskInline: To execute a task on the current thread.
  • GetScheduledTasks: Return an enumerable of tasks queued to the scheduler.

My TryExecuteTaskInline implementation removes the task from the internal list as well, if necessary, and because of this, I also implemented TryDequeue, which is a virtual method. A pity really – I had wanted to use a ConcurrentBag<T> for my internal list, which would have made the processing of tasks random (since it’s unordered). I couldn’t do that though, since I then had no way of removing specific tasks from the queue.

Update (2013-06-26): This is the slightly optimized and improved version:

  • QueueTask: The initial version was a little too keen to start running the threads, which would cause frequent ThreadPool threads running only a single Task. That should only happen when either there is only one task being scheduled, or the number of tasks being divided into batches leaves a remainder of one.
     
  • GetScheduledTasks: This was implemented as…
        lock (tasks) return tasks.ToArray();
    But that could hang the debugger if a lock is owned by a frozen thread. Thus it now uses Monitor.TryEnter, the same as the Microsoft sample code.

This code performs surprisingly well, and I’m now using it in most places of my RomyView application where I specify a TaskScheduler.

 

As you can see, TaskSchedulers are quite simple beasts… My confusion yesterday was unwarranted because the TaskScheduler runs there and then, as you queue the Task to it. How it does so is completely up to you, if you implement your own.

About Jerome

I am a senior C# developer in Johannesburg, South Africa. I am also a recovering addict, who spent nearly eight years using methamphetamine. I write on my recovery blog about my lessons learned and sometimes give advice to others who have made similar mistakes, often from my viewpoint as an atheist, and I also write some C# programming articles on my programming blog.
This entry was posted in Programming and tagged , , . Bookmark the permalink.

Leave a comment