I am trying to integrate Storm ( see here ) into my project. I understand the concepts of topologies, spouts and bolts. But now I’m trying to understand how some things are implemented.
A) I have a polyglot environment with Java and Clojure. My Java code is a callback class in which methods trigger streaming data. Event data pushed to these methods is what I want to use as a nose.
So, the first question is how to connect the data included in these methods to the nozzle? I try i) pass backtype.storm.topology.IRichSpout, then ii) pass backtype.storm.spout.SpoutOutputCollector ( see here ) to this open function ( see here ). But I see no way to actually transfer any card or list.
B) The rest of my project is all Clojure. These methods will be a lot of data. Each event will have an identifier from 1 to 100. In Clojure, I want to split the data coming from the spout into different threads. Those, I think, will be bolts.
How to configure a Clojure bolt to receive event data from a spout, and then disable the stream based on the identifier of the incoming event?
Thanks in advance Tim
[EDIT 1]
I really overcame this problem. I finished 1) using my own IRichSpout. I then 2) connected this internal package to the incoming streaming data in my Java callback class. I am not sure if this is idiomatic. But it compiles and works without errors. However 3) I do not see the input stream data (specifically there) passing through the printstuff bolt .
To ensure the distribution of event data, is there anything concrete that I have to do in implementing or defining a turntable or defining a topology? Thank.
;; tie Java callbacks to a Spout that I created
(.setSpout java-callback ibspout)
(storm / defbolt printstuff ["word"] [tuple collector]
(println (str "printstuff --> tuple["tuple"] > collector["collector"]"))
)
(storm/topology
{ "1" (storm/spout-spec ibspout)
}
{ "3" (storm/bolt-spec { "1" :shuffle }
printstuff
)
})
[ 2]
SO Ankur, . , Java, IBSpout, (.setTuple ibspout (.getTuple java-callback)). Java, NotSerializable. . , , printstuff. .
public class IBSpout implements IRichSpout {
/**
* Storm spout stuff
*/
private SpoutOutputCollector _collector;
private List _tuple = new ArrayList();
public void setTuple(List tuple) { _tuple = tuple; }
public List getTuple() { return _tuple; }
/**
* Storm ISpout interface functions
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
}
public void close() {}
public void activate() {}
public void deactivate() {}
public void nextTuple() {
_collector.emit(_tuple);
}
public void ack(Object msgId) {}
public void fail(Object msgId) {}
public void declareOutputFields(OutputFieldsDeclarer declarer) {}
public java.util.Map getComponentConfiguration() { return new HashMap(); }
}