Persistent globally unique DB ID in manifest (#5725)
Summary: Each DB has a globally unique ID. A DB can be physically copied around, or backed-up and restored, and the users should be identify the same DB. This unique ID right now is stored as plain text in file IDENTITY under the DB directory. This approach introduces at least two problems: (1) the file is not checksumed; (2) the source of truth of a DB is the manifest file, which can be copied separately from IDENTITY file, causing the DB ID to be wrong. The goal of this PR is solve this problem by moving the DB ID to manifest. To begin with we will write to both identity file and manifest. Write to Manifest is controlled via the flag write_dbid_to_manifest in Options and default is false. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5725 Test Plan: Added unit tests. Differential Revision: D16963840 Pulled By: vjnadimpalli fbshipit-source-id: 8a86a4c8c82c716003c40fd6b9d2d758030d92e9
This commit is contained in:
parent
44eca41add
commit
979fbdc696
@ -14,6 +14,7 @@
|
||||
|
||||
#include "db/column_family.h"
|
||||
#include "db/compaction/compaction_job.h"
|
||||
#include "db/db_impl/db_impl.h"
|
||||
#include "db/error_handler.h"
|
||||
#include "db/version_set.h"
|
||||
#include "rocksdb/cache.h"
|
||||
@ -204,8 +205,15 @@ class CompactionJobTest : public testing::Test {
|
||||
&write_controller_,
|
||||
/*block_cache_tracer=*/nullptr));
|
||||
compaction_job_stats_.Reset();
|
||||
SetIdentityFile(env_, dbname_);
|
||||
|
||||
VersionEdit new_db;
|
||||
if (db_options_.write_dbid_to_manifest) {
|
||||
DBImpl* impl = new DBImpl(DBOptions(), dbname_);
|
||||
std::string db_id;
|
||||
impl->GetDbIdentityFromIdentityFile(&db_id);
|
||||
new_db.SetDBId(db_id);
|
||||
}
|
||||
new_db.SetLogNumber(0);
|
||||
new_db.SetNextFile(2);
|
||||
new_db.SetLastSequence(0);
|
||||
|
@ -70,6 +70,44 @@ TEST_F(DBBasicTest, ReadOnlyDB) {
|
||||
ASSERT_TRUE(db_->SyncWAL().IsNotSupported());
|
||||
}
|
||||
|
||||
TEST_F(DBBasicTest, ReadOnlyDBWithWriteDBIdToManifestSet) {
|
||||
ASSERT_OK(Put("foo", "v1"));
|
||||
ASSERT_OK(Put("bar", "v2"));
|
||||
ASSERT_OK(Put("foo", "v3"));
|
||||
Close();
|
||||
|
||||
auto options = CurrentOptions();
|
||||
options.write_dbid_to_manifest = true;
|
||||
assert(options.env == env_);
|
||||
ASSERT_OK(ReadOnlyReopen(options));
|
||||
std::string db_id1;
|
||||
db_->GetDbIdentity(db_id1);
|
||||
ASSERT_EQ("v3", Get("foo"));
|
||||
ASSERT_EQ("v2", Get("bar"));
|
||||
Iterator* iter = db_->NewIterator(ReadOptions());
|
||||
int count = 0;
|
||||
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
||||
ASSERT_OK(iter->status());
|
||||
++count;
|
||||
}
|
||||
ASSERT_EQ(count, 2);
|
||||
delete iter;
|
||||
Close();
|
||||
|
||||
// Reopen and flush memtable.
|
||||
Reopen(options);
|
||||
Flush();
|
||||
Close();
|
||||
// Now check keys in read only mode.
|
||||
ASSERT_OK(ReadOnlyReopen(options));
|
||||
ASSERT_EQ("v3", Get("foo"));
|
||||
ASSERT_EQ("v2", Get("bar"));
|
||||
ASSERT_TRUE(db_->SyncWAL().IsNotSupported());
|
||||
std::string db_id2;
|
||||
db_->GetDbIdentity(db_id2);
|
||||
ASSERT_EQ(db_id1, db_id2);
|
||||
}
|
||||
|
||||
TEST_F(DBBasicTest, CompactedDB) {
|
||||
const uint64_t kFileSize = 1 << 20;
|
||||
Options options = CurrentOptions();
|
||||
@ -424,7 +462,7 @@ TEST_F(DBBasicTest, ManifestRollOver) {
|
||||
} while (ChangeCompactOptions());
|
||||
}
|
||||
|
||||
TEST_F(DBBasicTest, IdentityAcrossRestarts) {
|
||||
TEST_F(DBBasicTest, IdentityAcrossRestarts1) {
|
||||
do {
|
||||
std::string id1;
|
||||
ASSERT_OK(db_->GetDbIdentity(id1));
|
||||
@ -441,8 +479,35 @@ TEST_F(DBBasicTest, IdentityAcrossRestarts) {
|
||||
Reopen(options);
|
||||
std::string id3;
|
||||
ASSERT_OK(db_->GetDbIdentity(id3));
|
||||
if (options.write_dbid_to_manifest) {
|
||||
ASSERT_EQ(id1.compare(id3), 0);
|
||||
} else {
|
||||
// id1 should NOT match id3 because identity was regenerated
|
||||
ASSERT_NE(id1.compare(id3), 0);
|
||||
}
|
||||
} while (ChangeCompactOptions());
|
||||
}
|
||||
|
||||
TEST_F(DBBasicTest, IdentityAcrossRestarts2) {
|
||||
do {
|
||||
std::string id1;
|
||||
ASSERT_OK(db_->GetDbIdentity(id1));
|
||||
|
||||
Options options = CurrentOptions();
|
||||
options.write_dbid_to_manifest = true;
|
||||
Reopen(options);
|
||||
std::string id2;
|
||||
ASSERT_OK(db_->GetDbIdentity(id2));
|
||||
// id1 should match id2 because identity was not regenerated
|
||||
ASSERT_EQ(id1.compare(id2), 0);
|
||||
|
||||
std::string idfilename = IdentityFileName(dbname_);
|
||||
ASSERT_OK(env_->DeleteFile(idfilename));
|
||||
Reopen(options);
|
||||
std::string id3;
|
||||
ASSERT_OK(db_->GetDbIdentity(id3));
|
||||
// id1 should NOT match id3 because identity was regenerated
|
||||
ASSERT_EQ(id1, id3);
|
||||
} while (ChangeCompactOptions());
|
||||
}
|
||||
|
||||
@ -1370,8 +1435,8 @@ TEST_P(DBMultiGetRowCacheTest, MultiGetBatched) {
|
||||
if (use_snapshots) {
|
||||
ro.snapshot = snap2;
|
||||
}
|
||||
db_->MultiGet(ro, handles_[1], keys.size(), keys.data(),
|
||||
values.data(), s.data(), false);
|
||||
db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(),
|
||||
s.data(), false);
|
||||
|
||||
ASSERT_EQ(values.size(), keys.size());
|
||||
ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v1");
|
||||
|
@ -3158,6 +3158,11 @@ Status DBImpl::CheckConsistency() {
|
||||
}
|
||||
|
||||
Status DBImpl::GetDbIdentity(std::string& identity) const {
|
||||
identity.assign(db_id_);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status DBImpl::GetDbIdentityFromIdentityFile(std::string* identity) const {
|
||||
std::string idfilename = IdentityFileName(dbname_);
|
||||
const EnvOptions soptions;
|
||||
std::unique_ptr<SequentialFileReader> id_file_reader;
|
||||
@ -3184,10 +3189,10 @@ Status DBImpl::GetDbIdentity(std::string& identity) const {
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
identity.assign(id.ToString());
|
||||
identity->assign(id.ToString());
|
||||
// If last character is '\n' remove it from identity
|
||||
if (identity.size() > 0 && identity.back() == '\n') {
|
||||
identity.pop_back();
|
||||
if (identity->size() > 0 && identity->back() == '\n') {
|
||||
identity->pop_back();
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
@ -317,6 +317,8 @@ class DBImpl : public DB {
|
||||
|
||||
virtual Status GetDbIdentity(std::string& identity) const override;
|
||||
|
||||
virtual Status GetDbIdentityFromIdentityFile(std::string* identity) const;
|
||||
|
||||
ColumnFamilyHandle* DefaultColumnFamily() const override;
|
||||
|
||||
ColumnFamilyHandle* PersistentStatsColumnFamily() const;
|
||||
@ -926,6 +928,7 @@ class DBImpl : public DB {
|
||||
protected:
|
||||
Env* const env_;
|
||||
const std::string dbname_;
|
||||
std::string db_id_;
|
||||
std::unique_ptr<VersionSet> versions_;
|
||||
// Flag to check whether we allocated and own the info log file
|
||||
bool own_info_log_;
|
||||
|
@ -231,12 +231,19 @@ Status DBImpl::ValidateOptions(const DBOptions& db_options) {
|
||||
|
||||
Status DBImpl::NewDB() {
|
||||
VersionEdit new_db;
|
||||
Status s = SetIdentityFile(env_, dbname_);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
if (immutable_db_options_.write_dbid_to_manifest) {
|
||||
std::string temp_db_id;
|
||||
GetDbIdentityFromIdentityFile(&temp_db_id);
|
||||
new_db.SetDBId(temp_db_id);
|
||||
}
|
||||
new_db.SetLogNumber(0);
|
||||
new_db.SetNextFile(2);
|
||||
new_db.SetLastSequence(0);
|
||||
|
||||
Status s;
|
||||
|
||||
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n");
|
||||
const std::string manifest = DescriptorFileName(dbname_, 1);
|
||||
{
|
||||
@ -339,6 +346,9 @@ Status DBImpl::Recover(
|
||||
s = env_->FileExists(CurrentFileName(dbname_));
|
||||
if (s.IsNotFound()) {
|
||||
if (immutable_db_options_.create_if_missing) {
|
||||
// Has to be called only after Identity File creation is successful
|
||||
// because DB ID is stored in Manifest if
|
||||
// immutable_db_options_.write_dbid_to_manifest = true
|
||||
s = NewDB();
|
||||
is_new_db = true;
|
||||
if (!s.ok()) {
|
||||
@ -358,30 +368,19 @@ Status DBImpl::Recover(
|
||||
assert(s.IsIOError());
|
||||
return s;
|
||||
}
|
||||
// Check for the IDENTITY file and create it if not there
|
||||
s = env_->FileExists(IdentityFileName(dbname_));
|
||||
if (s.IsNotFound()) {
|
||||
s = SetIdentityFile(env_, dbname_);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
} else if (!s.ok()) {
|
||||
assert(s.IsIOError());
|
||||
return s;
|
||||
}
|
||||
// Verify compatibility of env_options_ and filesystem
|
||||
{
|
||||
std::unique_ptr<RandomAccessFile> idfile;
|
||||
EnvOptions customized_env(env_options_);
|
||||
customized_env.use_direct_reads |=
|
||||
immutable_db_options_.use_direct_io_for_flush_and_compaction;
|
||||
s = env_->NewRandomAccessFile(IdentityFileName(dbname_), &idfile,
|
||||
s = env_->NewRandomAccessFile(CurrentFileName(dbname_), &idfile,
|
||||
customized_env);
|
||||
if (!s.ok()) {
|
||||
std::string error_str = s.ToString();
|
||||
// Check if unsupported Direct I/O is the root cause
|
||||
customized_env.use_direct_reads = false;
|
||||
s = env_->NewRandomAccessFile(IdentityFileName(dbname_), &idfile,
|
||||
s = env_->NewRandomAccessFile(CurrentFileName(dbname_), &idfile,
|
||||
customized_env);
|
||||
if (s.ok()) {
|
||||
return Status::InvalidArgument(
|
||||
@ -393,8 +392,42 @@ Status DBImpl::Recover(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Status s = versions_->Recover(column_families, read_only);
|
||||
assert(db_id_.empty());
|
||||
Status s = versions_->Recover(column_families, read_only, &db_id_);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
// Happens when immutable_db_options_.write_dbid_to_manifest is set to true
|
||||
// the very first time.
|
||||
if (db_id_.empty()) {
|
||||
// Check for the IDENTITY file and create it if not there.
|
||||
s = env_->FileExists(IdentityFileName(dbname_));
|
||||
// Typically Identity file is created in NewDB() and for some reason if
|
||||
// it is no longer available then at this point DB ID is not in Identity
|
||||
// file or Manifest.
|
||||
if (s.IsNotFound()) {
|
||||
s = SetIdentityFile(env_, dbname_);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
} else if (!s.ok()) {
|
||||
assert(s.IsIOError());
|
||||
return s;
|
||||
}
|
||||
GetDbIdentityFromIdentityFile(&db_id_);
|
||||
if (immutable_db_options_.write_dbid_to_manifest) {
|
||||
VersionEdit edit;
|
||||
edit.SetDBId(db_id_);
|
||||
Options options;
|
||||
MutableCFOptions mutable_cf_options(options);
|
||||
versions_->db_id_ = db_id_;
|
||||
versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
|
||||
mutable_cf_options, &edit, &mutex_, nullptr,
|
||||
false);
|
||||
}
|
||||
} else {
|
||||
SetIdentityFile(env_, dbname_, db_id_);
|
||||
}
|
||||
|
||||
if (immutable_db_options_.paranoid_checks && s.ok()) {
|
||||
s = CheckConsistency();
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <string>
|
||||
|
||||
#include "db/column_family.h"
|
||||
#include "db/db_impl/db_impl.h"
|
||||
#include "db/flush_job.h"
|
||||
#include "db/version_set.h"
|
||||
#include "rocksdb/cache.h"
|
||||
@ -55,7 +56,14 @@ class FlushJobTest : public testing::Test {
|
||||
}
|
||||
|
||||
void NewDB() {
|
||||
SetIdentityFile(env_, dbname_);
|
||||
VersionEdit new_db;
|
||||
if (db_options_.write_dbid_to_manifest) {
|
||||
DBImpl* impl = new DBImpl(DBOptions(), dbname_);
|
||||
std::string db_id;
|
||||
impl->GetDbIdentityFromIdentityFile(&db_id);
|
||||
new_db.SetDBId(db_id);
|
||||
}
|
||||
new_db.SetLogNumber(0);
|
||||
new_db.SetNextFile(2);
|
||||
new_db.SetLastSequence(0);
|
||||
|
@ -18,8 +18,13 @@
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
// Mask for an identified tag from the future which can be safely ignored.
|
||||
const uint32_t kTagSafeIgnoreMask = 1 << 13;
|
||||
|
||||
// Tag numbers for serialized VersionEdit. These numbers are written to
|
||||
// disk and should not be changed.
|
||||
// disk and should not be changed. The number should be forward compatible so
|
||||
// users can down-grade RocksDB safely. A future Tag is ignored by doing '&'
|
||||
// between Tag and kTagSafeIgnoreMask field.
|
||||
enum Tag : uint32_t {
|
||||
kComparator = 1,
|
||||
kLogNumber = 2,
|
||||
@ -31,6 +36,8 @@ enum Tag : uint32_t {
|
||||
// 8 was used for large value refs
|
||||
kPrevLogNumber = 9,
|
||||
kMinLogNumberToKeep = 10,
|
||||
// Ignore-able field
|
||||
kDbId = kTagSafeIgnoreMask + 1,
|
||||
|
||||
// these are new formats divergent from open source leveldb
|
||||
kNewFile2 = 100,
|
||||
@ -44,9 +51,6 @@ enum Tag : uint32_t {
|
||||
kInAtomicGroup = 300,
|
||||
};
|
||||
|
||||
// Mask for an identified tag from the future which can be safely ignored.
|
||||
uint32_t kTagSafeIgnoreMask = 1 << 13;
|
||||
|
||||
enum CustomTag : uint32_t {
|
||||
kTerminate = 1, // The end of customized fields
|
||||
kNeedCompaction = 2,
|
||||
@ -67,6 +71,7 @@ uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id) {
|
||||
}
|
||||
|
||||
void VersionEdit::Clear() {
|
||||
db_id_.clear();
|
||||
comparator_.clear();
|
||||
max_level_ = 0;
|
||||
log_number_ = 0;
|
||||
@ -75,6 +80,7 @@ void VersionEdit::Clear() {
|
||||
next_file_number_ = 0;
|
||||
max_column_family_ = 0;
|
||||
min_log_number_to_keep_ = 0;
|
||||
has_db_id_ = false;
|
||||
has_comparator_ = false;
|
||||
has_log_number_ = false;
|
||||
has_prev_log_number_ = false;
|
||||
@ -93,6 +99,10 @@ void VersionEdit::Clear() {
|
||||
}
|
||||
|
||||
bool VersionEdit::EncodeTo(std::string* dst) const {
|
||||
if (has_db_id_) {
|
||||
PutVarint32(dst, kDbId);
|
||||
PutLengthPrefixedSlice(dst, db_id_);
|
||||
}
|
||||
if (has_comparator_) {
|
||||
PutVarint32(dst, kComparator);
|
||||
PutLengthPrefixedSlice(dst, comparator_);
|
||||
@ -320,9 +330,16 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
|
||||
FileMetaData f;
|
||||
Slice str;
|
||||
InternalKey key;
|
||||
|
||||
while (msg == nullptr && GetVarint32(&input, &tag)) {
|
||||
switch (tag) {
|
||||
case kDbId:
|
||||
if (GetLengthPrefixedSlice(&input, &str)) {
|
||||
db_id_ = str.ToString();
|
||||
has_db_id_ = true;
|
||||
} else {
|
||||
msg = "db id";
|
||||
}
|
||||
break;
|
||||
case kComparator:
|
||||
if (GetLengthPrefixedSlice(&input, &str)) {
|
||||
comparator_ = str.ToString();
|
||||
@ -537,6 +554,10 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
|
||||
std::string VersionEdit::DebugString(bool hex_key) const {
|
||||
std::string r;
|
||||
r.append("VersionEdit {");
|
||||
if (has_db_id_) {
|
||||
r.append("\n DB ID: ");
|
||||
r.append(db_id_);
|
||||
}
|
||||
if (has_comparator_) {
|
||||
r.append("\n Comparator: ");
|
||||
r.append(comparator_);
|
||||
@ -608,6 +629,9 @@ std::string VersionEdit::DebugJSON(int edit_num, bool hex_key) const {
|
||||
JSONWriter jw;
|
||||
jw << "EditNumber" << edit_num;
|
||||
|
||||
if (has_db_id_) {
|
||||
jw << "DB ID" << db_id_;
|
||||
}
|
||||
if (has_comparator_) {
|
||||
jw << "Comparator" << comparator_;
|
||||
}
|
||||
|
@ -198,6 +198,11 @@ class VersionEdit {
|
||||
|
||||
void Clear();
|
||||
|
||||
void SetDBId(const std::string& db_id) {
|
||||
has_db_id_ = true;
|
||||
db_id_ = db_id;
|
||||
}
|
||||
|
||||
void SetComparatorName(const Slice& name) {
|
||||
has_comparator_ = true;
|
||||
comparator_ = name.ToString();
|
||||
@ -227,6 +232,8 @@ class VersionEdit {
|
||||
min_log_number_to_keep_ = num;
|
||||
}
|
||||
|
||||
bool has_db_id() { return has_db_id_; }
|
||||
|
||||
bool has_log_number() { return has_log_number_; }
|
||||
|
||||
uint64_t log_number() { return log_number_; }
|
||||
@ -314,6 +321,8 @@ class VersionEdit {
|
||||
std::string DebugString(bool hex_key = false) const;
|
||||
std::string DebugJSON(int edit_num, bool hex_key = false) const;
|
||||
|
||||
const std::string GetDbId() { return db_id_; }
|
||||
|
||||
private:
|
||||
friend class ReactiveVersionSet;
|
||||
friend class VersionSet;
|
||||
@ -323,6 +332,7 @@ class VersionEdit {
|
||||
bool GetLevel(Slice* input, int* level, const char** msg);
|
||||
|
||||
int max_level_;
|
||||
std::string db_id_;
|
||||
std::string comparator_;
|
||||
uint64_t log_number_;
|
||||
uint64_t prev_log_number_;
|
||||
@ -331,6 +341,7 @@ class VersionEdit {
|
||||
// The most recent WAL log number that is deleted
|
||||
uint64_t min_log_number_to_keep_;
|
||||
SequenceNumber last_sequence_;
|
||||
bool has_db_id_;
|
||||
bool has_comparator_;
|
||||
bool has_log_number_;
|
||||
bool has_prev_log_number_;
|
||||
|
@ -239,6 +239,16 @@ TEST_F(VersionEditTest, IgnorableField) {
|
||||
ASSERT_EQ(88, ve.next_file_number());
|
||||
}
|
||||
|
||||
TEST_F(VersionEditTest, DbId) {
|
||||
VersionEdit edit;
|
||||
edit.SetDBId("ab34-cd12-435f-er00");
|
||||
TestEncodeDecode(edit);
|
||||
|
||||
edit.Clear();
|
||||
edit.SetDBId("34ba-cd12-435f-er01");
|
||||
TestEncodeDecode(edit);
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -3747,7 +3747,7 @@ Status VersionSet::ProcessManifestWrites(
|
||||
nullptr, db_options_->listeners));
|
||||
descriptor_log_.reset(
|
||||
new log::Writer(std::move(file_writer), 0, false));
|
||||
s = WriteSnapshot(descriptor_log_.get());
|
||||
s = WriteCurrentStateToManifest(descriptor_log_.get());
|
||||
}
|
||||
}
|
||||
|
||||
@ -4061,10 +4061,7 @@ Status VersionSet::ApplyOneVersionEditToBuilder(
|
||||
std::unordered_map<int, std::string>& column_families_not_found,
|
||||
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
|
||||
builders,
|
||||
bool* have_log_number, uint64_t* log_number, bool* have_prev_log_number,
|
||||
uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file,
|
||||
bool* have_last_sequence, SequenceNumber* last_sequence,
|
||||
uint64_t* min_log_number_to_keep, uint32_t* max_column_family) {
|
||||
VersionEditParams* version_edit_params) {
|
||||
// Not found means that user didn't supply that column
|
||||
// family option AND we encountered column family add
|
||||
// record. Once we encounter column family drop record,
|
||||
@ -4150,61 +4147,55 @@ Status VersionSet::ApplyOneVersionEditToBuilder(
|
||||
return s;
|
||||
}
|
||||
}
|
||||
return ExtractInfoFromVersionEdit(
|
||||
cfd, edit, have_log_number, log_number, have_prev_log_number,
|
||||
previous_log_number, have_next_file, next_file, have_last_sequence,
|
||||
last_sequence, min_log_number_to_keep, max_column_family);
|
||||
return ExtractInfoFromVersionEdit(cfd, edit, version_edit_params);
|
||||
}
|
||||
|
||||
Status VersionSet::ExtractInfoFromVersionEdit(
|
||||
ColumnFamilyData* cfd, const VersionEdit& edit, bool* have_log_number,
|
||||
uint64_t* log_number, bool* have_prev_log_number,
|
||||
uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file,
|
||||
bool* have_last_sequence, SequenceNumber* last_sequence,
|
||||
uint64_t* min_log_number_to_keep, uint32_t* max_column_family) {
|
||||
ColumnFamilyData* cfd, const VersionEdit& from_edit,
|
||||
VersionEditParams* version_edit_params) {
|
||||
if (cfd != nullptr) {
|
||||
if (edit.has_log_number_) {
|
||||
if (cfd->GetLogNumber() > edit.log_number_) {
|
||||
if (from_edit.has_db_id_) {
|
||||
version_edit_params->SetDBId(from_edit.db_id_);
|
||||
}
|
||||
if (from_edit.has_log_number_) {
|
||||
if (cfd->GetLogNumber() > from_edit.log_number_) {
|
||||
ROCKS_LOG_WARN(
|
||||
db_options_->info_log,
|
||||
"MANIFEST corruption detected, but ignored - Log numbers in "
|
||||
"records NOT monotonically increasing");
|
||||
} else {
|
||||
cfd->SetLogNumber(edit.log_number_);
|
||||
*have_log_number = true;
|
||||
*log_number = edit.log_number_;
|
||||
cfd->SetLogNumber(from_edit.log_number_);
|
||||
version_edit_params->SetLogNumber(from_edit.log_number_);
|
||||
}
|
||||
}
|
||||
if (edit.has_comparator_ &&
|
||||
edit.comparator_ != cfd->user_comparator()->Name()) {
|
||||
if (from_edit.has_comparator_ &&
|
||||
from_edit.comparator_ != cfd->user_comparator()->Name()) {
|
||||
return Status::InvalidArgument(
|
||||
cfd->user_comparator()->Name(),
|
||||
"does not match existing comparator " + edit.comparator_);
|
||||
"does not match existing comparator " + from_edit.comparator_);
|
||||
}
|
||||
}
|
||||
|
||||
if (edit.has_prev_log_number_) {
|
||||
*previous_log_number = edit.prev_log_number_;
|
||||
*have_prev_log_number = true;
|
||||
if (from_edit.has_prev_log_number_) {
|
||||
version_edit_params->SetPrevLogNumber(from_edit.prev_log_number_);
|
||||
}
|
||||
|
||||
if (edit.has_next_file_number_) {
|
||||
*next_file = edit.next_file_number_;
|
||||
*have_next_file = true;
|
||||
if (from_edit.has_next_file_number_) {
|
||||
version_edit_params->SetNextFile(from_edit.next_file_number_);
|
||||
}
|
||||
|
||||
if (edit.has_max_column_family_) {
|
||||
*max_column_family = edit.max_column_family_;
|
||||
if (from_edit.has_max_column_family_) {
|
||||
version_edit_params->SetMaxColumnFamily(from_edit.max_column_family_);
|
||||
}
|
||||
|
||||
if (edit.has_min_log_number_to_keep_) {
|
||||
*min_log_number_to_keep =
|
||||
std::max(*min_log_number_to_keep, edit.min_log_number_to_keep_);
|
||||
if (from_edit.has_min_log_number_to_keep_) {
|
||||
version_edit_params->min_log_number_to_keep_ =
|
||||
std::max(version_edit_params->min_log_number_to_keep_,
|
||||
from_edit.min_log_number_to_keep_);
|
||||
}
|
||||
|
||||
if (edit.has_last_sequence_) {
|
||||
*last_sequence = edit.last_sequence_;
|
||||
*have_last_sequence = true;
|
||||
if (from_edit.has_last_sequence_) {
|
||||
version_edit_params->SetLastSequence(from_edit.last_sequence_);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
@ -4245,10 +4236,7 @@ Status VersionSet::ReadAndRecover(
|
||||
std::unordered_map<int, std::string>& column_families_not_found,
|
||||
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
|
||||
builders,
|
||||
bool* have_log_number, uint64_t* log_number, bool* have_prev_log_number,
|
||||
uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file,
|
||||
bool* have_last_sequence, SequenceNumber* last_sequence,
|
||||
uint64_t* min_log_number_to_keep, uint32_t* max_column_family) {
|
||||
VersionEditParams* version_edit_params, std::string* db_id) {
|
||||
assert(reader != nullptr);
|
||||
assert(read_buffer != nullptr);
|
||||
Status s;
|
||||
@ -4261,6 +4249,12 @@ Status VersionSet::ReadAndRecover(
|
||||
if (!s.ok()) {
|
||||
break;
|
||||
}
|
||||
if (edit.has_db_id_) {
|
||||
db_id_ = edit.GetDbId();
|
||||
if (db_id != nullptr) {
|
||||
db_id->assign(edit.GetDbId());
|
||||
}
|
||||
}
|
||||
s = read_buffer->AddEdit(&edit);
|
||||
if (!s.ok()) {
|
||||
break;
|
||||
@ -4270,12 +4264,9 @@ Status VersionSet::ReadAndRecover(
|
||||
// Apply edits in an atomic group when we have read all edits in the
|
||||
// group.
|
||||
for (auto& e : read_buffer->replay_buffer()) {
|
||||
s = ApplyOneVersionEditToBuilder(
|
||||
e, name_to_options, column_families_not_found, builders,
|
||||
have_log_number, log_number, have_prev_log_number,
|
||||
previous_log_number, have_next_file, next_file,
|
||||
have_last_sequence, last_sequence, min_log_number_to_keep,
|
||||
max_column_family);
|
||||
s = ApplyOneVersionEditToBuilder(e, name_to_options,
|
||||
column_families_not_found, builders,
|
||||
version_edit_params);
|
||||
if (!s.ok()) {
|
||||
break;
|
||||
}
|
||||
@ -4288,11 +4279,9 @@ Status VersionSet::ReadAndRecover(
|
||||
}
|
||||
} else {
|
||||
// Apply a normal edit immediately.
|
||||
s = ApplyOneVersionEditToBuilder(
|
||||
edit, name_to_options, column_families_not_found, builders,
|
||||
have_log_number, log_number, have_prev_log_number,
|
||||
previous_log_number, have_next_file, next_file, have_last_sequence,
|
||||
last_sequence, min_log_number_to_keep, max_column_family);
|
||||
s = ApplyOneVersionEditToBuilder(edit, name_to_options,
|
||||
column_families_not_found, builders,
|
||||
version_edit_params);
|
||||
if (s.ok()) {
|
||||
recovered_edits++;
|
||||
}
|
||||
@ -4308,8 +4297,8 @@ Status VersionSet::ReadAndRecover(
|
||||
}
|
||||
|
||||
Status VersionSet::Recover(
|
||||
const std::vector<ColumnFamilyDescriptor>& column_families,
|
||||
bool read_only) {
|
||||
const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
|
||||
std::string* db_id) {
|
||||
std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options;
|
||||
for (auto cf : column_families) {
|
||||
cf_name_to_options.insert({cf.name, cf.options});
|
||||
@ -4348,16 +4337,6 @@ Status VersionSet::Recover(
|
||||
return s;
|
||||
}
|
||||
|
||||
bool have_log_number = false;
|
||||
bool have_prev_log_number = false;
|
||||
bool have_next_file = false;
|
||||
bool have_last_sequence = false;
|
||||
uint64_t next_file = 0;
|
||||
uint64_t last_sequence = 0;
|
||||
uint64_t log_number = 0;
|
||||
uint64_t previous_log_number = 0;
|
||||
uint32_t max_column_family = 0;
|
||||
uint64_t min_log_number_to_keep = 0;
|
||||
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
|
||||
builders;
|
||||
|
||||
@ -4377,7 +4356,7 @@ Status VersionSet::Recover(
|
||||
builders.insert(
|
||||
std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
|
||||
new BaseReferencedVersionBuilder(default_cfd))));
|
||||
|
||||
VersionEditParams version_edit_params;
|
||||
{
|
||||
VersionSet::LogReporter reporter;
|
||||
reporter.status = &s;
|
||||
@ -4386,33 +4365,32 @@ Status VersionSet::Recover(
|
||||
Slice record;
|
||||
std::string scratch;
|
||||
AtomicGroupReadBuffer read_buffer;
|
||||
s = ReadAndRecover(
|
||||
&reader, &read_buffer, cf_name_to_options, column_families_not_found,
|
||||
builders, &have_log_number, &log_number, &have_prev_log_number,
|
||||
&previous_log_number, &have_next_file, &next_file, &have_last_sequence,
|
||||
&last_sequence, &min_log_number_to_keep, &max_column_family);
|
||||
s = ReadAndRecover(&reader, &read_buffer, cf_name_to_options,
|
||||
column_families_not_found, builders,
|
||||
&version_edit_params, db_id);
|
||||
}
|
||||
|
||||
if (s.ok()) {
|
||||
if (!have_next_file) {
|
||||
if (!version_edit_params.has_next_file_number_) {
|
||||
s = Status::Corruption("no meta-nextfile entry in descriptor");
|
||||
} else if (!have_log_number) {
|
||||
} else if (!version_edit_params.has_log_number_) {
|
||||
s = Status::Corruption("no meta-lognumber entry in descriptor");
|
||||
} else if (!have_last_sequence) {
|
||||
} else if (!version_edit_params.has_last_sequence_) {
|
||||
s = Status::Corruption("no last-sequence-number entry in descriptor");
|
||||
}
|
||||
|
||||
if (!have_prev_log_number) {
|
||||
previous_log_number = 0;
|
||||
if (!version_edit_params.has_prev_log_number_) {
|
||||
version_edit_params.SetPrevLogNumber(0);
|
||||
}
|
||||
|
||||
column_family_set_->UpdateMaxColumnFamily(max_column_family);
|
||||
column_family_set_->UpdateMaxColumnFamily(
|
||||
version_edit_params.max_column_family_);
|
||||
|
||||
// When reading DB generated using old release, min_log_number_to_keep=0.
|
||||
// All log files will be scanned for potential prepare entries.
|
||||
MarkMinLogNumberToKeep2PC(min_log_number_to_keep);
|
||||
MarkFileNumberUsed(previous_log_number);
|
||||
MarkFileNumberUsed(log_number);
|
||||
MarkMinLogNumberToKeep2PC(version_edit_params.min_log_number_to_keep_);
|
||||
MarkFileNumberUsed(version_edit_params.prev_log_number_);
|
||||
MarkFileNumberUsed(version_edit_params.log_number_);
|
||||
}
|
||||
|
||||
// there were some column families in the MANIFEST that weren't specified
|
||||
@ -4473,11 +4451,11 @@ Status VersionSet::Recover(
|
||||
}
|
||||
|
||||
manifest_file_size_ = current_manifest_file_size;
|
||||
next_file_number_.store(next_file + 1);
|
||||
last_allocated_sequence_ = last_sequence;
|
||||
last_published_sequence_ = last_sequence;
|
||||
last_sequence_ = last_sequence;
|
||||
prev_log_number_ = previous_log_number;
|
||||
next_file_number_.store(version_edit_params.next_file_number_ + 1);
|
||||
last_allocated_sequence_ = version_edit_params.last_sequence_;
|
||||
last_published_sequence_ = version_edit_params.last_sequence_;
|
||||
last_sequence_ = version_edit_params.last_sequence_;
|
||||
prev_log_number_ = version_edit_params.prev_log_number_;
|
||||
|
||||
ROCKS_LOG_INFO(
|
||||
db_options_->info_log,
|
||||
@ -4487,8 +4465,9 @@ Status VersionSet::Recover(
|
||||
",prev_log_number is %" PRIu64 ",max_column_family is %" PRIu32
|
||||
",min_log_number_to_keep is %" PRIu64 "\n",
|
||||
manifest_path.c_str(), manifest_file_number_, next_file_number_.load(),
|
||||
last_sequence_.load(), log_number, prev_log_number_,
|
||||
column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc());
|
||||
last_sequence_.load(), version_edit_params.log_number_,
|
||||
prev_log_number_, column_family_set_->GetMaxColumnFamily(),
|
||||
min_log_number_to_keep_2pc());
|
||||
|
||||
for (auto cfd : *column_family_set_) {
|
||||
if (cfd->IsDropped()) {
|
||||
@ -4633,7 +4612,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
|
||||
}
|
||||
|
||||
// we need to allocate an array with the old number of levels size to
|
||||
// avoid SIGSEGV in WriteSnapshot()
|
||||
// avoid SIGSEGV in WriteCurrentStatetoManifest()
|
||||
// however, all levels bigger or equal to new_levels will be empty
|
||||
std::vector<FileMetaData*>* new_files_list =
|
||||
new std::vector<FileMetaData*>[current_levels];
|
||||
@ -4873,7 +4852,6 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) {
|
||||
next_file_number_.store(number + 1, std::memory_order_relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
// Called only either from ::LogAndApply which is protected by mutex or during
|
||||
// recovery which is single-threaded.
|
||||
void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
|
||||
@ -4882,7 +4860,7 @@ void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
|
||||
}
|
||||
}
|
||||
|
||||
Status VersionSet::WriteSnapshot(log::Writer* log) {
|
||||
Status VersionSet::WriteCurrentStateToManifest(log::Writer* log) {
|
||||
// TODO: Break up into multiple records to reduce memory usage on recovery?
|
||||
|
||||
// WARNING: This method doesn't hold a mutex!!
|
||||
@ -4890,6 +4868,22 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
|
||||
// This is done without DB mutex lock held, but only within single-threaded
|
||||
// LogAndApply. Column family manipulations can only happen within LogAndApply
|
||||
// (the same single thread), so we're safe to iterate.
|
||||
|
||||
if (db_options_->write_dbid_to_manifest) {
|
||||
VersionEdit edit_for_db_id;
|
||||
assert(!db_id_.empty());
|
||||
edit_for_db_id.SetDBId(db_id_);
|
||||
std::string db_id_record;
|
||||
if (!edit_for_db_id.EncodeTo(&db_id_record)) {
|
||||
return Status::Corruption("Unable to Encode VersionEdit:" +
|
||||
edit_for_db_id.DebugString(true));
|
||||
}
|
||||
Status add_record = log->AddRecord(db_id_record);
|
||||
if (!add_record.ok()) {
|
||||
return add_record;
|
||||
}
|
||||
}
|
||||
|
||||
for (auto cfd : *column_family_set_) {
|
||||
if (cfd->IsDropped()) {
|
||||
continue;
|
||||
@ -4943,7 +4937,6 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -5467,17 +5460,6 @@ Status ReactiveVersionSet::Recover(
|
||||
// In recovery, nobody else can access it, so it's fine to set it to be
|
||||
// initialized earlier.
|
||||
default_cfd->set_initialized();
|
||||
|
||||
bool have_log_number = false;
|
||||
bool have_prev_log_number = false;
|
||||
bool have_next_file = false;
|
||||
bool have_last_sequence = false;
|
||||
uint64_t next_file = 0;
|
||||
uint64_t last_sequence = 0;
|
||||
uint64_t log_number = 0;
|
||||
uint64_t previous_log_number = 0;
|
||||
uint32_t max_column_family = 0;
|
||||
uint64_t min_log_number_to_keep = 0;
|
||||
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
|
||||
builders;
|
||||
std::unordered_map<int, std::string> column_families_not_found;
|
||||
@ -5493,17 +5475,17 @@ Status ReactiveVersionSet::Recover(
|
||||
log::Reader* reader = manifest_reader->get();
|
||||
|
||||
int retry = 0;
|
||||
VersionEdit version_edit;
|
||||
while (s.ok() && retry < 1) {
|
||||
assert(reader != nullptr);
|
||||
Slice record;
|
||||
std::string scratch;
|
||||
s = ReadAndRecover(
|
||||
reader, &read_buffer_, cf_name_to_options, column_families_not_found,
|
||||
builders, &have_log_number, &log_number, &have_prev_log_number,
|
||||
&previous_log_number, &have_next_file, &next_file, &have_last_sequence,
|
||||
&last_sequence, &min_log_number_to_keep, &max_column_family);
|
||||
s = ReadAndRecover(reader, &read_buffer_, cf_name_to_options,
|
||||
column_families_not_found, builders, &version_edit);
|
||||
if (s.ok()) {
|
||||
bool enough = have_next_file && have_log_number && have_last_sequence;
|
||||
bool enough = version_edit.has_next_file_number_ &&
|
||||
version_edit.has_log_number_ &&
|
||||
version_edit.has_last_sequence_;
|
||||
if (enough) {
|
||||
for (const auto& cf : column_families) {
|
||||
auto cfd = column_family_set_->GetColumnFamily(cf.name);
|
||||
@ -5545,14 +5527,14 @@ Status ReactiveVersionSet::Recover(
|
||||
}
|
||||
|
||||
if (s.ok()) {
|
||||
if (!have_prev_log_number) {
|
||||
previous_log_number = 0;
|
||||
if (!version_edit.has_prev_log_number_) {
|
||||
version_edit.prev_log_number_ = 0;
|
||||
}
|
||||
column_family_set_->UpdateMaxColumnFamily(max_column_family);
|
||||
column_family_set_->UpdateMaxColumnFamily(version_edit.max_column_family_);
|
||||
|
||||
MarkMinLogNumberToKeep2PC(min_log_number_to_keep);
|
||||
MarkFileNumberUsed(previous_log_number);
|
||||
MarkFileNumberUsed(log_number);
|
||||
MarkMinLogNumberToKeep2PC(version_edit.min_log_number_to_keep_);
|
||||
MarkFileNumberUsed(version_edit.prev_log_number_);
|
||||
MarkFileNumberUsed(version_edit.log_number_);
|
||||
|
||||
for (auto cfd : *column_family_set_) {
|
||||
assert(builders.count(cfd->GetID()) > 0);
|
||||
@ -5585,11 +5567,11 @@ Status ReactiveVersionSet::Recover(
|
||||
!(db_options_->skip_stats_update_on_db_open));
|
||||
AppendVersion(cfd, v);
|
||||
}
|
||||
next_file_number_.store(next_file + 1);
|
||||
last_allocated_sequence_ = last_sequence;
|
||||
last_published_sequence_ = last_sequence;
|
||||
last_sequence_ = last_sequence;
|
||||
prev_log_number_ = previous_log_number;
|
||||
next_file_number_.store(version_edit.next_file_number_ + 1);
|
||||
last_allocated_sequence_ = version_edit.last_sequence_;
|
||||
last_published_sequence_ = version_edit.last_sequence_;
|
||||
last_sequence_ = version_edit.last_sequence_;
|
||||
prev_log_number_ = version_edit.prev_log_number_;
|
||||
for (auto cfd : *column_family_set_) {
|
||||
if (cfd->IsDropped()) {
|
||||
continue;
|
||||
@ -5611,16 +5593,6 @@ Status ReactiveVersionSet::ReadAndApply(
|
||||
mu->AssertHeld();
|
||||
|
||||
Status s;
|
||||
bool have_log_number = false;
|
||||
bool have_prev_log_number = false;
|
||||
bool have_next_file = false;
|
||||
bool have_last_sequence = false;
|
||||
uint64_t next_file = 0;
|
||||
uint64_t last_sequence = 0;
|
||||
uint64_t log_number = 0;
|
||||
uint64_t previous_log_number = 0;
|
||||
uint32_t max_column_family = 0;
|
||||
uint64_t min_log_number_to_keep = 0;
|
||||
uint64_t applied_edits = 0;
|
||||
while (s.ok()) {
|
||||
Slice record;
|
||||
@ -5635,7 +5607,7 @@ Status ReactiveVersionSet::ReadAndApply(
|
||||
}
|
||||
|
||||
// Skip the first VersionEdits of each MANIFEST generated by
|
||||
// VersionSet::WriteSnapshot.
|
||||
// VersionSet::WriteCurrentStatetoManifest.
|
||||
if (number_of_edits_to_skip_ > 0) {
|
||||
ColumnFamilyData* cfd =
|
||||
column_family_set_->GetColumnFamily(edit.column_family_);
|
||||
@ -5649,16 +5621,13 @@ Status ReactiveVersionSet::ReadAndApply(
|
||||
if (!s.ok()) {
|
||||
break;
|
||||
}
|
||||
VersionEdit temp_edit;
|
||||
if (edit.is_in_atomic_group_) {
|
||||
if (read_buffer_.IsFull()) {
|
||||
// Apply edits in an atomic group when we have read all edits in the
|
||||
// group.
|
||||
for (auto& e : read_buffer_.replay_buffer()) {
|
||||
s = ApplyOneVersionEditToBuilder(
|
||||
e, cfds_changed, &have_log_number, &log_number,
|
||||
&have_prev_log_number, &previous_log_number, &have_next_file,
|
||||
&next_file, &have_last_sequence, &last_sequence,
|
||||
&min_log_number_to_keep, &max_column_family);
|
||||
s = ApplyOneVersionEditToBuilder(e, cfds_changed, &temp_edit);
|
||||
if (!s.ok()) {
|
||||
break;
|
||||
}
|
||||
@ -5671,11 +5640,7 @@ Status ReactiveVersionSet::ReadAndApply(
|
||||
}
|
||||
} else {
|
||||
// Apply a normal edit immediately.
|
||||
s = ApplyOneVersionEditToBuilder(
|
||||
edit, cfds_changed, &have_log_number, &log_number,
|
||||
&have_prev_log_number, &previous_log_number, &have_next_file,
|
||||
&next_file, &have_last_sequence, &last_sequence,
|
||||
&min_log_number_to_keep, &max_column_family);
|
||||
s = ApplyOneVersionEditToBuilder(edit, cfds_changed, &temp_edit);
|
||||
if (s.ok()) {
|
||||
applied_edits++;
|
||||
}
|
||||
@ -5700,28 +5665,32 @@ Status ReactiveVersionSet::ReadAndApply(
|
||||
break;
|
||||
} else {
|
||||
// We have switched to a new MANIFEST whose first records have been
|
||||
// generated by VersionSet::WriteSnapshot. Since the secondary instance
|
||||
// has already finished recovering upon start, there is no need for the
|
||||
// secondary to process these records. Actually, if the secondary were
|
||||
// to replay these records, the secondary may end up adding the same
|
||||
// SST files AGAIN to each column family, causing consistency checks
|
||||
// done by VersionBuilder to fail. Therefore, we record the number of
|
||||
// records to skip at the beginning of the new MANIFEST and ignore
|
||||
// them.
|
||||
// generated by VersionSet::WriteCurrentStatetoManifest. Since the
|
||||
// secondary instance has already finished recovering upon start, there
|
||||
// is no need for the secondary to process these records. Actually, if
|
||||
// the secondary were to replay these records, the secondary may end up
|
||||
// adding the same SST files AGAIN to each column family, causing
|
||||
// consistency checks done by VersionBuilder to fail. Therefore, we
|
||||
// record the number of records to skip at the beginning of the new
|
||||
// MANIFEST and ignore them.
|
||||
number_of_edits_to_skip_ = 0;
|
||||
for (auto* cfd : *column_family_set_) {
|
||||
if (cfd->IsDropped()) {
|
||||
continue;
|
||||
}
|
||||
// Increase number_of_edits_to_skip by 2 because WriteSnapshot()
|
||||
// writes 2 version edits for each column family at the beginning of
|
||||
// the newly-generated MANIFEST.
|
||||
// Increase number_of_edits_to_skip by 2 because
|
||||
// WriteCurrentStatetoManifest() writes 2 version edits for each
|
||||
// column family at the beginning of the newly-generated MANIFEST.
|
||||
// TODO(yanqin) remove hard-coded value.
|
||||
if (db_options_->write_dbid_to_manifest) {
|
||||
number_of_edits_to_skip_ += 3;
|
||||
} else {
|
||||
number_of_edits_to_skip_ += 2;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (s.ok()) {
|
||||
for (auto cfd : *column_family_set_) {
|
||||
@ -5744,10 +5713,7 @@ Status ReactiveVersionSet::ReadAndApply(
|
||||
|
||||
Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
|
||||
VersionEdit& edit, std::unordered_set<ColumnFamilyData*>* cfds_changed,
|
||||
bool* have_log_number, uint64_t* log_number, bool* have_prev_log_number,
|
||||
uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file,
|
||||
bool* have_last_sequence, SequenceNumber* last_sequence,
|
||||
uint64_t* min_log_number_to_keep, uint32_t* max_column_family) {
|
||||
VersionEdit* version_edit) {
|
||||
ColumnFamilyData* cfd =
|
||||
column_family_set_->GetColumnFamily(edit.column_family_);
|
||||
|
||||
@ -5794,10 +5760,7 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
|
||||
return s;
|
||||
}
|
||||
}
|
||||
Status s = ExtractInfoFromVersionEdit(
|
||||
cfd, edit, have_log_number, log_number, have_prev_log_number,
|
||||
previous_log_number, have_next_file, next_file, have_last_sequence,
|
||||
last_sequence, min_log_number_to_keep, max_column_family);
|
||||
Status s = ExtractInfoFromVersionEdit(cfd, edit, version_edit);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
@ -5830,23 +5793,23 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
|
||||
// Some other error has occurred during LoadTableHandlers.
|
||||
}
|
||||
|
||||
if (have_next_file) {
|
||||
next_file_number_.store(*next_file + 1);
|
||||
if (version_edit->has_next_file_number()) {
|
||||
next_file_number_.store(version_edit->next_file_number_ + 1);
|
||||
}
|
||||
if (have_last_sequence) {
|
||||
last_allocated_sequence_ = *last_sequence;
|
||||
last_published_sequence_ = *last_sequence;
|
||||
last_sequence_ = *last_sequence;
|
||||
if (version_edit->has_last_sequence_) {
|
||||
last_allocated_sequence_ = version_edit->last_sequence_;
|
||||
last_published_sequence_ = version_edit->last_sequence_;
|
||||
last_sequence_ = version_edit->last_sequence_;
|
||||
}
|
||||
if (have_prev_log_number) {
|
||||
prev_log_number_ = *previous_log_number;
|
||||
MarkFileNumberUsed(*previous_log_number);
|
||||
if (version_edit->has_prev_log_number_) {
|
||||
prev_log_number_ = version_edit->prev_log_number_;
|
||||
MarkFileNumberUsed(version_edit->prev_log_number_);
|
||||
}
|
||||
if (have_log_number) {
|
||||
MarkFileNumberUsed(*log_number);
|
||||
if (version_edit->has_log_number_) {
|
||||
MarkFileNumberUsed(version_edit->log_number_);
|
||||
}
|
||||
column_family_set_->UpdateMaxColumnFamily(*max_column_family);
|
||||
MarkMinLogNumberToKeep2PC(*min_log_number_to_keep);
|
||||
column_family_set_->UpdateMaxColumnFamily(version_edit->max_column_family_);
|
||||
MarkMinLogNumberToKeep2PC(version_edit->min_log_number_to_keep_);
|
||||
return s;
|
||||
}
|
||||
|
||||
@ -5890,8 +5853,8 @@ Status ReactiveVersionSet::MaybeSwitchManifest(
|
||||
// TODO (yanqin) every time we switch to a new MANIFEST, we clear the
|
||||
// active_version_builders_ map because we choose to construct the
|
||||
// versions from scratch, thanks to the first part of each MANIFEST
|
||||
// written by VersionSet::WriteSnapshot. This is not necessary, but we
|
||||
// choose this at present for the sake of simplicity.
|
||||
// written by VersionSet::WriteCurrentStatetoManifest. This is not
|
||||
// necessary, but we choose this at present for the sake of simplicity.
|
||||
active_version_builders_.clear();
|
||||
}
|
||||
} while (s.IsPathNotFound());
|
||||
|
@ -65,6 +65,14 @@ class MergeContext;
|
||||
class ColumnFamilySet;
|
||||
class MergeIteratorBuilder;
|
||||
|
||||
// VersionEdit is always supposed to be valid and it is used to point at
|
||||
// entries in Manifest. Ideally it should not be used as a container to
|
||||
// carry around few of its fields as function params because it can cause
|
||||
// readers to think it's a valid entry from Manifest. To avoid that confusion
|
||||
// introducing VersionEditParams to simply carry around multiple VersionEdit
|
||||
// params. It need not point to a valid record in Manifest.
|
||||
using VersionEditParams = VersionEdit;
|
||||
|
||||
// Return the smallest index i such that file_level.files[i]->largest >= key.
|
||||
// Return file_level.num_files if there is no such file.
|
||||
// REQUIRES: "file_level.files" contains a sorted list of
|
||||
@ -851,7 +859,7 @@ class VersionSet {
|
||||
// If read_only == true, Recover() will not complain if some column families
|
||||
// are not opened
|
||||
Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
|
||||
bool read_only = false);
|
||||
bool read_only = false, std::string* db_id = nullptr);
|
||||
|
||||
// Reads a manifest file and returns a list of column families in
|
||||
// column_families.
|
||||
@ -1054,7 +1062,7 @@ class VersionSet {
|
||||
TableReaderCaller caller);
|
||||
|
||||
// Save current contents to *log
|
||||
Status WriteSnapshot(log::Writer* log);
|
||||
Status WriteCurrentStateToManifest(log::Writer* log);
|
||||
|
||||
void AppendVersion(ColumnFamilyData* column_family_data, Version* v);
|
||||
|
||||
@ -1068,10 +1076,7 @@ class VersionSet {
|
||||
std::unordered_map<int, std::string>& column_families_not_found,
|
||||
std::unordered_map<
|
||||
uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>& builders,
|
||||
bool* have_log_number, uint64_t* log_number, bool* have_prev_log_number,
|
||||
uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file,
|
||||
bool* have_last_sequence, SequenceNumber* last_sequence,
|
||||
uint64_t* min_log_number_to_keep, uint32_t* max_column_family);
|
||||
VersionEditParams* version_edit, std::string* db_id = nullptr);
|
||||
|
||||
// REQUIRES db mutex
|
||||
Status ApplyOneVersionEditToBuilder(
|
||||
@ -1080,22 +1085,17 @@ class VersionSet {
|
||||
std::unordered_map<int, std::string>& column_families_not_found,
|
||||
std::unordered_map<
|
||||
uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>& builders,
|
||||
bool* have_log_number, uint64_t* log_number, bool* have_prev_log_number,
|
||||
uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file,
|
||||
bool* have_last_sequence, SequenceNumber* last_sequence,
|
||||
uint64_t* min_log_number_to_keep, uint32_t* max_column_family);
|
||||
VersionEditParams* version_edit);
|
||||
|
||||
Status ExtractInfoFromVersionEdit(
|
||||
ColumnFamilyData* cfd, const VersionEdit& edit, bool* have_log_number,
|
||||
uint64_t* log_number, bool* have_prev_log_number,
|
||||
uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file,
|
||||
bool* have_last_sequence, SequenceNumber* last_sequence,
|
||||
uint64_t* min_log_number_to_keep, uint32_t* max_column_family);
|
||||
Status ExtractInfoFromVersionEdit(ColumnFamilyData* cfd,
|
||||
const VersionEdit& from_edit,
|
||||
VersionEditParams* version_edit_params);
|
||||
|
||||
std::unique_ptr<ColumnFamilySet> column_family_set_;
|
||||
|
||||
Env* const env_;
|
||||
const std::string dbname_;
|
||||
std::string db_id_;
|
||||
const ImmutableDBOptions* const db_options_;
|
||||
std::atomic<uint64_t> next_file_number_;
|
||||
// Any log number equal or lower than this should be ignored during recovery,
|
||||
@ -1195,10 +1195,7 @@ class ReactiveVersionSet : public VersionSet {
|
||||
// REQUIRES db mutex
|
||||
Status ApplyOneVersionEditToBuilder(
|
||||
VersionEdit& edit, std::unordered_set<ColumnFamilyData*>* cfds_changed,
|
||||
bool* have_log_number, uint64_t* log_number, bool* have_prev_log_number,
|
||||
uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file,
|
||||
bool* have_last_sequence, SequenceNumber* last_sequence,
|
||||
uint64_t* min_log_number_to_keep, uint32_t* max_column_family);
|
||||
VersionEdit* version_edit);
|
||||
|
||||
Status MaybeSwitchManifest(
|
||||
log::Reader::Reporter* reporter,
|
||||
|
@ -8,6 +8,7 @@
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#include "db/version_set.h"
|
||||
#include "db/db_impl/db_impl.h"
|
||||
#include "db/log_writer.h"
|
||||
#include "logging/logging.h"
|
||||
#include "table/mock_table.h"
|
||||
@ -637,6 +638,12 @@ class VersionSetTestBase {
|
||||
assert(last_seqno != nullptr);
|
||||
assert(log_writer != nullptr);
|
||||
VersionEdit new_db;
|
||||
if (db_options_.write_dbid_to_manifest) {
|
||||
DBImpl* impl = new DBImpl(DBOptions(), dbname_);
|
||||
std::string db_id;
|
||||
impl->GetDbIdentityFromIdentityFile(&db_id);
|
||||
new_db.SetDBId(db_id);
|
||||
}
|
||||
new_db.SetLogNumber(0);
|
||||
new_db.SetNextFile(2);
|
||||
new_db.SetLastSequence(0);
|
||||
@ -691,7 +698,7 @@ class VersionSetTestBase {
|
||||
std::vector<ColumnFamilyDescriptor> column_families;
|
||||
SequenceNumber last_seqno;
|
||||
std::unique_ptr<log::Writer> log_writer;
|
||||
|
||||
SetIdentityFile(env_, dbname_);
|
||||
PrepareManifest(&column_families, &last_seqno, &log_writer);
|
||||
log_writer.reset();
|
||||
// Make "CURRENT" file point to the new manifest file.
|
||||
|
@ -393,8 +393,14 @@ Status SetCurrentFile(Env* env, const std::string& dbname,
|
||||
return s;
|
||||
}
|
||||
|
||||
Status SetIdentityFile(Env* env, const std::string& dbname) {
|
||||
std::string id = env->GenerateUniqueId();
|
||||
Status SetIdentityFile(Env* env, const std::string& dbname,
|
||||
const std::string& db_id) {
|
||||
std::string id;
|
||||
if (db_id.empty()) {
|
||||
id = env->GenerateUniqueId();
|
||||
} else {
|
||||
id = db_id;
|
||||
}
|
||||
assert(!id.empty());
|
||||
// Reserve the filename dbname/000000.dbtmp for the temporary identity file
|
||||
std::string tmp = TempFileName(dbname, 0);
|
||||
|
@ -167,7 +167,8 @@ extern Status SetCurrentFile(Env* env, const std::string& dbname,
|
||||
Directory* directory_to_fsync);
|
||||
|
||||
// Make the IDENTITY file for the db
|
||||
extern Status SetIdentityFile(Env* env, const std::string& dbname);
|
||||
extern Status SetIdentityFile(Env* env, const std::string& dbname,
|
||||
const std::string& db_id = {});
|
||||
|
||||
// Sync manifest file `file`.
|
||||
extern Status SyncManifest(Env* env, const ImmutableDBOptions* db_options,
|
||||
|
@ -1090,6 +1090,17 @@ struct DBOptions {
|
||||
// ReadOptions::background_purge_on_iterator_cleanup.
|
||||
bool avoid_unnecessary_blocking_io = false;
|
||||
|
||||
// Historically DB ID has always been stored in Identity File in DB folder.
|
||||
// If this flag is true, the DB ID is written to Manifest file in addition
|
||||
// to the Identity file. By doing this 2 problems are solved
|
||||
// 1. We don't checksum the Identity file where as Manifest file is.
|
||||
// 2. Since the source of truth for DB is Manifest file DB ID will sit with
|
||||
// the source of truth. Previously the Identity file could be copied
|
||||
// independent of Manifest and that can result in wrong DB ID.
|
||||
// We recommend setting this flag to true.
|
||||
// Default: false
|
||||
bool write_dbid_to_manifest = false;
|
||||
|
||||
// The number of bytes to prefetch when reading the log. This is mostly useful
|
||||
// for reading a remotely located log, as it can save the number of
|
||||
// round-trips. If 0, then the prefetching is disabled.
|
||||
|
@ -86,6 +86,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
|
||||
atomic_flush(options.atomic_flush),
|
||||
avoid_unnecessary_blocking_io(options.avoid_unnecessary_blocking_io),
|
||||
persist_stats_to_disk(options.persist_stats_to_disk),
|
||||
write_dbid_to_manifest(options.write_dbid_to_manifest),
|
||||
log_readahead_size(options.log_readahead_size) {
|
||||
}
|
||||
|
||||
@ -226,6 +227,8 @@ void ImmutableDBOptions::Dump(Logger* log) const {
|
||||
avoid_unnecessary_blocking_io);
|
||||
ROCKS_LOG_HEADER(log, " Options.persist_stats_to_disk: %u",
|
||||
persist_stats_to_disk);
|
||||
ROCKS_LOG_HEADER(log, " Options.write_dbid_to_manifest: %d",
|
||||
write_dbid_to_manifest);
|
||||
ROCKS_LOG_HEADER(
|
||||
log, " Options.log_readahead_size: %" ROCKSDB_PRIszt,
|
||||
log_readahead_size);
|
||||
|
@ -82,6 +82,7 @@ struct ImmutableDBOptions {
|
||||
bool atomic_flush;
|
||||
bool avoid_unnecessary_blocking_io;
|
||||
bool persist_stats_to_disk;
|
||||
bool write_dbid_to_manifest;
|
||||
size_t log_readahead_size;
|
||||
};
|
||||
|
||||
|
@ -1664,6 +1664,9 @@ std::unordered_map<std::string, OptionTypeInfo>
|
||||
{offsetof(struct DBOptions, avoid_unnecessary_blocking_io),
|
||||
OptionType::kBoolean, OptionVerificationType::kNormal, false,
|
||||
offsetof(struct ImmutableDBOptions, avoid_unnecessary_blocking_io)}},
|
||||
{"write_dbid_to_manifest",
|
||||
{offsetof(struct DBOptions, write_dbid_to_manifest),
|
||||
OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}},
|
||||
{"log_readahead_size",
|
||||
{offsetof(struct DBOptions, log_readahead_size), OptionType::kSizeT,
|
||||
OptionVerificationType::kNormal, false, 0}},
|
||||
|
@ -296,7 +296,8 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) {
|
||||
"seq_per_batch=false;"
|
||||
"atomic_flush=false;"
|
||||
"avoid_unnecessary_blocking_io=false;"
|
||||
"log_readahead_size=0",
|
||||
"log_readahead_size=0;"
|
||||
"write_dbid_to_manifest=false",
|
||||
new_options));
|
||||
|
||||
ASSERT_EQ(unset_bytes_base, NumUnsetBytes(new_options_ptr, sizeof(DBOptions),
|
||||
|
Loading…
Reference in New Issue
Block a user