Speculative execution using TPL

I have List<Task<bool>>one that I want to list in parallel to find the first task with the result trueand not wait or watch for exceptions for any of the remaining tasks.

var tasks = new List<Task<bool>>
{ 
    Task.Delay(2000).ContinueWith(x => false), 
    Task.Delay(0).ContinueWith(x => true), 
};

I tried using PLINQ to do something like:

var task = tasks.AsParallel().FirstOrDefault(t => t.Result);

It is executed in parallel, but does not return as soon as it finds a satisfactory result. because access to the Result property is blocked. In order for this to work using PLINQ, I would have to write this serious expression:

var cts = new CancellationTokenSource();
var task = tasks.AsParallel()
    .FirstOrDefault(t =>
    {
        try 
        { 
            t.Wait(cts.Token);
            if (t.Result)
            {
                cts.Cancel();
            }

            return t.Result;
        } 
        catch (OperationCanceledException) 
        { 
            return false;
        }
    } );

I wrote an extension method that gives tasks as they are completed.

public static class Exts
{
    public static IEnumerable<Task<T>> InCompletionOrder<T>(this IEnumerable<Task<T>> source)
    {
        var tasks = source.ToList();
        while (tasks.Any())
        {
            var t = Task.WhenAny(tasks);
            yield return t.Result;
            tasks.Remove(t.Result);
        }
    }
}

// and run like so
var task = tasks.InCompletionOrder().FirstOrDefault(t => t.Result);

But it seems that this is something in common, that there is a better way. Suggestions?

+4
source share
3 answers

, - ?

var tcs = new TaskCompletionSource<Task<bool>>();

foreach (var task in tasks)
{
    task.ContinueWith((t, state) =>
    {
        if (t.Result)
        {
            ((TaskCompletionSource<Task<bool>>)state).TrySetResult(t);
        }
    },
        tcs,
        TaskContinuationOptions.OnlyOnRanToCompletion |
        TaskContinuationOptions.ExecuteSynchronously);
}

var firstTaskToComplete = tcs.Task;
+3

, Rx.Net. Linq .

LinqPad Microsoft Rx.Net.

using System
using System.Linq
using System.Reactive.Concurrency
using System.Reactive.Linq
using System.Reactive.Threading.Tasks
using System.Threading.Tasks

void Main()
{
    var tasks = new List<Task<bool>>
    { 
        Task.Delay(2000).ContinueWith(x => false), 
        Task.Delay(0).ContinueWith(x => true), 
    };

    var observable = (from t in tasks.ToObservable()
                      //Convert task to an observable
                      let o = t.ToObservable()
                      //SelectMany
                      from x in o
                      select x);


    var foo = observable
                .SubscribeOn(Scheduler.Default) //Run the tasks on the threadpool
                .ToList()
                .First();

    Console.WriteLine(foo);
}
+1

-, , PLINQ . Task , , - .

, Task, true, () IsCompleted:

var task = tasks.FirstOrDefault(t => t.IsCompleted && t.Result);

Task s, , . , true, . , Stephen Cleary AsyncEx.


, "" , .WithMergeOptions(ParallelMergeOptions.NotBuffered) PLINQ. , . , PLINQ Result .

+1
source

All Articles