How to implement a queue that can be processed by multiple threads?

I think I am doing it wrong. I create streams that would assume a crunch of some data from the general queue. My problem is that the program is slow and the rage of memory, I suspect that the queue may not be as general as I had hoped. I suspect this because in my code I added a line that displayed the size of the queue, and if I start 2 threads, then I get two outputs with completely different numbers and seem to increase them myself (I thought it could be like that the same number, but maybe he jumped from 100 to 2 and so on, but after viewing it shows 105 and 5 and goes at a different speed. If I have 4 threads, I see 4 different numbers).

Here is a snippet of the relevant parts. I am creating a static class with the data I want in the queue at the top of the program

static class queue_class {
        int number;
        int[] data;
        Context(int number,  int[] data) {
            this.number = number;
            this.data = data;
        }
    }

Then I create a queue after sending some jobs to the callee.

static class process_threaded implements Callable<Void> {
    // queue with contexts to process
    private Queue<queue_class> queue;

    process_threaded(queue_class request) {
        queue = new ArrayDeque<queue_class>();
        queue.add(request);
    }

    public Void call() {
        while(!queue.isEmpty()) {
            System.out.println("in contexts queue with a size of " + queue.size());
            Context current = contexts.poll();
            //get work and process it, if it work great then the solution goes elsewhere
            //otherwise, depending on the data, its either discarded or parts of it is added back to queue
            queue.add(new queue_class(k, data_list)); 

As you can see, there are 3 options for the data, they are sent if the data is good, drop it if it is completely terrible or sent back to the queue. I think the queues go when they send it back, but I suspect because each thread runs in its own queue, not in general.

Is this assumption correct and am I doing it wrong?

+3
source share
2 answers

, () , Callable. ( Callable<Void> - Runnable?)

, , , , , , , .

, , ? ExecutorService, Callable ( Runnables, ): Callable s, Callable . .

:

static class process_threaded implements Runnable {
    // Reference to an executor
    private final ExecutorService exec;
    // Reference to the job counter
    private final AtomicInteger jobCounter;
    // Request to process
    private queue_class request;

    process_threaded( ExecutorService exec, AtomicInteger counter, queue_class request) {
        this.exec = exec;
        this.jobCounter = counter;
        this.jobCounter.incrementAndGet(); // Assuming that you will always
                                           // submit the process_threaded to
                                           // the executor if you create it.
        this.request = request;
    }

    public run() {
        //get work and process **request**, if it work great then the solution goes elsewhere
        //otherwise, depending on the data, its either discarded or parts of are added back to the executor
        exec.submit( new process_threaded( exec, new queue_class(k, data_list) ) );

        // Can do some more work

        // Always run before returning: counter update and notify the launcher
        synchronized(jobCounter){
            jobCounter.decrementAndGet();
            jobCounter.notifyAll();
        }
    }
}

Edit:

, , , , 0. AtomicInteger, , . , . :

void theLauncher() {

    AtomicInteger jobCounter = new AtomicInteger( 0 );

    ExecutorService exec = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcesses());

    exec.submit( new process_threaded( exec, jobCounter, someProcessRequest ) );
    // Can submit some other things here of course...

    // Wait for jobs to complete:
    for(;;jobCounter.get() > 0){
        synchronized( jobCounter ){ // (I'm not sure if you have to have the synchronized block, but I think this is safer.
            if( jobCounter.get() > 0 )
                jobCounter.wait();
        }
    }

    // Now you can shutdown:
    exec.shutdown();
}
+2

! ConcurrentLinkedQueue? javadocs:

- . FIFO (first-in-first-out). - , . - , . , . ConcurrentLinkedQueue , .

+2

All Articles