理清leveldb的recover流程对于理解leveldb如何保证数据正确性和一致性(即使在节点崩溃的情况下)是非常有帮助的。
首先从Open函数开始,构造一个DBImpl实例,然后调用了其Recover方法。
Status DB::Open(const Options& options, const std::string& dbname,
DB** dbptr) {
*dbptr = NULL;
DBImpl* impl = new DBImpl(options, dbname);
impl->mutex_.Lock();
VersionEdit edit;
// Recover handles create_if_missing, error_if_exists
bool save_manifest = false;
Status s = impl->Recover(&edit, &save_manifest);
if (s.ok() && impl->mem_ == NULL) {
// Create new log and a corresponding memtable.
uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile* lfile;
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
&lfile);
if (s.ok()) {
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile);
impl->mem_ = new MemTable(impl->internal_comparator_);
impl->mem_->Ref();
}
}
if (s.ok() && save_manifest) {
edit.SetPrevLogNumber(0);
// No older logs needed after recovery.
edit.SetLogNumber(impl->logfile_number_);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
}
if (s.ok()) {
impl->DeleteObsoleteFiles();
impl->MaybeScheduleCompaction();
}
impl->mutex_.Unlock();
if (s.ok()) {
assert(impl->mem_ != NULL);
*dbptr = impl;
} else {
delete impl;
}
return s;
}
Status DBImpl::Recover(VersionEdit* edit, bool *save_manifest) {
mutex_.AssertHeld();
// Ignore error from CreateDir since the creation of the DB is
// committed only when the descriptor is created, and this directory
// may already exist from a previous failed creation attempt.
env_->CreateDir(dbname_);
assert(db_lock_ == NULL);
Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
if (!s.ok()) {
return s;
}if (!env_->FileExists(CurrentFileName(dbname_))) {
if (options_.create_if_missing) {
s = NewDB();
if (!s.ok()) {
return s;
}
} else {
return Status::InvalidArgument(
dbname_, "does not exist (create_if_missing is false)");
}
} else {
if (options_.error_if_exists) {
return Status::InvalidArgument(
dbname_, "exists (error_if_exists is true)");
}
}
// 这里调用了versions_->Recover函数
s = versions_->Recover(save_manifest);
if (!s.ok()) {
return s;
}
SequenceNumber max_sequence(0);
// Recover from all newer log files than the ones named in the
// descriptor (new log files may have been added by the previous
// incarnation without registering them in the descriptor).
//
// Note that PrevLogNumber() is no longer used, but we pay
// attention to it in case we are recovering a database
// produced by an older version of leveldb.
// 以下是恢复在descriptor中记录的最后一个log file之后的所有日志文件
const uint64_t min_log = versions_->LogNumber();
const uint64_t prev_log = versions_->PrevLogNumber();
std::vector filenames;
s = env_->GetChildren(dbname_, &filenames);
if (!s.ok()) {
return s;
}
std::set expected;
versions_->AddLiveFiles(&expected);
uint64_t number;
FileType type;
std::vector logs;
for (size_t i = 0;
i < filenames.size();
i++) {
if (ParseFileName(filenames[i], &number, &type)) {
expected.erase(number);
// 对于日志文件而言,所有文件编号大于等于versions_->LogNumber()的日志文件都没有来得及被写入当前版本,此时需要回放。
if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
logs.push_back(number);
}
}
if (!expected.empty()) {
char buf[50];
snprintf(buf, sizeof(buf), "%d missing files;
e.g.",
static_cast(expected.size()));
return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
}// Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
for (size_t i = 0;
i < logs.size();
i++) {
s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
&max_sequence);
if (!s.ok()) {
return s;
}// The previous incarnation may not have written any MANIFEST
// records after allocating this log number.So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(logs[i]);
}if (versions_->LastSequence() < max_sequence) {
versions_->SetLastSequence(max_sequence);
}return Status::OK();
}
【leveldb源代码分析系列 recover流程,major compaction】下面关注下versions_->Recover()函数
Status VersionSet::Recover(bool *save_manifest) {
struct LogReporter : public log::Reader::Reporter {
Status* status;
virtual void Corruption(size_t bytes, const Status& s) {
if (this->status->ok()) *this->status = s;
}
};
// Read "CURRENT" file, which contains a pointer to the current manifest file
std::string current;
Status s = ReadFileToString(env_, CurrentFileName(dbname_), ¤t);
if (!s.ok()) {
return s;
}
if (current.empty() || current[current.size()-1] != '\n') {
return Status::Corruption("CURRENT file does not end with newline");
}
current.resize(current.size() - 1);
std::string dscname = dbname_ + "/" + current;
// manifast file
SequentialFile* file;
s = env_->NewSequentialFile(dscname, &file);
if (!s.ok()) {
return s;
}bool have_log_number = false;
bool have_prev_log_number = false;
bool have_next_file = false;
bool have_last_sequence = false;
uint64_t next_file = 0;
uint64_t last_sequence = 0;
uint64_t log_number = 0;
uint64_t prev_log_number = 0;
// Builder可以看作是version edit的集合中转站
Builder builder(this, current_);
{
LogReporter reporter;
reporter.status = &s;
log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/);
Slice record;
std::string scratch;
// 这里可以看到,每个manifast文件是按照日志格式存储的(有校验码,保证数据的正确性)
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit;
// 每条日志都记录了一个VersionEdit的信息,这里解码到edit中
s = edit.DecodeFrom(record);
if (s.ok()) {
// 首先判断comparator是不是一致的,不一致直接返回
if (edit.has_comparator_ &&
edit.comparator_ != icmp_.user_comparator()->Name()) {
s = Status::InvalidArgument(
edit.comparator_ + " does not match existing comparator ",
icmp_.user_comparator()->Name());
}
}if (s.ok()) {
//edit解码日志成功,写入builder中
builder.Apply(&edit);
}
// 这里说明一下:每个version edit中记录的log_number_的含义:对于当前版本而言,所有日志文件编号小于这个值的日志都是可以被删除的(因为已经安全的被写入sstable)
if (edit.has_log_number_) {
log_number = edit.log_number_;
have_log_number = true;
}
// 旧版本,不考虑
if (edit.has_prev_log_number_) {
prev_log_number = edit.prev_log_number_;
have_prev_log_number = true;
}
// 下一个可以使用的文件编号
if (edit.has_next_file_number_) {
next_file = edit.next_file_number_;
have_next_file = true;
}
// 下一个可以使用的递增的sequence
if (edit.has_last_sequence_) {
last_sequence = edit.last_sequence_;
have_last_sequence = true;
}
}
}
delete file;
file = NULL;
if (s.ok()) {
if (!have_next_file) {
s = Status::Corruption("no meta-nextfile entry in descriptor");
} else if (!have_log_number) {
s = Status::Corruption("no meta-lognumber entry in descriptor");
} else if (!have_last_sequence) {
s = Status::Corruption("no last-sequence-number entry in descriptor");
}
// 为了兼容旧版本,这一项不再使用
if (!have_prev_log_number) {
prev_log_number = 0;
}MarkFileNumberUsed(prev_log_number);
MarkFileNumberUsed(log_number);
}if (s.ok()) {
Version* v = new Version(this);
//
builder.SaveTo(v);
// Install recovered version
// 计算v的best compaction
Finalize(v);
AppendVersion(v);
// 为manifast文件分配一个文件编号
manifest_file_number_ = next_file;
next_file_number_ = next_file + 1;
last_sequence_ = last_sequence;
log_number_ = log_number;
prev_log_number_ = prev_log_number;
// See if we can reuse the existing MANIFEST file.
// 这个函数检查了是否复用当前的manifast文件
if (ReuseManifest(dscname, current)) {
// No need to save new manifest
} else {
*save_manifest = true;
}
}return s;
}