TPL Dataflows LinkTo multiple users not working

I have a BufferBlock to which I send messages:

public class DelimitedFileBlock : ISourceBlock<string>
{
    private ISourceBlock<string> _source;
    _source = new BufferBlock<string>(new DataflowBlockOptions() { BoundedCapacity = 10000 });

    //Read a file
    While(!eof)
        row = read one row 
    //if consumers are slow, then sleep for a while
    while(!(_source as BufferBlock<string>).Post<string>(row))
    {
        Thread.Sleep(5000);
    }
}

This is a 5 GB file with 24 million lines.

Now I have a Target block that uses an ActionBlock:

public class SolaceTargetBlock : ITargetBlock<string>
       private ActionBlock<IBasicDataContract> _publishToSolaceBlock;

       public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, string messageValue, ISourceBlock<string> source, bool consumeToAccept)
    {
        //post to another block to publish
        bool success = _publishToSolaceBlock.Post(messageValue);

Now in the console application I specify:

 SolaceTargetBlock solaceTargetBlock1 = new SolaceTargetBlock("someparam", 
            new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });
 SolaceTargetBlock solaceTargetBlock2 = new SolaceTargetBlock("someparam", 
            new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });
 SolaceTargetBlock solaceTargetBlock3 = new SolaceTargetBlock("someparam", 
            new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });

 DelimitedFileBlock delimitedFileBlock = new DelimitedFileBlock(csvFileInfo);

I had limited capacity as 1 for testing.

and now I connect these three consumers with my source using LinkTo:

 delimitedFileBlock.LinkTo(solaceTargetBlock1);      
 delimitedFileBlock.LinkTo(solaceTargetBlock2);      
 delimitedFileBlock.LinkTo(solaceTargetBlock3);      

This refers to the Thread.Sleep (5000) statement after 10003 lines, and the message in the while loop always returns false.

, , LinkTo, solaceTargetBlocks, , , LinkTo BufferBlock. , . ?

+5
1

Post DataflowBlock<T> ( )

, ,

, ( , ).

, :

, , , Post, SendAsync, SendAsync.

, ( ) , , , .

, BoundedCapacity BufferBlock<T> < href= "http://msdn.microsoft.com/en-us/library/hh194684.aspx" > ActionBlock<TInput> - , :

  • BufferBlock<T> 10000; , 10 000 , (. ), (SendAsync , , ).

  • ActionBlock<TInput> 1, .

10000 + (1 * 3) = 10000 + 3 = 10,003

, .

MaxDegreeOfParallelism ExecutionDataFlowBlockOptions ActionBlock<TInput>.

ActionBlock<TInput> MaxDegreeOfParallelism 1; , , . , ActionBlock<T> , .

ActionBlock<TInput> , , MaxDegreeOfParallelism DataflowBlockOptions.Unbounded.

, - ActionBlock<TInput>, , , , .

- , , , MaxDegreeOfParallelism.

-, , BoundedCapacity.

, " , - "; , , , . , .

, , . :

// The source, your read lines will be posted here.
var delimitedFileBlock = new BufferBlock<string>();

// The Action for the action blocks.
Action<string> action = 
    s => { /* Do something with the string here. */ };

// Create the action blocks, assuming that
// action is thread-safe, no need to have it process one at a time
// or to bound the capacity.
var solaceActionBlock1 = new ActionBlock<string>(action,
    new ExecutionDataflowBlockOptions { 
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });
var solaceActionBlock2 = new ActionBlock<string>(action,
    new ExecutionDataflowBlockOptions { 
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });
var solaceActionBlock3 = new ActionBlock<string>(action,
    new ExecutionDataflowBlockOptions { 
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });

// Link everything.
delimitedFileBlock.LinkTo(solaceTargetBlock1);
delimitedFileBlock.LinkTo(solaceTargetBlock2);
delimitedFileBlock.LinkTo(solaceTargetBlock3);

// Now read the file, and post to the BufferBlock<T>:
// Note: This is pseudo-code.
while (!eof)
{
    // Read the row.
    string row = ...;

    delimitedFileBlock.Post(read);
}

, ActionBlock<TInput> , ( ), ( , MaxDegreeOfParallelism Unbounded):

// The source, your read lines will be posted here.
var delimitedFileBlock = new BufferBlock<string>();

// The Action for the action blocks.
Action<string> action = 
    s => { /* Do something with the string here. */ };

// Create the action blocks, assuming that
// action is thread-safe, no need to have it process one at a time
// or to bound the capacity.
var solaceActionBlock1 = new ActionBlock<string>(action,
    new ExecutionDataflowBlockOptions { 
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });

// Link everything.
delimitedFileBlock.LinkTo(solaceTargetBlock);

// Now read the file, and post to the BufferBlock<T>:
// Note: This is pseudo-code.
while (!eof)
{
    // Read the row.
    string row = ...;

    delimitedFileBlock.Post(read);
}
+8

All Articles