producer 发送数据到 broker,如果 producer 端设置 acks = -1,同时 broker 侧配置 min.insync.replicas = 2,这时 broker 会创建 DelayedProduce,leader broker 会等待消息复制到其他副本中,或者超时后返回。
//kafka.server.ReplicaManager#appendRecords
/**
* Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
* the callback function will be triggered either when timeout or the required acks are satisfied;
* if the callback function itself is already synchronized on some object then pass this object to avoid deadlock.
* 把消息追加到 leader 副本中,然后等待消息复制到其他副本,当超时或者 required acks 数满足后,会执行回调函数。
*/
def appendRecords(timeout: Long,
requiredAcks: Short,
internalTopicsAllowed: Boolean,
isFromClient: Boolean,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
delayedProduceLock: Option[Lock] = None,
processingStatsCallback: Map[TopicPartition, RecordsProcessingStats] => Unit = _ => ()) {
// requiredAcks 取值为 -1, 0, 1
if (isValidRequiredAcks(requiredAcks)) {
val sTime = time.milliseconds
// 把数据写入本地文件中
val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
isFromClient = isFromClient, entriesPerPartition, requiredAcks)
debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
// 遍历 localProduceResults
val produceStatus = localProduceResults.map { case (topicPartition, result) =>
topicPartition ->
ProducePartitionStatus(
result.info.lastOffset + 1, // required offset
new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime, result.info.logStartOffset)) // response status
}
processingStatsCallback(localProduceResults.mapValues(_.info.recordsProcessingStats))
// 可简单认为当 ack = -1 时,需要 DelayedProduce
if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
// create delayed produce operation
val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)
// create a list of (topic, partition) pairs to use as keys for this delayed produce operation
val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
// try to complete the request immediately, otherwise put it into the purgatory
// this is because while the delayed produce operation is being created, new
// requests may arrive and hence make this operation completable.
// 尝试执行 delayedProduce 的 tryComplete,执行成功则返回,失败则加入到哈希轮定时器
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
} else {
// we can respond immediately
val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
responseCallback(produceResponseStatus)
}
} else {
// If required.acks is outside accepted range, something is wrong with the client
// Just return an error and don't handle the request at all
val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS,
LogAppendInfo.UnknownLogAppendInfo.firstOffset, RecordBatch.NO_TIMESTAMP, LogAppendInfo.UnknownLogAppendInfo.logStartOffset)
}
responseCallback(responseStatus)
}
}
什么时候触发 DelayedProduce 的 tryComplete 呢?
follower 一直从 leader 拉取消息,并上报自身的 loe,leader 根据副本的 loe 更新 hw,并且触发 DelayedProduce.tryComplete。
// kafka.cluster.Partition#updateReplicaLogReadResult
def updateReplicaLogReadResult(replica: Replica, logReadResult: LogReadResult): Boolean = {
val replicaId = replica.brokerId
// No need to calculate low watermark if there is no delayed DeleteRecordsRequest
val oldLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader else -1L
replica.updateLogReadResult(logReadResult)
val newLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader else -1L
// check if the LW of the partition has incremented
// since the replica's logStartOffset may have incremented
val leaderLWIncremented = newLeaderLW > oldLeaderLW
// check if we need to expand ISR to include this replica
// if it is not in the ISR yet
val leaderHWIncremented = maybeExpandIsr(replicaId, logReadResult)
val result = leaderLWIncremented || leaderHWIncremented
// some delayed operations may be unblocked after HW or LW changed
if (result)
tryCompleteDelayedRequests()
debug(s"Recorded replica $replicaId log end offset (LEO) position ${logReadResult.info.fetchOffsetMetadata.messageOffset}.")
result
}
// kafka.cluster.Partition#tryCompleteDelayedRequests
/**
* Try to complete any pending requests. This should be called without holding the leaderIsrUpdateLock.
*/
private def tryCompleteDelayedRequests() {
val requestKey = new TopicPartitionOperationKey(topicPartition)
replicaManager.tryCompleteDelayedFetch(requestKey)
replicaManager.tryCompleteDelayedProduce(requestKey)
replicaManager.tryCompleteDelayedDeleteRecords(requestKey)
}
// kafka.server.ReplicaManager#tryCompleteDelayedProduce
/**
* Try to complete some delayed produce requests with the request key;
* this can be triggered when:
*
* 1. The partition HW has changed (for acks = -1)
* 2. A follower replica's fetch operation is received (for acks > 1)
*/
def tryCompleteDelayedProduce(key: DelayedOperationKey) {
val completed = delayedProducePurgatory.checkAndComplete(key)
debug("Request key %s unblocked %d producer requests.".format(key.keyLabel, completed))
}
Original: https://www.cnblogs.com/allenwas3/p/13141070.html
Author: 偶尔发呆
Title: kafka 的 DelayedProduce
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/534019/
转载文章受原作者版权保护。转载请注明原作者出处!