Pulsar 源码阅读: Retention

对于已经消费确认的消息,Pulsar 可以通过配置 Retention 策略决定保留的时间及大小。

具体参见官方文档:Message retention and expiry

Pulsar 源码中有三个部分与 Retention 相关。

1. PersistentTopic

BrokerService启动之后,this.startInactivityMonitor();操作会对不活动任务进行定期清理,其中包括 GC 操作:

1
2
3
public void checkGC(int gcIntervalInSeconds) {
forEachTopic(topic -> topic.checkGC(gcIntervalInSeconds));
}

该操作会对 Broker 中的每个 Topic 进行 GC 检查清理的操作。

其中checkGC()PersistentTopic类中的实现,如下:

1
2
3
4
5
6
7
8
9
10
11
if (isActive()) {
lastActive = System.nanoTime();
} else if (System.nanoTime() - lastActive < TimeUnit.SECONDS.toNanos(gcIntervalInSeconds)) {
// Gc interval did not expire yet
return;
} else if (shouldTopicBeRetained()) {
// Topic activity is still within the retention period
return;
} else {
...
}

shouldTopicBeRetained()函数会对需要 retention 的数据进行检查。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Check whether the topic should be retained (based on time), even tough there are no producers/consumers and it's
* marked as inactive.
*/
private boolean shouldTopicBeRetained() {
TopicName name = TopicName.get(topic);
try {
// 从配置缓存中读取配置信息
Optional<Policies> policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, name.getNamespace()));
// 如果没有配置信息,默认该 Topic 不需要 Retention ,清理。
// 如果有配置信息,根据是否超过设定的 Retention 时间选择是否进行清理
return policies.map(p -> p.retention_policies).map(rp -> {
long retentionTime = TimeUnit.MINUTES.toNanos(rp.getRetentionTimeInMinutes());
return retentionTime < 0 || (System.nanoTime() - lastActive) < retentionTime;
}).orElse(false);
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("[{}] Error getting policies", topic);
}
// Don't delete in case we cannot get the policies
return true;
}

上面对是否到达 Retention 时间的检查用到了 AbstractTopic 类中的lastActive字段:

1
2
// Timestamp of when this topic was last seen active
protected volatile long lastActive;

该字段在每次移除Producer、移除订阅和执行checkGC()的时候进行更新。这里根据我的理解对两个问题进行解释:

  1. 为什么只在移除的时候更新,而不再加入的时候更新?

    lastActive指的是最后的存活时间,所以只有移除所有的producer、consumer之后才可能需要更新。

  2. 在每次移除Producer和consumer之后都进行更新,如果保证lastActive就代表了该Topic清空Producer和Consumer的时间?

    shiyonglastActive并不能代表这个意思,首先lastActive只在判断是否需要进行GC和是否需要被保留的时候使用(后者是在前者之中调用的),在使用lastActive之前,会执行isActive()函数,该函数是对该Topic是否还有与其连接的Producer和Consumer,所有之后使用lastActive参数时已经保证了isActive()不成立,即:该Topic上已经不存在Producer和Consumer了。

以上是BrokerService中进行Topic清理时对**RetentionTime**的使用,这一部分是在清理 Topic 之前根据 Retention 策略决定该 Topic 是否应该被清理。

2. ManagedLedgerImpl

Ledger 有关的 Retention 特性是在**ManagedLedgerImpl**类中实现的,下面介绍该部分。

在启动BrokerService的时候,会设置managedLedgerConfig:

1
2
managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());

ManagedLedgerImpl类中,下面的函数会对已经完全被消费(所有消息已经被所有 Consumer 消费和确认过)Ledger进行周期性检查清理,

1
void internalTrimConsumedLedgers(CompletableFuture<?> promise)

在上面的函数执行过程中,会通过hasLedgerRetentionExpired函数判断该Ledger是否需要被Retention。

1
2
3
4
5
6
7
8
private boolean hasLedgerRetentionExpired(long ledgerTimestamp) {
if (config.getRetentionTimeMillis() < 0) {
// Negative retention time equates to infinite retention
return false;
}
long elapsedMs = clock.millis() - ledgerTimestamp;
return elapsedMs > config.getRetentionTimeMillis();
}

其中ledgerTimestamp参数表示Ledger建立的时间。

3. NamespaceBase

还有一个用到Retention的地方是在NamespaceBase类中,可以对存储在Zookeeper中的Retention配置信息进行get和set,它通过Namespaces类中的Restful接口对外提供服务,client中的cmd/namepaces也是通过调用Restful接口实现的对Broker端的配置的修改。

Retention 属于 Namespace 级别的配置,Namespace 只是一个逻辑上的概念,具体消息的存储是通过 Ledger 进行的(Ledger 是 Pulsar 中增加删除持久化信息的最小单位),所以 Retention 这一特性也是有 Ledger 部分实现的。