My Parallel TaskScheduler revisited

Recall my post A configurable TaskScheduler that can be changed by the user at runtime

Yesterday I used ILSpy (I don’t use Reflector because it is no longer free) to disassemble the built-in ThreadPoolTaskScheduler class, just out of curiosity. Some code almost jumped out at me to smack me in the face! This class, which I presume is the default TaskScheduler, does one thing I forgot all about when it runs a Task. That is it checks if the Task’s TaskCreationOptions includes LongRunning, and if so, runs the respective Task on a dedicated thread.

Since my classes didn’t respect the LongRunning option and treated all tasks equally, some refactoring was in order. Now long running tasks are catered for, in a similar manner to the default scheduler.

You can download the code here.

To reiterate the summary of what this comes down to, backwards, just because I like the arse about face approach, the end result of this as far as the user is concerned is:

TaskSchedulerOptions

Above: RomyView application options dialog

And that means that normally my ParallelThreadPoolTaskScheduler is used, unless the user chooses not to use the ThreadPool, in which case the ParallelDedicatedThreadsTaskScheduler comes into play. Then, the behaviour can be further customized by choosing to use foreground threads. (And wondering why the application sometimes takes very long to exit. Hint: Even the most useless background task now runs as a foreground thread.)

To cater for long running tasks, I added a new method to the underlying interface that the base class implements. (With a signature almost identical to the existing method.)

namespace System.Threading.Tasks.Schedulers
{
    /// <summary>This is the interface implemented by <see cref="ParallelTaskSchedulerBase"/>. It declares <b>ParallelInvoke</b> 
    /// as abstract, forcing derived classes to implement it. <b>LongRunningTasksInvoke</b> is implemented as a virtual method;
    /// by default running long-running tasks on dedicated threads similarly to the default TaskScheduler. This may of course be 
    /// overridden.</summary>
    /// <remarks><b>ParallelDedicatedThreadsTaskScheduler</b> overrides <b>LongRunningTasksInvoke</b> to runs Tasks on dedicated 
    /// threads, setting each as a forground or background according to it's <b>UseForegroundThreads</b> property.</remarks>
    public interface IParallelInvoker
    {
        void ParallelInvoke(Action<Task> action, params Task[] tasks);

        void LongRunningTasksInvoke(Action<Task> action, Task[] tasks);
    }
}

The class diagram now looks like this:

TaskSchedulers

Next since all the methods have almost the same signature, and the implementation comes in two flavours; ThreadPool or Thread – I moved the relevant code into a helper class with a couple of extension methods.

using System.Collections.Generic;
using System.Linq;

namespace System.Threading.Tasks.Schedulers
{
    internal static class TasksExtensions
    {
        public static void InvokeActionOnTasksThreaded(this IEnumerable<Task> tasks, Action<Task> action, bool isBackground = true)
        {
            if (tasks == null) throw new ArgumentNullException("tasks");
            if (action == null) throw new ArgumentNullException("action");

            var exceptions = new List<Exception>();
            var threads = new List<Thread>();

            foreach (var task in tasks)
            {
                var thread = new Thread(t =>
                {
                    try { action(task); }
                    catch (Exception ex)
                    { lock (exceptions) exceptions.Add(ex); }
                }) { IsBackground = isBackground };

                threads.Add(thread);
                thread.Start();
            }

            foreach (var thread in threads)
                thread.Join();

            if (exceptions.Count > 0) throw new AggregateException(exceptions);
        }

        public static void InvokeActionOnTasksThreadPooled(this IEnumerable<Task> tasks, Action<Task> action)
        {
            if (tasks == null) throw new ArgumentNullException("tasks");
            if (action == null) throw new ArgumentNullException("action");

            var count = tasks.Count();
            if (count == 0) return;

            using (ManualResetEvent mre = new ManualResetEvent(false))
            {
                var remaining = count;
                var exceptions = new List<Exception>();

                foreach (var task in tasks)
                {
                    ThreadPool.UnsafeQueueUserWorkItem(_ =>
                    {
                        try { action(task); }
                        catch (Exception ex)
                        { lock (exceptions) exceptions.Add(ex); }
                        finally
                        {
                            if (Interlocked.Decrement(ref remaining) == 0)
                                mre.Set();
                        }
                    }, null);
                }

                mre.WaitOne();

                if (exceptions.Count > 0) throw new AggregateException(exceptions);
            }
        }
    }
}

I copied this part from my last post because I am lazy…

I took what I have learned in writing a custom TaskScheduler type, and defined an abstract class that can be used to derive custom schedulers that schedule their tasks to be run in parallel. The IParallelInvoker interface is unnecessary, but it helps clarify that the important method is the abstract ParallelInvoke method on the base class. Derived classes must then implement it to define how their tasks will be run in parallel.

I have derived two scheduler types.

  • The ParallelThreadPoolTaskScheduler runs its tasks in parallel on the managed ThreadPool.
  • The ParallelDedicatedThreadsTaskScheduler creates Thread instances, and can be configured to either run them in the foreground or in the background.

I then created a custom TaskFactory class, the ParallelTaskFactory, with two properties introduced, UseThreadPool, to set which scheduler will be returned by the Scheduler property, and UseForegroundThreads, to configure whether the ParallelDedicatedThreadsTaskScheduler instance runs its threads in the foreground or background.

Note that the TaskFactory.Scheduler property is not virtual, so it was necessary to use the new keyword to hide (and reimplement) the inherited property.

All that is still true… with the new virtual LongRunningTasksInvoke method added to the base class, and overridden by the ParallelDedicatedThreadsTaskScheduler, and all the implementations simplified thanks to the extension methods.

The base class now looks like this:

using System.Collections.Generic;
using System.Linq;

namespace System.Threading.Tasks.Schedulers
{
    /// <summary>Base class for a TaskScheduler that, when processing work items, 
    /// groups them into batches, and processes the Tasks of each batch in parallel. 
    /// The <see cref="ParallelInvoke"/> method is abstract so that derived classes 
    /// must specify how they run the Tasks in parallel.</summary>
    public abstract class ParallelTaskSchedulerBase : TaskScheduler, IParallelInvoker
    {
        #region Fields

        [ThreadStatic]
        private static bool currentThreadIsProcessingItems;

        private int maxDegreeOfParallelism;

        private volatile int runningOrQueuedCount;

        private readonly LinkedList<Task> tasks = new LinkedList<Task>();

        #endregion Fields

        #region Constructors

        protected ParallelTaskSchedulerBase(int maxDegreeOfParallelism)
        {
            if (maxDegreeOfParallelism < 1)
                throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");

            this.maxDegreeOfParallelism = maxDegreeOfParallelism;
        }

        protected ParallelTaskSchedulerBase() : this(Environment.ProcessorCount) { }

        #endregion Constructors

        #region Properties

        public override int MaximumConcurrencyLevel
        {
            get { return maxDegreeOfParallelism; }
        }

        #endregion Properties

        #region Methods

        #region Public Methods

        public virtual void LongRunningTasksInvoke(Action<Task> action, params Task[] tasks)
        {
            tasks.InvokeActionOnTasksThreaded(action, true);
        }

        public abstract void ParallelInvoke(Action<Task> action, params Task[] tasks);

        #endregion Public Methods

        #region Protected Methods

        protected override bool TryDequeue(Task task)
        {
            lock (tasks) return tasks.Remove(task);
        }

        protected override bool TryExecuteTaskInline(Task task,
            bool taskWasPreviouslyQueued)
        {
            if (!currentThreadIsProcessingItems) return false;

            if (taskWasPreviouslyQueued) TryDequeue(task);

            return base.TryExecuteTask(task);
        }

        protected override IEnumerable<Task> GetScheduledTasks()
        {
            var lockTaken = false;
            try
            {
                Monitor.TryEnter(tasks, ref lockTaken);
                if (lockTaken) return tasks.ToArray();
                else throw new NotSupportedException();
            }
            finally { if (lockTaken) Monitor.Exit(tasks); }
        }

        protected override void QueueTask(Task task)
        {
            lock (tasks) tasks.AddLast(task);

            if (runningOrQueuedCount < maxDegreeOfParallelism)
            {
                runningOrQueuedCount++;
                RunTasks();
            }
        }

        #endregion Protected Methods

        #region Private Methods

        private void RunTasks()
        {
            var taskList = new List<Task>();
            var longRunningTaskList = new List<Task>();

            currentThreadIsProcessingItems = true;
            try
            {
                while (true)
                {
                    lock (tasks)
                    {
                        if (tasks.Count == 0)
                        {
                            runningOrQueuedCount--;
                            break;
                        }

                        var t = tasks.First.Value;

                        if ((t.CreationOptions & TaskCreationOptions.LongRunning) != TaskCreationOptions.None)
                            longRunningTaskList.Add(t);
                        else
                            taskList.Add(t);

                        tasks.RemoveFirst();
                    }
                }

                if (longRunningTaskList.Count > 0)
                    LongRunningTasksInvoke(t => base.TryExecuteTask(t), longRunningTaskList.ToArray());

                if (taskList.Count > 0)
                {
                    var batches = taskList.GroupBy(
                        task => taskList.IndexOf(task) / maxDegreeOfParallelism);

                    foreach (var batch in batches)
                        ParallelInvoke(t => base.TryExecuteTask(t), batch.ToArray());
                }
            }
            finally { currentThreadIsProcessingItems = false; }
        }

        #endregion Private Methods

        #endregion Methods
    }
}

Here is the ParallelThreadPoolTaskScheduler. (It’s name also changed.)

namespace System.Threading.Tasks.Schedulers
{
    /// <summary>A TaskScheduler that, when processing work items, groups 
    /// them into batches, and processes the Tasks of each batch in parallel. 
    /// This <b>ParallelInvoke</b> implementation runs the work on background 
    /// threads, using the THreadPool.</summary>
    public sealed class ParallelThreadPoolTaskScheduler : ParallelTaskSchedulerBase
    {
        /// <summary>Invoke the specified Tasks, executing the specified 
        /// action for each on the ThreadPool in parallel.</summary>
        public override void ParallelInvoke(Action<Task> action, params Task[] tasks)
        {
            tasks.InvokeActionOnTasksThreadPooled(action);
        }
    }
}

And here’s the other one…

namespace System.Threading.Tasks.Schedulers
{
    /// <summary>A TaskScheduler that, when processing work items, groups 
    /// them into batches, and processes the Tasks of each batch in parallel. 
    /// This <b>ParallelInvoke</b> implementation runs the work on dedicated 
    /// threads, and can be configured to use foreground threads.</summary>
    public sealed class ParallelDedicatedThreadsTaskScheduler : ParallelTaskSchedulerBase
    {
        #region Fields

        private bool useForegroundThreads;

        #endregion Fields

        #region Properties

        public bool UseForegroundThreads
        {
            get { return useForegroundThreads; }
            set { useForegroundThreads = value; }
        }

        #endregion Properties

        #region Methods

        #region Public Methods

        public override void LongRunningTasksInvoke(Action<Task> action, params Task[] tasks)
        {
            tasks.InvokeActionOnTasksThreaded(action, !UseForegroundThreads);
        }

        public override void ParallelInvoke(Action<Task> action, params Task[] tasks)
        {
            InternalTasksInvoke(action, tasks);
        }

        #endregion Public Methods

        #region Private Methods

        /// <summary>Invoke the specified Tasks, executing the specified action for each 
        /// on a dedicated thread, in parallel. <b>UseForegroundThreads</b> determines 
        /// whether threads are run in the foreground or background.</summary>
        private void InternalTasksInvoke(Action<Task> action, Task[] tasks)
        {
            tasks.InvokeActionOnTasksThreaded(action, !UseForegroundThreads);
        }

        #endregion Private Methods

        #endregion Methods
    }
}

Last time I wrote this (and it still applies)

And lastly, here is the code for my static Tasks class, and my custom TaskFactory, which are both in one file. Note that I had declared all the scheduler types in the code as TaskScheduler types originally (I figured it makes better polymorphic sense), but changed them a few minutes ago because declaring them using the actual types shows the relationships properly on the class diagram.

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Schedulers;

namespace Romy.Core
{
    /// <summary>Set up some common options for async Tasks. The <b>TaskSchedulers.Parallel</b>
    /// scheduler references the static <b>Scheduler</b> property of this class. The value of
    /// <b>Factory.UseForeGroundThreads</b> defines which <B>ParallelTaskSchedulerBase</B> 
    /// implementation is used, and whether the underlying threads will be foreground threads 
    /// or background TheadPool threads.</summary>
    public static class Tasks
    {
        private static ParallelTaskFactory factory = new ParallelTaskFactory();

        public static ParallelTaskFactory Factory
        {
            get { return Tasks.factory; }
        }

        public static TaskScheduler Scheduler
        {
            get { return factory.Scheduler; }
        }

        public static Task Run(Func<Task> body)
        {
            return factory.StartNew(async () => await body()).Unwrap();
        }

        public static Task<T> Run<T>(Func<Task<T>> body, CancellationToken cancellationToken)
        {
            return factory.StartNew(async () => await body(), cancellationToken, TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness, Scheduler).Unwrap();
        }
    }

    /// <summary>A TaskFactory to define which TaskScheduler will 
    /// be used for most async Tasks in the application.</summary>
    public class ParallelTaskFactory : TaskFactory
    {
        private bool useForeGroundThreads;

        private bool useThreadPool;

        private static Lazy<ParallelDedicatedThreadsTaskScheduler> dedicatedThreadsScheduler = new Lazy<ParallelDedicatedThreadsTaskScheduler>(() => new ParallelDedicatedThreadsTaskScheduler());

        private static Lazy<ParallelThreadPoolTaskScheduler> threadPoolScheduler = new Lazy<ParallelThreadPoolTaskScheduler>(() => new ParallelThreadPoolTaskScheduler());

        public ParallelTaskFactory()
            : base(CancellationToken.None,
                TaskCreationOptions.DenyChildAttach | TaskCreationOptions.HideScheduler,
                TaskContinuationOptions.ExecuteSynchronously, ThreadPoolScheduler) { }

        public bool UseForeGroundThreads
        {
            get { return useForeGroundThreads; }
            set
            {
                useForeGroundThreads = value;
                DedicatedThreadsScheduler.UseForegroundThreads = useForeGroundThreads;
            }
        }

        public bool UseThreadPool
        {
            get { return useThreadPool; }
            set { useThreadPool = value; }
        }

        public static ParallelDedicatedThreadsTaskScheduler DedicatedThreadsScheduler
        {
            get { return ParallelTaskFactory.dedicatedThreadsScheduler.Value; }
        }

        public new TaskScheduler Scheduler
        {
            get { return useThreadPool ? (TaskScheduler)ThreadPoolScheduler : (TaskScheduler)DedicatedThreadsScheduler; }
        }

        public static ParallelThreadPoolTaskScheduler ThreadPoolScheduler
        {
            get { return ParallelTaskFactory.threadPoolScheduler.Value; }
        }
    }
}

And now my code is a bit fairer for long running tasks.

Advertisements

About Jerome

I am a senior C# developer in Johannesburg, South Africa. I am also a recovering addict, who spent nearly eight years using methamphetamine. I write on my recovery blog about my lessons learned and sometimes give advice to others who have made similar mistakes, often from my viewpoint as an atheist, and I also write some C# programming articles on my programming blog.
This entry was posted in Programming and tagged , , , . Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s