I am working on a project where I need to receive Twitter messages using TwitterAPI, process them and store them in a database. I am using Producer / Consumer BlockingQueue, where the elements act as follows:
- Producer: Retrieves Twitter messages using the TwitterAPI and stores it in BlockingQueue.
- Consumer: takes an item from the queue, processes it, and stores it in the database.
Here is the main class:
BlockingQueue<TwitterMessage> sharedQueue = new ArrayBlockingQueue<TwitterMessage>(1);
Thread prodThread = new Thread(new TwitterStreamProducer(sharedQueue));
Thread consThread = new Thread(new TwitterStreamConsumer(sharedQueue));
prodThread.start();
consThread.start();
The manufacturer processes the TwitterAPI response and adds the object to the queue.
@Override
public void run() {
while (true) {
try {
message = extractData();
sharedQueue.put(message);
System.out.println("Produced: " + message.getTwitterMessage());
} catch (Exception ex) {
Logger.getLogger(TwitterStreamProducer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
The consumer does the following:
private final BlockingQueue<TwitterMessage> sharedQueue;
private TwitterProcessor twitterProcessor;
private TwitterMessage twitterMessage;
public TwitterStreamConsumer(BlockingQueue<TwitterMessage> sharedQueue) {
this.sharedQueue = sharedQueue;
twitterProcessor = new TwitterProcessor();
}
@Override
public void run() {
while (true) {
try {
twitterMessage = this.twitterProcessor.process(sharedQueue.take());
if (twitterMessage.getTwitterMessage().length() > 1) {
System.out.printf("Consumed: %s\n", twitterMessage.getTwitterMessage());
}
} catch (InterruptedException ex) {
Logger.getLogger(TwitterStreamConsumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
As I expected, the following will look:
Produced: …twittermessage1…
Consumed: …twittermessage1…
Produced: …twittermessage2…
Consumed: …twittermessage2…
Produced: …twittermessage3…
Consumed: …twittermessage3…
...
However, the result I get is the following:
Produced: …twittermessage1…
Produced: …twittermessage2… <= problem
Consumed: …twittermessage1…
Produced: …twittermessage3…
Consumed: …twittermessage3…
Consumed: …twittermessage3… <= problem
Produced: …twittermessage4… <= problem
Produced: …twittermessage5…
Consumed: …twittermessage5…
, , , . ( )
EDIT1
:
Produced: @1StevenGeorgiou thanks for the follow
Processed: follow
Produced: @nmagliozzi6 @_PatrickKealy_ but of course!!!!!
Produced: @taylorgaglia Thanks Tayl 😊 miss you tooo
Processed: tayl miss
Produced: Hate this who to follow tab in
Processed: hate follow tabshow pathet peopl accid ad act fast unfollow
EDIT2
"System.identityHashCode(sharedQueue.take()), :
Produced: …
Consumed: 1206857787
Produced: …
Consumed: 1206857787
…
- , ?
!