Pulsar 源码阅读: Retention
对于已经消费确认的消息,Pulsar 可以通过配置 Retention 策略决定保留的时间及大小。
具体参见官方文档:Message retention and expiry
Pulsar 源码中有三个部分与 Retention 相关。
1. PersistentTopic
在BrokerService启动之后,this.startInactivityMonitor();
操作会对不活动任务进行定期清理,其中包括 GC 操作:
1 | public void checkGC(int gcIntervalInSeconds) { |
该操作会对 Broker 中的每个 Topic 进行 GC 检查清理的操作。
其中checkGC()
为PersistentTopic
类中的实现,如下:
1 | if (isActive()) { |
shouldTopicBeRetained()
函数会对需要 retention 的数据进行检查。
1 | /** |
上面对是否到达 Retention 时间的检查用到了 AbstractTopic
类中的lastActive
字段:
1 | // Timestamp of when this topic was last seen active |
该字段在每次移除Producer
、移除订阅和执行checkGC()
的时候进行更新。这里根据我的理解对两个问题进行解释:
为什么只在移除的时候更新,而不再加入的时候更新?
lastActive指的是最后的存活时间,所以只有移除所有的producer、consumer之后才可能需要更新。
在每次移除Producer和consumer之后都进行更新,如果保证lastActive就代表了该Topic清空Producer和Consumer的时间?
shiyonglastActive并不能代表这个意思,首先lastActive只在判断是否需要进行GC和是否需要被保留的时候使用(后者是在前者之中调用的),在使用lastActive之前,会执行
isActive()
函数,该函数是对该Topic是否还有与其连接的Producer和Consumer,所有之后使用lastActive参数时已经保证了isActive()
不成立,即:该Topic上已经不存在Producer和Consumer了。
以上是BrokerService中进行Topic清理时对**RetentionTime
**的使用,这一部分是在清理 Topic 之前根据 Retention 策略决定该 Topic 是否应该被清理。
2. ManagedLedgerImpl
Ledger 有关的 Retention 特性是在**ManagedLedgerImpl
**类中实现的,下面介绍该部分。
在启动BrokerService的时候,会设置managedLedgerConfig
:
1 | managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES); |
在ManagedLedgerImpl
类中,下面的函数会对已经完全被消费(所有消息已经被所有 Consumer 消费和确认过)Ledger进行周期性检查清理,
1 | void internalTrimConsumedLedgers(CompletableFuture<?> promise) |
在上面的函数执行过程中,会通过hasLedgerRetentionExpired
函数判断该Ledger是否需要被Retention。
1 | private boolean hasLedgerRetentionExpired(long ledgerTimestamp) { |
其中ledgerTimestamp
参数表示Ledger建立的时间。
3. NamespaceBase
还有一个用到Retention
的地方是在NamespaceBase
类中,可以对存储在Zookeeper中的Retention
配置信息进行get和set,它通过Namespaces
类中的Restful接口对外提供服务,client中的cmd/namepaces
也是通过调用Restful接口实现的对Broker端的配置的修改。
Retention 属于 Namespace 级别的配置,Namespace 只是一个逻辑上的概念,具体消息的存储是通过 Ledger 进行的(Ledger 是 Pulsar 中增加删除持久化信息的最小单位),所以 Retention 这一特性也是有 Ledger 部分实现的。