/** a file that is scheduled to be deleted */ valDeletedFileSuffix = ".deleted"
/** * Perform an asynchronous delete on the given file if it exists (otherwise do nothing) * * @throws KafkaStorageException if the file can't be renamed and still exists */ privatedefasyncDeleteSegment(segment: LogSegment) { segment.changeFileSuffixes("", Log.DeletedFileSuffix) defdeleteSeg() { info("Deleting segment %d from log %s.".format(segment.baseOffset, name)) segment.delete() } scheduler.schedule("delete-file", deleteSeg, delay = config.fileDeleteDelayMs) }
/** * Change the suffix for the index and log file for this log segment */ defchangeFileSuffixes(oldSuffix: String, newSuffix: String) {
defkafkaStorageException(fileType: String, e: IOException) = newKafkaStorageException(s"Failed to change the $fileType file suffix from $oldSuffix to $newSuffix for log segment $baseOffset", e)