Data buffering in a multi-threaded Java application

I have a multi-threaded application in which there is one manufacturer thread and several consumer threads. Data is stored in a collective streaming secure collection and is deleted to the database when there is enough data in the buffer.

From javadocs -

BlockingQueue<E>

A queue that additionally supports operations that expect the queue to become empty when an item is retrieved, and wait until space becomes available in the queue when the item is stored.

take()

Retrieves and removes the head of this queue, waiting if necessary until the item becomes available.

My questions -

  • Is there another collection that has an E [] take (int n) method? that is, the queue lock waits until the item is available. I want him to wait until 100 or 200 elements.
  • Alternatively, is there another method that I could use to solve the problem without polling?
+5
source share
4 answers

I think the only way is to either expand some implementation BlockingQueueor create some kind of utility method using take:

public <E> void take(BlockingQueue<E> queue, List<E> to, int max) 
        throws InterruptedException {

    for (int i = 0; i < max; i++)
        to.add(queue.take());
}
+2
source

The method is drainTonot quite what you are looking for, but will it serve your purpose?

http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html#drainTo(java.util.Collection , int)

EDIT

takemin, take drainTo:

public <E> void drainTo(final BlockingQueue<E> queue, final List<E> list, final int min) throws InterruptedException
{
  int drained = 0;
  do 
  {
    if (queue.size() > 0)
      drained += queue.drainTo(list, min - drained);
    else
    {
      list.add(queue.take());
      drained++;
    }
  }
  while (drained < min);
}
+2

I'm not sure if there is a similar class in the standard library with a type method take(int n), but you should be able to wrap it by default BlockingQueuein order to add this function without too much hassle, right ??

An alternative scenario was to initiate an action in which you put items in a collection where the threshold you set triggers a flush.

+1
source

Thus, it should be a streaming queue that allows you to block the adoption of an arbitrary number of elements. More eyes to check the thread code would be desirable.

package mybq;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

public class ChunkyBlockingQueue<T> {
    protected final LinkedList<T> q = new LinkedList<T>();
    protected final Object lock = new Object();

    public void add(T t) {
        synchronized (lock) {
            q.add(t);
            lock.notifyAll();
        }
    }

    public List<T> take(int numElements) {
        synchronized (lock) {
            while (q.size() < numElements) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            ArrayList<T> l = new ArrayList<T>(numElements);
            l.addAll(q.subList(0, numElements));
            q.subList(0, numElements).clear();
            return l;
        }
    }
}
+1
source

All Articles