ThreadPool Deviations - Thread Creation Exceeding SetMaxThreads

  • I have an intensive I / O operation.
  • I want only MAX out of 5 threads ever working simultaneously.
  • I have 8000 tasks to queue and complete.
  • Each task takes approximately 15-20 seconds to complete.

I looked at ThreadPool, but

        ThreadPool.SetMaxThreads(5, 0);

        List<task> tasks = GetTasks();

        int toProcess = tasks.Count;
        ManualResetEvent resetEvent = new ManualResetEvent(false);

        for (int i = 0; i < tasks.Count; i++)
        {
            ReportGenerator worker = new ReportGenerator(tasks[i].Code, id);
            ThreadPool.QueueUserWorkItem(x =>
            {
                worker.Go();
                if (Interlocked.Decrement(ref toProcess) == 0)
                    resetEvent.Set();
            });
        }

        resetEvent.WaitOne();

I can’t understand why ... my code runs more than 5 threads at a time. I tried setmaxthreads, setminthreads, but it continues to execute more than 5 threads.

What's happening? What am I missing? Should I do it differently?

thank

+5
source share
4 answers

A parallel task library can help you:

List<task> tasks = GetTasks();

Parallel.ForEach(tasks, new ParallelOptions { MaxDegreeOfParallelism = 5 }, 
  task => {ReportGenerator worker = new ReportGenerator(task.Code, id); 
           worker.Go();});

What does MaxDegreeOfParallelism do?

+3
source

SetMaxThreads , . 8 , 5 - , .

+3

, . ( , Java -ize )

, "" - , , , , .

, , , , . .

// New variable in your class definition
private int taskStackPointer;
private final static int MAX_THREADS = 5;

, . :

// Make sure that only one thread has access at a time
[MethodImpl(MethodImplOptions.Synchronized)] 
public task getNextTask()
{
    if( taskStackPointer < tasks.Count )
        return tasks[taskStackPointer++];
    else
        return null;
}

[taskStackPointer ++]. code, , " ". , .

:

public interface TaskDispatcher
{
     [MethodImpl(MethodImplOptions.Synchronized)] public task getNextTask();
}

ReportGenerator , :

public ReportGenerator( TaskDispatcher td, int idCode )
{
    ...
}

ReportGenerator, , td.getNextTask(), , , NULL.

, - : ( , )

taskStackPointer = 0;
for (int i = 0; i < MAX_THREADS; i++) 
{ 
    ReportGenerator worker = new ReportGenerator(this,id);
    worker.Go(); 
} 

, .

( , "[MethodImpl (MethodImplOptions.Synchronized)]" ... Java, #)

+1

8k , , :

List<task> tasks = GetTasks();

, , , , .

. , . , , ( ), ...

, , "" . .

, ...

, , BlockingCollection ( ConcurrentQueue) "" :

var parallelOptions = new ParallelOptions
{
    MaxDegreeOfParallelism = 5
};

Parallel.ForEach(collection.GetConsumingEnumerable(), options, x =>
{
    // Do work here...
});

MaxDegreeOfParallelism , , , .

:

forEach

BlockingCollection

+1

All Articles