ThreadPoolExecutor with ArrayBlockingQueue

I started reading more about ThreadPoolExecutor from the Java Doc, as I use it in one of my projects. So can someone explain to me what this line really means? - I know what each parameter means, but I would like to understand this in a more usual / usual way from some experts here.

ExecutorService service = new ThreadPoolExecutor(10, 10, 1000L,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10, true), new 
ThreadPoolExecutor.CallerRunsPolicy());

Updated: - Reporting problems: -

Each thread uses a unique identifier from 1 to 1000, and the program must run for 60 minutes or more. Thus, within 60 minutes, it is possible that the entire identifier ends, so I need to use that identifier again. So, this is the program below, which I wrote using the above executor.

class IdPool {
    private final LinkedList<Integer> availableExistingIds = new LinkedList<Integer>();

    public IdPool() {
        for (int i = 1; i <= 1000; i++) {
            availableExistingIds.add(i);
        }
    }

    public synchronized Integer getExistingId() {
        return availableExistingIds.removeFirst();
    }

    public synchronized void releaseExistingId(Integer id) {
        availableExistingIds.add(id);
    }
}


class ThreadNewTask implements Runnable {
    private IdPool idPool;

    public ThreadNewTask(IdPool idPool) {
        this.idPool = idPool;
    }

    public void run() {
        Integer id = idPool.getExistingId();
        someMethod(id);
        idPool.releaseExistingId(id);
    }

// This method needs to be synchronized or not?
    private synchronized void someMethod(Integer id) {
        System.out.println("Task: " +id);
// and do other calcuations whatever you need to do in your program
    }
}

public class TestingPool {
    public static void main(String[] args) throws InterruptedException {
        int size = 10;
        int durationOfRun = 60;
        IdPool idPool = new IdPool();   
        // create thread pool with given size
        ExecutorService service = new ThreadPoolExecutor(size, size, 500L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(size), new ThreadPoolExecutor.CallerRunsPolicy()); 

        // queue some tasks
        long startTime = System.currentTimeMillis();
        long endTime = startTime + (durationOfRun * 60 * 1000L);

        // Running it for 60 minutes
        while(System.currentTimeMillis() <= endTime) {
            service.submit(new ThreadNewTask(idPool));
        }

        // wait for termination        
        service.shutdown();
        service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 
    }
}

: - , ? , ? .

+5
3

[-, , , ].

, , ThreadPoolExecutor . , ThreadPoolExecutor BlockingQueue.offer(T item), . , true, ( ) false. ThreadPoolExecutor RejectedExecutionHandler, .

javadoc:

. . , , - , , RejectedExecutionHandler.

ThreadPoolExecutor.AbortPolicy(), RejectedExecutionException "" "" ThreadPoolExecutor.

try {
   executorService.execute(new Runnable() { ... });
}
catch (RejectedExecutionException e) {
   // the queue is full, and you're using the AbortPolicy as the 
   // RejectedExecutionHandler
}

- , (DiscardPolicy) , "" "" (CallerRunsPolicy). "" "" , . ( , , ):

ExecutorService service = new ThreadPoolExecutor(..., new ThreadPoolExecutor.CallerRunsPolicy());

, RejectedExecutionHandler, , ( , , )

public class BlockUntilAvailableSlot implements RejectedExecutionHandler {
  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
     if (e.isTerminated() || e.isShutdown()) {
        return;
     }

     boolean submitted = false;
     while (! submitted) {
       if (Thread.currentThread().isInterrupted()) {
            // be a good citizen and do something nice if we were interrupted
            // anywhere other than during the sleep method.
       }

       try {
          e.execute(r);
          submitted = true;
       }
       catch (RejectedExceptionException e) {
         try {
           // Sleep for a little bit, and try again.
           Thread.sleep(100L);
         }
         catch (InterruptedException e) {
           ; // do you care if someone called Thread.interrupt?
           // if so, do something nice here, and maybe just silently return.
         }
       }
     }
  }
}
+8

ExecutorService, . , 10. 1 (1000 ), ( ), - , , ( 10 10 ).

ArrayBlockingQueue 10 , , ( , 10 ), .

( , , ), Runnable .

+2

Consider semaphores. They are for the same purpose. Please check below code using semaphore. Not sure if this is what you want. But it will be blocked if there are no more purchase permits. Is an ID important to you?

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

class ThreadNewTask implements Runnable {
    private Semaphore idPool;

    public ThreadNewTask(Semaphore idPool) {
        this.idPool = idPool;
    }

    public void run() {
//      Integer id = idPool.getExistingId();
        try {
            idPool.acquire();
            someMethod(0);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            idPool.release();
        }
//      idPool.releaseExistingId(id);
    }

    // This method needs to be synchronized or not?
    private void someMethod(Integer id) {
        System.out.println("Task: " + id);
        // and do other calcuations whatever you need to do in your program
    }
}

public class TestingPool {
    public static void main(String[] args) throws InterruptedException {
        int size = 10;
        int durationOfRun = 60;
        Semaphore idPool = new Semaphore(100); 
//      IdPool idPool = new IdPool();
        // create thread pool with given size
        ExecutorService service = new ThreadPoolExecutor(size, size, 500L,
                TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(size),
                new ThreadPoolExecutor.CallerRunsPolicy());

        // queue some tasks
        long startTime = System.currentTimeMillis();
        long endTime = startTime + (durationOfRun * 60 * 1000L);

        // Running it for 60 minutes
        while (System.currentTimeMillis() <= endTime) {
            service.submit(new ThreadNewTask(idPool));
        }

        // wait for termination
        service.shutdown();
        service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
    }
}
0
source

All Articles