Kafka Consumer Group Id and consumer rebalance issue -


i using kafka 0.10.0 , zookeeper 3.4.6 in production server .i having 20 topics each approx 50 partitions. having total of 100 consumers each subscribed different topics , partitions .all consumers having same groupid. case if consumer added or removed specific topic consumers attached different topic undergo rebalancing?

my consumer code is:

public static void main(string[] args) {         string groupid = "prod"         string topicregex = args[0]         string consumertimeout = "10000"         int n_threads = 1         if (args && args.size() > 1) {             configloader.init(args[1])         }         else {             configloader.init('development')         }         if(args && args.size() > 2 && args[2].isinteger()){             n_threads = (args[2]).tointeger()         }          executorservice executor = executors.newfixedthreadpool(n_threads)         addshutdownhook(executor)         string zookeeper = configloader.conf.zookeeper.hostname         list<runnable> tasklist = []         for(int = 0; < n_threads; i++){             kafkaconsumer example = new kafkaconsumer(zookeeper, groupid, topicregex, consumertimeout)             tasklist.add(example)         }         tasklist.each{ task ->             executor.submit(task)         }         executor.shutdown()         executor.awaittermination(long.max_value, timeunit.seconds)     }  private static consumerconfig createconsumerconfig(string a_zookeeper, string a_groupid, string consumertimeout) {          properties props = new properties()         props.put("zookeeper.connect", a_zookeeper)         props.put("group.id", a_groupid)         props.put("zookeeper.session.timeout.ms", "10000")         props.put("rebalance.backoff.ms","10000")         props.put("zookeeper.sync.time.ms","200")         props.put("rebalance.max.retries","10")         props.put("enable.auto.commit", "false")         props.put("consumer.timeout.ms", consumertimeout)         props.put("auto.offset.reset", "smallest")         return new consumerconfig(props)      }  public void run(string topicregex) {         string threadname = thread.currentthread().getname()         logger.info("{} [{}] main starting", tag, threadname)         map<string, integer> topiccountmap = new hashmap<string, integer>()         list<kafkastream<byte[], byte[]>> streams = consumer.createmessagestreamsbyfilter(new whitelist(topicregex),1)         consumerconnector consumerconnector = consumer          (final kafkastream stream : streams) {             consumeriterator<byte[], byte[]> consumeriterator = stream.iterator()             list<object> batchtypeobjlist = []             string topic             string topicobjecttype             string method             string classname             string deserialzer             integer batchsize = 200             while (true){                 boolean hasnext = false                 try {                     hasnext = consumeriterator.hasnext()                 } catch (interruptedexception interruptedexception) {                     //if (exception instanceof interruptedexception) {                     logger.error("{} [{}]interrupted exception: {}", tag, threadname, interruptedexception.getmessage())                     throw interruptedexception                     //} else {                 } catch(consumertimeoutexception timeoutexception){                     logger.error("{} [{}] timeout exception: {}", tag, threadname, timeoutexception.getmessage())                     topiclistmap.each{ eachtopic, value ->                         batchtypeobjlist = topiclistmap.get(eachtopic)                         if(batchtypeobjlist != null && !batchtypeobjlist.isempty()) {                             def dbobject = topicconfigmap.get(eachtopic)                             logger.debug("{} [{}] timeout happened.. indexing remaining objects in list topic: {}", tag, threadname, eachtopic)                             classname = dbobject.get(kafkatopicconfigentity.class_name_key)                             method = dbobject.get(kafkatopicconfigentity.method_name_key)                             int sleeptime = 0                             if(dbobject.get(kafkatopicconfigentity.conusmer_sleep_in_ms) != null)                                 sleeptime = dbobject.get(kafkatopicconfigentity.conusmer_sleep_in_ms)?.tointeger()                             executemethod(classname, method, batchtypeobjlist)                             batchtypeobjlist.clear()                             topiclistmap.put(eachtopic,batchtypeobjlist)                             sleep(sleeptime)                         }                     }                     consumer.commitoffsets()                     continue                 } catch(exception exception){                     logger.error("{} [{}]exception: {}", tag, threadname, exception.getmessage())                     throw exception                 }                 if(hasnext) {                     def consumerobj = consumeriterator.next()                     logger.debug("{} [{}] partition name: {}", tag, threadname, consumerobj.partition())                     topic = consumerobj.topic()                     dbobject dbobject = topicconfigmap.get(topic)                     logger.debug("{} [{}] topic name: {}", tag, threadname, topic)                     topicobjecttype = dbobject.get(kafkatopicconfigentity.topic_object_type_key)                     deserialzer = kafkaconfig.default_deserializer                     if(kafkaconfig.deserializer_map.containskey(topicobjecttype)){                         deserialzer = kafkaconfig.deserializer_map.get(topicobjecttype)                     }                     classname = dbobject.get(kafkatopicconfigentity.class_name_key)                     method = dbobject.get(kafkatopicconfigentity.method_name_key)                     boolean isbatchjob = dbobject.get(kafkatopicconfigentity.is_batch_job_key)                     if(dbobject.get(kafkatopicconfigentity.batch_size_key) != null)                         batchsize = dbobject.get(kafkatopicconfigentity.batch_size_key)                     else                         batchsize = 1                     object queueobj = (class.forname(deserialzer)).deserialize(consumerobj.message())                     int sleeptime = 0                     if(dbobject.get(kafkatopicconfigentity.conusmer_sleep_in_ms) != null)                         sleeptime = dbobject.get(kafkatopicconfigentity.conusmer_sleep_in_ms)?.tointeger()                     if(isbatchjob == true){                         batchtypeobjlist = topiclistmap.get(topic)                         batchtypeobjlist.add(queueobj)                         if(batchtypeobjlist.size() == batchsize) {                             executemethod(classname, method, batchtypeobjlist)                             batchtypeobjlist.clear()                             sleep(sleeptime)                         }                         topiclistmap.put(topic,batchtypeobjlist)                     } else {                         executemethod(classname, method, queueobj)                         sleep(sleeptime)                     }                     consumer.commitoffsets()                 }             }             logger.debug("{} [{}] shutting down process ", tag, threadname)         }     } 

any appriciated.

whenever consumer leaves or joins consumer group, entire group undergoes rebalance. since group tracks partitions across topics members subscribed right in thinking, can lead rebalancing of consumers not subscribed topic in question.

please see below small test illustrating point, have broker 2 topics test1 (2 partitions) , test2 (9 partitions) , starting 2 consumers, both same consumer group, each subscribes 1 of 2 topics. can see, when consumer2 joins group, consumer1 gets partitions revoked , reassigned, because entire group rebalances.

subscribing consumer1 topic test1 starting thread consumer1 polling consumer1 consumer1 got 0 partitions revoked! consumer1 got 2 partitions assigned! polling consumer1 polling consumer1 polling consumer1 subscribing consumer2 topic test2 starting thread consumer2 polling consumer2 polling consumer1 consumer2 got 0 partitions revoked! polling consumer1 polling consumer1 consumer1 got 2 partitions revoked! consumer2 got 9 partitions assigned! consumer1 got 2 partitions assigned! polling consumer2 polling consumer1 polling consumer2 polling consumer1 polling consumer2 

Comments

Popular posts from this blog

python - How to insert QWidgets in the middle of a Layout? -

python - serve multiple gunicorn django instances under nginx ubuntu -

module - Prestashop displayPaymentReturn hook url -