Putting it all together – Configurable TaskSchedulers that can be changed by the user at runtime

This will be my last post about TaskSchedulers and TaskFactories. It will be easier to understand this post if you have read my previous entries on implementing a custom TaskScheduler, but that is not a requirement, as all the relevant code will be included here.

Again, the source code for the whole solution is here.

Using what I have learned recently with regard to TaskSchedulers and TaskFactories, I decided to go one step further, and create a couple of different schedulers that can be changed at runtime by the user.

I’ll do this backwards for a change… Below is the end result of today’s post. (The part the user can see; that is the UI to configure the schedulers.)

I’m not going to explain either how I connected the settings with a property grid again, or how the context menu allows resetting the connected item’s properties, but if you haven’t read my two posts on that topic, you can find them here: Part 1 and Part 2.TaskUserSettings

I’ve added a new class diagram to the solution because I think it makes this code easier to visualize. Here it is:

TaskSchedulers

I took what I have learned in writing a custom TaskScheduler type, and defined an abstract class that can be used to derive custom schedulers that schedule their tasks to be run in parallel. The IParallelInvoker interface is unnecessary, but it helps clarify that the important method is the abstract ParallelInvoke method on the base class. Derived classes must then implement it to define how their tasks will be run in parallel.

I have derived two scheduler types.

  • The ParallelPoolThreadsTaskScheduler runs its tasks in parallel on the managed ThreadPool.
  • The ParallelDedicatedThreadsTaskScheduler creates Thread instances, and can be configured to either run them in the foreground or in the background.

I then created a custom TaskFactory class, the ParallelTaskFactory, with two properties introduced, UseThreadPool, to set which scheduler will be returned by the Scheduler property, and UseForegroundThreads, to configure whether the ParallelDedicatedThreadsTaskScheduler instance runs its threads in the foreground or background.

Note that the TaskFactory.Scheduler property is not virtual, so it was necessary to use the new keyword to hide (and reimplement) the inherited property.

Here’s the code for the base scheduler type. (It will look very familiar if you’ve read my other posts on custom schedulers.)

using System.Collections.Generic;
using System.Linq;

namespace System.Threading.Tasks.Schedulers
{
    /// <summary>Base class for a TaskScheduler that, when processing work items, 
    /// groups them into batches, and processes the Tasks of each batch in parallel. 
    /// The <see cref="ParallelInvoke"/> method is abstract so that derived classes 
    /// must specify how they run the Tasks in parallel.</summary>
    public abstract class ParallelTaskSchedulerBase : TaskScheduler, IParallelInvoker
    {
        #region Fields

        [ThreadStatic]
        private static bool currentThreadIsProcessingItems;

        private int maxDegreeOfParallelism;

        private volatile int runningOrQueuedCount;

        private readonly LinkedList<Task> tasks = new LinkedList<Task>();

        #endregion Fields

        #region Constructors

        protected ParallelTaskSchedulerBase(int maxDegreeOfParallelism)
        {
            if (maxDegreeOfParallelism < 1)
                throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");

            this.maxDegreeOfParallelism = maxDegreeOfParallelism;
        }

        protected ParallelTaskSchedulerBase() : this(Environment.ProcessorCount) { }

        #endregion Constructors

        #region Properties

        public override int MaximumConcurrencyLevel
        {
            get { return maxDegreeOfParallelism; }
        }

        #endregion Properties

        #region Methods

        #region Public Methods

        public abstract void ParallelInvoke(Action<Task> action, params Task[] tasks);

        #endregion Public Methods

        #region Protected Methods

        protected override bool TryDequeue(Task task)
        {
            lock (tasks) return tasks.Remove(task);
        }

        protected override bool TryExecuteTaskInline(Task task,
            bool taskWasPreviouslyQueued)
        {
            if (!currentThreadIsProcessingItems) return false;

            if (taskWasPreviouslyQueued) TryDequeue(task);

            return base.TryExecuteTask(task);
        }

        protected override IEnumerable<Task> GetScheduledTasks()
        {
            var lockTaken = false;
            try
            {
                Monitor.TryEnter(tasks, ref lockTaken);
                if (lockTaken) return tasks.ToArray();
                else throw new NotSupportedException();
            }
            finally { if (lockTaken) Monitor.Exit(tasks); }
        }

        protected override void QueueTask(Task task)
        {
            lock (tasks) tasks.AddLast(task);

            if (runningOrQueuedCount < maxDegreeOfParallelism)
            {
                runningOrQueuedCount++;
                RunTasks();
            }
        }

        #endregion Protected Methods

        #region Private Methods

        private void RunTasks()
        {
            List<Task> taskList = new List<Task>();

            currentThreadIsProcessingItems = true;
            try
            {
                while (true)
                {
                    lock (tasks)
                    {
                        if (tasks.Count == 0)
                        {
                            runningOrQueuedCount--;
                            break;
                        }

                        var t = tasks.First.Value;
                        taskList.Add(t);
                        tasks.RemoveFirst();
                    }
                }

                if (taskList.Count > 0)
                {
                    var batches = taskList.GroupBy(
                        task => taskList.IndexOf(task) / maxDegreeOfParallelism);

                    foreach (var batch in batches)
                    {
                        ParallelInvoke(t => base.TryExecuteTask(t), batch.ToArray());
                    }
                }
            }
            finally { currentThreadIsProcessingItems = false; }
        }

        #endregion Private Methods

        #endregion Methods
    }
}

Here are the two derived custom schedulers:

using System.Collections.Generic;

namespace System.Threading.Tasks.Schedulers
{
    /// <summary>A TaskScheduler that, when processing work items, groups 
    /// them into batches, and processes the Tasks of each batch in parallel. 
    /// This <b>ParallelInvoke</b> implementation runs the work on background 
    /// threads, using the THreadPool.</summary>
    public sealed class ParallelPoolThreadsTaskScheduler : ParallelTaskSchedulerBase
    {
        /// <summary>Invoke the specified Tasks, executing the specified 
        /// Action for each on the ThreadPool in parallel.</summary>
        public override void ParallelInvoke(Action<Task> action, params Task[] tasks)
        {
            if (tasks == null) throw new ArgumentNullException("tasks");
            if (action == null) throw new ArgumentNullException("action");
            if (tasks.Length == 0) return;

            using (ManualResetEvent mre = new ManualResetEvent(false))
            {
                int remaining = tasks.Length;
                var exceptions = new List<Exception>();

                foreach (var task in tasks)
                {
                    ThreadPool.UnsafeQueueUserWorkItem(_ =>
                    {
                        try
                        {
                            action(task);
                        }
                        catch (Exception ex)
                        {
                            lock (exceptions) exceptions.Add(ex);
                        }
                        finally
                        {
                            if (Interlocked.Decrement(ref remaining) == 0) mre.Set();
                        }
                    }, null);
                }
                mre.WaitOne();
                if (exceptions.Count > 0) throw new AggregateException(exceptions);
            }
        }
    }
}
using System.Collections.Generic;

namespace System.Threading.Tasks.Schedulers
{
    /// <summary>A TaskScheduler that, when processing work items, groups 
    /// them into batches, and processes the Tasks of each batch in parallel. 
    /// This <b>ParallelInvoke</b> implementation runs the work on dedicated 
    /// threads, and can be configured to use foreground threads.</summary>
    public sealed class ParallelDedicatedThreadsTaskScheduler : ParallelTaskSchedulerBase
    {
        private bool useForegroundThreads;

        public bool UseForegroundThreads
        {
            get { return useForegroundThreads; }
            set { useForegroundThreads = value; }
        }

        /// <summary>Invoke the specified Tasks, executing the specified 
        /// Action for each on a dedicated thread in parallel.</summary>
        public override void ParallelInvoke(Action<Task> action, params Task[] tasks)
        {
            var exceptions = new List<Exception>();

            List<Thread> threads = new List<Thread>();

            foreach (var task in tasks)
            {
                var thread = new Thread(t =>
                {
                    try
                    {
                        action(task);
                    }
                    catch (Exception ex)
                    {
                        lock (exceptions) exceptions.Add(ex);
                    }
                }) { IsBackground = !UseForegroundThreads };
                thread.Start();
                threads.Add(thread);
            }

            foreach (var t in threads)
                t.Join();

            if (exceptions.Count > 0) throw new AggregateException(exceptions);
        }
    }
}

Notice the pattern used for exceptions. I took this from a MSDN Magazine article by Stephen Toub. It is very important to stick to this pattern, especially if you write libraries and other developers will end up using you code. That is, when you have several Tasks running, they should capture any exceptions in an AggregateException, so that one faulting Task doesn’t break the rest, and the calling code can choose to handle (and possibly suppress) the errors just by adding an exception handler and handling any AggregateException.

And lastly, here is the code for my static Tasks class, and my custom TaskFactory, which are both in one file. Note that I had declared all the scheduler types in the code as TaskScheduler types originally (I figured it makes better polymorphic sense), but changed them a few minutes ago because declaring them using the actual types shows the relationships properly on the class diagram.

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Schedulers;

namespace Romy.Core
{
    /// <summary>Set up some common options for async Tasks. The <b>TaskSchedulers.Parallel</b>
    /// scheduler references the static <b>Scheduler</b> property of this class. The value of
    /// <b>Factory.UseForeGroundThreads</b> defines which <B>ParallelTaskSchedulerBase</B> 
    /// implementation is used, and whether the underlying threads will be foreground threads 
    /// or background TheadPool threads.</summary>
    public static class Tasks
    {
        private static ParallelTaskFactory factory = new ParallelTaskFactory();

        public static ParallelTaskFactory Factory
        {
            get { return Tasks.factory; }
        }

        public static TaskScheduler Scheduler
        {
            get { return factory.Scheduler; }
        }

        public static Task Run(Func<Task> body)
        {
            return factory.StartNew(async () => await body()).Unwrap();
        }
    }

    /// <summary>A TaskFactory to define which TaskScheduler will 
    /// be used for most async Tasks in the application.</summary>
    public class ParallelTaskFactory : TaskFactory
    {
        private bool useForeGroundThreads;

        private bool useThreadPool;

        private static Lazy<ParallelDedicatedThreadsTaskScheduler> dedicatedThreadsScheduler = new Lazy<ParallelDedicatedThreadsTaskScheduler>(() => new ParallelDedicatedThreadsTaskScheduler());

        private static Lazy<ParallelPoolThreadsTaskScheduler> threadPoolScheduler = new Lazy<ParallelPoolThreadsTaskScheduler>(() => new ParallelPoolThreadsTaskScheduler());

        public ParallelTaskFactory()
            : base(CancellationToken.None,
                TaskCreationOptions.DenyChildAttach | TaskCreationOptions.HideScheduler,
                TaskContinuationOptions.ExecuteSynchronously, ThreadPoolScheduler) { }

        public bool UseForeGroundThreads
        {
            get { return useForeGroundThreads; }
            set
            {
                useForeGroundThreads = value;
                DedicatedThreadsScheduler.UseForegroundThreads = useForeGroundThreads;
            }
        }

        public bool UseThreadPool
        {
            get { return useThreadPool; }
            set { useThreadPool = value; }
        }

        public static ParallelDedicatedThreadsTaskScheduler DedicatedThreadsScheduler
        {
            get { return ParallelTaskFactory.dedicatedThreadsScheduler.Value; }
        }

        public new TaskScheduler Scheduler
        {
            get { return useThreadPool ? (TaskScheduler)ThreadPoolScheduler : (TaskScheduler)DedicatedThreadsScheduler; }
        }

        public static ParallelPoolThreadsTaskScheduler ThreadPoolScheduler
        {
            get { return ParallelTaskFactory.threadPoolScheduler.Value; }
        }
    }
}

I won’t include the user settings code, because it should be fairly obvious how to hook that up. (Just add two boolean type user settings, and change the values of the two relevant properties on the global TaskFactory when the user settings change. This then ripples through to setting/configuring the scheduler.)

As far as the performance of the code goes, I can’t see a difference. (I have not timed them.) Both schedulers work better than the default scheduler in my application, and I tend to stick with the one that uses the ThreadPool, which is in theory more resource-friendly. (It doesn’t have the overhead of creating threads all the time.)


If anyone does download the application and would like to compare the performance of the schedulers, the best place to do so (and also the only place where you can visually see the results of the schedulers directly) is when populating thumbnails. (Thumbnails that are not already cached by my application, and not in the Windows thumbnail cache.)

To clear my application’s cache, just run the ClearCache.cmd batch file from the root of the source directory.

To clear the Windows thumbnail cache, you have to run the CleanMgr application. It allows you to save sets of different options that it can run, called sage sets. I always have one set just to delete the Windows thumbnails, which I set up by running CleanMgr /sageset:2 and run by running CleanMgr /sagerun:2.

Having cleared the cached thumbnails, the results of the schedulers can be seen clearly when navigating directories with many JPG images or movie files, since the thumbnails are initially drawn with placeholders before their images are loaded.

Advertisements

About Jerome

I am a senior C# developer in Johannesburg, South Africa. I am also a recovering addict, who spent nearly eight years using methamphetamine. I write on my recovery blog about my lessons learned and sometimes give advice to others who have made similar mistakes, often from my viewpoint as an atheist, and I also write some C# programming articles on my programming blog.
This entry was posted in Programming and tagged , , , , , . Bookmark the permalink.

One Response to Putting it all together – Configurable TaskSchedulers that can be changed by the user at runtime

  1. luke says:

    Excellent blog post. Its useful information.

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s