Confusion of Storm acker and guaranteed message processing -


now learning storm's guaranteeing message processing , confused concepts in part.

to guarantee message emitted spout processed, storm uses acker achieve this. each time spout emits tuple, acker assign "ack val" initialized 0 store status of tuple tree. each time downstream bolts of tuple emit new tuple or ack "old" tuple, tuple id xor "ack val". acker needs check whether "ack val" 0 or not know tuple has been processed. let's see code below:

public class wordreader implements irichspout {     ... ... while((str = reader.readline()) != null){     this.collector.emit(new values(str), str);     ... ... } 

the code piece above spout in word count program "getting started storm" tutorial. in emit method, 2nd parameter "str" messageid. confused parameter: 1) understand, each time tuple (i.e., message) emitted no matter in spouts or in bolts, should storm's responsibility assign 64-bit messageid message. correct? or here "str" human-readable alias message? 2) no matter what's answer 1), here "str" same word in 2 different messages because in text file there should many duplicate words. if true, how storm differentiate different messages? , what's meaning of parameter? 3) in code piece, see spouts use following code set message id in spout emit method:

public class randomintegerspout extends baserichspout {     private long msgid = 0;     collector.emit(new values(..., ++msgid), msgid); } 

this closer think should be: message id should totally different across different messages. code piece, confusion is: happen private field "msgid" across different executors? because each executor has own msgid initialized 0, messages in different executors named 0, 1, 2, , on. how storm differentiate these messages?

i novice storm, maybe these problems naive. hope me figure out. thanks!

about message id general: internally might 64bit value, 64bit value computed hash msgid object provided in emit() within spout. can hand object message id (the probability 2 objects hash same value close zero).

about using str: think in example, str contains line (and not word) , unlikely document contains exact same line twice (if there no empty lines might many).

about counter message id: absolutely right observation -- if multiple spouts running in parallel, give message id conflict , break fault tolerance.

if want "fix" counter approach, each counter should initialized differently (best, 1...#spouttasks). can use taskid (which unique , can accessed via topologycontext provided in spout.open()). basically, taskids parallel spout tasks, sort them, , assign each spout task ordering number. furthermore, need increment "number of parallel spouts" instead of 1.


Comments

Popular posts from this blog

python - How to insert QWidgets in the middle of a Layout? -

python - serve multiple gunicorn django instances under nginx ubuntu -

module - Prestashop displayPaymentReturn hook url -