Scala system process freezes

I have an actor who uses ProcessBuilder to execute an external process:

  def act {
    while (true) {
      receive {
        case param: String => {
           val filePaths = Seq("/tmp/file1","/tmp/file2")
           val fileList = new ByteArrayInputStream(filePaths.mkString("\n").getBytes())
           val output = s"myExecutable.sh ${param}" #< fileList !!<

           doSomethingWith(output)
        }
      }
    }
  }

I run hundreds of actors working in parallel. Sometimes, for some unknown reason, the execution of a process (!!) never returns. It hangs forever. This particular actor cannot process new messages. Is there a way to set a timeout to return this process and if it exceeds repetition?

What could be the reason that these executions persist forever? Because these commands should not last more than a few milliseconds.

Edit 1: Two important facts that I observed:

  • This problem does not occur on Max OS X, only on Linux
  • When I do not use ByteArrayInputStream as input for execution, the program does not hang
+3
4

, ProcessBuilder :... , ...

- , . ( , ):

  • process =
  • thread = ( )
  • actor = light-weight ( ).

- . , O/S , . ( ). , CGI- - O/S-- - -. , : .

, , , . , , 10 , , 100.

?

, myExecutable.sh Scala, . , .

, /, , / .

: (1) , (, 10) (2) (, 100), ProcessIO (3), , /, , . : ; 100 , , , - .

: (1) (, 10) (2), (.. ) (3), / ProcessIO, , . : , ; .

: (1) , for , (2) (10) (3) for-loop, ProcessIO ( - )

- ?

, . ( , /). " scala " , . , Akka . scala (2.11) Akka , " scala " .

, / ( /). , , ):

import scala.concurrent.duration._
import scala.collection.immutable.Set

class Supervisor extends Actor {
  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
      case _: ArithmeticException => Resume     // resumes (reuses) all child actors
      case _: NullPointerException => Restart   // restarts all child actors
      case _: IllegalArgumentException => Stop  // terminates this actor & all children
      case _: Exception => Escalate             // supervisor to receive exception
    }

  val worker = context.actorOf(Props[Worker])  // creates a supervised child actor
  var pendingRequests = Set.empty[WorkerRequest]

  def receive = {
    case req: WorkRequest(sender, jobReq) => 
      pendingRequests = pendingRequests + req
      worker ! req
      system.scheduler.scheduleOnce(10 seconds, self, WorkTimeout(req))
    case resp: WorkResponse(req @ WorkRequest(sender, jobReq), jobResp) => 
      pendingRequests = pendingRequests - req
      sender ! resp
    case timeout: WorkTimeout(req) =>
      if (pendingRequests get req != None) {
        // restart the unresponsive worker
        worker restart
        // resend all pending requests
        pendingRequests foreach{ worker ! _ }
      }
  }
}

: . // , . , , " " - O/S- , .

+2

, , , , . - , .

, , , myExecutable.sh, , .

, , , . , , - , myExecutable.sh. , ByteArrayInputStream , , myExecutable.sh stdin. , script, , . , ByteArrayInputStream - : unicode , , , . , myExecutable.sh.

, - . , ( ForkJoin , , ) scala.sys.process (wouldn ' t - scala.sys.process , - ).

, (VisualVM - ), , . OpenJDK Scala. , .

+1

?

0

, myExecutable.sh doSomethingWith.

When it freezes, try to kill all the processes myExecutable.sh.

  • If this helps, you should check myExecutable.sh.
  • If this does not help, you should check the doSomethingWith function.
0
source

All Articles