Queued Actions / Delegates for Asynchronous Execution

Is there anything within the framework that allows me to execute the delegate queue asynchronously?

I mean, I want delegates to execute one at a time in the order in which they are queued, but I want the whole process to run asynchronously. The queue is also not fixed, additional delegates will be added periodically and should be processed as soon as it reaches the top of the queue.

I do not need to use, in particular, Queuethis is exactly how I will describe the desired behavior.

I could write something myself to do this, but if there is something built-in, I could use instead, which would be better.

I looked briefly at ThreadPool.QueueUserWorkItem, because it allows you to execute in order, but may find a satisfactory way to prevent more than one execution at a time.

+3
source share
2 answers

Is there anything within the framework that would allow me to execute the delegate queue asynchronously?

I would use this as a custom task scheduler. You can then queue and run your delegates as tasks, which will give you all the benefits of handling, canceling, and async/awaithandling exceptions.

Implementing a task scheduler that your delegates will execute in sequential order is pretty simple using BlockingCollection. The following is a SerialTaskSchedulersimplified version of Stephen ToubStaTaskScheduler :

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Console_21628490
{
    // Test
    class Program
    {
        static async Task DoWorkAsync()
        {
            using (var scheduler = new SerialTaskScheduler())
            {
                var tasks = Enumerable.Range(1, 10).Select(i =>
                    scheduler.Run(() =>
                    {
                        var sleep = 1000 / i;
                        Thread.Sleep(sleep);
                        Console.WriteLine("Task #" + i + ", sleep: " + sleep);
                    }, CancellationToken.None));

                await Task.WhenAll(tasks);
            }
        }

        static void Main(string[] args)
        {
            DoWorkAsync().Wait();
            Console.ReadLine();
        }
    }

    // SerialTaskScheduler
    public sealed class SerialTaskScheduler : TaskScheduler, IDisposable
    {
        Task _schedulerTask;
        BlockingCollection<Task> _tasks;
        Thread _schedulerThread;

        public SerialTaskScheduler()
        {
            _tasks = new BlockingCollection<Task>();

            _schedulerTask = Task.Run(() =>
            {
                _schedulerThread = Thread.CurrentThread;

                foreach (var task in _tasks.GetConsumingEnumerable())
                    TryExecuteTask(task);
            });
        }

        protected override void QueueTask(Task task)
        {
            _tasks.Add(task);
        }

        protected override IEnumerable<Task> GetScheduledTasks()
        {
            return _tasks.ToArray();
        }

        protected override bool TryExecuteTaskInline(
            Task task, bool taskWasPreviouslyQueued)
        {
            return _schedulerThread == Thread.CurrentThread &&
                TryExecuteTask(task);
        }

        public override int MaximumConcurrencyLevel
        {
            get { return 1; }
        }

        public void Dispose()
        {
            if (_schedulerTask != null)
            {
                _tasks.CompleteAdding();
                _schedulerTask.Wait();
                _tasks.Dispose();
                _tasks = null;
                _schedulerTask = null;
            }
        }

        public Task Run(Action action, CancellationToken token)
        {
            return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this);
        }

        public Task Run(Func<Task> action, CancellationToken token)
        {
            return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
        }

        public Task<T> Run<T>(Func<Task<T>> action, CancellationToken token)
        {
            return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
        }
    }
}

Output:

Task #1, sleep: 1000
Task #2, sleep: 500
Task #3, sleep: 333
Task #4, sleep: 250
Task #5, sleep: 200
Task #6, sleep: 166
Task #7, sleep: 142
Task #8, sleep: 125
Task #9, sleep: 111
Task #10, sleep: 100
+4

TPL dataflow ActionBlock , Delegate . ActionBlock .

var block = new ActionBlock<Item>(_ => _.Action.DynamicInvoke(_.Paramters));

class Item
{
    public Delegate Action { get; private set; }
    public object[] Parameters { get; private set; }

    public Item(Delegate action, object[] parameters)
    {
        Action = action;
        Parameters = parameters;
    }
}

ActionBlock Action, :

var block = new ActionBlock<Action>(action => action());

block.Post(() => Console.WriteLine(message));
+2

All Articles