Kafka Consumer Group Id and consumer rebalance issue -
i using kafka 0.10.0
, zookeeper 3.4.6
in production server .i having 20 topics each approx 50 partitions. having total of 100 consumers each subscribed different topics , partitions .all consumers having same groupid. case if consumer added or removed specific topic consumers attached different topic undergo rebalancing?
my consumer code is:
public static void main(string[] args) { string groupid = "prod" string topicregex = args[0] string consumertimeout = "10000" int n_threads = 1 if (args && args.size() > 1) { configloader.init(args[1]) } else { configloader.init('development') } if(args && args.size() > 2 && args[2].isinteger()){ n_threads = (args[2]).tointeger() } executorservice executor = executors.newfixedthreadpool(n_threads) addshutdownhook(executor) string zookeeper = configloader.conf.zookeeper.hostname list<runnable> tasklist = [] for(int = 0; < n_threads; i++){ kafkaconsumer example = new kafkaconsumer(zookeeper, groupid, topicregex, consumertimeout) tasklist.add(example) } tasklist.each{ task -> executor.submit(task) } executor.shutdown() executor.awaittermination(long.max_value, timeunit.seconds) } private static consumerconfig createconsumerconfig(string a_zookeeper, string a_groupid, string consumertimeout) { properties props = new properties() props.put("zookeeper.connect", a_zookeeper) props.put("group.id", a_groupid) props.put("zookeeper.session.timeout.ms", "10000") props.put("rebalance.backoff.ms","10000") props.put("zookeeper.sync.time.ms","200") props.put("rebalance.max.retries","10") props.put("enable.auto.commit", "false") props.put("consumer.timeout.ms", consumertimeout) props.put("auto.offset.reset", "smallest") return new consumerconfig(props) } public void run(string topicregex) { string threadname = thread.currentthread().getname() logger.info("{} [{}] main starting", tag, threadname) map<string, integer> topiccountmap = new hashmap<string, integer>() list<kafkastream<byte[], byte[]>> streams = consumer.createmessagestreamsbyfilter(new whitelist(topicregex),1) consumerconnector consumerconnector = consumer (final kafkastream stream : streams) { consumeriterator<byte[], byte[]> consumeriterator = stream.iterator() list<object> batchtypeobjlist = [] string topic string topicobjecttype string method string classname string deserialzer integer batchsize = 200 while (true){ boolean hasnext = false try { hasnext = consumeriterator.hasnext() } catch (interruptedexception interruptedexception) { //if (exception instanceof interruptedexception) { logger.error("{} [{}]interrupted exception: {}", tag, threadname, interruptedexception.getmessage()) throw interruptedexception //} else { } catch(consumertimeoutexception timeoutexception){ logger.error("{} [{}] timeout exception: {}", tag, threadname, timeoutexception.getmessage()) topiclistmap.each{ eachtopic, value -> batchtypeobjlist = topiclistmap.get(eachtopic) if(batchtypeobjlist != null && !batchtypeobjlist.isempty()) { def dbobject = topicconfigmap.get(eachtopic) logger.debug("{} [{}] timeout happened.. indexing remaining objects in list topic: {}", tag, threadname, eachtopic) classname = dbobject.get(kafkatopicconfigentity.class_name_key) method = dbobject.get(kafkatopicconfigentity.method_name_key) int sleeptime = 0 if(dbobject.get(kafkatopicconfigentity.conusmer_sleep_in_ms) != null) sleeptime = dbobject.get(kafkatopicconfigentity.conusmer_sleep_in_ms)?.tointeger() executemethod(classname, method, batchtypeobjlist) batchtypeobjlist.clear() topiclistmap.put(eachtopic,batchtypeobjlist) sleep(sleeptime) } } consumer.commitoffsets() continue } catch(exception exception){ logger.error("{} [{}]exception: {}", tag, threadname, exception.getmessage()) throw exception } if(hasnext) { def consumerobj = consumeriterator.next() logger.debug("{} [{}] partition name: {}", tag, threadname, consumerobj.partition()) topic = consumerobj.topic() dbobject dbobject = topicconfigmap.get(topic) logger.debug("{} [{}] topic name: {}", tag, threadname, topic) topicobjecttype = dbobject.get(kafkatopicconfigentity.topic_object_type_key) deserialzer = kafkaconfig.default_deserializer if(kafkaconfig.deserializer_map.containskey(topicobjecttype)){ deserialzer = kafkaconfig.deserializer_map.get(topicobjecttype) } classname = dbobject.get(kafkatopicconfigentity.class_name_key) method = dbobject.get(kafkatopicconfigentity.method_name_key) boolean isbatchjob = dbobject.get(kafkatopicconfigentity.is_batch_job_key) if(dbobject.get(kafkatopicconfigentity.batch_size_key) != null) batchsize = dbobject.get(kafkatopicconfigentity.batch_size_key) else batchsize = 1 object queueobj = (class.forname(deserialzer)).deserialize(consumerobj.message()) int sleeptime = 0 if(dbobject.get(kafkatopicconfigentity.conusmer_sleep_in_ms) != null) sleeptime = dbobject.get(kafkatopicconfigentity.conusmer_sleep_in_ms)?.tointeger() if(isbatchjob == true){ batchtypeobjlist = topiclistmap.get(topic) batchtypeobjlist.add(queueobj) if(batchtypeobjlist.size() == batchsize) { executemethod(classname, method, batchtypeobjlist) batchtypeobjlist.clear() sleep(sleeptime) } topiclistmap.put(topic,batchtypeobjlist) } else { executemethod(classname, method, queueobj) sleep(sleeptime) } consumer.commitoffsets() } } logger.debug("{} [{}] shutting down process ", tag, threadname) } }
any appriciated.
whenever consumer leaves or joins consumer group, entire group undergoes rebalance. since group tracks partitions across topics members subscribed right in thinking, can lead rebalancing of consumers not subscribed topic in question.
please see below small test illustrating point, have broker 2 topics test1 (2 partitions) , test2 (9 partitions) , starting 2 consumers, both same consumer group, each subscribes 1 of 2 topics. can see, when consumer2 joins group, consumer1 gets partitions revoked , reassigned, because entire group rebalances.
subscribing consumer1 topic test1 starting thread consumer1 polling consumer1 consumer1 got 0 partitions revoked! consumer1 got 2 partitions assigned! polling consumer1 polling consumer1 polling consumer1 subscribing consumer2 topic test2 starting thread consumer2 polling consumer2 polling consumer1 consumer2 got 0 partitions revoked! polling consumer1 polling consumer1 consumer1 got 2 partitions revoked! consumer2 got 9 partitions assigned! consumer1 got 2 partitions assigned! polling consumer2 polling consumer1 polling consumer2 polling consumer1 polling consumer2
Comments
Post a Comment