leveldb源代码分析系列 recover流程,major compaction

理清leveldb的recover流程对于理解leveldb如何保证数据正确性和一致性(即使在节点崩溃的情况下)是非常有帮助的。
首先从Open函数开始,构造一个DBImpl实例,然后调用了其Recover方法。

Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { *dbptr = NULL; DBImpl* impl = new DBImpl(options, dbname); impl->mutex_.Lock(); VersionEdit edit; // Recover handles create_if_missing, error_if_exists bool save_manifest = false; Status s = impl->Recover(&edit, &save_manifest); if (s.ok() && impl->mem_ == NULL) { // Create new log and a corresponding memtable. uint64_t new_log_number = impl->versions_->NewFileNumber(); WritableFile* lfile; s = options.env->NewWritableFile(LogFileName(dbname, new_log_number), &lfile); if (s.ok()) { edit.SetLogNumber(new_log_number); impl->logfile_ = lfile; impl->logfile_number_ = new_log_number; impl->log_ = new log::Writer(lfile); impl->mem_ = new MemTable(impl->internal_comparator_); impl->mem_->Ref(); } } if (s.ok() && save_manifest) { edit.SetPrevLogNumber(0); // No older logs needed after recovery. edit.SetLogNumber(impl->logfile_number_); s = impl->versions_->LogAndApply(&edit, &impl->mutex_); } if (s.ok()) { impl->DeleteObsoleteFiles(); impl->MaybeScheduleCompaction(); } impl->mutex_.Unlock(); if (s.ok()) { assert(impl->mem_ != NULL); *dbptr = impl; } else { delete impl; } return s; }

Status DBImpl::Recover(VersionEdit* edit, bool *save_manifest) { mutex_.AssertHeld(); // Ignore error from CreateDir since the creation of the DB is // committed only when the descriptor is created, and this directory // may already exist from a previous failed creation attempt. env_->CreateDir(dbname_); assert(db_lock_ == NULL); Status s = env_->LockFile(LockFileName(dbname_), &db_lock_); if (!s.ok()) { return s; }if (!env_->FileExists(CurrentFileName(dbname_))) { if (options_.create_if_missing) { s = NewDB(); if (!s.ok()) { return s; } } else { return Status::InvalidArgument( dbname_, "does not exist (create_if_missing is false)"); } } else { if (options_.error_if_exists) { return Status::InvalidArgument( dbname_, "exists (error_if_exists is true)"); } } // 这里调用了versions_->Recover函数 s = versions_->Recover(save_manifest); if (!s.ok()) { return s; } SequenceNumber max_sequence(0); // Recover from all newer log files than the ones named in the // descriptor (new log files may have been added by the previous // incarnation without registering them in the descriptor). // // Note that PrevLogNumber() is no longer used, but we pay // attention to it in case we are recovering a database // produced by an older version of leveldb. // 以下是恢复在descriptor中记录的最后一个log file之后的所有日志文件 const uint64_t min_log = versions_->LogNumber(); const uint64_t prev_log = versions_->PrevLogNumber(); std::vector filenames; s = env_->GetChildren(dbname_, &filenames); if (!s.ok()) { return s; } std::set expected; versions_->AddLiveFiles(&expected); uint64_t number; FileType type; std::vector logs; for (size_t i = 0; i < filenames.size(); i++) { if (ParseFileName(filenames[i], &number, &type)) { expected.erase(number); // 对于日志文件而言,所有文件编号大于等于versions_->LogNumber()的日志文件都没有来得及被写入当前版本,此时需要回放。 if (type == kLogFile && ((number >= min_log) || (number == prev_log))) logs.push_back(number); } } if (!expected.empty()) { char buf[50]; snprintf(buf, sizeof(buf), "%d missing files; e.g.", static_cast(expected.size())); return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin()))); }// Recover in the order in which the logs were generated std::sort(logs.begin(), logs.end()); for (size_t i = 0; i < logs.size(); i++) { s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit, &max_sequence); if (!s.ok()) { return s; }// The previous incarnation may not have written any MANIFEST // records after allocating this log number.So we manually // update the file number allocation counter in VersionSet. versions_->MarkFileNumberUsed(logs[i]); }if (versions_->LastSequence() < max_sequence) { versions_->SetLastSequence(max_sequence); }return Status::OK(); }

【leveldb源代码分析系列 recover流程,major compaction】下面关注下versions_->Recover()函数
Status VersionSet::Recover(bool *save_manifest) { struct LogReporter : public log::Reader::Reporter { Status* status; virtual void Corruption(size_t bytes, const Status& s) { if (this->status->ok()) *this->status = s; } }; // Read "CURRENT" file, which contains a pointer to the current manifest file std::string current; Status s = ReadFileToString(env_, CurrentFileName(dbname_), ¤t); if (!s.ok()) { return s; } if (current.empty() || current[current.size()-1] != '\n') { return Status::Corruption("CURRENT file does not end with newline"); } current.resize(current.size() - 1); std::string dscname = dbname_ + "/" + current; // manifast file SequentialFile* file; s = env_->NewSequentialFile(dscname, &file); if (!s.ok()) { return s; }bool have_log_number = false; bool have_prev_log_number = false; bool have_next_file = false; bool have_last_sequence = false; uint64_t next_file = 0; uint64_t last_sequence = 0; uint64_t log_number = 0; uint64_t prev_log_number = 0; // Builder可以看作是version edit的集合中转站 Builder builder(this, current_); { LogReporter reporter; reporter.status = &s; log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/); Slice record; std::string scratch; // 这里可以看到,每个manifast文件是按照日志格式存储的(有校验码,保证数据的正确性) while (reader.ReadRecord(&record, &scratch) && s.ok()) { VersionEdit edit; // 每条日志都记录了一个VersionEdit的信息,这里解码到edit中 s = edit.DecodeFrom(record); if (s.ok()) { // 首先判断comparator是不是一致的,不一致直接返回 if (edit.has_comparator_ && edit.comparator_ != icmp_.user_comparator()->Name()) { s = Status::InvalidArgument( edit.comparator_ + " does not match existing comparator ", icmp_.user_comparator()->Name()); } }if (s.ok()) { //edit解码日志成功,写入builder中 builder.Apply(&edit); } // 这里说明一下:每个version edit中记录的log_number_的含义:对于当前版本而言,所有日志文件编号小于这个值的日志都是可以被删除的(因为已经安全的被写入sstable) if (edit.has_log_number_) { log_number = edit.log_number_; have_log_number = true; } // 旧版本,不考虑 if (edit.has_prev_log_number_) { prev_log_number = edit.prev_log_number_; have_prev_log_number = true; } // 下一个可以使用的文件编号 if (edit.has_next_file_number_) { next_file = edit.next_file_number_; have_next_file = true; } // 下一个可以使用的递增的sequence if (edit.has_last_sequence_) { last_sequence = edit.last_sequence_; have_last_sequence = true; } } } delete file; file = NULL; if (s.ok()) { if (!have_next_file) { s = Status::Corruption("no meta-nextfile entry in descriptor"); } else if (!have_log_number) { s = Status::Corruption("no meta-lognumber entry in descriptor"); } else if (!have_last_sequence) { s = Status::Corruption("no last-sequence-number entry in descriptor"); } // 为了兼容旧版本,这一项不再使用 if (!have_prev_log_number) { prev_log_number = 0; }MarkFileNumberUsed(prev_log_number); MarkFileNumberUsed(log_number); }if (s.ok()) { Version* v = new Version(this); // builder.SaveTo(v); // Install recovered version // 计算v的best compaction Finalize(v); AppendVersion(v); // 为manifast文件分配一个文件编号 manifest_file_number_ = next_file; next_file_number_ = next_file + 1; last_sequence_ = last_sequence; log_number_ = log_number; prev_log_number_ = prev_log_number; // See if we can reuse the existing MANIFEST file. // 这个函数检查了是否复用当前的manifast文件 if (ReuseManifest(dscname, current)) { // No need to save new manifest } else { *save_manifest = true; } }return s; }

    推荐阅读