diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 380b79f73..f1fd2d018 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -625,6 +625,7 @@ Status CompactionJob::WriteKeyValue(const Slice& key, const Slice& value, return status; } } + assert(compact_->builder != nullptr); SequenceNumber seqno = GetInternalKeySeqno(newkey); if (compact_->builder->NumEntries() == 0) { diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index 781c0dd7b..185b3bff0 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -3,25 +3,29 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. +#include #include #include +#include #include "db/compaction_job.h" #include "db/column_family.h" #include "db/version_set.h" #include "db/writebuffer.h" #include "rocksdb/cache.h" -#include "rocksdb/options.h" #include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "table/mock_table.h" #include "util/file_reader_writer.h" #include "util/string_util.h" #include "util/testharness.h" #include "util/testutil.h" -#include "table/mock_table.h" +#include "utilities/merge_operators.h" namespace rocksdb { namespace { + void VerifyInitializationOfCompactionJobStats( const CompactionJobStats& compaction_job_stats) { #if !defined(IOS_CROSS_COMPILE) @@ -73,12 +77,6 @@ class CompactionJobTest : public testing::Test { EXPECT_OK(env_->CreateDirIfMissing(dbname_)); db_options_.db_paths.emplace_back(dbname_, std::numeric_limits::max()); - NewDB(); - std::vector column_families; - cf_options_.table_factory = mock_table_factory_; - column_families.emplace_back(kDefaultColumnFamilyName, cf_options_); - - EXPECT_OK(versions_->Recover(column_families, false)); } std::string GenerateFileName(uint64_t file_number) { @@ -89,13 +87,68 @@ class CompactionJobTest : public testing::Test { return TableFileName(db_paths, meta.fd.GetNumber(), meta.fd.GetPathId()); } + std::string KeyStr(const std::string& user_key, const SequenceNumber seq_num, + const ValueType t) { + return InternalKey(user_key, seq_num, t).Encode().ToString(); + } + // Corrupts key by changing the type - void CorruptKey(InternalKey* ikey) { + void CorruptKeyType(InternalKey* ikey) { std::string keystr = ikey->Encode().ToString(); keystr[keystr.size() - 8] = kTypeLogData; ikey->DecodeFrom(Slice(keystr.data(), keystr.size())); } + void AddMockFile(const mock::MockFileContents& contents, int level = 0) { + assert(contents.size() > 0); + + bool first_key = true; + std::string smallest, largest; + InternalKey smallest_key, largest_key; + SequenceNumber smallest_seqno = kMaxSequenceNumber; + SequenceNumber largest_seqno = 0; + for (auto kv : contents) { + ParsedInternalKey key; + std::string skey; + std::string value; + std::tie(skey, value) = kv; + ParseInternalKey(skey, &key); + + smallest_seqno = std::min(smallest_seqno, key.sequence); + largest_seqno = std::max(largest_seqno, key.sequence); + + if (first_key || + cfd_->user_comparator()->Compare(key.user_key, smallest) < 0) { + smallest.assign(key.user_key.data(), key.user_key.size()); + smallest_key.DecodeFrom(skey); + } + if (first_key || + cfd_->user_comparator()->Compare(key.user_key, largest) > 0) { + largest.assign(key.user_key.data(), key.user_key.size()); + largest_key.DecodeFrom(skey); + } + + first_key = false; + } + + uint64_t file_number = versions_->NewFileNumber(); + EXPECT_OK(mock_table_factory_->CreateMockTable( + env_, GenerateFileName(file_number), std::move(contents))); + + VersionEdit edit; + edit.AddFile(level, file_number, 0, 10, smallest_key, largest_key, + smallest_seqno, largest_seqno, false); + + mutex_.Lock(); + versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(), + mutable_cf_options_, &edit, &mutex_); + mutex_.Unlock(); + } + + void SetLastSequence(const SequenceNumber sequence_number) { + versions_->SetLastSequence(sequence_number + 1); + } + // returns expected result after compaction mock::MockFileContents CreateTwoFiles(bool gen_corrupted_keys) { mock::MockFileContents expected_results; @@ -110,8 +163,6 @@ class CompactionJobTest : public testing::Test { for (int i = 0; i < 2; ++i) { mock::MockFileContents contents; - SequenceNumber smallest_seqno = 0, largest_seqno = 0; - InternalKey smallest, largest; for (int k = 0; k < kKeysPerFile; ++k) { auto key = ToString(i * kMatchingKeys + k); auto value = ToString(i * kKeysPerFile + k); @@ -120,41 +171,25 @@ class CompactionJobTest : public testing::Test { // file InternalKey bottommost_internal_key(key, 0, kTypeValue); if (corrupt_id(k)) { - CorruptKey(&internal_key); - CorruptKey(&bottommost_internal_key); + CorruptKeyType(&internal_key); + CorruptKeyType(&bottommost_internal_key); } - if (k == 0) { - smallest = internal_key; - smallest_seqno = sequence_number; - } else if (k == kKeysPerFile - 1) { - largest = internal_key; - largest_seqno = sequence_number; - } - contents.insert({internal_key.Encode().ToString(), value}); + contents.insert({ internal_key.Encode().ToString(), value }); if (i == 1 || k < kMatchingKeys || corrupt_id(k - kMatchingKeys)) { expected_results.insert( - {bottommost_internal_key.Encode().ToString(), value}); + { bottommost_internal_key.Encode().ToString(), value }); } } - uint64_t file_number = versions_->NewFileNumber(); - EXPECT_OK(mock_table_factory_->CreateMockTable( - env_, GenerateFileName(file_number), std::move(contents))); - - VersionEdit edit; - edit.AddFile(0, file_number, 0, 10, smallest, largest, smallest_seqno, - largest_seqno, false); - - mutex_.Lock(); - versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(), - mutable_cf_options_, &edit, &mutex_); - mutex_.Unlock(); + AddMockFile(contents); } - versions_->SetLastSequence(sequence_number + 1); + + SetLastSequence(sequence_number); + return expected_results; } - void NewDB() { + void NewDB(std::shared_ptr merge_operator = nullptr) { VersionEdit new_db; new_db.SetLogNumber(0); new_db.SetNextFile(2); @@ -166,7 +201,7 @@ class CompactionJobTest : public testing::Test { manifest, &file, env_->OptimizeForManifestWrite(env_options_)); ASSERT_OK(s); unique_ptr file_writer( - new WritableFileWriter(std::move(file), EnvOptions())); + new WritableFileWriter(std::move(file), env_options_)); { log::Writer log(std::move(file_writer)); std::string record; @@ -176,19 +211,35 @@ class CompactionJobTest : public testing::Test { ASSERT_OK(s); // Make "CURRENT" file that points to the new manifest file. s = SetCurrentFile(env_, dbname_, 1, nullptr); + + std::vector column_families; + cf_options_.table_factory = mock_table_factory_; + cf_options_.merge_operator = merge_operator; + column_families.emplace_back(kDefaultColumnFamilyName, cf_options_); + + EXPECT_OK(versions_->Recover(column_families, false)); + cfd_ = versions_->GetColumnFamilySet()->GetDefault(); } - void RunCompaction(const std::vector& files) { + void RunCompaction(const std::vector>& input_files, + const mock::MockFileContents& expected_results) { auto cfd = versions_->GetColumnFamilySet()->GetDefault(); - CompactionInputFiles compaction_input_files; - compaction_input_files.level = 0; - for (auto file : files) { - compaction_input_files.files.push_back(file); + size_t num_input_files = 0; + std::vector compaction_input_files; + for (size_t level = 0; level < input_files.size(); level++) { + auto level_files = input_files[level]; + CompactionInputFiles compaction_level; + compaction_level.level = level; + compaction_level.files.insert(compaction_level.files.end(), + level_files.begin(), level_files.end()); + compaction_input_files.push_back(compaction_level); + num_input_files += level_files.size(); } + Compaction compaction(cfd->current()->storage_info(), *cfd->GetLatestMutableCFOptions(), - {compaction_input_files}, 1, 1024 * 1024, 10, 0, + compaction_input_files, 1, 1024 * 1024, 10, 0, kNoCompression, {}); compaction.SetInputVersion(cfd->current()); @@ -204,16 +255,18 @@ class CompactionJobTest : public testing::Test { compaction_job.Prepare(); mutex_.Unlock(); - ASSERT_OK(compaction_job.Run()); - mutex_.Lock(); Status s; + s = compaction_job.Run(); + ASSERT_OK(s); + mutex_.Lock(); compaction_job.Install(&s, *cfd->GetLatestMutableCFOptions(), &mutex_); ASSERT_OK(s); mutex_.Unlock(); ASSERT_GE(compaction_job_stats_.elapsed_micros, 0U); - ASSERT_EQ(compaction_job_stats_.num_input_files, files.size()); + ASSERT_EQ(compaction_job_stats_.num_input_files, num_input_files); ASSERT_EQ(compaction_job_stats_.num_output_files, 1U); + mock_table_factory_->AssertLatestFile(expected_results); } Env* env_; @@ -230,26 +283,165 @@ class CompactionJobTest : public testing::Test { std::atomic shutting_down_; std::shared_ptr mock_table_factory_; CompactionJobStats compaction_job_stats_; + ColumnFamilyData* cfd_; }; TEST_F(CompactionJobTest, Simple) { + NewDB(); + auto expected_results = CreateTwoFiles(false); auto cfd = versions_->GetColumnFamilySet()->GetDefault(); auto files = cfd->current()->storage_info()->LevelFiles(0); ASSERT_EQ(2U, files.size()); - - RunCompaction(files); - mock_table_factory_->AssertLatestFile(expected_results); + RunCompaction({ files }, expected_results); } TEST_F(CompactionJobTest, SimpleCorrupted) { + NewDB(); + auto expected_results = CreateTwoFiles(true); auto cfd = versions_->GetColumnFamilySet()->GetDefault(); auto files = cfd->current()->storage_info()->LevelFiles(0); - - RunCompaction(files); + RunCompaction({ files }, expected_results); ASSERT_EQ(compaction_job_stats_.num_corrupt_keys, 400U); - mock_table_factory_->AssertLatestFile(expected_results); +} + +TEST_F(CompactionJobTest, SimpleDeletion) { + NewDB(); + + mock::MockFileContents file1 = { + { KeyStr("c", 4U, kTypeDeletion), "" }, + { KeyStr("c", 3U, kTypeValue), "val" } + }; + AddMockFile(file1); + + mock::MockFileContents file2 = { + { KeyStr("b", 2U, kTypeValue), "val" }, + { KeyStr("b", 1U, kTypeValue), "val" } + }; + AddMockFile(file2); + + mock::MockFileContents expected_results = { + { KeyStr("b", 0U, kTypeValue), "val" } + }; + + SetLastSequence(4U); + auto files = cfd_->current()->storage_info()->LevelFiles(0); + RunCompaction({ files }, expected_results); +} + +TEST_F(CompactionJobTest, SimpleOverwrite) { + NewDB(); + + mock::MockFileContents file1 = { + { KeyStr("a", 3U, kTypeValue), "val2" }, + { KeyStr("b", 4U, kTypeValue), "val3" }, + }; + AddMockFile(file1); + + mock::MockFileContents file2 = { + { KeyStr("a", 1U, kTypeValue), "val" }, + { KeyStr("b", 2U, kTypeValue), "val" } + }; + AddMockFile(file2); + + mock::MockFileContents expected_results = { + { KeyStr("a", 0U, kTypeValue), "val2" }, + { KeyStr("b", 0U, kTypeValue), "val3" } + }; + + SetLastSequence(4U); + auto files = cfd_->current()->storage_info()->LevelFiles(0); + RunCompaction({ files }, expected_results); +} + +TEST_F(CompactionJobTest, SimpleNonLastLevel) { + NewDB(); + + mock::MockFileContents file1 = { + { KeyStr("a", 5U, kTypeValue), "val2" }, + { KeyStr("b", 6U, kTypeValue), "val3" }, + }; + AddMockFile(file1); + + mock::MockFileContents file2 = { + { KeyStr("a", 3U, kTypeValue), "val" }, + { KeyStr("b", 4U, kTypeValue), "val" } + }; + AddMockFile(file2, 1); + + mock::MockFileContents file3 = { + { KeyStr("a", 1U, kTypeValue), "val" }, + { KeyStr("b", 2U, kTypeValue), "val" } + }; + AddMockFile(file3, 2); + + // Because level 1 is not the last level, the sequence numbers of a and b + // cannot be set to 0 + mock::MockFileContents expected_results = { + { KeyStr("a", 5U, kTypeValue), "val2" }, + { KeyStr("b", 6U, kTypeValue), "val3" } + }; + + SetLastSequence(6U); + auto lvl0_files = cfd_->current()->storage_info()->LevelFiles(0); + auto lvl1_files = cfd_->current()->storage_info()->LevelFiles(1); + RunCompaction({ lvl0_files, lvl1_files }, expected_results); +} + +TEST_F(CompactionJobTest, SimpleMerge) { + auto merge_op = MergeOperators::CreateStringAppendOperator(); + NewDB(merge_op); + + mock::MockFileContents file1 = { + { KeyStr("a", 5U, kTypeMerge), "5" }, + { KeyStr("a", 4U, kTypeMerge), "4" }, + { KeyStr("a", 3U, kTypeValue), "3" }, + }; + AddMockFile(file1); + + mock::MockFileContents file2 = { + { KeyStr("b", 2U, kTypeMerge), "2" }, + { KeyStr("b", 1U, kTypeValue), "1" } + }; + AddMockFile(file2); + + mock::MockFileContents expected_results = { + { KeyStr("a", 0U, kTypeValue), "3,4,5" }, + { KeyStr("b", 0U, kTypeValue), "1,2" } + }; + + SetLastSequence(5U); + auto files = cfd_->current()->storage_info()->LevelFiles(0); + RunCompaction({ files }, expected_results); +} + +TEST_F(CompactionJobTest, NonAssocMerge) { + auto merge_op = MergeOperators::CreateStringAppendTESTOperator(); + NewDB(merge_op); + + mock::MockFileContents file1 = { + { KeyStr("a", 5U, kTypeMerge), "5" }, + { KeyStr("a", 4U, kTypeMerge), "4" }, + { KeyStr("a", 3U, kTypeMerge), "3" }, + }; + AddMockFile(file1); + + mock::MockFileContents file2 = { + { KeyStr("b", 2U, kTypeMerge), "2" }, + { KeyStr("b", 1U, kTypeMerge), "1" } + }; + AddMockFile(file2); + + mock::MockFileContents expected_results = { + { KeyStr("a", 0U, kTypeValue), "3,4,5" }, + { KeyStr("b", 2U, kTypeMerge), "2" }, + { KeyStr("b", 1U, kTypeMerge), "1" } + }; + + SetLastSequence(5U); + auto files = cfd_->current()->storage_info()->LevelFiles(0); + RunCompaction({ files }, expected_results); } } // namespace rocksdb diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 7e8fb8f51..085415d92 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -103,7 +103,7 @@ TEST_F(FlushJobTest, NonEmpty) { auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber); new_mem->Ref(); - std::map inserted_keys; + mock::MockFileContents inserted_keys; for (int i = 1; i < 10000; ++i) { std::string key(ToString(i)); std::string value("value" + ToString(i)); diff --git a/db/version_set.cc b/db/version_set.cc index c42b3b728..f4a2b045c 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1130,7 +1130,6 @@ bool CompareCompensatedSizeDescending(const Fsize& first, const Fsize& second) { } // anonymous namespace void VersionStorageInfo::AddFile(int level, FileMetaData* f) { - assert(level < num_levels()); auto* level_files = &files_[level]; // Must not overlap assert(level <= 0 || level_files->empty() || diff --git a/include/rocksdb/merge_operator.h b/include/rocksdb/merge_operator.h index c17dd8e71..05b66f202 100644 --- a/include/rocksdb/merge_operator.h +++ b/include/rocksdb/merge_operator.h @@ -6,9 +6,10 @@ #ifndef STORAGE_ROCKSDB_INCLUDE_MERGE_OPERATOR_H_ #define STORAGE_ROCKSDB_INCLUDE_MERGE_OPERATOR_H_ +#include #include #include -#include + #include "rocksdb/slice.h" namespace rocksdb { diff --git a/table/mock_table.cc b/table/mock_table.cc index 925b75cd7..c62a61920 100644 --- a/table/mock_table.cc +++ b/table/mock_table.cc @@ -5,11 +5,11 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. -#include "rocksdb/table_properties.h" -#include "table/mock_table.h" -#include "table/get_context.h" #include "db/dbformat.h" #include "port/port.h" +#include "rocksdb/table_properties.h" +#include "table/get_context.h" +#include "table/mock_table.h" #include "util/coding.h" #include "util/file_reader_writer.h" diff --git a/table/mock_table.h b/table/mock_table.h index fd542c965..214bcef96 100644 --- a/table/mock_table.h +++ b/table/mock_table.h @@ -12,6 +12,7 @@ #include #include +#include "rocksdb/comparator.h" #include "rocksdb/table.h" #include "table/table_reader.h" #include "table/table_builder.h" @@ -23,8 +24,23 @@ namespace rocksdb { namespace mock { -typedef std::map MockFileContents; -// NOTE this currently only supports bitwise comparator +struct MockFileContentsCmp { + MockFileContentsCmp() : icmp_(BytewiseComparator()) {} + + bool operator() (const std::string& x, const std::string& y) const { + InternalKey ikey_x; + InternalKey ikey_y; + ikey_x.DecodeFrom(x); + ikey_y.DecodeFrom(y); + return icmp_.Compare(ikey_x, ikey_y) < 0; + } + + InternalKeyComparator icmp_; +}; + +// NOTE: this currently only supports the bytewise comparator +typedef std::map + MockFileContents; struct MockTableFileSystem { port::Mutex mutex;