Confusion of Storm acker and guaranteed message processing -
now learning storm's guaranteeing message processing , confused concepts in part.
to guarantee message emitted spout processed, storm uses acker achieve this. each time spout emits tuple, acker assign "ack val" initialized 0 store status of tuple tree. each time downstream bolts of tuple emit new tuple or ack "old" tuple, tuple id xor "ack val". acker needs check whether "ack val" 0 or not know tuple has been processed. let's see code below:
public class wordreader implements irichspout { ... ... while((str = reader.readline()) != null){ this.collector.emit(new values(str), str); ... ... }
the code piece above spout in word count program "getting started storm" tutorial. in emit method, 2nd parameter "str" messageid. confused parameter: 1) understand, each time tuple (i.e., message) emitted no matter in spouts or in bolts, should storm's responsibility assign 64-bit messageid message. correct? or here "str" human-readable alias message? 2) no matter what's answer 1), here "str" same word in 2 different messages because in text file there should many duplicate words. if true, how storm differentiate different messages? , what's meaning of parameter? 3) in code piece, see spouts use following code set message id in spout emit method:
public class randomintegerspout extends baserichspout { private long msgid = 0; collector.emit(new values(..., ++msgid), msgid); }
this closer think should be: message id should totally different across different messages. code piece, confusion is: happen private field "msgid" across different executors? because each executor has own msgid initialized 0, messages in different executors named 0, 1, 2, , on. how storm differentiate these messages?
i novice storm, maybe these problems naive. hope me figure out. thanks!
about message id general: internally might 64bit value, 64bit value computed hash msgid
object provided in emit()
within spout. can hand object message id (the probability 2 objects hash same value close zero).
about using str
: think in example, str
contains line (and not word) , unlikely document contains exact same line twice (if there no empty lines might many).
about counter message id: absolutely right observation -- if multiple spouts running in parallel, give message id conflict , break fault tolerance.
if want "fix" counter approach, each counter should initialized differently (best, 1...#spouttasks
). can use taskid (which unique , can accessed via topologycontext
provided in spout.open()
). basically, taskids parallel spout tasks, sort them, , assign each spout task ordering number. furthermore, need increment "number of parallel spouts" instead of 1
.
Comments
Post a Comment