Pulsar 源码阅读: Backlog

对于已经消费但是没有确认的消息,Pulsar 可以通过配置 BacklogQuota 决定保留大小及丢弃策略。

具体参见官方文档:Message retention and expiry

当 Backlog 大小未达到限额时,不需要处理,当 Backlog 大小超限时,根据丢弃策略进行处理。

BacklogQuota 的丢弃策略一共有三种:

  1. producer_request_hold:① 断开所有的 Producer ② 在新建 Producer 时进行检查,如果超出 BacklogQuota,则拒绝新建请求并等待超时(异步返回结果调用get()时才抛出异常)
  2. producer_exception:① 断开所有的 Producer ② 在新建 Producer 时进行检查,如果超出 BacklogQuota,则拒绝新建请求并抛出异常
  3. consumer_backlog_eviction:丢弃最早的 Backlog

BacklogQuotaManager

Pulsar 中有BacklogQuotaManager用来进行 Backlog 处理,有下面几个关键函数。

handleExceededBacklogQuota

该函数用来处理 Backlog 超限的情况,对于consumer_backlog_eviction策略,调用dropBacklog(persistentTopic, quota);;对于producer_exceptionproducer_request_hold两种策略,调用disconnectProducers(persistentTopic);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void handleExceededBacklogQuota(PersistentTopic persistentTopic) {
TopicName topicName = TopicName.get(persistentTopic.getName());
String namespace = topicName.getNamespace();
String policyPath = AdminResource.path(POLICIES, namespace);

BacklogQuota quota = getBacklogQuota(namespace, policyPath);
log.info("Backlog quota exceeded for topic [{}]. Applying [{}] policy", persistentTopic.getName(),
quota.getPolicy());
switch (quota.getPolicy()) {
case consumer_backlog_eviction:
dropBacklog(persistentTopic, quota);
break;
case producer_exception:
case producer_request_hold:
disconnectProducers(persistentTopic);
break;
default:
break;
}
}

下面介绍dropBacklog(persistentTopic, quota);disconnectProducers(persistentTopic);两个函数。

dropBacklog(persistentTopic, quota)

dropBacklog(persistentTopic, quota);函数负责在 Backlog 超限之后对 Backlog 中最早的消息进行丢弃,这里的丢弃实际是指向后移动未确认消息的起始标记(该 Topic 上最慢的 Consumer 的位置)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void dropBacklog(PersistentTopic persistentTopic, BacklogQuota quota) {
// 设置丢弃比例为 0.9 ,即丢弃任务完成之后 Backlog 大小变为 Backlog 限额的 90%
double reductionFactor = 0.9;
double targetSize = reductionFactor * quota.getLimit();

// 获取 Backlog 大小的估计值,这里不直接使用 Ledger 的实际大小是因为 Ledger 不一定会被及时清理,实际大小会大于 Backlog 的大小
ManagedLedgerImpl mLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
long backlogSize = mLedger.getEstimatedBacklogSize(); //这个函数后面介绍

ManagedCursor previousSlowestConsumer = null;
while (backlogSize > targetSize) {
// 最慢的 Consumer
ManagedCursor slowestConsumer = mLedger.getSlowestConsumer();
if (slowestConsumer == null) {
break;
}
// 需要跳过的比例
double messageSkipFactor = ((backlogSize - targetSize) / backlogSize);
// Cursor 没有移动,不需要执行清理
if (slowestConsumer == previousSlowestConsumer) {
break;
}
// 计算需要移动的距离
long entriesInBacklog = slowestConsumer.getNumberOfEntriesInBacklog();
int messagesToSkip = (int) (messageSkipFactor * entriesInBacklog);
try {
if (messagesToSkip == 0) {
break;
}
// 移动 slowestConsumer 位置
slowestConsumer.skipEntries(messagesToSkip, IndividualDeletedEntries.Include);
} catch (Exception e) {
log.error("Error skipping [{}] messages from slowest consumer : [{}]", messagesToSkip,
slowestConsumer.getName());
}

// 移动完成之后更新 backlogSize,再次执行上面的流程,确保移动之后 Backlog 没有再次超限
backlogSize = mLedger.getEstimatedBacklogSize();
previousSlowestConsumer = slowestConsumer;
}

}

这里用到了一个ManagedLedgerImpl类中的一个函数getEstimatedBacklogSize(),用来估计 Backlog的大小。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public long getEstimatedBacklogSize() {
// 未确认消息的起始标记
PositionImpl pos = getMarkDeletePositionOfSlowestConsumer();

while (true) {
if (pos == null) {
return 0;
}
long size = 0; // Backlog 大小
final long slowestConsumerLedgerId = pos.getLedgerId();

synchronized (this) {
// 获取所有 Ledger 总大小
size = getTotalSize();
// 减去没有及时清理的 Ledger 的大小
size -= ledgers.values().stream().filter(li -> li.getLedgerId() < slowestConsumerLedgerId)
.mapToLong(li -> li.getSize()).sum();
}

LedgerInfo ledgerInfo = null;
synchronized (this) {
ledgerInfo = ledgers.get(pos.getLedgerId());
}
if (ledgerInfo == null) {
// 如果 pos 指向的 Ledger 已经被删除,但是删除标记还没有更新(每次启动新的 manageLedger 时才会更新),就直接返回结果
if (pos.compareTo(getMarkDeletePositionOfSlowestConsumer()) == 0) {
return size;
}
// 如果删除标记已经更新,说明当前 pos 指向的 Ledger 已经被完全清理,则需要更新 pos 进行重试
pos = getMarkDeletePositionOfSlowestConsumer();
continue;
}

long numEntries = pos.getEntryId();
// consumedLedgerSize()第三个参数需要作为除数,不能为 0
if (ledgerInfo.getEntries() == 0) {
size -= consumedLedgerSize(currentLedgerSize, currentLedgerEntries, numEntries);
return size;
} else {
size -= consumedLedgerSize(ledgerInfo.getSize(), ledgerInfo.getEntries(), numEntries);
return size;
}
}
}
1
2
3
4
5
6
7
8
9
private long consumedLedgerSize(long ledgerSize, long ledgerEntries, long consumedEntries) {
if (ledgerEntries <= 0) {
return 0;
}
// 计算平均 Entry 大小
long averageSize = ledgerSize / ledgerEntries;
// Entry Id 的起始编号为 -1,所以这里需要 +1
return consumedEntries >= 0 ? (consumedEntries + 1) * averageSize : 0;
}

disconnectProducers(persistentTopic)

disconnectProducers(persistentTopic);函数负责在producer_request_holdproducer_exception两种模式下 Backlog 超限时断开与 Producer 的链接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void disconnectProducers(PersistentTopic persistentTopic) {
List<CompletableFuture<Void>> futures = Lists.newArrayList();
ConcurrentOpenHashSet<Producer> producers = persistentTopic.getProducers();

producers.forEach(producer -> {
futures.add(producer.disconnect());
});

FutureUtil.waitForAll(futures).thenRun(() -> {
log.info("All producers on topic [{}] are disconnected", persistentTopic.getName());
}).exceptionally(exception -> {
log.error("Error in disconnecting producers on topic [{}] [{}]", persistentTopic.getName(), exception);
return null;

});
}

BacklogQuota 检查

BacklogQuota 有两种形式的检查,一种是周期性检查,另一种是创建 Producer 之前检查

周期性检查

BrokerService启动时,会启动startBacklogQuotaChecker();startBacklogQuotaChecker();负责周期性执行monitorBacklogQuota(),对于 Backlog 超限的情况,会通过BacklogQuotaManager进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void monitorBacklogQuota() {
forEachTopic(topic -> {
if (topic instanceof PersistentTopic) {
PersistentTopic persistentTopic = (PersistentTopic) topic;
if (isBacklogExceeded(persistentTopic)) {
getBacklogQuotaManager().handleExceededBacklogQuota(persistentTopic);
} else {
if (log.isDebugEnabled()) {
log.debug("quota not exceeded for [{}]", topic.getName());
}
}
}
});
}

创建 Producer 之前检查

在 Broker 与 Client 对接的服务端ServerCnx上,收到建立 Producer 的触发之后,在创建 Producer 之前,会进行 BacklogQuota 检查。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if (topic.isBacklogQuotaExceeded(producerName)) {
IllegalStateException illegalStateException = new IllegalStateException(
"Cannot create producer on topic with backlog quota exceeded");
BacklogQuota.RetentionPolicy retentionPolicy = topic.getBacklogQuota().getPolicy();
if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
// 返回 Error
ctx.writeAndFlush( Commands.newError(requestId, ServerError.ProducerBlockedQuotaExceededError,
illegalStateException.getMessage()));
} else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
// 返回 Exception
ctx.writeAndFlush(Commands.newError(requestId, ServerError.ProducerBlockedQuotaExceededException,
illegalStateException.getMessage()));
}
producerFuture.completeExceptionally(illegalStateException);
producers.remove(producerId, producerFuture);
return;
}

这里只进行producer_request_holdproducer_exception两种策略的处理,consumer_backlog_eviction只在周期性检查时进行处理。

对于producer_request_hold策略,返回 Error ,ClientCnx在收到 Error 之后,不会直接结束请求,会在 Future 任务超时或者调用get()时抛出ProducerBlockedQuotaExceededError异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected void handleError(CommandError error) {
checkArgument(state == State.Ready);

log.warn("{} Received error from server: {}", ctx.channel(), error.getMessage());
long requestId = error.getRequestId();
if (error.getError() == ServerError.ProducerBlockedQuotaExceededError) {
log.warn("{} Producer creation has been blocked because backlog quota exceeded for producer topic",
ctx.channel());
}
CompletableFuture<ProducerResponse> requestFuture = pendingRequests.remove(requestId);
if (requestFuture != null) {
requestFuture.completeExceptionally(getPulsarClientException(error.getError(), error.getMessage()));
} else {
log.warn("{} Received unknown request id from server: {}", ctx.channel(), error.getRequestId());
}
}

对于producer_exception策略,直接返回ProducerBlockedQuotaExceededException异常。

以上就是对 Pulsar 代码中 BacklogQuota 机制的实现。