Java - synchronizing multiple threads step by step

Suppose I have a wizard that stores a list of SlaveThread objects. At each time step, I want Master to run SlaveThreads in parallel, but at the end of the time step, I want SlaveThreads to wait for each other to complete the current time step before moving forward. In addition, I do not want to reinstall SlaveThreads at each time step. I have two possible solutions, and I don’t know how to make them work:

1) The run () method in SlaveThread is a while (true) loop. After completing one loop in SlaveThread, I will have SlaveThread, which will notify Teacher (which I don’t know how to do), and the Wizard does something like

try{
    for (int i = 0; i < numSlaveThreads; i++) {
        while (!slaveThreads[i].getCompletedThisIter()) {
        wait()
        }
      }
  System.out.println("Joined");

}

before moving on to the next step. How should I do it? How can I get one SlaveThread only master?

2) Run () in Slave is not in while (true), then I have to call start () on it at each iteration. But the state of the slave flow at this point will be terminated. How can I call start () on it again without restoring it?

+5
source share
2 answers

It is for this that there are barriers, you can implement this using CyclicBarrier or CountDownLatch . These are the synchronizers used to delay the progress of the threads until they reach the desired state, in your case, the threads completed the calculation.

Here it depends on how you want to implement:

; .

CyclicBarrier, :

// whereby count is the number of your slave threads
this.barrier = new CyclicBarrier(count); 

Runnable : barrier.await()

public class Slaves implements Runnable {

   // ...

   @Override
   public void run() {

      while(condition) {

         // computation
         // ...

         try {
            // do not proceed, until all [count] threads
            // have reached this position
            barrier.await();
         } catch (InterruptedException ex) {
            return;
         } catch (BrokenBarrierException ex) {
            return;
         }
      }
   }
}

, . , .

, , , , ( ), Runnable CyclicBarrier, , .

this.barrier = new CyclicBarrier(count,
   new Runnable() {
      @Override
      public void run() {
         // signal your master thread, update values, etc.
      }
    }
 );
+5

ExecutorService ( , ) CyclicBarrier, .

, , , . , , - ( ) :

public class Test {

    private static final ExecutorService executor = Executors.newFixedThreadPool(5);
    private static final CyclicBarrier barrier = new CyclicBarrier(5); //4 slaves + 1 master

    public static void main(String[] args) throws InterruptedException {
        Runnable master = new Runnable() {
            @Override
            public void run() {
                try {
                    while (true) {
                        System.out.println("Starting slaves");
                        for (int i = 100; i < 500; i += 100) {
                            executor.submit(getRunnable(i));
                        }
                        barrier.await();
                        System.out.println("All slaves done");
                    }
                } catch (InterruptedException | BrokenBarrierException ex) {
                    System.out.println("Bye Bye");
                }
            }
        };

        executor.submit(master);
        Thread.sleep(2000);
        executor.shutdownNow();

    }

    public static Runnable getRunnable(final int sleepTime) {
        return new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("Entering thread " + Thread.currentThread() + " for " + sleepTime + " ms.");
                    Thread.sleep(sleepTime);
                    System.out.println("Exiting thread " + Thread.currentThread());
                    barrier.await();
                } catch (BrokenBarrierException | InterruptedException ex) {
                }
            }
        };

    }
}
+3

All Articles