在Kafka中,当有新消费者加入或者订阅的topic数发生变化时,会触发Rebalance(再均衡:在同一个消费者组当中,分区的所有权从一个消费者转移到另外一个消费者)机制,Rebalance顾名思义就是重新均衡消费者消费。Rebalance的过程如下:
第一步:所有成员都向coordinator发送请求,请求入组。一旦所有成员都发送了请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader。
第二步:leader开始分配消费方案,指明具体哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案发给coordinator。coordinator接收到分配方案之后会把方案发给各个consumer,这样组内的所有成员就都知道自己应该消费哪些分区了。
所以对于Rebalance来说,Coordinator起着至关重要的作用,那么怎么查看消费者对应的Coordinator呢,我们知道某个消费者组对应__consumer_offsets中的哪个Partation是通过hash计算出来的:partation=hash("test_group_1")%50=28,表示test_group_1这个消费者组属于28号partation,通过命令:
./kafka-topics.sh --zookeeper 192.168.33.11:2181 --describe --topic __consumer_offsets
可以找到28号Partation所对应的信息:
从而可以知道coordinator对应的broker为1
在Rebalance期间,消费者会出现无法读取消息,造成整个消费者群组一段时间内不可用,假设现在消费者组当中有A,代码逻辑执行10s,如果消费者组在消费的过程中consumer B加入到了该消费者组,并且B的代码逻辑执行20s,那么当A处理完后先进入Rebalance状态等待,只有当B也处理完后,A和B才真正通过Rebalance重新分配,这样显然A在等待的过程中浪费了资源。
消费者A:
1 """ 2 consumer_rebalance_a.py a消费者 3 """ 4 import pickle 5 import uuid 6 import time 7 from kafka import KafkaConsumer 8 from kafka.structs import TopicPartition, OffsetAndMetadata 9 from kafka import ConsumerRebalanceListener10 11 consumer = KafkaConsumer(12 bootstrap_servers=['192.168.33.11:9092'],13 group_id="test_group_1",14 client_id="{}".format(str(uuid.uuid4())),15 enable_auto_commit=False,16 key_deserializer=lambda k: pickle.loads(k),17 value_deserializer=lambda v: pickle.loads(v)18 )19 20 # 用来记录最新的偏移量信息.21 consumer_offsets = {}22 23 24 class MineConsumerRebalanceListener(ConsumerRebalanceListener):25 def on_partitions_revoked(self, revoked):26 """27 再均衡开始之前 下一轮poll之前触发28 :param revoked:29 :return:30 """31 print('再均衡开始之前被自动触发.')32 print(revoked, type(revoked))33 consumer.commit_async(offsets=consumer_offsets)34 35 def on_partitions_assigned(self, assigned):36 """37 再均衡完成之后 即将下一轮poll之前 触发38 :param assigned:39 :return:40 """41 print('在均衡完成之后自动触发.')42 print(assigned, type(assigned))43 44 45 consumer.subscribe(topics=('round_topic',), listener=MineConsumerRebalanceListener())46 47 48 def _on_send_response(*args, **kwargs):49 """50 提交偏移量涉及回调函数51 :param args: 52 :param kwargs:53 :return:54 """55 if isinstance(args[1], Exception):56 print('偏移量提交异常. {}'.format(args[1]))57 else:58 print('偏移量提交成功')59 60 61 try:62 start_time = time.time()63 while True:64 # 再均衡其实是在poll之前完成的65 consumer_records_dict = consumer.poll(timeout_ms=100)66 67 # 处理逻辑.68 for k, record_list in consumer_records_dict.items():69 for record in record_list:70 print("topic = {},partition = {},offset = {},key = {},value = {}".format(71 record.topic, record.partition, record.offset, record.key, record.value)72 )73 74 consumer_offsets[75 TopicPartition(record.topic, record.partition)76 ] = OffsetAndMetadata(77 record.offset + 1, metadata='偏移量.'78 )79 80 try:81 consumer.commit_async(callback=_on_send_response)82 time.sleep(10)83 except Exception as e:84 print('commit failed', str(e))85 86 except Exception as e:87 print(str(e))88 finally:89 try:90 # 同步提交偏移量,在消费者异常退出的时候再次提交偏移量,确保偏移量的提交.91 consumer.commit()92 print("同步补救提交成功")93 except Exception as e:94 consumer.close()
消费者B:
1 """ 2 consumer b.py 消费者B 3 """ 4 5 import pickle 6 import uuid 7 import time 8 from kafka import KafkaConsumer 9 from kafka.structs import TopicPartition, OffsetAndMetadata 10 from kafka import ConsumerRebalanceListener 11 12 consumer = KafkaConsumer( 13 bootstrap_servers=['192.168.33.11:9092'], 14 group_id="test_group_1", 15 client_id="{}".format(str(uuid.uuid4())), 16 enable_auto_commit=False, # 设置为手动提交偏移量. 17 key_deserializer=lambda k: pickle.loads(k), 18 value_deserializer=lambda v: pickle.loads(v) 19 ) 20 21 consumer_offsets = {} # 用来记录最新的偏移量信息. 22 23 24 class MineConsumerRebalanceListener(ConsumerRebalanceListener): 25 def on_partitions_revoked(self, revoked): 26 """ 27 再均衡开始之前 下一轮poll之前触发 28 :param revoked: 29 :return: 30 """ 31 print('再均衡开始之前被自动触发.') 32 print(revoked, type(revoked)) 33 consumer.commit_async(offsets=consumer_offsets) 34 35 def on_partitions_assigned(self, assigned): 36 """ 37 再均衡完成之后 即将下一轮poll之前 触发 38 :param assigned: 39 :return: 40 """ 41 42 print('在均衡完成之后自动触发.') 43 print(assigned, type(assigned)) 44 45 46 consumer.subscribe(topics=('round_topic',), listener=MineConsumerRebalanceListener()) 47 48 49 def _on_send_response(*args, **kwargs): 50 """ 51 提交偏移量涉及回调函数 52 :param args: 53 :param kwargs: 54 :return: 55 """ 56 57 if isinstance(args[1], Exception): 58 print('偏移量提交异常. {}'.format(args[1])) 59 else: 60 print('偏移量提交成功') 61 62 63 try: 64 start_time = time.time() 65 while True: 66 # 再均衡其实是在poll之前完成的 67 consumer_records_dict = consumer.poll(timeout_ms=100) 68 69 record_num = 0 70 for key, record_list in consumer_records_dict.items(): 71 for record in record_list: 72 record_num += 1 73 print("---->当前批次获取到的消息个数是:{}".format(record_num)) 74 75 # 处理逻辑. 76 for k, record_list in consumer_records_dict.items(): 77 for record in record_list: 78 print("topic = {},partition = {},offset = {},key = {},value = {}".format( 79 record.topic, record.partition, record.offset, record.key, record.value) 80 ) 81 82 consumer_offsets[ 83 TopicPartition(record.topic, record.partition) 84 ] = OffsetAndMetadata(record.offset + 1, metadata='偏移量.') 85 86 try: 87 # 轮询一个batch 手动提交一次 88 consumer.commit_async(callback=_on_send_response) 89 time.sleep(20) 90 except Exception as e: 91 print('commit failed', str(e)) 92 93 except Exception as e: 94 print(str(e)) 95 finally: 96 try: 97 # 同步提交偏移量,在消费者异常退出的时候再次提交偏移量,确保偏移量的提交. 98 consumer.commit() 99 print("同步补救提交成功")100 except Exception as e:101 consumer.close()
消费者A和消费者B是同一个消费者组(test_group_1)的两个消费者,用time.sleep的方式模拟执行时间,A:10s,B:20s;首先A开始消费,当B新加入消费者组的时候会触发Rebalance,可以通过实现再均衡监听器(RebalanceListener)中的on_partitions_revoked和on_partitions_assigned方法来查看再均衡触发前后的partition变化情况,依次启动消费者A和B之后:
消费者A:再均衡开始之前被自动触发.{TopicPartition(topic='round_topic', partition=0), TopicPartition(topic='round_topic', partition=1), TopicPartition(topic='round_topic', partition=2)}<-------------------------------------------------------------------------------->在均衡完成之后自动触发.{TopicPartition(topic='round_topic', partition=0), TopicPartition(topic='round_topic', partition=1)} <----------------------------------------消费者B:再均衡开始之前被自动触发.set() <-------------------------------------------------------------------------------->在均衡完成之后自动触发.{TopicPartition(topic='round_topic', partition=2)} <----------------------------------------
在等待B的逻辑执行完后,A和B进入再均衡状态;再均衡前A处于partition 0、1、 2三个分区,B不占有任何partition;当再均衡结束后,A占有partition 0、1,B占有partition 2;然后A和B分别开始消费对应的partition。
在上述消费者A和B的代码中重写了RebalanceListener,主要是为了在发生再均衡之前提交最后一个已经处理记录的偏移量,因为再均衡时消费者将失去对一个分区的所有权,如果消费者已经消费了当前partition还没提交offset,这时候发生再均衡会使得消费者重新分配partition,可能使得同一个消息先后被两个消费者消费的情况,实现MineConsumerRebalanceListener再均衡前提交一次offset,确保每一个消费者在触发再均衡前提交最后一次offset:
1 class MineConsumerRebalanceListener(ConsumerRebalanceListener): 2 def on_partitions_revoked(self, revoked): 3 """ 4 再均衡开始之前 下一轮poll之前触发 5 :param revoked: 6 :return: 7 """ 8 print('再均衡开始之前被自动触发.') 9 print(revoked, type(revoked))10 consumer.commit_async(offsets=consumer_offsets)11 12 def on_partitions_assigned(self, assigned):13 """14 再均衡完成之后 即将下一轮poll之前 触发15 :param assigned:16 :return:17 """18 19 print('在均衡完成之后自动触发.')20 print(assigned, type(assigned))
再均衡发生的场景有以下几种:
1. 组成员发生变更(新consumer加入组、已有consumer主动离开组或已有consumer崩溃了) 2. 订阅主题数发生变更,如果你使用了正则表达式的方式进行订阅,那么新建匹配正则表达式的topic就会触发rebalance 3. 订阅主题的分区数发生变更 鉴于触发再均衡后会造成资源浪费的问题,所以我们尽量不要触发再均衡