I am testing an application that should compile many projects / files.
I have a ConucrrentBag that should work with Parallel.
private readonly ConcurrentBag<string> m_files;
My call for parallel:
Parallel.ForEach(m_files, new ParallelOptions
{
MaxDegreeOfParallelism = MaxProcesses,
}, currFile => ProcessSingle(currFile.ToString()));
Sum MaxProcess is LogicalCpu * 2.
When I compile 140 projects, until the end Parallel will start linearly fewer threads. At least the last 4 projects use only one thread. This is not nice, but everything is in order.
Now my problem is:
When I collect about 14000+ projects (this is COBOL-SOURCE ;-) and a really big system). Recent modules will not be compiled because Parallel.ForEach does not launch new themes for this. There is currently no live labor. But concurrentBag still has 140 elements.
Does anyone know how to solve this?
: . ( ) ...
:
ConcurrentBag , Parallel.ForEach.
, SingleProcess:
private void ProcessSingle(string item)
{
Monitor.Enter(lockingObj);
if (m_files.TryTake(out item))
{
if (CompilingModules <= 0)
{
OnQueueStarted(new EventArgs());
}
CompilingModules++;
Monitor.Exit(lockingObj);
OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.Done, ItemQueueObject.String));
OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.Dequeued, ItemQueueObject.String));
using (CobolCompiler compiler = new CobolCompiler())
{
compiler.OutputDataReceived += (sender, e) => OnOutputDataReceived(e);
compiler.Compile(item);
Thread.Sleep(2000);
if (compiler.LinkFailure)
{
if (ObjWithoutDll.ContainsKey(item))
{
if (ObjWithoutDll[item] <= 2)
{
m_files.Add(item);
OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.Enqueued, ItemQueueObject.String));
ObjWithoutDll[item]++;
}
else
{
OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.LinkError, ItemQueueObject.String));
ObjWithoutDll.Remove(item);
}
}
else
{
ObjWithoutDll.Add(item, 0);
m_files.Add(item);
OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.Enqueued, ItemQueueObject.String));
}
}
else
{
if (compiler.DllExisting)
{
ObjWithoutDll.Remove(item);
}
OnQueueItemStateChanged(compiler.DllExisting ? new ItemQueueEventArgs(item, null, ItemQueueType.Done, ItemQueueObject.String) : new ItemQueueEventArgs(item, null, ItemQueueType.Failed, ItemQueueObject.String));
}
}
Monitor.Enter(lockingObj);
CompiledModules++;
if (CompiledModules % 300 == 0)
{
Thread.Sleep(60000);
}
CompilingModules--;
if (CompilingModules <= 0 && m_files.Count <= 0)
{
try
{
Process prReschk = new Process();
FileInfo batch = new FileInfo(@"batches\reschkdlg.cmd");
if (!batch.Exists)
{
Assembly _assembly = Assembly.GetExecutingAssembly();
StreamReader _textStreamReader = new StreamReader(_assembly.GetManifestResourceStream(@"Batches\reschkdlg.cmd"));
}
if (!File.Exists(Config.Instance.WorkingDir + @"reschkdlg.exe"))
{
File.Copy(Config.Instance.VersionExeDirectory + @"reschkdlg.exe", Config.Instance.WorkingDir + @"reschkdlg.exe");
}
prReschk.StartInfo.FileName = @"cmd.exe";
prReschk.StartInfo.Arguments = @"/c " + batch.FullName + " " + Config.Instance.Version.Replace(".", "") + " " + @"*" + " " + Config.Instance.WorkingDir;
prReschk.StartInfo.CreateNoWindow = true;
prReschk.StartInfo.UseShellExecute = false;
prReschk.Start();
prReschk.Close();
prReschk.Dispose();
}
catch
{
}
OnQueueFinished(new EventArgs());
}
}
Monitor.Exit(lockingObj);
}
Codesnippet CobolCompiler:
public void Compile ( ) {
file = file.ToLower();
Process prCompile = new Process();
Dir = Directory.CreateDirectory(c.WorkingDir + random.Next() + "\\");
try
{
CleanUpFolder(true, file);
Monitor.Enter(lockingObj);
if (filesToCopy == null)
{
CopySource(Dir.FullName);
}
Monitor.Exit(lockingObj);
FileInfo batch = new FileInfo(@"batches\compile.cmd");
if (!batch.Exists)
{
Assembly _assembly = Assembly.GetExecutingAssembly();
StreamReader _textStreamReader = new StreamReader(_assembly.GetManifestResourceStream(@"Batches\compile.cmd"));
_textStreamReader.Dispose();
}
prCompile.StartInfo.FileName = @"cmd.exe";
prCompile.StartInfo.Arguments = @"/c " + batch.FullName + " " + c.Version.Replace(".", "") + " " + file.Remove(file.LastIndexOf('.')) + " " + Dir.FullName + " " + Dir.FullName.Remove(Dir.FullName.IndexOf(@"\"));
prCompile.StartInfo.CreateNoWindow = true;
prCompile.StartInfo.UseShellExecute = false;
prCompile.StartInfo.RedirectStandardOutput = true;
prCompile.StartInfo.RedirectStandardError = true;
prCompile.StartInfo.WorkingDirectory = Assembly.GetExecutingAssembly().Location.Remove(Assembly.GetExecutingAssembly().Location.LastIndexOf("\\") + 1);
prCompile.EnableRaisingEvents = true;
prCompile.OutputDataReceived += prCompile_OutputDataReceived;
prCompile.ErrorDataReceived += prCompile_OutputDataReceived;
prCompile.Start();
prCompile.BeginErrorReadLine();
prCompile.BeginOutputReadLine();
prCompile.WaitForExit();
prCompile.Close();
prCompile.Dispose();
CleanUpFolder(false, file);
if (File.Exists(Config.Instance.WorkingDir + file.Remove(file.LastIndexOf('.')) + ".dll") || File.Exists(Config.Instance.WorkingDir + file.Remove(file.LastIndexOf('.')) + ".exe"))
{
dllExisting = true;
linkFailure = false;
}
else
{
if (File.Exists(Config.Instance.WorkingDir + file.Remove(file.LastIndexOf('.')) + ".obj"))
{
linkFailure = true;
}
dllExisting = false;
}
}
catch (ThreadAbortException)
{
if (prCompile != null)
{
prCompile.Kill();
prCompile.Dispose();
}
}
catch (Win32Exception)
{
}
catch (Exception)
{
dllExisting = false;
}
while (true)
{
try
{
if (Directory.Exists(Dir.FullName))
{
Directory.Delete(Dir.FullName, true);
break;
}
else
{
break;
}
}
catch
{
}
}
}
private void CopySource(string Destination)
{
filesToCopy = new StringCollection();
foreach (string strFile in Directory.GetFiles(c.WorkingDir))
{
string tmpStrFile = strFile.ToLower();
foreach (string Extension in c.Extensions)
{
if (tmpStrFile.Contains(Extension))
{
filesToCopy.Add(tmpStrFile);
}
}
}
if (filesToCopy.Count > 0)
{
foreach (string strFile in filesToCopy)
{
File.Copy(strFile, Destination + strFile.Remove(0, strFile.LastIndexOf("\\")));
}
}
}
private void CleanUpFolder(bool PreCleanup, string Filename)
{
if (!PreCleanup)
{
foreach (string strFile in Directory.GetFiles(Dir.FullName, Filename.Remove(Filename.LastIndexOf(".") + 1) + "*"))
{
FileInfo fileToMove = new FileInfo(strFile);
if (fileToMove.Name.ToLower().Contains(Filename.Remove(Filename.LastIndexOf("."))))
{
File.Copy(strFile, c.WorkingDir + fileToMove.Name, true);
}
}
}
foreach (string filename in Directory.GetFiles(Config.Instance.WorkingDir, Filename.Remove(Filename.LastIndexOf("."))+".*"))
{
bool foundExt = c.Extensions.Contains(filename.Remove(0, filename.LastIndexOf(".") + 1));
if (PreCleanup)
{
if(!foundExt)
{
File.Delete(filename);
}
}
else
{
if (!Config.Instance.SaveLspFile && filename.Contains(".lsp"))
{
File.Delete(filename);
}
if (!Config.Instance.SaveLstFile && filename.Contains(".lst"))
{
File.Delete(filename);
}
}
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
Dir = null;
}
disposed = true;
}
}
~CobolCompiler()
{
Dispose (false);
}
. .
100%. RAM 270 . 35 .
, , .
Edit:
, .
ProcessSingle , , dll.
, 14000 ( ) Parallel.ForEach. 14000 ForEach xxx , .: - (
. prReschk WaitForExit. - Ressources 14000 .
ConcurrentBag :( , .