Parallel.ForEach thread at the end

I am testing an application that should compile many projects / files.

I have a ConucrrentBag that should work with Parallel.

private readonly ConcurrentBag<string> m_files;

My call for parallel:

Parallel.ForEach(m_files, new ParallelOptions
            {
                MaxDegreeOfParallelism = MaxProcesses,

            }, currFile => ProcessSingle(currFile.ToString()));

Sum MaxProcess is LogicalCpu * 2.

When I compile 140 projects, until the end Parallel will start linearly fewer threads. At least the last 4 projects use only one thread. This is not nice, but everything is in order.

Now my problem is:

When I collect about 14000+ projects (this is COBOL-SOURCE ;-) and a really big system). Recent modules will not be compiled because Parallel.ForEach does not launch new themes for this. There is currently no live labor. But concurrentBag still has 140 elements.

Does anyone know how to solve this?

: . ( ) ...

:

ConcurrentBag , Parallel.ForEach.

, SingleProcess:

private void ProcessSingle(string item)
        {
            Monitor.Enter(lockingObj);
            if (m_files.TryTake(out item))
            {
                if (CompilingModules <= 0)
                {
                    OnQueueStarted(new EventArgs());
                }
                CompilingModules++;
                Monitor.Exit(lockingObj);
                OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.Done, ItemQueueObject.String));

                OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.Dequeued, ItemQueueObject.String));
                using (CobolCompiler compiler = new CobolCompiler())
                {
                    compiler.OutputDataReceived += (sender, e) => OnOutputDataReceived(e);
                    compiler.Compile(item);
                    Thread.Sleep(2000);
                    if (compiler.LinkFailure)
                    {
                        if (ObjWithoutDll.ContainsKey(item))
                        {
                            if (ObjWithoutDll[item] <= 2)
                            {
                                m_files.Add(item);
                                OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.Enqueued, ItemQueueObject.String));
                                ObjWithoutDll[item]++;
                            }
                            else
                            {
                                OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.LinkError, ItemQueueObject.String));
                                ObjWithoutDll.Remove(item);
                            }
                        }
                        else
                        {
                            ObjWithoutDll.Add(item, 0);
                            m_files.Add(item);
                            OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.Enqueued, ItemQueueObject.String));
                        }
                    }
                    else
                    {
                        if (compiler.DllExisting)
                        {
                            ObjWithoutDll.Remove(item);
                        }
                        OnQueueItemStateChanged(compiler.DllExisting ? new ItemQueueEventArgs(item, null, ItemQueueType.Done, ItemQueueObject.String) : new ItemQueueEventArgs(item, null, ItemQueueType.Failed, ItemQueueObject.String));
                    }

                }

                Monitor.Enter(lockingObj);
                CompiledModules++;
                if (CompiledModules % 300 == 0)
                {
                    Thread.Sleep(60000);
                }
                CompilingModules--;
                if (CompilingModules <= 0 && m_files.Count <= 0)
                {

                    try
                    {
                        Process prReschk = new Process();
                        FileInfo batch = new FileInfo(@"batches\reschkdlg.cmd");
                        if (!batch.Exists)
                        {
                            Assembly _assembly = Assembly.GetExecutingAssembly();
                            StreamReader _textStreamReader = new StreamReader(_assembly.GetManifestResourceStream(@"Batches\reschkdlg.cmd"));
                        }

                        if (!File.Exists(Config.Instance.WorkingDir + @"reschkdlg.exe"))
                        {
                            File.Copy(Config.Instance.VersionExeDirectory + @"reschkdlg.exe", Config.Instance.WorkingDir + @"reschkdlg.exe");
                        }

                        prReschk.StartInfo.FileName = @"cmd.exe";
                        prReschk.StartInfo.Arguments = @"/c " + batch.FullName + " " + Config.Instance.Version.Replace(".", "") + " " + @"*" + " " + Config.Instance.WorkingDir;
                        prReschk.StartInfo.CreateNoWindow = true;
                        prReschk.StartInfo.UseShellExecute = false;
                        prReschk.Start();
                        prReschk.Close();
                        prReschk.Dispose();
                    }
                    catch
                    {
                    }

                    OnQueueFinished(new EventArgs());
                }
            }
            Monitor.Exit(lockingObj);
        }

Codesnippet CobolCompiler:

public void Compile ( )       {

        file = file.ToLower();

        Process prCompile = new Process();
        Dir = Directory.CreateDirectory(c.WorkingDir + random.Next() + "\\");

        try
        {
            // First clean up the folder
            CleanUpFolder(true, file);

            // First set lock and copy all sources
            Monitor.Enter(lockingObj);
            if (filesToCopy == null)
            {
                CopySource(Dir.FullName);
            }
            Monitor.Exit(lockingObj);

            FileInfo batch = new FileInfo(@"batches\compile.cmd");
            if (!batch.Exists)
            {
                Assembly _assembly = Assembly.GetExecutingAssembly();
                StreamReader _textStreamReader = new StreamReader(_assembly.GetManifestResourceStream(@"Batches\compile.cmd"));
                _textStreamReader.Dispose();
            }

            prCompile.StartInfo.FileName = @"cmd.exe";
            prCompile.StartInfo.Arguments = @"/c " + batch.FullName + " " + c.Version.Replace(".", "") + " " + file.Remove(file.LastIndexOf('.')) + " " + Dir.FullName + " " + Dir.FullName.Remove(Dir.FullName.IndexOf(@"\"));
            prCompile.StartInfo.CreateNoWindow = true;
            prCompile.StartInfo.UseShellExecute = false;
            prCompile.StartInfo.RedirectStandardOutput = true;
            prCompile.StartInfo.RedirectStandardError = true;
            prCompile.StartInfo.WorkingDirectory = Assembly.GetExecutingAssembly().Location.Remove(Assembly.GetExecutingAssembly().Location.LastIndexOf("\\") + 1);
            prCompile.EnableRaisingEvents = true;
            prCompile.OutputDataReceived += prCompile_OutputDataReceived;
            prCompile.ErrorDataReceived += prCompile_OutputDataReceived;
            prCompile.Start();
            prCompile.BeginErrorReadLine();
            prCompile.BeginOutputReadLine();
            prCompile.WaitForExit();
            prCompile.Close();
            prCompile.Dispose();

            CleanUpFolder(false, file);

            if (File.Exists(Config.Instance.WorkingDir + file.Remove(file.LastIndexOf('.')) + ".dll") || File.Exists(Config.Instance.WorkingDir + file.Remove(file.LastIndexOf('.')) + ".exe"))
            {
                dllExisting = true;
                linkFailure = false;
            }
            else
            {
                if (File.Exists(Config.Instance.WorkingDir + file.Remove(file.LastIndexOf('.')) + ".obj"))
                {
                    linkFailure = true;
                }
                dllExisting = false;
            }



        }
        catch (ThreadAbortException)
        {
            if (prCompile != null)
            {
                // On Error kill process
                prCompile.Kill();
                prCompile.Dispose();
            }
        }
        catch (Win32Exception)
        {
        }
        catch (Exception)
        {
            dllExisting = false;
        }

        while (true)
        {
            try
            {
                if (Directory.Exists(Dir.FullName))
                {
                    Directory.Delete(Dir.FullName, true);
                    break;
                }
                else
                {
                    break;
                }
            }
            catch
            {
            }
        }


    }
private void CopySource(string Destination)
{
    filesToCopy = new StringCollection();
    foreach (string strFile in Directory.GetFiles(c.WorkingDir))
    {
        string tmpStrFile = strFile.ToLower();

        foreach (string Extension in c.Extensions)
        {
            if (tmpStrFile.Contains(Extension))
            {
                filesToCopy.Add(tmpStrFile);
            }
        }
    }

    if (filesToCopy.Count > 0)
    {
        foreach (string strFile in filesToCopy)
        {
            File.Copy(strFile, Destination + strFile.Remove(0, strFile.LastIndexOf("\\")));
        }
    }
}

private void CleanUpFolder(bool PreCleanup, string Filename)
{
    //Copy all files from compilationfolder to working directory
    if (!PreCleanup)
    {
        foreach (string strFile in Directory.GetFiles(Dir.FullName, Filename.Remove(Filename.LastIndexOf(".") + 1) + "*"))
        {
            FileInfo fileToMove = new FileInfo(strFile);

            if (fileToMove.Name.ToLower().Contains(Filename.Remove(Filename.LastIndexOf("."))))
            {
                File.Copy(strFile, c.WorkingDir + fileToMove.Name, true);
            }
        }
    }

    //Delete useless files
    foreach (string filename in Directory.GetFiles(Config.Instance.WorkingDir, Filename.Remove(Filename.LastIndexOf("."))+".*"))
    {
        bool foundExt = c.Extensions.Contains(filename.Remove(0, filename.LastIndexOf(".") + 1));
        if (PreCleanup)
        {
            // Only delete files, which are not won't be compiled
            if(!foundExt)
            {
                File.Delete(filename);
            }
        }
        else
        {
            if (!Config.Instance.SaveLspFile && filename.Contains(".lsp"))
            {
                File.Delete(filename);
            }

            if (!Config.Instance.SaveLstFile && filename.Contains(".lst"))
            {
                File.Delete(filename);
            }
        }
    }
}

public void Dispose()
{
    Dispose(true);
    GC.SuppressFinalize(this);
}

protected virtual void Dispose(bool disposing)
{
    if (!disposed)
    {
        if (disposing)
        {
            Dir = null;
        }
        disposed = true;
    }
}

~CobolCompiler()
{
    Dispose (false);
}

. .

100%. RAM 270 . 35 .

, , .

Edit: , .

ProcessSingle , , dll.

, 14000 ( ) Parallel.ForEach. 14000 ForEach xxx , .: - (

. prReschk WaitForExit. - Ressources 14000 .

ConcurrentBag :( , .

+3
2

Parallel.ForEach .Net ThreadPool . , , ThreadPool . , MaxDegreeOfParallelism, , ThreadPool , .

, , , . , 140 , - ThreadPool , , CPU .

, . ProcessSingle ? ?

, -, ProcessSingle:

 System.Threading.Thread.Sleep(2000);

, ThreadPool, . , .

+2

CopySource , lockingObj, . lock (lockingObj), finally .

0

All Articles