Kafka Producer同步模式发送message源码分析



先把几个比较重要的方法列出来
// 入口,处理 message及messages
def handle(events: Seq[KeyedMessage[K,V]])

//处理序列化的KeyedMessage数据
private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]]

// 为messages分区
def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]]

/**
* Constructs and sends the produce request based on a map from (topic, partition) -> messages
*
* @param brokerId the broker that will receive the request
* @param messagesPerTopic the messages as a map from (topic, partition) -> messages
* @return the set (topic, partitions) messages which incurred an error sending or processing
*/
private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet])

kafka在sync模式下发送消息时,是通过DefaultEventHandler的handle方法把message发送给broker,可以是一条消息也可以是多条消息组成的List。
用List举例:
DefaultEventHandler得到messages(可能包含多个topic的messages)后,调用dispatchSerializedData方法,遍历message为各个topic的每个message进行partition分区。如果message中key!=null, 则调用配置的partition.class定义的分区规则类,得到partitionIndex,否则在sendPartitionPerTopicCache的记录中去获取topic的partitionId,然后找到partitionIndex对应的leaderBrokerId。所以1个message会分配给1个topic的1个partition的leaderBrokerId.


【Kafka Producer同步模式发送message源码分析】

package kafka.producer.asyncimport kafka.common._ import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet} import kafka.producer._ import kafka.serializer.Encoder import kafka.utils.{Utils, Logging, SystemTime} import scala.util.Random import scala.collection.{Seq, Map} import scala.collection.mutable.{ArrayBuffer, HashMap, Set} import java.util.concurrent.atomic._ import kafka.api.{TopicMetadata, ProducerRequest}class DefaultEventHandler[K,V](config: ProducerConfig, private val partitioner: Partitioner[K], private val encoder: Encoder[V], private val keyEncoder: Encoder[K], private val producerPool: ProducerPool, private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata]) extends EventHandler[K,V] with Logging { val isSync = ("sync" == config.producerType)val correlationId = new AtomicInteger(0) val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos)private val topicMetadataRefreshInterval = config.topicMetadataRefreshIntervalMs private var lastTopicMetadataRefreshTime = 0L private val topicMetadataToRefresh = Set.empty[String] private val sendPartitionPerTopicCache = HashMap.empty[String, Int]private val producerStats = ProducerStatsRegistry.getProducerStats(config.clientId) private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId) // 入口,处理 message及messages def handle(events: Seq[KeyedMessage[K,V]]) { val serializedData = https://www.it610.com/article/serialize(events) serializedData.foreach { keyed => val dataSize = keyed.message.payloadSize producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize) producerTopicStats.getProducerAllTopicsStats.byteRate.mark(dataSize) } var outstandingProduceRequests = serializedData var remainingRetries = config.messageSendMaxRetries + 1 val correlationIdStart = correlationId.get() debug("Handling %d events".format(events.size)) while (remainingRetries > 0 && outstandingProduceRequests.size > 0) { topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic) if (topicMetadataRefreshInterval >= 0 && SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) { Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement)) sendPartitionPerTopicCache.clear() topicMetadataToRefresh.clear lastTopicMetadataRefreshTime = SystemTime.milliseconds } // 处理序列化的数据 outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests) if (outstandingProduceRequests.size > 0) { info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1)) // back off and update the topic metadata cache before attempting another send operation Thread.sleep(config.retryBackoffMs) // get topics of the outstanding produce requests and refresh metadata for those Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement)) sendPartitionPerTopicCache.clear() remainingRetries -= 1 producerStats.resendRate.mark() } } if(outstandingProduceRequests.size > 0) { producerStats.failedSendRate.mark() val correlationIdEnd = correlationId.get() error("Failed to send requests for topics %s with correlation ids in [%d,%d]" .format(outstandingProduceRequests.map(_.topic).toSet.mkString(","), correlationIdStart, correlationIdEnd-1)) throw new FailedToSendMessageException("Failed to send messages after " + config.messageSendMaxRetries + " tries.", null) } } //处理序列化的数据 private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = { //按规则为messages分配分区 val partitionedDataOpt = partitionAndCollate(messages) partitionedDataOpt match { case Some(partitionedData) => val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]] try { //遍历每个leaderBrokerId,发送messages for ((brokerid, messagesPerBrokerMap) <- partitionedData) { if (logger.isTraceEnabled) messagesPerBrokerMap.foreach(partitionAndEvent => trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2))) val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap) //发送消息到brokerid val failedTopicPartitions = send(brokerid, messageSetPerBroker) failedTopicPartitions.foreach(topicPartition => { messagesPerBrokerMap.get(topicPartition) match { case Some(data) => failedProduceRequests.appendAll(data) case None => // nothing } }) } } catch { case t: Throwable => error("Failed to send messages", t) } failedProduceRequests case None => // all produce requests failed messages } }def serialize(events: Seq[KeyedMessage[K,V]]): Seq[KeyedMessage[K,Message]] = { val serializedMessages = new ArrayBuffer[KeyedMessage[K,Message]](events.size) events.map{e => try { if(e.hasKey) serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = e.key, message = new Message(key = keyEncoder.toBytes(e.key), bytes = encoder.toBytes(e.message))) else serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = null.asInstanceOf[K], message = new Message(bytes = encoder.toBytes(e.message))) } catch { case t: Throwable => producerStats.serializationErrorRate.mark() if (isSync) { throw t } else { // currently, if in async mode, we just log the serialization error. We need to revisit // this when doing kafka-496 error("Error serializing message for topic %s".format(e.topic), t) } } } serializedMessages } // 为messages分区 def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = { //定义一个leaderId ==> (topic,messages)的映射 val ret = new HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]] try { for (message <- messages) { //获取topic的partition集合 val topicPartitionsList = getPartitionListForTopic(message) //为message分配partition index,如果message的key不等于null,则会调用配置的partitioner.class val partitionIndex = getPartition(message.topic, message.key, topicPartitionsList) //根据partitionIndex获取对应的partition-broker信息 val brokerPartition = topicPartitionsList(partitionIndex)// postpone the failure until the send operation, so that requests for other brokers are handled correctly // 获取partitionIndex的leaderBrokerId val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1) // 定义leaderBrokerId需要发送的数据集合 //最终发送messages时是按照每个leaderBrokerId的每个Topic的每个partitionId发送的 var dataPerBroker: HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]] = null ret.get(leaderBrokerId) match { case Some(element) => dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]] case None => dataPerBroker = new HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]] ret.put(leaderBrokerId, dataPerBroker) }val topicAndPartition = TopicAndPartition(message.topic, brokerPartition.partitionId) // 定义topic的配个Partition发送的messages var dataPerTopicPartition: ArrayBuffer[KeyedMessage[K,Message]] = null dataPerBroker.get(topicAndPartition) match { case Some(element) => dataPerTopicPartition = element.asInstanceOf[ArrayBuffer[KeyedMessage[K,Message]]] case None => dataPerTopicPartition = new ArrayBuffer[KeyedMessage[K,Message]] dataPerBroker.put(topicAndPartition, dataPerTopicPartition) } // 为brokerid的topic的partition 追加需要发送的消息 dataPerTopicPartition.append(message) }//end for Some(ret) }catch {// Swallow recoverable exceptions and return None so that they can be retried. case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic,partition due to: " + ute.getMessage); None case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to: " + lnae.getMessage); None case oe: Throwable => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None } }private def getPartitionListForTopic(m: KeyedMessage[K,Message]): Seq[PartitionAndLeader] = { val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(m.topic, correlationId.getAndIncrement) debug("Broker partitions registered for topic: %s are %s" .format(m.topic, topicPartitionsList.map(p => p.partitionId).mkString(","))) val totalNumPartitions = topicPartitionsList.length if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition key = " + m.key) topicPartitionsList }/** * Retrieves the partition id and throws an UnknownTopicOrPartitionException if * the value of partition is not between 0 and numPartitions-1 * @param key the partition key * @param topicPartitionList the list of available partitions * @return the partition id */ private def getPartition(topic: String, key: K, topicPartitionList: Seq[PartitionAndLeader]): Int = { val numPartitions = topicPartitionList.size if(numPartitions <= 0) throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist") val partition = if(key == null) { // If the key is null, we don't really need a partitioner // So we look up in the send partition cache for the topic to decide the target partition val id = sendPartitionPerTopicCache.get(topic) id match { case Some(partitionId) => // directly return the partitionId without checking availability of the leader, // since we want to postpone the failure until the send operation anyways partitionId case None => val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) if (availablePartitions.isEmpty) throw new LeaderNotAvailableException("No leader for any partition in topic " + topic) val index = Utils.abs(Random.nextInt) % availablePartitions.size val partitionId = availablePartitions(index).partitionId sendPartitionPerTopicCache.put(topic, partitionId) partitionId } } else partitioner.partition(key, numPartitions) if(partition < 0 || partition >= numPartitions) throw new UnknownTopicOrPartitionException("Invalid partition id: " + partition + " for topic " + topic + "; Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]") trace("Assigning message of topic %s and key %s to a selected partition %d".format(topic, if (key == null) "[none]" else key.toString, partition)) partition }/** * Constructs and sends the produce request based on a map from (topic, partition) -> messages * * @param brokerId the broker that will receive the request * @param messagesPerTopic the messages as a map from (topic, partition) -> messages * @return the set (topic, partitions) messages which incurred an error sending or processing */ private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) = { if(brokerId < 0) { warn("Failed to send data since partitions %s don't have a leader".format(messagesPerTopic.map(_._1).mkString(","))) messagesPerTopic.keys.toSeq } else if(messagesPerTopic.size > 0) { val currentCorrelationId = correlationId.getAndIncrement val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requestRequiredAcks, config.requestTimeoutMs, messagesPerTopic) var failedTopicPartitions = Seq.empty[TopicAndPartition] try { val syncProducer = producerPool.getProducer(brokerId) debug("Producer sending messages with correlation id %d for topics %s to broker %d on %s:%d" .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port)) val response = syncProducer.send(producerRequest) debug("Producer sent messages with correlation id %d for topics %s to broker %d on %s:%d" .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port)) if(response != null) { if (response.status.size != producerRequest.data.size) throw new KafkaException("Incomplete response (%s) for producer request (%s)".format(response, producerRequest)) if (logger.isTraceEnabled) { val successfullySentData = https://www.it610.com/article/response.status.filter(_._2.error == ErrorMapping.NoError) successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message => trace("Successfully sent message: %s".format(Utils.readString(message.message.payload))))) } val failedPartitionsAndStatus = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq failedTopicPartitions = failedPartitionsAndStatus.map(partitionStatus => partitionStatus._1) if(failedTopicPartitions.size > 0) { val errorString = failedPartitionsAndStatus .sortWith((p1, p2) => p1._1.topic.compareTo(p2._1.topic) < 0 || (p1._1.topic.compareTo(p2._1.topic) == 0 && p1._1.partition < p2._1.partition)) .map{ case(topicAndPartition, status) => topicAndPartition.toString + ": " + ErrorMapping.exceptionFor(status.error).getClass.getName }.mkString(",") warn("Produce request with correlation id %d failed due to %s".format(currentCorrelationId, errorString)) } failedTopicPartitions } else Seq.empty[TopicAndPartition] } catch { case t: Throwable => warn("Failed to send producer request with correlation id %d to broker %d with data for partitions %s" .format(currentCorrelationId, brokerId, messagesPerTopic.map(_._1).mkString(",")), t) messagesPerTopic.keys.toSeq } } else { List.empty } }private def groupMessagesToSet(messagesPerTopicAndPartition: collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]) = { /** enforce the compressed.topics config here. *If the compression codec is anything other than NoCompressionCodec, *Enable compression only for specified topics if any *If the list of compressed topics is empty, then enable the specified compression codec for all topics *If the compression codec is NoCompressionCodec, compression is disabled for all topics */val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case (topicAndPartition, messages) => val rawMessages = messages.map(_.message) ( topicAndPartition, config.compressionCodec match { case NoCompressionCodec => debug("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition)) new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*) case _ => config.compressedTopics.size match { case 0 => debug("Sending %d messages with compression codec %d to %s" .format(messages.size, config.compressionCodec.codec, topicAndPartition)) new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*) case _ => if(config.compressedTopics.contains(topicAndPartition.topic)) { debug("Sending %d messages with compression codec %d to %s" .format(messages.size, config.compressionCodec.codec, topicAndPartition)) new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*) } else { debug("Sending %d messages to %s with no compression as it is not in compressed.topics - %s" .format(messages.size, topicAndPartition, config.compressedTopics.toString)) new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*) } } } ) } messagesPerTopicPartition }def close() { if (producerPool != null) producerPool.close } }



    推荐阅读