Parallel.Invoke - dynamically creating more "threads"

I educate myself on Parallel.Invoke and parallel processing in general for use in the current project. I need a push in the right direction to understand how you can dynamically / intelligently distribute more parallel "threads" as needed.

As an example. Say you are parsing large log files. This includes reading from a file, some sorting of returned rows, and finally writing to the database.

So, for me, this is a typical problem that could benefit from parallel processing.

Like a simple first pass, the following code implements this.

Parallel.Invoke(
  ()=> readFileLinesToBuffer(),
  ()=> parseFileLinesFromBuffer(),
  ()=> updateResultsToDatabase()    
);

Behind the scenes

  • readFileLinesToBuffer () reads each line and saves to the buffer.
  • parseFileLinesFromBuffer , , updateResultsToDatabase() .

, , \, , parseFileLinesFromBuffer() - , .

, ?

, , , "", , ... , TPL.

, , . , System.Threading.Tasks.TaskScheduler ?

+5
3

?

http://msdn.microsoft.com/en-us/data/gg577609.aspx

Rx - Microsoft, , :

Reactive Extensions (Rx)...... LINQ.

Nuget

https://nuget.org/packages/Rx-Main/1.0.11226

Rx, , , , , , .

, , , , Rx ( )

DTO,

class MyItem
{
    public string Line { get; set; }
    public int CurrentThread { get; set; }
}

, , Thread.Sleep, , Thread.CurrentThread.ManagedThreadId. , ProcessLine 4 ,

private IEnumerable<MyItem> ReadLinesFromFile(string fileName)
{
    var source = from e in Enumerable.Range(1, 10)
                 let v = e.ToString()
                 select v;

    foreach (var item in source)
    {
        Thread.Sleep(1000);
        yield return new MyItem { CurrentThread = Thread.CurrentThread.ManagedThreadId, Line = item };
    }
}

private MyItem UpdateResultToDatabase(string processedLine)
{
    Thread.Sleep(700);
    return new MyItem { Line = "s" + processedLine, CurrentThread = Thread.CurrentThread.ManagedThreadId };
}

private MyItem ProcessLine(string line)
{
    Thread.Sleep(4000);
    return new MyItem { Line = "p" + line, CurrentThread = Thread.CurrentThread.ManagedThreadId };
}

,

private void DisplayResults(MyItem myItem, Color color, string message)
{
    this.listView1.Items.Add(
        new ListViewItem(
            new[]
            {
                message, 
                myItem.Line ,
                myItem.CurrentThread.ToString(), 
                Thread.CurrentThread.ManagedThreadId.ToString()
            }
        )
        {
            ForeColor = color
        }
    );
}

, , , Rx API

private void PlayWithRx()
{
    // we init the observavble with the lines read from the file
    var source = this.ReadLinesFromFile("some file").ToObservable(Scheduler.TaskPool);

    source.ObserveOn(this).Subscribe(x =>
    {
        // for each line read, we update the UI
        this.DisplayResults(x, Color.Red, "Read");

        // for each line read, we subscribe the line to the ProcessLine method
        var process = Observable.Start(() => this.ProcessLine(x.Line), Scheduler.TaskPool)
            .ObserveOn(this).Subscribe(c =>
            {
                // for each line processed, we update the UI
                this.DisplayResults(c, Color.Blue, "Processed");

                // for each line processed we subscribe to the final process the UpdateResultToDatabase method
                // finally, we update the UI when the line processed has been saved to the database
                var persist = Observable.Start(() => this.UpdateResultToDatabase(c.Line), Scheduler.TaskPool)
                    .ObserveOn(this).Subscribe(z => this.DisplayResults(z, Color.Black, "Saved"));
            });
    });
}

, :

enter image description here

+4

async/wait, - :

public async Task ProcessFileAsync(string filename)
{
    var lines = await ReadLinesFromFileAsync(filename);
    var parsed = await ParseLinesAsync(lines);
    await UpdateDatabaseAsync(parsed);
}

var tasks = filenames.Select(ProcessFileAsync).ToArray(); (WaitAll, WhenAll .., )

0

BlockingCollection.

, producer,

while (true) {
    var data = ReadData();
    blockingCollection1.Add(data);
}

,

while (true) {
    var data = blockingCollection1.Take();
    var processedData = ProcessData(data);
    blockingCollection2.Add(processedData);
}

..

You can also let TPL handle the number of users using Parallel.Foreach

Parallel.ForEach(blockingCollection1.GetConsumingPartitioner(),
                 data => {
                          var processedData = ProcessData(data);
                          blockingCollection2.Add(processedData);
                 });

(note that you need to use GetConsumingPartitionernot GetConsumingEnumerable( here)

0
source

All Articles