对于已经消费但是没有确认的消息,Pulsar 可以通过配置 BacklogQuota 决定保留大小及丢弃策略。
具体参见官方文档:Message retention and expiry
当 Backlog 大小未达到限额时,不需要处理,当 Backlog 大小超限时,根据丢弃策略进行处理。
BacklogQuota 的丢弃策略一共有三种:
producer_request_hold
:① 断开所有的 Producer ② 在新建 Producer 时进行检查,如果超出 BacklogQuota,则拒绝新建请求并等待超时(异步返回结果调用get()
时才抛出异常)producer_exception
:① 断开所有的 Producer ② 在新建 Producer 时进行检查,如果超出 BacklogQuota,则拒绝新建请求并抛出异常consumer_backlog_eviction
:丢弃最早的 Backlog BacklogQuotaManagerPulsar 中有BacklogQuotaManager
用来进行 Backlog 处理,有下面几个关键函数。
handleExceededBacklogQuota
该函数用来处理 Backlog 超限的情况,对于consumer_backlog_eviction
策略,调用dropBacklog(persistentTopic, quota);
;对于producer_exception
和producer_request_hold
两种策略,调用disconnectProducers(persistentTopic);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public void handleExceededBacklogQuota (PersistentTopic persistentTopic) { TopicName topicName = TopicName.get(persistentTopic.getName()); String namespace = topicName.getNamespace(); String policyPath = AdminResource.path(POLICIES, namespace); BacklogQuota quota = getBacklogQuota(namespace, policyPath); log.info("Backlog quota exceeded for topic [{}]. Applying [{}] policy" , persistentTopic.getName(), quota.getPolicy()); switch (quota.getPolicy()) { case consumer_backlog_eviction: dropBacklog(persistentTopic, quota); break ; case producer_exception: case producer_request_hold: disconnectProducers(persistentTopic); break ; default : break ; } }
下面介绍dropBacklog(persistentTopic, quota);
和disconnectProducers(persistentTopic);
两个函数。
dropBacklog(persistentTopic, quota)
dropBacklog(persistentTopic, quota);
函数负责在 Backlog 超限之后对 Backlog 中最早的消息进行丢弃,这里的丢弃实际是指向后移动未确认消息的起始标记 (该 Topic 上最慢的 Consumer 的位置)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 private void dropBacklog (PersistentTopic persistentTopic, BacklogQuota quota) { double reductionFactor = 0.9 ; double targetSize = reductionFactor * quota.getLimit(); ManagedLedgerImpl mLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); long backlogSize = mLedger.getEstimatedBacklogSize(); ManagedCursor previousSlowestConsumer = null ; while (backlogSize > targetSize) { ManagedCursor slowestConsumer = mLedger.getSlowestConsumer(); if (slowestConsumer == null ) { break ; } double messageSkipFactor = ((backlogSize - targetSize) / backlogSize); if (slowestConsumer == previousSlowestConsumer) { break ; } long entriesInBacklog = slowestConsumer.getNumberOfEntriesInBacklog(); int messagesToSkip = (int ) (messageSkipFactor * entriesInBacklog); try { if (messagesToSkip == 0 ) { break ; } slowestConsumer.skipEntries(messagesToSkip, IndividualDeletedEntries.Include); } catch (Exception e) { log.error("Error skipping [{}] messages from slowest consumer : [{}]" , messagesToSkip, slowestConsumer.getName()); } backlogSize = mLedger.getEstimatedBacklogSize(); previousSlowestConsumer = slowestConsumer; } }
这里用到了一个ManagedLedgerImpl
类中的一个函数getEstimatedBacklogSize()
,用来估计 Backlog的大小。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public long getEstimatedBacklogSize () { PositionImpl pos = getMarkDeletePositionOfSlowestConsumer(); while (true ) { if (pos == null ) { return 0 ; } long size = 0 ; final long slowestConsumerLedgerId = pos.getLedgerId(); synchronized (this ) { size = getTotalSize(); size -= ledgers.values().stream().filter(li -> li.getLedgerId() < slowestConsumerLedgerId) .mapToLong(li -> li.getSize()).sum(); } LedgerInfo ledgerInfo = null ; synchronized (this ) { ledgerInfo = ledgers.get(pos.getLedgerId()); } if (ledgerInfo == null ) { if (pos.compareTo(getMarkDeletePositionOfSlowestConsumer()) == 0 ) { return size; } pos = getMarkDeletePositionOfSlowestConsumer(); continue ; } long numEntries = pos.getEntryId(); if (ledgerInfo.getEntries() == 0 ) { size -= consumedLedgerSize(currentLedgerSize, currentLedgerEntries, numEntries); return size; } else { size -= consumedLedgerSize(ledgerInfo.getSize(), ledgerInfo.getEntries(), numEntries); return size; } } }
1 2 3 4 5 6 7 8 9 private long consumedLedgerSize (long ledgerSize, long ledgerEntries, long consumedEntries) { if (ledgerEntries <= 0 ) { return 0 ; } long averageSize = ledgerSize / ledgerEntries; return consumedEntries >= 0 ? (consumedEntries + 1 ) * averageSize : 0 ; }
disconnectProducers(persistentTopic)
disconnectProducers(persistentTopic);
函数负责在producer_request_hold
和producer_exception
两种模式下 Backlog 超限时断开与 Producer 的链接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private void disconnectProducers (PersistentTopic persistentTopic) { List<CompletableFuture<Void>> futures = Lists.newArrayList(); ConcurrentOpenHashSet<Producer> producers = persistentTopic.getProducers(); producers.forEach(producer -> { futures.add(producer.disconnect()); }); FutureUtil.waitForAll(futures).thenRun(() -> { log.info("All producers on topic [{}] are disconnected" , persistentTopic.getName()); }).exceptionally(exception -> { log.error("Error in disconnecting producers on topic [{}] [{}]" , persistentTopic.getName(), exception); return null ; }); }
BacklogQuota 检查BacklogQuota 有两种形式的检查,一种是周期性检查 ,另一种是创建 Producer 之前检查 。
周期性检查在BrokerService
启动时,会启动startBacklogQuotaChecker();
,startBacklogQuotaChecker();
负责周期性执行monitorBacklogQuota()
,对于 Backlog 超限的情况,会通过BacklogQuotaManager
进行处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public void monitorBacklogQuota () { forEachTopic(topic -> { if (topic instanceof PersistentTopic) { PersistentTopic persistentTopic = (PersistentTopic) topic; if (isBacklogExceeded(persistentTopic)) { getBacklogQuotaManager().handleExceededBacklogQuota(persistentTopic); } else { if (log.isDebugEnabled()) { log.debug("quota not exceeded for [{}]" , topic.getName()); } } } }); }
创建 Producer 之前检查在 Broker 与 Client 对接的服务端ServerCnx
上,收到建立 Producer 的触发之后,在创建 Producer 之前,会进行 BacklogQuota 检查。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 if (topic.isBacklogQuotaExceeded(producerName)) { IllegalStateException illegalStateException = new IllegalStateException( "Cannot create producer on topic with backlog quota exceeded" ); BacklogQuota.RetentionPolicy retentionPolicy = topic.getBacklogQuota().getPolicy(); if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) { ctx.writeAndFlush( Commands.newError(requestId, ServerError.ProducerBlockedQuotaExceededError, illegalStateException.getMessage())); } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) { ctx.writeAndFlush(Commands.newError(requestId, ServerError.ProducerBlockedQuotaExceededException, illegalStateException.getMessage())); } producerFuture.completeExceptionally(illegalStateException); producers.remove(producerId, producerFuture); return ; }
这里只进行producer_request_hold
和producer_exception
两种策略的处理,consumer_backlog_eviction
只在周期性检查时进行处理。
对于producer_request_hold
策略,返回 Error ,ClientCnx
在收到 Error 之后,不会直接结束请求,会在 Future 任务超时或者调用get()
时抛出ProducerBlockedQuotaExceededError
异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 protected void handleError (CommandError error) { checkArgument(state == State.Ready); log.warn("{} Received error from server: {}" , ctx.channel(), error.getMessage()); long requestId = error.getRequestId(); if (error.getError() == ServerError.ProducerBlockedQuotaExceededError) { log.warn("{} Producer creation has been blocked because backlog quota exceeded for producer topic" , ctx.channel()); } CompletableFuture<ProducerResponse> requestFuture = pendingRequests.remove(requestId); if (requestFuture != null ) { requestFuture.completeExceptionally(getPulsarClientException(error.getError(), error.getMessage())); } else { log.warn("{} Received unknown request id from server: {}" , ctx.channel(), error.getRequestId()); } }
对于producer_exception
策略,直接返回ProducerBlockedQuotaExceededException
异常。
以上就是对 Pulsar 代码中 BacklogQuota 机制的实现。