New Manifest format to allow customized fields in NewFile.
Summary: With this commit, we add a new format in manifest when adding a new file. Now path ID and need-compaction hint are first two customized fields. Test Plan: Add a test case in version_edit_test to verify the encoding and decoding logic. Add a unit test in db_test to verify need compaction is persistent after DB restarting. Reviewers: kradhakrishnan, anthony, IslamAbdelRahman, yhchiang, rven, igor Reviewed By: igor Subscribers: javigon, leveldb, dhruba Differential Revision: https://reviews.facebook.net/D48123
This commit is contained in:
parent
6732a5765d
commit
b77eb16aba
@ -3,6 +3,7 @@
|
||||
## Unreleased
|
||||
### Public API Changes
|
||||
* CompactionFilter::Context includes information of Column Family ID
|
||||
* The need-compaction hint given by TablePropertiesCollector::NeedCompact() will be persistent and recoverable after DB recovery. This introduces a breaking format change. If you use this experimental feature, including NewCompactOnDeletionCollectorFactory() in the new version, you may not be able to directly downgrade the DB back to version 4.0 or lower.
|
||||
|
||||
## 4.1.0 (10/8/2015)
|
||||
### New Features
|
||||
|
@ -8306,8 +8306,8 @@ TEST_F(DBTest, TablePropertiesNeedCompactTest) {
|
||||
options.soft_rate_limit = 1.1;
|
||||
options.num_levels = 8;
|
||||
|
||||
std::shared_ptr<TablePropertiesCollectorFactory> collector_factory(
|
||||
new CountingDeleteTabPropCollectorFactory);
|
||||
std::shared_ptr<TablePropertiesCollectorFactory> collector_factory =
|
||||
std::make_shared<CountingDeleteTabPropCollectorFactory>();
|
||||
options.table_properties_collector_factories.resize(1);
|
||||
options.table_properties_collector_factories[0] = collector_factory;
|
||||
|
||||
@ -8366,6 +8366,61 @@ TEST_F(DBTest, TablePropertiesNeedCompactTest) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBTest, NeedCompactHintPersistentTest) {
|
||||
Random rnd(301);
|
||||
|
||||
Options options;
|
||||
options.create_if_missing = true;
|
||||
options.max_write_buffer_number = 8;
|
||||
options.level0_file_num_compaction_trigger = 10;
|
||||
options.level0_slowdown_writes_trigger = 10;
|
||||
options.level0_stop_writes_trigger = 10;
|
||||
options.disable_auto_compactions = true;
|
||||
|
||||
std::shared_ptr<TablePropertiesCollectorFactory> collector_factory =
|
||||
std::make_shared<CountingDeleteTabPropCollectorFactory>();
|
||||
options.table_properties_collector_factories.resize(1);
|
||||
options.table_properties_collector_factories[0] = collector_factory;
|
||||
|
||||
DestroyAndReopen(options);
|
||||
|
||||
const int kMaxKey = 100;
|
||||
for (int i = 0; i < kMaxKey; i++) {
|
||||
ASSERT_OK(Put(Key(i), ""));
|
||||
}
|
||||
Flush();
|
||||
dbfull()->TEST_WaitForFlushMemTable();
|
||||
|
||||
for (int i = 1; i < kMaxKey - 1; i++) {
|
||||
Delete(Key(i));
|
||||
}
|
||||
Flush();
|
||||
dbfull()->TEST_WaitForFlushMemTable();
|
||||
ASSERT_EQ(NumTableFilesAtLevel(0), 2);
|
||||
|
||||
// Restart the DB. Although number of files didn't reach
|
||||
// options.level0_file_num_compaction_trigger, compaction should
|
||||
// still be triggered because of the need-compaction hint.
|
||||
options.disable_auto_compactions = false;
|
||||
Reopen(options);
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
|
||||
{
|
||||
SetPerfLevel(kEnableCount);
|
||||
perf_context.Reset();
|
||||
int c = 0;
|
||||
std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
|
||||
for (iter->Seek(Key(0)); iter->Valid(); iter->Next()) {
|
||||
c++;
|
||||
}
|
||||
ASSERT_EQ(c, 2);
|
||||
ASSERT_EQ(perf_context.internal_delete_skipped_count, 0);
|
||||
// We iterate every key twice. Is it a bug?
|
||||
ASSERT_LE(perf_context.internal_key_skipped_count, 2);
|
||||
SetPerfLevel(kDisable);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBTest, SuggestCompactRangeTest) {
|
||||
class CompactionFilterFactoryGetContext : public CompactionFilterFactory {
|
||||
public:
|
||||
|
@ -12,6 +12,7 @@
|
||||
#include "db/version_set.h"
|
||||
#include "util/coding.h"
|
||||
#include "util/event_logger.h"
|
||||
#include "util/sync_point.h"
|
||||
#include "rocksdb/slice.h"
|
||||
|
||||
namespace rocksdb {
|
||||
@ -32,12 +33,22 @@ enum Tag {
|
||||
// these are new formats divergent from open source leveldb
|
||||
kNewFile2 = 100,
|
||||
kNewFile3 = 102,
|
||||
kNewFile4 = 103, // 4th (the latest) format version of adding files
|
||||
kColumnFamily = 200, // specify column family for version edit
|
||||
kColumnFamilyAdd = 201,
|
||||
kColumnFamilyDrop = 202,
|
||||
kMaxColumnFamily = 203,
|
||||
};
|
||||
|
||||
enum CustomTag {
|
||||
kTerminate = 1, // The end of customized fields
|
||||
kNeedCompaction = 2,
|
||||
kPathId = 65,
|
||||
};
|
||||
// If this bit for the custom tag is set, opening DB should fail if
|
||||
// we don't know this field.
|
||||
uint32_t kCustomTagNonSafeIgnoreMask = 1 << 6;
|
||||
|
||||
uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id) {
|
||||
assert(number <= kFileNumberMask);
|
||||
return number | (path_id * (kFileNumberMask + 1));
|
||||
@ -102,7 +113,11 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
|
||||
if (!f.smallest.Valid() || !f.largest.Valid()) {
|
||||
return false;
|
||||
}
|
||||
if (f.fd.GetPathId() == 0) {
|
||||
bool has_customized_fields = false;
|
||||
if (f.marked_for_compaction) {
|
||||
PutVarint32(dst, kNewFile4);
|
||||
has_customized_fields = true;
|
||||
} else if (f.fd.GetPathId() == 0) {
|
||||
// Use older format to make sure user can roll back the build if they
|
||||
// don't config multiple DB paths.
|
||||
PutVarint32(dst, kNewFile2);
|
||||
@ -111,7 +126,8 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
|
||||
}
|
||||
PutVarint32(dst, new_files_[i].first); // level
|
||||
PutVarint64(dst, f.fd.GetNumber());
|
||||
if (f.fd.GetPathId() != 0) {
|
||||
if (f.fd.GetPathId() != 0 && !has_customized_fields) {
|
||||
// kNewFile3
|
||||
PutVarint32(dst, f.fd.GetPathId());
|
||||
}
|
||||
PutVarint64(dst, f.fd.GetFileSize());
|
||||
@ -119,6 +135,48 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
|
||||
PutLengthPrefixedSlice(dst, f.largest.Encode());
|
||||
PutVarint64(dst, f.smallest_seqno);
|
||||
PutVarint64(dst, f.largest_seqno);
|
||||
if (has_customized_fields) {
|
||||
// Customized fields' format:
|
||||
// +-----------------------------+
|
||||
// | 1st field's tag (varint32) |
|
||||
// +-----------------------------+
|
||||
// | 1st field's size (varint32) |
|
||||
// +-----------------------------+
|
||||
// | bytes for 1st field |
|
||||
// | (based on size decoded) |
|
||||
// +-----------------------------+
|
||||
// | |
|
||||
// | ...... |
|
||||
// | |
|
||||
// +-----------------------------+
|
||||
// | last field's size (varint32)|
|
||||
// +-----------------------------+
|
||||
// | bytes for last field |
|
||||
// | (based on size decoded) |
|
||||
// +-----------------------------+
|
||||
// | terminating tag (varint32) |
|
||||
// +-----------------------------+
|
||||
//
|
||||
// Customized encoding for fields:
|
||||
// tag kPathId: 1 byte as path_id
|
||||
// tag kNeedCompaction:
|
||||
// now only can take one char value 1 indicating need-compaction
|
||||
//
|
||||
if (f.fd.GetPathId() != 0) {
|
||||
PutVarint32(dst, CustomTag::kPathId);
|
||||
char p = static_cast<char>(f.fd.GetPathId());
|
||||
PutLengthPrefixedSlice(dst, Slice(&p, 1));
|
||||
}
|
||||
if (f.marked_for_compaction) {
|
||||
PutVarint32(dst, CustomTag::kNeedCompaction);
|
||||
char p = static_cast<char>(1);
|
||||
PutLengthPrefixedSlice(dst, Slice(&p, 1));
|
||||
}
|
||||
TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:NewFile4:CustomizeFields",
|
||||
dst);
|
||||
|
||||
PutVarint32(dst, CustomTag::kTerminate);
|
||||
}
|
||||
}
|
||||
|
||||
// 0 is default and does not need to be explicitly written
|
||||
@ -161,6 +219,63 @@ bool VersionEdit::GetLevel(Slice* input, int* level, const char** msg) {
|
||||
}
|
||||
}
|
||||
|
||||
const char* VersionEdit::DecodeNewFile4From(Slice* input) {
|
||||
const char* msg = nullptr;
|
||||
int level;
|
||||
FileMetaData f;
|
||||
uint64_t number;
|
||||
uint32_t path_id = 0;
|
||||
uint64_t file_size;
|
||||
if (GetLevel(input, &level, &msg) && GetVarint64(input, &number) &&
|
||||
GetVarint64(input, &file_size) && GetInternalKey(input, &f.smallest) &&
|
||||
GetInternalKey(input, &f.largest) &&
|
||||
GetVarint64(input, &f.smallest_seqno) &&
|
||||
GetVarint64(input, &f.largest_seqno)) {
|
||||
// See comments in VersionEdit::EncodeTo() for format of customized fields
|
||||
while (true) {
|
||||
uint32_t custom_tag;
|
||||
Slice field;
|
||||
if (!GetVarint32(input, &custom_tag)) {
|
||||
return "new-file4 custom field";
|
||||
}
|
||||
if (custom_tag == kTerminate) {
|
||||
break;
|
||||
}
|
||||
if (!GetLengthPrefixedSlice(input, &field)) {
|
||||
return "new-file4 custom field lenth prefixed slice error";
|
||||
}
|
||||
switch (custom_tag) {
|
||||
case kPathId:
|
||||
if (field.size() != 1) {
|
||||
return "path_id field wrong size";
|
||||
}
|
||||
path_id = field[0];
|
||||
if (path_id > 3) {
|
||||
return "path_id wrong vaue";
|
||||
}
|
||||
break;
|
||||
case kNeedCompaction:
|
||||
if (field.size() != 1) {
|
||||
return "need_compaction field wrong size";
|
||||
}
|
||||
f.marked_for_compaction = (field[0] == 1);
|
||||
break;
|
||||
default:
|
||||
if ((custom_tag & kCustomTagNonSafeIgnoreMask) != 0) {
|
||||
// Should not proceed if cannot understand it
|
||||
return "new-file4 custom field not supported";
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return "new-file4 entry";
|
||||
}
|
||||
f.fd = FileDescriptor(number, path_id, file_size);
|
||||
new_files_.push_back(std::make_pair(level, f));
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
Status VersionEdit::DecodeFrom(const Slice& src) {
|
||||
Clear();
|
||||
Slice input = src;
|
||||
@ -304,6 +419,11 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
|
||||
break;
|
||||
}
|
||||
|
||||
case kNewFile4: {
|
||||
msg = DecodeNewFile4From(&input);
|
||||
break;
|
||||
}
|
||||
|
||||
case kColumnFamily:
|
||||
if (!GetVarint32(&input, &column_family_)) {
|
||||
if (!msg) {
|
||||
|
@ -237,6 +237,8 @@ class VersionEdit {
|
||||
bool EncodeTo(std::string* dst) const;
|
||||
Status DecodeFrom(const Slice& src);
|
||||
|
||||
const char* DecodeNewFile4From(Slice* input);
|
||||
|
||||
typedef std::set<std::pair<int, uint64_t>> DeletedFileSet;
|
||||
|
||||
const DeletedFileSet& GetDeletedFiles() { return deleted_files_; }
|
||||
|
@ -8,6 +8,7 @@
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#include "db/version_edit.h"
|
||||
#include "util/sync_point.h"
|
||||
#include "util/testharness.h"
|
||||
|
||||
namespace rocksdb {
|
||||
@ -45,6 +46,121 @@ TEST_F(VersionEditTest, EncodeDecode) {
|
||||
TestEncodeDecode(edit);
|
||||
}
|
||||
|
||||
TEST_F(VersionEditTest, EncodeDecodeNewFile4) {
|
||||
static const uint64_t kBig = 1ull << 50;
|
||||
|
||||
VersionEdit edit;
|
||||
edit.AddFile(3, 300, 3, 100, InternalKey("foo", kBig + 500, kTypeValue),
|
||||
InternalKey("zoo", kBig + 600, kTypeDeletion), kBig + 500,
|
||||
kBig + 600, true);
|
||||
edit.AddFile(4, 301, 3, 100, InternalKey("foo", kBig + 501, kTypeValue),
|
||||
InternalKey("zoo", kBig + 601, kTypeDeletion), kBig + 501,
|
||||
kBig + 601, false);
|
||||
edit.AddFile(5, 302, 0, 100, InternalKey("foo", kBig + 502, kTypeValue),
|
||||
InternalKey("zoo", kBig + 602, kTypeDeletion), kBig + 502,
|
||||
kBig + 602, true);
|
||||
|
||||
edit.DeleteFile(4, 700);
|
||||
|
||||
edit.SetComparatorName("foo");
|
||||
edit.SetLogNumber(kBig + 100);
|
||||
edit.SetNextFile(kBig + 200);
|
||||
edit.SetLastSequence(kBig + 1000);
|
||||
TestEncodeDecode(edit);
|
||||
|
||||
std::string encoded, encoded2;
|
||||
edit.EncodeTo(&encoded);
|
||||
VersionEdit parsed;
|
||||
Status s = parsed.DecodeFrom(encoded);
|
||||
ASSERT_TRUE(s.ok()) << s.ToString();
|
||||
auto& new_files = parsed.GetNewFiles();
|
||||
ASSERT_TRUE(new_files[0].second.marked_for_compaction);
|
||||
ASSERT_TRUE(!new_files[1].second.marked_for_compaction);
|
||||
ASSERT_TRUE(new_files[2].second.marked_for_compaction);
|
||||
ASSERT_EQ(3, new_files[0].second.fd.GetPathId());
|
||||
ASSERT_EQ(3, new_files[1].second.fd.GetPathId());
|
||||
ASSERT_EQ(0, new_files[2].second.fd.GetPathId());
|
||||
}
|
||||
|
||||
TEST_F(VersionEditTest, ForwardCompatibleNewFile4) {
|
||||
static const uint64_t kBig = 1ull << 50;
|
||||
VersionEdit edit;
|
||||
edit.AddFile(3, 300, 3, 100, InternalKey("foo", kBig + 500, kTypeValue),
|
||||
InternalKey("zoo", kBig + 600, kTypeDeletion), kBig + 500,
|
||||
kBig + 600, true);
|
||||
edit.AddFile(4, 301, 3, 100, InternalKey("foo", kBig + 501, kTypeValue),
|
||||
InternalKey("zoo", kBig + 601, kTypeDeletion), kBig + 501,
|
||||
kBig + 601, false);
|
||||
edit.DeleteFile(4, 700);
|
||||
|
||||
edit.SetComparatorName("foo");
|
||||
edit.SetLogNumber(kBig + 100);
|
||||
edit.SetNextFile(kBig + 200);
|
||||
edit.SetLastSequence(kBig + 1000);
|
||||
|
||||
std::string encoded;
|
||||
|
||||
// Call back function to add extra customized builds.
|
||||
bool first = true;
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"VersionEdit::EncodeTo:NewFile4:CustomizeFields", [&](void* arg) {
|
||||
std::string* str = reinterpret_cast<std::string*>(arg);
|
||||
PutVarint32(str, 33);
|
||||
const std::string str1 = "random_string";
|
||||
PutLengthPrefixedSlice(str, str1);
|
||||
if (first) {
|
||||
first = false;
|
||||
PutVarint32(str, 22);
|
||||
const std::string str2 = "s";
|
||||
PutLengthPrefixedSlice(str, str2);
|
||||
}
|
||||
});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
edit.EncodeTo(&encoded);
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
|
||||
VersionEdit parsed;
|
||||
Status s = parsed.DecodeFrom(encoded);
|
||||
ASSERT_TRUE(s.ok()) << s.ToString();
|
||||
ASSERT_TRUE(!first);
|
||||
auto& new_files = parsed.GetNewFiles();
|
||||
ASSERT_TRUE(new_files[0].second.marked_for_compaction);
|
||||
ASSERT_TRUE(!new_files[1].second.marked_for_compaction);
|
||||
ASSERT_EQ(3, new_files[0].second.fd.GetPathId());
|
||||
ASSERT_EQ(3, new_files[1].second.fd.GetPathId());
|
||||
ASSERT_EQ(1u, parsed.GetDeletedFiles().size());
|
||||
}
|
||||
|
||||
TEST_F(VersionEditTest, NewFile4NotSupportedField) {
|
||||
static const uint64_t kBig = 1ull << 50;
|
||||
VersionEdit edit;
|
||||
edit.AddFile(3, 300, 3, 100, InternalKey("foo", kBig + 500, kTypeValue),
|
||||
InternalKey("zoo", kBig + 600, kTypeDeletion), kBig + 500,
|
||||
kBig + 600, true);
|
||||
|
||||
edit.SetComparatorName("foo");
|
||||
edit.SetLogNumber(kBig + 100);
|
||||
edit.SetNextFile(kBig + 200);
|
||||
edit.SetLastSequence(kBig + 1000);
|
||||
|
||||
std::string encoded;
|
||||
|
||||
// Call back function to add extra customized builds.
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"VersionEdit::EncodeTo:NewFile4:CustomizeFields", [&](void* arg) {
|
||||
std::string* str = reinterpret_cast<std::string*>(arg);
|
||||
const std::string str1 = "s";
|
||||
PutLengthPrefixedSlice(str, str1);
|
||||
});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
edit.EncodeTo(&encoded);
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
|
||||
VersionEdit parsed;
|
||||
Status s = parsed.DecodeFrom(encoded);
|
||||
ASSERT_NOK(s);
|
||||
}
|
||||
|
||||
TEST_F(VersionEditTest, EncodeEmptyFile) {
|
||||
VersionEdit edit;
|
||||
edit.AddFile(0, 0, 0, 0, InternalKey(), InternalKey(), 0, 0, false);
|
||||
|
Loading…
Reference in New Issue
Block a user