Does Java provide an ExecutorService service that allows a worker to execute the same thread?

I am looking for an implementation ExecutorServicethat will provide the following semantics. Each thread is occupied by a "worker" who performs some task based on input. Each employee is guaranteed to execute in only one thread, so he should be allowed to maintain state from task to task without synchronization overhead, since he will synchronize with himself in one thread.

So, let's say I have 100 inputs and 10 workers, I would like to write something like:

for (Input input: inputs) {
    // The following code would pass t to all 10 workers,
    // each bound to their own thread,
    // and wait for them to complete.
    executor.invokeAll(input);
}

Note that each Worker does a different thing with any given input. Input is not an executable block of code, but just a parameter for the worker. Each employee decides what to do with the input. Although, to simplify the work, workers implement an interface that will allow it to be called polymorphically, receiving input.

I hacked something that works, using Map<Worker, WorkerExecutor>, where WorkerExecutoris my thin shell around Executors.newSingleThreadPool, and only one instance of Worker will run in each thread pool. I would rather find something written by someone who knows what they are doing :-)


Potential Inefficiency I'm OK C

, , , , Worker . , / ( . 2 ):

         | Task 1    | Task 2    | Task 3    | Task 4    |
Worker 1 | =@        | =@        | =@        | =@        |
Worker 2 | ==@       | ==@       | ==@       | ==@       |
Worker 3 |   ==@     |   ==@     |   ==@     |   ==@     |
Worker 4 |    =====@ |    =====@ |    =====@ |    =====@ |

, Worker 3 , , , Worker 4 . , CPU .


ExecutorService?

+5
2

, , . , - , " " , . , Akka /, JVM. .

+2

- :

import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

// you implement this for each of your non-parallelisable jobbies
interface Worker<T> {
    public void process(T input);
}

// implementation detail
class Clerk<T> {
    private final Executor executor = Executors.newSingleThreadExecutor();
    private final Worker<T> worker;

    public Clerk(Worker<T> worker) {
        this.worker = worker;
    }

    public void process(final T input) {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                worker.process(input);
            }
        });
    }
}

// make one of these, and give it all your workers, then give it input
class Workshop<T> {
    private final Set<Clerk<T>> clerks = new LinkedHashSet<Clerk<T>>();

    public void addWorker(Worker<T> worker) {
        // mutable; you love it
        clerks.add(new Clerk<T>(worker));
    }

    public void process(T input) {
        for (Clerk<T> clerk : clerks) {
            clerk.process(input);
        }
    }

    public void processAll(Iterable<T> inputs) {
        for (T input : inputs) {
            process(input);
        }
    }
}

,?

+1

All Articles