kafka源码分析之kafka的consumer的负载均衡管理

GroupCoordinator 说明,主要是消费者的连接建立,offset的更新操作。管理所有的consumer与对应的group的信息。Group的metadata的信息,consumer对应的offset的更新操作。
实例创建与启动 consumerCoordinator = GroupCoordinator.create(config, zkUtils, replicaManager)
consumerCoordinator.startup()

创建实例,
def create(config: KafkaConfig,
zkUtils: ZkUtils,
replicaManager: ReplicaManager): GroupCoordinator = {
读取与记录group的offset相关的配置信息:
1,配置项offset.metadata.max.bytes,默认值4096.用于配置offset的请求的最大请求的消息大小。
2,配置项offsets.load.buffer.size,默认值5MB,用于在读取offset信息到内存cache时,用于读取缓冲区的大小。
3,配置项offsets.retention.minutes,默认值24小时,针对一个offset的消费记录的最长保留时间。
4,配置项offsets.retention.check.interval.ms,默认值600秒,用于定期检查offset过期数据的检查周期。
5,配置项offsets.topic.num.partitions,默认值50,offset记录的topic的partition个数。
6,配置项offsets.topic.replication.factor,默认3,用于配置offset记录的topic的partition的副本个数。
7,配置项offsets.commit.timeout.ms,默认值5秒,用于配置提交offset的最长等待时间。
8,配置项offsets.commit.required.acks,默认值-1,用于配置提交offset的请求的ack的值。
9,配置项group.min.session.timeout.ms,默认值6秒,
10,配置项group.max.session.timeout.ms,默认值30秒,用于配置session的超时时间。

val offsetConfig = OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize,
loadBufferSize = config.offsetsLoadBufferSize,
offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
offsetsTopicNumPartitions = config.offsetsTopicPartitions,
offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
val groupConfig = GroupConfig(
groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs)

new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, replicaManager,
zkUtils)
}

更新此topic对应的配置文件,主要修改日志清理部分的配置。
修改这个topic的segment的大小为100MB每一个。默认的非内置的topic的segment的大小为1GB.
def offsetsTopicConfigs: Properties = {
val props = new Properties
props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
props.put(LogConfig.SegmentBytesProp,
offsetConfig.offsetsTopicSegmentBytes.toString)
props.put(LogConfig.CompressionTypeProp, UncompressedCodec.name)
props
}

生成GroupCoordinator中用于对offset进行操作的组件,GroupMetadataManager实例。
---------------------------
用于存储每个group消费的partition对应的offset
private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata]
用于存储当前所有的消费者的信息,每个消费者中包含有多少个client进行消费等
private val groupsCache = new Pool[String, GroupMetadata]
如果正在对topic中的内容进行加载时,还没有加载到cache中,这个集合中存储有每个group与partition的名称。
private val loadingPartitions: mutable.Set[Int] = mutable.Set()
这个集合中存储有当前所有的group中已经cache到内存的partition的消费者信息,表示这个group的offse可以被读取。
private val ownedPartitions: mutable.Set[Int] = mutable.Set()

从zk中对应的这个记录消费者信息的topic中读取这个topic的partition信息与副本信息。
/* number of partitions for the consumer metadata topic */
private val groupMetadataTopicPartitionCount = getOffsetsTopicPartitionCount

/* Single-thread scheduler to handling offset/group metadata cache loading and unloading */
private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "group-metadata-manager-")

根据定时检查offset过期的时间周期,执行过期offset删除的操作,deleteExpiredOffsets函数。
scheduler.startup()
scheduler.schedule(name = "delete-expired-consumer-offsets",
fun = deleteExpiredOffsets,
period = config.offsetsRetentionCheckIntervalMs,
unit = TimeUnit.MILLISECONDS)


启动GroupCoordinator实例时,生成的相关信息:
def startup() {
info("Starting up.")

定义用于处理client与group心跳超时的控制单元。
heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]
("Heartbeat", brokerId)
定义用于处理group加入的超时控制单元。
joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]
("Rebalance", brokerId)

设置当前的coordinator的实例为活动状态。
isActive.set(true)
info("Startup complete.")
}

Group元数据partition的leader上线操作 这个操作在对应元数据管理的partition的leader发生变化后,被选择成为新的leader的节点上会进行触发,或者一个broker启动时,也会触发这个动作。

这个onGroupLoaded函数用于处理在group的加载后执行的动作,这个回调函数主要完成对当前的所有的member进行心跳超时的监听动作,生成一个DelayedHeartbeat实例用于监听对member的心跳超时。
private def onGroupLoaded(group: GroupMetadata) {
group synchronized {
info(s"Loading group metadata for ${group.groupId} with generation
${group.generationId}")
assert(group.is(Stable))
group.allMemberMetadata.foreach(
completeAndScheduleNextHeartbeatExpiration(group, _)
)
}
}

当group的消费的topic的partition在当前的broker中被选举成leader时,触发的函数。
def handleGroupImmigration(offsetTopicPartitionId: Int) {
这里直接通过groupManager中的loadGroupsForPartition对partition进行加载。
groupManager.loadGroupsForPartition(offsetTopicPartitionId, onGroupLoaded)
}

接下来看看这个loadGroupsForPartition函数的处理流程:

/**
* Asynchronously read the partition from the offsets topic and populate the cache
*/
def loadGroupsForPartition(offsetsPartition: Int,
onGroupLoaded: GroupMetadata => Unit) {
val topicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName,
offsetsPartition)
执行这个loadGroupsForPartition函数内的内部函数loadGroupsAndOffsets函数,来对这个partition的数据进行加载。
scheduler.schedule(topicPartition.toString, loadGroupsAndOffsets)

接下来看看这个加载partition数据的函数的处理逻辑:
def loadGroupsAndOffsets() {
info("Loading offsets and group metadata from " + topicPartition)

首先,如果要加载的partition已经在loadingPartitions集合中存在了,表示这个partition已经在执行加载操作,直接return回去,不进行处理,否则把这个partition加入到loadingPartitions中,这个表示是正在执行加载操作的partition的集合。
这里的offsetsPartition表示的是存储元数据与offset的内置topic的partition.
loadingPartitions synchronized {
if (loadingPartitions.contains(offsetsPartition)) {
info("Offset load from %s already in progress.".format(topicPartition))
return
} else {
loadingPartitions.add(offsetsPartition)
}
}

val startMs = SystemTime.milliseconds
try {
从LogManager中得到这个partition对应的Log实例,
replicaManager.logManager.getLog(topicPartition) match {
case Some(log) =>
如果在当前的机器上有这个partition的副本,那么这个Log实例就一定存在,得到这个Log中最小的segment的最小的offset.
var currOffset = log.logSegments.head.baseOffset
根据每次加载的数据量,生成一个加载数据的buffer.
val buffer = ByteBuffer.allocate(config.loadBufferSize)
inWriteLock
(offsetExpireLock) {
val loadedGroups = mutable.Map[String, GroupMetadata]()
val removedGroups = mutable.Set[String]()

开始进行迭代读取这个partition的log中的消息,直到读取到offset等于当前partition的最大的offset为迭代结束 。这里加载到的highWatermark的offset是当前副本同步到的最新的大小。
这个highWatermark根据对应的partition的follower的副本的同步,每次同步会更新这个副本的logEndOffset的值,而这个highWatermark的值是所有的副本中logEndOffset最小的一个值。
while (currOffset < getHighWatermark(offsetsPartition)
&& !shuttingDown.get()) {
buffer.clear()
读取指定大小的数据,并把消息存储到生成的buffer中。
val messages = log.read(currOffset,
config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet]
messages.readInto(buffer, 0)
根据这个buffer生成用于消息读取的message的集合。
val messageSet = new ByteBufferMessageSet(buffer)

根据读取出来的消息集合进行迭代,处理每一条读取到的消息。这里调用的是messageSet的iterator的函数。
messageSet.foreach { msgAndOffset =>
require(msgAndOffset.message.key != null,
"Offset entry key should not be null")

解析出这一条消息的key值,并根据key值的类型做对应的处理流程。
val baseKey = GroupMetadataManager.readMessageKey(
msgAndOffset.message.key)

如果读取到的消息是一个consumer记录的offset的消费信息的记录,
if (baseKey.isInstanceOf[OffsetKey]) {
// load offset
val key = baseKey.key.asInstanceOf[GroupTopicPartition]
这里检查下读取到的offset的metadata的记录的value部分是否为null,如果为null,表示这条offset已经过期被清理掉,从offsetsCache中移出这条offset的记录。
if (msgAndOffset.message.payload == null) {
if (offsetsCache.remove(key) != null)
trace("Removed offset for %s due to tombstone entry.".format(key))
else
trace("Ignoring redundant tombstone for %s.".format(key))
}
下面的else部分表示offset读取到的消息是一条正常的消息,把这条存储consumer offset记录的消息写入到offsetsCache集合中,如果这条offset的commit时,指定了过期时间时,那么这个消息直接使用这个过期时间,否则使用这个offset commit时的时间加上配置的过期延时来设置这个offset的过期时间。
else {
val value = https://www.it610.com/article/GroupMetadataManager.readOffsetMessageValue(
msgAndOffset.message.payload)
putOffset(key, value.copy (
expireTimestamp = {
if (value.expireTimestamp == org.apache.kafka.common
.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)
value.commitTimestamp + config.offsetsRetentionMs
else
value.expireTimestamp
}
))
trace("Loaded offset %s for %s.".format(value, key))
}
}
如果读取到的消息是一个对group的sync的操作后存储的group的元数据的消息
else {
// load group metadata
val groupId = baseKey.key.asInstanceOf[String]
这个部分表示是一条存储group的metadata的消息,检查这个group的value是否为null,为null表示这个group已经被删除,把这个group添加到removedGroups集合中,否则把这个group与对应的group的metadata信息添加到loadedGroups集合中。
val groupMetadata = https://www.it610.com/article/GroupMetadataManager.readGroupMessageValue(
groupId, msgAndOffset.message.payload)
if (groupMetadata != null) {
trace(s"Loaded group metadata for group
${groupMetadata.groupId} with generation
${groupMetadata.generationId}")

removedGroups.remove(groupId)
loadedGroups.put(groupId, groupMetadata)
} else {
loadedGroups.remove(groupId)
removedGroups.add(groupId)
}
}

currOffset = msgAndOffset.nextOffset
}
}

如果读取完成这个partition的所有的日志后,有需要添加到cache的group的元数据信息时,迭代这个集合,取出每一个group的元数据并添加到groupsCache的cache集合中。同时设置每一个group的所有的member的心跳超时监听。
loadedGroups.values.foreach { group =>
val currentGroup = addGroup(group)
if (group != currentGroup)
debug(s"Attempt to load group
${group.groupId} from log with generation
${group.generationId} failed " +
s"because there is already a cached group with generation
${currentGroup.generationId}")
else
onGroupLoaded(group)
}

如果读取完成对这个partition的所有的日志后,有已经被删除掉的group时,迭代这个已经删除的group的集合,检查每一个已经删除的group是否还在groupsCache的cache,
removedGroups.foreach { groupId =>
val group = groupsCache.get(groupId)
if (group != null)
throw new IllegalStateException(s"Unexpected unload of acitve group
${group.groupId} while " +
s"loading partition ${topicPartition}")
}
}

if (!shuttingDown.get())
info("Finished loading offsets from %s in %d milliseconds."
.format(topicPartition, SystemTime.milliseconds - startMs))
case None =>
warn("No log found for " + topicPartition)
}
}
catch {
case t: Throwable =>
error("Error in loading offsets from " + topicPartition, t)
}
finally {
当对用于存储group的元数据与consumer对partition的消费记录的offset的加载完成后,把这个partition添加到ownedPartitions集合中表示完成partition的数据加载,
并从loadingPartitions集合中移出,loadingPartitions中如果存在partition表示这个parition中存储的group信息暂时是无法被访问的。
loadingPartitions synchronized {
ownedPartitions.add(offsetsPartition)
loadingPartitions.remove(offsetsPartition)
}
}
}
}

Group元数据partition的leader下线操作 当一个group对应的元数据的partition的leader对应的broker节点下线,或者leader发生切换时,对原来的leader的partition需要执行下线的操作。

下线操作后的回调函数:
在下线一个group对应的partition的leader时,会得到这个partition中所有的group,并根据这个group的元数据调用当前的回调函数。
处理流程:
1,更新这个group的状态为Dead的状态,表示group在当前的节点已经死亡。
2,根据group更新状态前原来的状态,进行回调处理:
2,1,如果group下线前的状态是PreparingRebalance,调用这个group中所有的consumer的joinCallback的回调函数,向对应的consumer写入一个NOT_COORDINATOR_FOR_GROUP错误。
2,2,如果group下线前的状态是Stable | AwaitingSync,调用这个group中所有的consumer的syncCallback的回调函数,向对应的consumer写入一个NOT_COORDINATOR_FOR_GROUP错误。
private def onGroupUnloaded(group: GroupMetadata) {
group synchronized {
info(s"Unloading group metadata for ${group.groupId}
with generation ${group.generationId}")
val previousState = group.currentState
group.transitionTo(Dead)

previousState match {
case Dead =>
case PreparingRebalance =>
for (member <- group.allMemberMetadata) {
if (member.awaitingJoinCallback != null) {
member.awaitingJoinCallback(joinError(member.memberId,
Errors.NOT_COORDINATOR_FOR_GROUP.code))
member.awaitingJoinCallback = null
}
}
joinPurgatory.checkAndComplete(GroupKey(group.groupId))

case Stable | AwaitingSync =>
for (member <- group.allMemberMetadata) {
if (member.awaitingSyncCallback != null) {
member.awaitingSyncCallback(Array.empty[Byte],
Errors.NOT_COORDINATOR_FOR_GROUP.code)
member.awaitingSyncCallback = null
}
heartbeatPurgatory.checkAndComplete(MemberKey(member.groupId,
member.memberId))
}
}
}
}

下线leader时的入口函数:这个函数直接调用了groupManager中的removeGroupsForPartition函数。
def handleGroupEmigration(offsetTopicPartitionId: Int) {
groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded)
}

接下来看看GroupMetadataManager中的removeGroupsForPartition函数流程:
这个函数中根据要执行下线操作的partition的下标与对应用于group下线的回调处理函数进行处理。
*/
def removeGroupsForPartition(offsetsPartition: Int,
onGroupUnloaded: GroupMetadata => Unit) {

首先得到要下线的partition的TopicPartition的实例。
val topicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName,
offsetsPartition)
发起对下线group与offset的元数据的下线处理,这里直接调用当前的内部函数removeGroupsAndOffsets的函数。
scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets)

下面定义了具体下线操作的操作流程的函数removeGroupsAndOffsets。
def removeGroupsAndOffsets() {
var numOffsetsRemoved = 0
var numGroupsRemoved = 0

loadingPartitions synchronized {
首先从ownedPartitions集合中移出这个partition.表示这个partition已经下线,所有的对应这个partition的group来请求当前的节点时,都将无法访问。
// we need to guard the group removal in cache in the loading partition lock
// to prevent coordinator's check-and-get-group race condition
ownedPartitions
.remove(offsetsPartition)

迭代offsetsCache的cache集合,找到这个offset的元数据中group对应的partition与下线的partition是相同的partition的所有的记录并从cache中移出这些记录,并记录移出的offset的条数(用于打印日志)。
offsetsCache
.keys.foreach { key =>
if (partitionFor(key.group) == offsetsPartition) {
offsetsCache.remove(key)
numOffsetsRemoved += 1
}
}

groupsCache的cache的集合中找到group元数据对应的存储partition与下线的partition相同的所有的group的元数据记录,从cache中移出这些个group并执行下线group的回调函数,记录下线group的元数据的个数(用于打印日志)。
// clear the groups for this partition in the cache
for (group <- groupsCache.values) {
if (partitionFor(group.groupId) == offsetsPartition) {
onGroupUnloaded(group)
groupsCache.remove(group.groupId, group)
numGroupsRemoved += 1
}
}
}

if (numOffsetsRemoved > 0) info("Removed %d cached offsets for %s on follower
transition."
.format(numOffsetsRemoved, TopicAndPartition(
GroupCoordinator.GroupMetadataTopicName, offsetsPartition)))

if (numGroupsRemoved > 0) info("Removed %d cached groups for %s on follower
transition."
.format(numGroupsRemoved, TopicAndPartition(
GroupCoordinator.GroupMetadataTopicName, offsetsPartition)))
}
}

定期删除过期的offset记录 根据配置项offsets.retention.check.interval.ms,默认值600秒,用于定期检查offset过期数据的检查周期,通过GroupMetadataManager中的deleteExpiredOffsets函数来进行处理。


private def deleteExpiredOffsets() {
debug("Collecting expired offsets.")
val startMs = SystemTime.milliseconds

val numExpiredOffsetsRemoved = inWriteLock(offsetExpireLock) {
这里得到过期时间小于当前的时间的所有的offset的cache的集合。
val expiredOffsets = offsetsCache.filter {
case (groupTopicPartition, offsetAndMetadata) =>
offsetAndMetadata.expireTimestamp < startMs
}

debug("Found %d expired offsets.".format(expiredOffsets.size))

把过期的offset cache的集合从offsetsCache集合中移出,同时生成一个新的消息集合,这个消息的value都是null值,表示这是一个可以被删除的点,并按partition的id值进行分组。
val tombstonesForPartition = expiredOffsets.map {
case (groupTopicAndPartition, offsetAndMetadata) =>
这里找到这个groupid对应的存储offset的partition的partition的id值。
val offsetsPartition = partitionFor(groupTopicAndPartition.group)
trace("Removing expired offset and metadata for %s: %s"
.format(groupTopicAndPartition, offsetAndMetadata))

offsetsCache.remove(groupTopicAndPartition)

val commitKey = GroupMetadataManager.offsetCommitKey(
groupTopicAndPartition.group,
groupTopicAndPartition.topicPartition.topic,
groupTopicAndPartition.topicPartition.partition)

(offsetsPartition, new Message(bytes = null, key = commitKey))
}.groupBy { case (partition, tombstone) => partition }


tombstonesForPartition.flatMap { case (offsetsPartition, tombstones) =>
从副本的allPartitions集合中找到对应group的partition的实例。
val partitionOpt = replicaManager.getPartition(
GroupCoordinator.GroupMetadataTopicName, offsetsPartition)

如果这个partition存在,生成向这个partition写入的消息,并向topic中写入这个offset记录的消息,这个消息的offset部分的存储是一个null值。
partitionOpt.map { partition =>
val appendPartition = TopicAndPartition(
GroupCoordinator.GroupMetadataTopicName, offsetsPartition)
val messages = tombstones.map(_._2).toSeq

trace("Marked %d offsets in %s for deletion."
.format(messages.size, appendPartition))

try {
// do not need to require acks since even if the tombsone is lost,
// it will be appended again in the next purge cycle
partition.appendMessagesToLeader(
new ByteBufferMessageSet(config.offsetsTopicCompressionCodec,
messages: _*))
tombstones.size
}
catch {
case t: Throwable =>
error("Failed to mark %d expired offsets for deletion in %s."
.format(messages.size, appendPartition), t)
// ignore and continue
0
}
}
}.sum
}

info("Removed %d expired offsets in %d milliseconds."
.format(numExpiredOffsetsRemoved, SystemTime.milliseconds - startMs))
}

处理group的加入 接收并处理请求
当一个consumer的实例生成时,第一次执行消费操作时,会向group对应的broker发起一个joinGroup的请求,这个请求由GroupCoordinator实例中的如下函数来进行处理。
def handleJoinGroup(groupId: String,
memberId: String,
clientId: String,
clientHost: String,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback) {

首先判断coordinator实例是否被启动,如果没有启动,
直接向client端响应GROUP_COORDINATOR_NOT_AVAILABLE消息。
if (!isActive.get) {
responseCallback(joinError(memberId,
Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))
} else if (!validGroupId(groupId)) {
这里检查groupId是否为空字符串或者是一个null值,如果是,直接向client端响应INVALID_GROUP_ID代码。
responseCallback(joinError(memberId, Errors.INVALID_GROUP_ID.code))
} else if (!isCoordinatorForGroup(groupId)) {
这里检查groupId对应的topic存储的partition是否在当前的ownerPartitions的集合中,如果不在,表示这个joinGroup的请求对应的leader发生了变化 ,需要重新连接,直接向client端响应NOT_COORDINATOR_FOR_GROUP代码。
responseCallback(joinError(memberId, Errors.NOT_COORDINATOR_FOR_GROUP.code))
} else if (isCoordinatorLoadingInProgress(groupId)) {
这里检查groupId对应的partition是否还处于加载的状态(loadingPartitions集合中存在),如果这个partition还没有加载完成,直接向client端响应GROUP_LOAD_IN_PROGRESS代码。
responseCallback(joinError(memberId, Errors.GROUP_LOAD_IN_PROGRESS.code))
} else if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||
sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {
如果超时时间不在配置的范围内时,直接向client端响应INVALID_SESSION_TIMEOUT代码。
responseCallback(joinError(memberId, Errors.INVALID_SESSION_TIMEOUT.code))
} else {
如果流程执行到这个操作时,表示开始进行group的join处理。
首先先通过groupManager从groups集合中得到对应的groupId的定义信息,如果这个信息不存在,表示这个group是第一次加入,否则表示这是一个已经存在的group
// only try to create the group if the group is not unknown AND
// the member id is UNKNOWN, if member is specified but group does not
// exist we should reject the request
var group = groupManager.getGroup(groupId)
if (group == null) {
if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
如果要进行的join的groupId是一个新加入的group,也就是还没有这个groupId对应的元数据时,传入的memberId的值必须是一个UNKNOWN_MEMBER_ID值,否则直接向client端响应UNKNOWN_MEMBER_ID代码。
responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
} else {
这种情况是groupId第一次加入,先生成一个GroupMetadata信息,一个GroupMetadata包含有这个groupId的名称与group对应的协议类型(consumer)
group = groupManager.addGroup(new GroupMetadata(groupId, protocolType))

执行doJoinGroup操作,来添加这个group.
doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs,
protocolType, protocols, responseCallback)
}
} else {
这种情况下,groupId是一个已经存在的group信息,把这个groupId对应的元数据直接传入到doJoinGroup函数中进行处理。
doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs,
protocolType, protocols, responseCallback)
}
}
}

接下来看看doJoinGroup函数的流程:

private def doJoinGroup(group: GroupMetadata,
memberId: String,
clientId: String,
clientHost: String,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback) {
group synchronized {
if (group.protocolType != protocolType ||
!group.supportsProtocols(protocols.map(_._1).toSet)) {
如果group是一个已经存在的group时,这个检查用于检查上一次group的协议于当前consumer传入此group的join的协议是否相同,如果不相同,直接向client端响应INCONSISTENT_GROUP_PROTOCOL代码。
// if the new member does not support the group protocol, reject it
responseCallback(joinError(memberId,
Errors.INCONSISTENT_GROUP_PROTOCOL.code))
}
这里检查下memberId的值是否为UNKNOWN_MEMBER_ID,如果不是时,同时这个memberId不在group中存在时,直接向client端响应UNKNOWN_MEMBER_ID代码。这个处理主要也是用在重新请求时(leader切换后的重新请求)。
else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID
&& !group.has(memberId)) {
responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
} else {


如果流程执行到这里时,开始根据当前group的状态执行相应的处理。
group.currentState match {

case Dead =>
如果group已经是一个Dead的状态的group表示这个group已经死亡,直接返回UNKNOWN_MEMBER_ID代码。
responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))


case PreparingRebalance =>
如果当前的group的状态是正在准备执行Rebalance操作,
根据memberId是否是UNKNOWN_MEMBER_ID值执行新的member加入到group的操作或者执行对group中存在的member的更新操作。
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost,
protocols, group, responseCallback)
} else {
val member = group.get(memberId)
updateMemberAndRebalance(group, member, protocols, responseCallback)
}

case AwaitingSync =>
如果当前的group的状态是一个AwaitingSync 表示是已经在执行rebalance的操作,这个时候对于新加入的member时,会先把原有的member全部失效。
根据memberId是否是UNKNOWN_MEMBER_ID值执行新的member加入到group的操作,或者说当前的member是一个重复的请求,也就是说他们的通信协议都相同,直接根据当前请求的member是否是leader来返回JoinGroupResult,最后一种情况,这种情况表示请求的member的协议发生变化,执行对对group中存在的member的更新操作。
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost,
protocols, group, responseCallback)
} else {
val member = group.get(memberId)
if (member.matches(protocols)) {
responseCallback(JoinGroupResult(
members = if (memberId == group.leaderId) {
group.currentMemberMetadata
} else {
Map.empty
},
memberId = memberId,
generationId = group.generationId,
subProtocol = group.protocol,
leaderId = group.leaderId,
errorCode = Errors.NONE.code))
} else {
// member has changed metadata, so force a rebalance
updateMemberAndRebalance(group, member, protocols, responseCallback)
}
}


如果group的状态是Stable的状态时,表示这是一个新加入的Group,
case Stable =>
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
如果memberId是UNKNOWN_MEMBER_ID值表示consumer是一个新生成的consumer,执行添加成员并进行balance的操作。
// if the member id is unknown, register the member to the group
addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost,
protocols, group, responseCallback)
} else {
这种情况表示是一个成员被重新加入到group中来,
val member = group.get(memberId)
if (memberId == group.leaderId || !member.matches(protocols)) {
updateMemberAndRebalance(group, member, protocols, responseCallback)
} else {
responseCallback(JoinGroupResult(
members = Map.empty,
memberId = memberId,
generationId = group.generationId,
subProtocol = group.protocol,
leaderId = group.leaderId,
errorCode = Errors.NONE.code))
}
}
}

如果流程执行到这里后,目前group的状态还是PreparingRebalance状态时,通过这个joinGroup操作的DelayedJoin的操作实例,检查是否是completed的状态,如果不是时,执行tryComplete函数,如果这个函数执行成功时,会从对应的Watchers中移出这个DelayedJoin的操作。
if (group.is(PreparingRebalance))
joinPurgatory.checkAndComplete(GroupKey(group.groupId))
}
}
}

处理新的member加入group
当一个consumer执行joinGroup操作时,如果这个consumer对应的groupId的在metadata中不存在时(或者说是一个已经存在的group,但是来的是一个新加入的memberId的成员时),同时这个memberId是UNKNOWN_MEMBER_ID值时,表示这是一个新加入的groupId,这时会生成一个GroupMetadata的实例,这个实例的状态是Stable。
在处理这个操作时,通过GroupCoordinator中的addMemberAndRebalance函数来进行处理。
private def addMemberAndRebalance(sessionTimeoutMs: Int,
clientId: String,
clientHost: String,
protocols: List[(String, Array[Byte])],
group: GroupMetadata,
callback: JoinCallback) = {

根据当前请求的clientId的值加上一个UUID的值,生成一个memberId值,并生成MemberMetadata的信息,这个信息中包含有clientId与client对应的host,client的partition的名称。
// use the client-id with a random id suffix as the member-id
val memberId = clientId + "-" + group.generateMemberIdSuffix
val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost,
sessionTimeoutMs, protocols)
member.awaitingJoinCallback = callback

把这个新生成的member添加到group中。在向groupMetadata中添加成员时,如果group中的leader成员不存在时,把当前添加的member的id当成是这个group的leaderId.
group.add(member.memberId, member)

对这个group触发partition的rebalance操作。如果要执行rebalance操作,group的状态必须是Stable或者AwaitingSync状态,新添加的group的状态默认是Stable.
maybePrepareRebalance函数判断状态是否在指定的状态集合后,会执行prepareRebalance函数,这里的处理见下面的对member的加入与更新后group的rebalance操作。

maybePrepareRebalance(group)
member
}
处理对group中存在的member的更新
这个在consumer执行joinGroup操作时,这个consumer对应的groupId的在metadata中必须存在,同时这个memberId是一个已经存在的值,表示这个member在group中已经存在,这个groupId必须是一个已经存在的groupId。在执行joinGroup操作时,通过GroupCoordinator中的updateMemberAndRebalance函数来进行处理。
private def updateMemberAndRebalance(group: GroupMetadata,
member: MemberMetadata,
protocols: List[(String, Array[Byte])],
callback: JoinCallback) {
这里跟新添加一个member到group中处理不同的是直接修改这个已经存在的member的protocols与callback的函数。
member.supportedProtocols = protocols
member.awaitingJoinCallback = callback
检查并发起准备rebalance的操作。
maybePrepareRebalance(group)
}

private def maybePrepareRebalance(group: GroupMetadata) {
group synchronized {
对这个group触发partition的rebalance操作。如果要执行rebalance操作,group的状态必须是Stable或者AwaitingSync状态,新添加的group的状态默认是Stable.
这里的处理见下面的对member的加入与更新后group的rebalance操作。
if (group.canRebalance)
prepareRebalance(group)
}
}

group的rebalance操作
这个操作在一个新的group被生成后加入了一个member,在group中新加入了一个member,对一个已经存在的member进行了更新操作时,这个操作会被触发。
接下来看看触发rebalance操作的函数:
private def prepareRebalance(group: GroupMetadata) {
如果当前的group的状态是AwaitingSync状态时,表示当前的group已经在执行rebalance的操作,这里取消所有的member的成员的分配的partition,并向client端回写REBALANCE_IN_PROGRESS代码。
// if any members are awaiting sync, cancel their request and have them rejoin
if (group.is(AwaitingSync))
resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS.code)

更新group的状态为PreparingRebalance状态,表示准备执行rebalance的操作。
group.transitionTo(PreparingRebalance)
info("Preparing to restabilize group %s with old generation %s"
.format(group.groupId, group.generationId))

【kafka源码分析之kafka的consumer的负载均衡管理】开始监听join的加入。
val rebalanceTimeout = group.rebalanceTimeout
val delayedRebalance = new DelayedJoin(this, group, rebalanceTimeout)
val groupKey = GroupKey(group.groupId)
joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
}

执行join加入的监听:
首先调用joinPurgatory实例中的tryCompleteElseWatch函数
1,先执行如下的步骤:
var isCompletedByMe = operation synchronized operation.tryComplete()
if (isCompletedByMe)
return true
这里调用了DelayedJoin中的tryComplete的函数,
coordinator.tryCompleteJoin(group, forceComplete)
1,1这里直接通过调用GroupCoordinator中的tryCompleteJoin函数:
在这个函数中直接判断group的member的集合中是否有callback回调函数是null的集合,如果有,表示有需要进行reJoin的member的集合,直接返回false,否则调用forceComplete函数,并得到返回值,。
group synchronized {
if (group.notYetRejoinedMembers.isEmpty)
forceComplete()
else false
}
1,2在group不没有member的成员的callback的回调函数是null的情况下,先看看forceComplete的函数定义:
如果当前的completed的值是false,设置成true成功的话,执行onComplete的函数,并返回true,
否则的话,表示当前的completed的值已经被修改成了true,这里这个函数返回false.
if (completed.compareAndSet(false, true)) {
// cancel the timeout timer
cancel()
onComplete()
true
} else {
false
}
1,3在completed被成功从false设置为true后,onComplete函数实际上调用GroupCoordinator中的onCompleteJoin函数。
def onCompleteJoin(group: GroupMetadata) {
group synchronized {
这里首先得到需要进行rejoin操作的成员的集合(callback的回调函数是空的集合)
val failedMembers = group.notYetRejoinedMembers
如果说当前的group中没有成员或者说callback是null的成员集合有值时,
先从group中移出掉这个没有callback的成员。
移出完成没有callback的成员集合后,如果group现在的成员集合是个空集合,设置group的状态为Dead,
同时从groupManager中移出这个group.
if (group.isEmpty || !failedMembers.isEmpty) {
failedMembers.foreach { failedMember =>
group.remove(failedMember.memberId)
// TODO: cut the socket connection to the client
}

// TODO KAFKA-2720: only remove group in the background thread
if (group.isEmpty) {
group.transitionTo(Dead)
groupManager.removeGroup(group)

info("Group %s generation %s is dead and removed"
.format(group.groupId, group.generationId))
}
}
if (!group.is(Dead)) {
如果group的状态不是Dead的状态,对group的generation的值进行初始化并递加。
并设置当前的group的状态为AwaitingSync状态。
group.initNextGeneration()
info("Stabilized group %s generation %s".format(group.groupId,
group.generationId))

迭代这个group中的所有的成员,
如果成员的memberId的值与group.leaderId的值相同,那么向这个成员的client端响应这个当前的group的所有的成员的集合,否则只响应这个member的memberId与generationId,与leader对应的memberId的值。
for (member <- group.allMemberMetadata) {
assert(member.awaitingJoinCallback != null)
val joinResult = JoinGroupResult(
members=if (member.memberId == group.leaderId)
{ group.currentMemberMetadata } else { Map.empty },
memberId=member.memberId,
generationId=group.generationId,
subProtocol=group.protocol,
leaderId=group.leaderId,
errorCode=Errors.NONE.code)

member.awaitingJoinCallback(joinResult)
member.awaitingJoinCallback = null
completeAndScheduleNextHeartbeatExpiration(group, member)
}
}
}
}

2,回到joinPurgatory实例中的tryCompleteElseWatch函数,开始根据1步中的返回结果进行处理:
如果返回的值是true,表示group中没有failedMembers的成员,同时completed属性把值从false设置成了true,这里返回的就是true.
if (isCompletedByMe)
return true

2,1流程执行到这里,表示当前的group中包含有failedMembers的成员,或者completed的值现在已经是true了没有更新成功,
var watchCreated = false
for(key <- watchKeys) {
如果当前的completed的值是true,直接返回。
if (operation.isCompleted())
return false

把这个key与operation添加到监听器中,这个情况只有completed的值是false,这种情况是一开始进入到join操作的时候,就发现group中包含有failedMembers的成员
watchForOperation(key, operation)

if (!watchCreated) {
由于流程没有return,这里设置watchCreated的值为true,因为已经把operation加入到了watchs中。
watchCreated = true
estimatedTotalOperations.incrementAndGet()
}
}

3,这里再执行一次coordinator.tryCompleteJoin(group, forceComplete)函数,如果函数返回为true时,表示这个这个处理完成,直接返回true,
isCompletedByMe = operation synchronized operation.tryComplete()
if (isCompletedByMe)
return true

3,1,这种情况检查completed的值是否是true,如果不是,把这个operation添加到timer中,timer中会在超时时执行coordinator.onCompleteJoin(group)。
if (! operation.isCompleted()) {
timeoutTimer.add(operation)
if (operation.isCompleted()) {
// cancel the timer task
operation.cancel()
}
}

Consumer中分配partition后的group同步 在对同一个group进行消费的多个consumer中,consumer在执行完成joinGroup操作后,会通过这个部分的handleSyncGroup函数来进行处理,
这个函数中传入的groupAssignment参数如果是follower的consumer时传入为空的集合。
def handleSyncGroup(groupId: String,
generation: Int,
memberId: String,
groupAssignment: Map[String, Array[Byte]],
responseCallback: SyncCallback) {
if (!isActive.get) {
responseCallback(Array.empty, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)
} else if (!isCoordinatorForGroup(groupId)) {
responseCallback(Array.empty, Errors.NOT_COORDINATOR_FOR_GROUP.code)
} else {
得到对应的group并对这个group执行doSyncGroup的函数调用来完成操作。
val group = groupManager.getGroup(groupId)
if (group == null)
responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)
else
doSyncGroup(group, generation, memberId, groupAssignment, responseCallback)
}
}

接下来看看这个doSyncGroup函数具体流程:

private def doSyncGroup(group: GroupMetadata,
generationId: Int,
memberId: String,
groupAssignment: Map[String, Array[Byte]],
responseCallback: SyncCallback) {
var delayedGroupStore: Option[DelayedStore] = None

group synchronized {
if (!group.has(memberId)) {
responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)
} else if (generationId != group.generationId) {
responseCallback(Array.empty, Errors.ILLEGAL_GENERATION.code)
} else {
group.currentState match {
case Dead =>
responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)

case PreparingRebalance =>
responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS.code)

case AwaitingSync =>
如果是执行syncGroup的操作时,那么这个时候,group的状态只有在AwaitingSync的状态时才执行处理,其它情况下,不执行相关的操作。
这里先根据当前的memberId在对应的member中的回调函数进行设置,这个回调函数并不会立即响应,只有在memberId是leader的consumer时,处理完成后才统一对所有的consumer进行响应。
group.get(memberId).awaitingSyncCallback = responseCallback
completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))


if (memberId == group.leaderId) {
只有在memberId是所有的consumer的leader时,才执行的处理流程。
info(s"Assignment received from leader for group
${group.groupId} for generation ${group.generationId}")

这里得到所有的已经分配的consumer的partition的分配信息。
// fill any missing members with an empty assignment
val missing = group.allMembers -- groupAssignment.keySet
val assignment = groupAssignment ++ missing.map(_ ->
Array.empty[Byte]).toMap

这里把已经分配的consumer的partition的分配集合与group通过GroupMetadataManager中的perpareStoreGroup进行处理,
prepareStoreGroup的处理流程:
这里主要先生成一个DelayedStore实例,并通过groupManager中的store函数来执行这个实例,这里直接通过调用replicaManager中的appendMessages函数来进行处理并这个group所在的topic的partition中写入一条group的metadata的信息,这个信息的key就是group的id,value是group实例与group实例对应的member的partition的分配信息。
处理完成的回调操作:
1,如果处理完成后,group的状态还是AwaitingSync同时generationId的值没有发生变化,表示执行出错,这个时候,重新执行rebalance的操作。
2,group的sync操作完成处理,更新group的状态为Stable状态,更新group中对应已经分配过partition的consumer的member中的assignment的值为分配的partition的信息,同时这个时候向所有的member对应的consumer的client端进行回调操作,向client端写入对应的分配信息。
delayedGroupStore = Some(groupManager.prepareStoreGroup(group,
assignment, (errorCode: Short) =>
{
group synchronized {
if (group.is(AwaitingSync) && generationId == group.generationId) {
if (errorCode != Errors.NONE.code) {
resetAndPropagateAssignmentError(group, errorCode)
maybePrepareRebalance(group)
} else {
setAndPropagateAssignment(group, assignment)
group.transitionTo(Stable)
}
}
}
}))
}

case Stable =>
// if the group is stable, we just return the current assignment
val memberMetadata = https://www.it610.com/article/group.get(memberId)
responseCallback(memberMetadata.assignment, Errors.NONE.code)
completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
}
}
}

delayedGroupStore.foreach(groupManager.store)
}

Consumer的心跳处理 接收consumer的心跳报告
当consumer向对应的leader broker报告心跳时,通过GroupCoordinator实例中的handleHeartbeat的函数进行处理。
在这个函数中,根据当前的group的对应的状态,进行处理,如果group对应的partition的leader正在切换或者leader已经挂掉向老的leader节点发起了请求,或者group中对应的member已经超时被移出掉时直接响应对应的错误代码,让consumer执行相对的重新rejoin或者重新连接新的coordinator的leader的操作,否则执行下面代码的部分操作:
更新这上member的最后一次心跳的时间,并添加这个心跳的超时监听。
val member = group.get(memberId)
completeAndScheduleNextHeartbeatExpiration(group, member)
responseCallback(Errors.NONE.code)

Consumer的心跳超时监听
在每次对group对应的member进行操作时,或者每个consumer定时向coorniator发起心跳时,会重新注册这个超时监听,
通过completeAndScheduleNextHeartbeatExpiration函数。

接下来看看这个函数实现流程:
private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata,
member: MemberMetadata) {
设置这个consumer的成员的最后一次更新时间为最后一次对这个member的操作时间,
// complete current heartbeat expectation
member.latestHeartbeat = SystemTime.milliseconds

根据这个memberId与group的id生成一个MemberKey的值,检查是否有老的delay已经存在,把已经存在的DelayedHeartbeat的操作完成的,从监听器中移出。
val memberKey = MemberKey(member.groupId, member.memberId)
这里如果已经有一个重复的心跳超时监听存在,先完成上一个心跳的超时监听,当前的心跳时间被更新,已经大于了上一次的心跳更新时间。
比较的关键代码:
{
member.latestHeartbeat + member.sessionTimeoutMs > heartbeatDeadline
member.latestHeartbeat + member.sessionTimeoutMs这一部分代码表示得到这一次心跳的超时时间,
heartbeatDeadline表示是当前的operation(也就是上次更新心跳时计算出来的超时时间),
中间使用大于号,表示如果是第一次的时候,进行监听,等待超时或者下一次心跳过来。
}

heartbeatPurgatory.checkAndComplete(memberKey)

根据当前的心跳时间也就是当前时间加上session的超时时间,设置这个心跳的过期时间为这个时间,
生成一个DelayedHeartbeat的实例。并完成这个实例或者加入到监听器中。
// reschedule the next heartbeat expiration deadline
val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs
val delayedHeartbeat = new DelayedHeartbeat(this, group, member,
newHeartbeatDeadline, member.sessionTimeoutMs)
heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
}

处理心跳超时的具体处理流程:
1,在heartbeatPurgatory中的tryCompleteElseWatch函数中首先执行DelayedHeartbeat的tryComplete函数,这个函数直接调用GroupCoortinator实例中的tryCompleteHeartbeat函数。
def tryCompleteHeartbeat(group: GroupMetadata, member: MemberMetadata,
heartbeatDeadline: Long, forceComplete: () => Boolean) = {
group synchronized {
如果说member是一个已经退出的member或者memeber中的joinCallback或者syncCallback函数不空null或者最后一次心跳的时间加上session的超时时间大于了上一次心跳的时间加上session的超时时间的值(这是表示心跳在未超时的时间内重复过来时,结束掉上一次的监听),执行forceComplete函数,这个函数设置completed属性的值为true,表示需要这个task完成返回true,否则其它情况返回false,
也就是说,这个监听器会一直等待,直到这个心跳超时或者下一次的心跳过来。
if (shouldKeepMemberAlive(member, heartbeatDeadline) || member.isLeaving)
forceComplete()
else false
}
}

2,根据1中执行tryComplete得到的返回值,如果返回的是true,表示成功完成(这种情况通常是第二次心跳过来时,这里会返回true),直接返回,否则执行下一个操作。
var isCompletedByMe = operation synchronized operation.tryComplete()
if (isCompletedByMe)
return true

3,把这个DelayHeartbeat的实例添加到Watchers中,并添加到Timer中,如果没有更多的心跳请求过来时,在timer的函数中,在指定的超时时间达到后会设置这个实例的forceComplete函数来设置这个实例的completed的函数,从timer中移出这个实例,并执行这个member的过期操作,从group中移出这个member,如果group的状态是Stable或者是AwaitingSync状态时,重新执行rebalance的操作。

    推荐阅读