Storm> Howto Integrate Java Callback into Spout

I am trying to integrate Storm ( see here ) into my project. I understand the concepts of topologies, spouts and bolts. But now I’m trying to understand how some things are implemented.

A) I have a polyglot environment with Java and Clojure. My Java code is a callback class in which methods trigger streaming data. Event data pushed to these methods is what I want to use as a nose.

So, the first question is how to connect the data included in these methods to the nozzle? I try i) pass backtype.storm.topology.IRichSpout, then ii) pass backtype.storm.spout.SpoutOutputCollector ( see here ) to this open function ( see here ). But I see no way to actually transfer any card or list.

B) The rest of my project is all Clojure. These methods will be a lot of data. Each event will have an identifier from 1 to 100. In Clojure, I want to split the data coming from the spout into different threads. Those, I think, will be bolts.

How to configure a Clojure bolt to receive event data from a spout, and then disable the stream based on the identifier of the incoming event?

Thanks in advance Tim

[EDIT 1]

I really overcame this problem. I finished 1) using my own IRichSpout. I then 2) connected this internal package to the incoming streaming data in my Java callback class. I am not sure if this is idiomatic. But it compiles and works without errors. However 3) I do not see the input stream data (specifically there) passing through the printstuff bolt .

To ensure the distribution of event data, is there anything concrete that I have to do in implementing or defining a turntable or defining a topology? Thank.

      ;; tie Java callbacks to a Spout that I created
      (.setSpout java-callback ibspout)

      (storm / defbolt printstuff ["word"] [tuple collector]
        (println (str "printstuff --> tuple["tuple"] > collector["collector"]"))
      )
      (storm/topology
       { "1" (storm/spout-spec ibspout)
       }
       { "3" (storm/bolt-spec  { "1" :shuffle }
                               printstuff
             )
       })

[ 2]

SO Ankur, . , Java, IBSpout, (.setTuple ibspout (.getTuple java-callback)). Java, NotSerializable. . , , printstuff. .

    public class IBSpout implements IRichSpout {

      /**
       * Storm spout stuff
       */
      private SpoutOutputCollector _collector;

      private List _tuple = new ArrayList();
      public void setTuple(List tuple) { _tuple = tuple; }
      public List getTuple() { return _tuple; }

      /**
       * Storm ISpout interface functions
       */
      public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
      }
      public void close() {}
      public void activate() {}
      public void deactivate() {}
      public void nextTuple() {
        _collector.emit(_tuple);
      }
      public void ack(Object msgId) {}
      public void fail(Object msgId) {}


      public void declareOutputFields(OutputFieldsDeclarer declarer) {}
      public java.util.Map  getComponentConfiguration() { return new HashMap(); }

    }

+5
2

, , . , spouts nextTuple, java- spout, , , java- , , .

, Spouts , . , , ( java- ), nextTuple.

+3

B:

, , , .

, , , , . , .

0
source

All Articles