Java BlockingQueue produces / consumes incorrectly

I am working on a project where I need to receive Twitter messages using TwitterAPI, process them and store them in a database. I am using Producer / Consumer BlockingQueue, where the elements act as follows:

  • Producer: Retrieves Twitter messages using the TwitterAPI and stores it in BlockingQueue.
  • Consumer: takes an item from the queue, processes it, and stores it in the database.

Here is the main class:

    // Creating shared object
    BlockingQueue<TwitterMessage> sharedQueue = new ArrayBlockingQueue<TwitterMessage>(1);

    // Creating Producer and Consumer Thread
    Thread prodThread = new Thread(new TwitterStreamProducer(sharedQueue));
    Thread consThread = new Thread(new TwitterStreamConsumer(sharedQueue));

    // Starting producer and Consumer thread
    prodThread.start();
    consThread.start();

The manufacturer processes the TwitterAPI response and adds the object to the queue.

@Override
public void run() {
    while (true) {
        try {
            message = extractData(); // extract data from TwitterAPI response and return TwitterMessage object
            sharedQueue.put(message);

            System.out.println("Produced: " + message.getTwitterMessage());
        } catch (Exception ex) {
            Logger.getLogger(TwitterStreamProducer.class.getName()).log(Level.SEVERE, null, ex);
        }
    }
}

The consumer does the following:

private final BlockingQueue<TwitterMessage> sharedQueue;
private TwitterProcessor twitterProcessor;
private TwitterMessage twitterMessage;

public TwitterStreamConsumer(BlockingQueue<TwitterMessage> sharedQueue) {
    this.sharedQueue = sharedQueue;
    twitterProcessor = new TwitterProcessor();
}

@Override
public void run() {
    while (true) {
        try {
            twitterMessage = this.twitterProcessor.process(sharedQueue.take());
            if (twitterMessage.getTwitterMessage().length() > 1) {
                System.out.printf("Consumed: %s\n", twitterMessage.getTwitterMessage());
            }
        } catch (InterruptedException ex) {
            Logger.getLogger(TwitterStreamConsumer.class.getName()).log(Level.SEVERE, null, ex);
        }
    }
}

As I expected, the following will look:

Produced: …twittermessage1… 
Consumed: …twittermessage1… 
Produced: …twittermessage2… 
Consumed: …twittermessage2… 
Produced: …twittermessage3… 
Consumed: …twittermessage3…
...

However, the result I get is the following:

Produced: …twittermessage1…
Produced: …twittermessage2…  <= problem
Consumed: …twittermessage1…
Produced: …twittermessage3…
Consumed: …twittermessage3…
Consumed: …twittermessage3…  <= problem
Produced: …twittermessage4…  <= problem
Produced: …twittermessage5…
Consumed: …twittermessage5…

, , , . ( )

EDIT1 :

Produced: @1StevenGeorgiou thanks for the follow #ff
Processed: follow
Produced: @nmagliozzi6 @_PatrickKealy_ but of course!!!!!
Produced: @taylorgaglia Thanks Tayl 😊 miss you tooo
Processed: tayl miss
Produced: Hate this who to follow tab in #twitter it shows the most pathetic people you know. Accidently added one I had to act fast to unfollow
Processed: hate follow tabshow pathet peopl accid ad act fast unfollow

EDIT2 "System.identityHashCode(sharedQueue.take()), :

Produced: …
Consumed: 1206857787
Produced: …
Consumed: 1206857787

- , ?

!

+3
2

: undefined. , . , , (), .

/, , . / , . ( , ), - - "" .

: , LinkedBlockingQueue, .
ExecutorService, Runnables.

+1

BlockingQueue / , , :

public static void main(String[] args) {
    BlockingQueue<String> queue = new ArrayBlockingQueue<>(16);
    new Thread(new Producer(queue)).start();
    new Thread(new Consumer(queue)).start();
}

private static class Producer implements Runnable {

    private static final String[] MSGS = {
        "msg1", "msg2", "msg3", "msg4", "msg5",
        "msg6", "msg7", "msg8", "msg9", "msg10"
    };

    final BlockingQueue<String> sharedQueue;

    public Producer(BlockingQueue<String> queue) {
        sharedQueue = queue;
    }

    @Override
    public void run() {
        for (String msg : MSGS) {
            try {
                sharedQueue.put(msg);
                // yield the producer thread, so that the consumer could win the CPU
                System.out.println("Produced: " + msg);
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                System.out.println("Producer was interrupted: " + msg);
            }
        }
    }

}

private static class Consumer implements Runnable {

    final BlockingQueue<String> sharedQueue;

    public Consumer(BlockingQueue<String> queue) {
        sharedQueue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                String toProcess = sharedQueue.take();
                System.out.println("Consumed: " + toProcess);
            }
        } catch (InterruptedException e) {
            System.out.println("Consumer was interrupted!");
        }
    }

}

, , ( , , twittermessage1, ) twitterProcessor.

0

All Articles