// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under the BSD-style license found in // the LICENSE file in the root directory of this source tree. An // additional grant of patent rights can be found in the PATENTS file // in the same directory. // // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include #include #include #include #include "db/db_impl.h" #include "db/db_test_util.h" #include "options/options_parser.h" #include "port/port.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" #include "util/coding.h" #include "util/fault_injection_test_env.h" #include "util/string_util.h" #include "util/sync_point.h" #include "util/testharness.h" #include "util/testutil.h" #include "utilities/merge_operators.h" namespace rocksdb { static const int kValueSize = 1000; namespace { std::string RandomString(Random* rnd, int len) { std::string r; test::RandomString(rnd, len, &r); return r; } } // anonymous namespace // counts how many operations were performed class EnvCounter : public EnvWrapper { public: explicit EnvCounter(Env* base) : EnvWrapper(base), num_new_writable_file_(0) {} int GetNumberOfNewWritableFileCalls() { return num_new_writable_file_; } Status NewWritableFile(const std::string& f, unique_ptr* r, const EnvOptions& soptions) override { ++num_new_writable_file_; return EnvWrapper::NewWritableFile(f, r, soptions); } private: std::atomic num_new_writable_file_; }; class ColumnFamilyTest : public testing::Test { public: ColumnFamilyTest() : rnd_(139) { env_ = new EnvCounter(Env::Default()); dbname_ = test::TmpDir() + "/column_family_test"; db_options_.create_if_missing = true; db_options_.fail_if_options_file_error = true; db_options_.env = env_; DestroyDB(dbname_, Options(db_options_, column_family_options_)); } ~ColumnFamilyTest() { Close(); rocksdb::SyncPoint::GetInstance()->DisableProcessing(); Destroy(); delete env_; } // Return the value to associate with the specified key Slice Value(int k, std::string* storage) { if (k == 0) { // Ugh. Random seed of 0 used to produce no entropy. This code // preserves the implementation that was in place when all of the // magic values in this file were picked. *storage = std::string(kValueSize, ' '); return Slice(*storage); } else { Random r(k); return test::RandomString(&r, kValueSize, storage); } } void Build(int base, int n, int flush_every = 0) { std::string key_space, value_space; WriteBatch batch; for (int i = 0; i < n; i++) { if (flush_every != 0 && i != 0 && i % flush_every == 0) { DBImpl* dbi = reinterpret_cast(db_); dbi->TEST_FlushMemTable(); } int keyi = base + i; Slice key(DBTestBase::Key(keyi)); batch.Clear(); batch.Put(handles_[0], key, Value(keyi, &value_space)); batch.Put(handles_[1], key, Value(keyi, &value_space)); batch.Put(handles_[2], key, Value(keyi, &value_space)); ASSERT_OK(db_->Write(WriteOptions(), &batch)); } } void CheckMissed() { uint64_t next_expected = 0; uint64_t missed = 0; int bad_keys = 0; int bad_values = 0; int correct = 0; std::string value_space; for (int cf = 0; cf < 3; cf++) { next_expected = 0; Iterator* iter = db_->NewIterator(ReadOptions(false, true), handles_[cf]); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { uint64_t key; Slice in(iter->key()); in.remove_prefix(3); if (!ConsumeDecimalNumber(&in, &key) || !in.empty() || key < next_expected) { bad_keys++; continue; } missed += (key - next_expected); next_expected = key + 1; if (iter->value() != Value(static_cast(key), &value_space)) { bad_values++; } else { correct++; } } delete iter; } ASSERT_EQ(0, bad_keys); ASSERT_EQ(0, bad_values); ASSERT_EQ(0, missed); (void)correct; } void Close() { for (auto h : handles_) { if (h) { db_->DestroyColumnFamilyHandle(h); } } handles_.clear(); names_.clear(); delete db_; db_ = nullptr; } Status TryOpen(std::vector cf, std::vector options = {}) { std::vector column_families; names_.clear(); for (size_t i = 0; i < cf.size(); ++i) { column_families.push_back(ColumnFamilyDescriptor( cf[i], options.size() == 0 ? column_family_options_ : options[i])); names_.push_back(cf[i]); } return DB::Open(db_options_, dbname_, column_families, &handles_, &db_); } Status OpenReadOnly(std::vector cf, std::vector options = {}) { std::vector column_families; names_.clear(); for (size_t i = 0; i < cf.size(); ++i) { column_families.push_back(ColumnFamilyDescriptor( cf[i], options.size() == 0 ? column_family_options_ : options[i])); names_.push_back(cf[i]); } return DB::OpenForReadOnly(db_options_, dbname_, column_families, &handles_, &db_); } #ifndef ROCKSDB_LITE // ReadOnlyDB is not supported void AssertOpenReadOnly(std::vector cf, std::vector options = {}) { ASSERT_OK(OpenReadOnly(cf, options)); } #endif // !ROCKSDB_LITE void Open(std::vector cf, std::vector options = {}) { ASSERT_OK(TryOpen(cf, options)); } void Open() { Open({"default"}); } DBImpl* dbfull() { return reinterpret_cast(db_); } int GetProperty(int cf, std::string property) { std::string value; EXPECT_TRUE(dbfull()->GetProperty(handles_[cf], property, &value)); #ifndef CYGWIN return std::stoi(value); #else return std::strtol(value.c_str(), 0 /* off */, 10 /* base */); #endif } bool IsDbWriteStopped() { #ifndef ROCKSDB_LITE uint64_t v; EXPECT_TRUE(dbfull()->GetIntProperty("rocksdb.is-write-stopped", &v)); return (v == 1); #else return dbfull()->TEST_write_controler().IsStopped(); #endif // !ROCKSDB_LITE } uint64_t GetDbDelayedWriteRate() { #ifndef ROCKSDB_LITE uint64_t v; EXPECT_TRUE( dbfull()->GetIntProperty("rocksdb.actual-delayed-write-rate", &v)); return v; #else if (!dbfull()->TEST_write_controler().NeedsDelay()) { return 0; } return dbfull()->TEST_write_controler().delayed_write_rate(); #endif // !ROCKSDB_LITE } void Destroy() { Close(); ASSERT_OK(DestroyDB(dbname_, Options(db_options_, column_family_options_))); } void CreateColumnFamilies( const std::vector& cfs, const std::vector options = {}) { int cfi = static_cast(handles_.size()); handles_.resize(cfi + cfs.size()); names_.resize(cfi + cfs.size()); for (size_t i = 0; i < cfs.size(); ++i) { const auto& current_cf_opt = options.size() == 0 ? column_family_options_ : options[i]; ASSERT_OK( db_->CreateColumnFamily(current_cf_opt, cfs[i], &handles_[cfi])); names_[cfi] = cfs[i]; #ifndef ROCKSDB_LITE // RocksDBLite does not support GetDescriptor // Verify the CF options of the returned CF handle. ColumnFamilyDescriptor desc; ASSERT_OK(handles_[cfi]->GetDescriptor(&desc)); RocksDBOptionsParser::VerifyCFOptions(desc.options, current_cf_opt); #endif // !ROCKSDB_LITE cfi++; } } void Reopen(const std::vector options = {}) { std::vector names; for (auto name : names_) { if (name != "") { names.push_back(name); } } Close(); assert(options.size() == 0 || names.size() == options.size()); Open(names, options); } void CreateColumnFamiliesAndReopen(const std::vector& cfs) { CreateColumnFamilies(cfs); Reopen(); } void DropColumnFamilies(const std::vector& cfs) { for (auto cf : cfs) { ASSERT_OK(db_->DropColumnFamily(handles_[cf])); db_->DestroyColumnFamilyHandle(handles_[cf]); handles_[cf] = nullptr; names_[cf] = ""; } } void PutRandomData(int cf, int num, int key_value_size, bool save = false) { for (int i = 0; i < num; ++i) { // 10 bytes for key, rest is value if (!save) { ASSERT_OK(Put(cf, test::RandomKey(&rnd_, 11), RandomString(&rnd_, key_value_size - 10))); } else { std::string key = test::RandomKey(&rnd_, 11); keys_.insert(key); ASSERT_OK(Put(cf, key, RandomString(&rnd_, key_value_size - 10))); } } } #ifndef ROCKSDB_LITE // TEST functions in DB are not supported in lite void WaitForFlush(int cf) { ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[cf])); } void WaitForCompaction() { ASSERT_OK(dbfull()->TEST_WaitForCompact()); } uint64_t MaxTotalInMemoryState() { return dbfull()->TEST_MaxTotalInMemoryState(); } void AssertMaxTotalInMemoryState(uint64_t value) { ASSERT_EQ(value, MaxTotalInMemoryState()); } #endif // !ROCKSDB_LITE Status Put(int cf, const std::string& key, const std::string& value) { return db_->Put(WriteOptions(), handles_[cf], Slice(key), Slice(value)); } Status Merge(int cf, const std::string& key, const std::string& value) { return db_->Merge(WriteOptions(), handles_[cf], Slice(key), Slice(value)); } Status Flush(int cf) { return db_->Flush(FlushOptions(), handles_[cf]); } std::string Get(int cf, const std::string& key) { ReadOptions options; options.verify_checksums = true; std::string result; Status s = db_->Get(options, handles_[cf], Slice(key), &result); if (s.IsNotFound()) { result = "NOT_FOUND"; } else if (!s.ok()) { result = s.ToString(); } return result; } void CompactAll(int cf) { ASSERT_OK(db_->CompactRange(CompactRangeOptions(), handles_[cf], nullptr, nullptr)); } void Compact(int cf, const Slice& start, const Slice& limit) { ASSERT_OK( db_->CompactRange(CompactRangeOptions(), handles_[cf], &start, &limit)); } int NumTableFilesAtLevel(int level, int cf) { return GetProperty(cf, "rocksdb.num-files-at-level" + ToString(level)); } #ifndef ROCKSDB_LITE // Return spread of files per level std::string FilesPerLevel(int cf) { std::string result; int last_non_zero_offset = 0; for (int level = 0; level < dbfull()->NumberLevels(handles_[cf]); level++) { int f = NumTableFilesAtLevel(level, cf); char buf[100]; snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f); result += buf; if (f > 0) { last_non_zero_offset = static_cast(result.size()); } } result.resize(last_non_zero_offset); return result; } #endif void AssertFilesPerLevel(const std::string& value, int cf) { #ifndef ROCKSDB_LITE ASSERT_EQ(value, FilesPerLevel(cf)); #endif } #ifndef ROCKSDB_LITE // GetLiveFilesMetaData is not supported int CountLiveFiles() { std::vector metadata; db_->GetLiveFilesMetaData(&metadata); return static_cast(metadata.size()); } #endif // !ROCKSDB_LITE void AssertCountLiveFiles(int expected_value) { #ifndef ROCKSDB_LITE ASSERT_EQ(expected_value, CountLiveFiles()); #endif } // Do n memtable flushes, each of which produces an sstable // covering the range [small,large]. void MakeTables(int cf, int n, const std::string& small, const std::string& large) { for (int i = 0; i < n; i++) { ASSERT_OK(Put(cf, small, "begin")); ASSERT_OK(Put(cf, large, "end")); ASSERT_OK(db_->Flush(FlushOptions(), handles_[cf])); } } #ifndef ROCKSDB_LITE // GetSortedWalFiles is not supported int CountLiveLogFiles() { int micros_wait_for_log_deletion = 20000; env_->SleepForMicroseconds(micros_wait_for_log_deletion); int ret = 0; VectorLogPtr wal_files; Status s; // GetSortedWalFiles is a flakey function -- it gets all the wal_dir // children files and then later checks for their existence. if some of the // log files doesn't exist anymore, it reports an error. it does all of this // without DB mutex held, so if a background process deletes the log file // while the function is being executed, it returns an error. We retry the // function 10 times to avoid the error failing the test for (int retries = 0; retries < 10; ++retries) { wal_files.clear(); s = db_->GetSortedWalFiles(wal_files); if (s.ok()) { break; } } EXPECT_OK(s); for (const auto& wal : wal_files) { if (wal->Type() == kAliveLogFile) { ++ret; } } return ret; return 0; } #endif // !ROCKSDB_LITE void AssertCountLiveLogFiles(int value) { #ifndef ROCKSDB_LITE // GetSortedWalFiles is not supported ASSERT_EQ(value, CountLiveLogFiles()); #endif // !ROCKSDB_LITE } void AssertNumberOfImmutableMemtables(std::vector num_per_cf) { assert(num_per_cf.size() == handles_.size()); #ifndef ROCKSDB_LITE // GetProperty is not supported in lite for (size_t i = 0; i < num_per_cf.size(); ++i) { ASSERT_EQ(num_per_cf[i], GetProperty(static_cast(i), "rocksdb.num-immutable-mem-table")); } #endif // !ROCKSDB_LITE } void CopyFile(const std::string& source, const std::string& destination, uint64_t size = 0) { const EnvOptions soptions; unique_ptr srcfile; ASSERT_OK(env_->NewSequentialFile(source, &srcfile, soptions)); unique_ptr destfile; ASSERT_OK(env_->NewWritableFile(destination, &destfile, soptions)); if (size == 0) { // default argument means copy everything ASSERT_OK(env_->GetFileSize(source, &size)); } char buffer[4096]; Slice slice; while (size > 0) { uint64_t one = std::min(uint64_t(sizeof(buffer)), size); ASSERT_OK(srcfile->Read(one, &slice, buffer)); ASSERT_OK(destfile->Append(slice)); size -= slice.size(); } ASSERT_OK(destfile->Close()); } std::vector handles_; std::vector names_; std::set keys_; ColumnFamilyOptions column_family_options_; DBOptions db_options_; std::string dbname_; DB* db_ = nullptr; EnvCounter* env_; Random rnd_; }; TEST_F(ColumnFamilyTest, DontReuseColumnFamilyID) { for (int iter = 0; iter < 3; ++iter) { Open(); CreateColumnFamilies({"one", "two", "three"}); for (size_t i = 0; i < handles_.size(); ++i) { auto cfh = reinterpret_cast(handles_[i]); ASSERT_EQ(i, cfh->GetID()); } if (iter == 1) { Reopen(); } DropColumnFamilies({3}); Reopen(); if (iter == 2) { // this tests if max_column_family is correctly persisted with // WriteSnapshot() Reopen(); } CreateColumnFamilies({"three2"}); // ID 3 that was used for dropped column family "three" should not be reused auto cfh3 = reinterpret_cast(handles_[3]); ASSERT_EQ(4U, cfh3->GetID()); Close(); Destroy(); } } TEST_F(ColumnFamilyTest, CreateCFRaceWithGetAggProperty) { Open(); rocksdb::SyncPoint::GetInstance()->LoadDependency( {{"DBImpl::WriteOptionsFile:1", "ColumnFamilyTest.CreateCFRaceWithGetAggProperty:1"}, {"ColumnFamilyTest.CreateCFRaceWithGetAggProperty:2", "DBImpl::WriteOptionsFile:2"}}); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::port::Thread thread([&] { CreateColumnFamilies({"one"}); }); TEST_SYNC_POINT("ColumnFamilyTest.CreateCFRaceWithGetAggProperty:1"); uint64_t pv; db_->GetAggregatedIntProperty(DB::Properties::kEstimateTableReadersMem, &pv); TEST_SYNC_POINT("ColumnFamilyTest.CreateCFRaceWithGetAggProperty:2"); thread.join(); rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } class FlushEmptyCFTestWithParam : public ColumnFamilyTest, public testing::WithParamInterface { public: FlushEmptyCFTestWithParam() { allow_2pc_ = GetParam(); } // Required if inheriting from testing::WithParamInterface<> static void SetUpTestCase() {} static void TearDownTestCase() {} bool allow_2pc_; }; TEST_P(FlushEmptyCFTestWithParam, FlushEmptyCFTest) { std::unique_ptr fault_env( new FaultInjectionTestEnv(env_)); db_options_.env = fault_env.get(); db_options_.allow_2pc = allow_2pc_; Open(); CreateColumnFamilies({"one", "two"}); // Generate log file A. ASSERT_OK(Put(1, "foo", "v1")); // seqID 1 Reopen(); // Log file A is not dropped after reopening because default column family's // min log number is 0. // It flushes to SST file X ASSERT_OK(Put(1, "foo", "v1")); // seqID 2 ASSERT_OK(Put(1, "bar", "v2")); // seqID 3 // Current log file is file B now. While flushing, a new log file C is created // and is set to current. Boths' min log number is set to file C in memory, so // after flushing file B is deleted. At the same time, the min log number of // default CF is not written to manifest. Log file A still remains. // Flushed to SST file Y. Flush(1); Flush(0); ASSERT_OK(Put(1, "bar", "v3")); // seqID 4 ASSERT_OK(Put(1, "foo", "v4")); // seqID 5 // Preserve file system state up to here to simulate a crash condition. fault_env->SetFilesystemActive(false); std::vector names; for (auto name : names_) { if (name != "") { names.push_back(name); } } Close(); fault_env->ResetState(); // Before opening, there are four files: // Log file A contains seqID 1 // Log file C contains seqID 4, 5 // SST file X contains seqID 1 // SST file Y contains seqID 2, 3 // Min log number: // default CF: 0 // CF one, two: C // When opening the DB, all the seqID should be preserved. Open(names, {}); ASSERT_EQ("v4", Get(1, "foo")); ASSERT_EQ("v3", Get(1, "bar")); Close(); db_options_.env = env_; } TEST_P(FlushEmptyCFTestWithParam, FlushEmptyCFTest2) { std::unique_ptr fault_env( new FaultInjectionTestEnv(env_)); db_options_.env = fault_env.get(); db_options_.allow_2pc = allow_2pc_; Open(); CreateColumnFamilies({"one", "two"}); // Generate log file A. ASSERT_OK(Put(1, "foo", "v1")); // seqID 1 Reopen(); // Log file A is not dropped after reopening because default column family's // min log number is 0. // It flushes to SST file X ASSERT_OK(Put(1, "foo", "v1")); // seqID 2 ASSERT_OK(Put(1, "bar", "v2")); // seqID 3 // Current log file is file B now. While flushing, a new log file C is created // and is set to current. Both CFs' min log number is set to file C so after // flushing file B is deleted. Log file A still remains. // Flushed to SST file Y. Flush(1); ASSERT_OK(Put(0, "bar", "v2")); // seqID 4 ASSERT_OK(Put(2, "bar", "v2")); // seqID 5 ASSERT_OK(Put(1, "bar", "v3")); // seqID 6 // Flushing all column families. This forces all CFs' min log to current. This // is written to the manifest file. Log file C is cleared. Flush(0); Flush(1); Flush(2); // Write to log file D ASSERT_OK(Put(1, "bar", "v4")); // seqID 7 ASSERT_OK(Put(1, "bar", "v5")); // seqID 8 // Preserve file system state up to here to simulate a crash condition. fault_env->SetFilesystemActive(false); std::vector names; for (auto name : names_) { if (name != "") { names.push_back(name); } } Close(); fault_env->ResetState(); // Before opening, there are two logfiles: // Log file A contains seqID 1 // Log file D contains seqID 7, 8 // Min log number: // default CF: D // CF one, two: D // When opening the DB, log file D should be replayed using the seqID // specified in the file. Open(names, {}); ASSERT_EQ("v1", Get(1, "foo")); ASSERT_EQ("v5", Get(1, "bar")); Close(); db_options_.env = env_; } INSTANTIATE_TEST_CASE_P(FlushEmptyCFTestWithParam, FlushEmptyCFTestWithParam, ::testing::Bool()); TEST_F(ColumnFamilyTest, AddDrop) { Open(); CreateColumnFamilies({"one", "two", "three"}); ASSERT_EQ("NOT_FOUND", Get(1, "fodor")); ASSERT_EQ("NOT_FOUND", Get(2, "fodor")); DropColumnFamilies({2}); ASSERT_EQ("NOT_FOUND", Get(1, "fodor")); CreateColumnFamilies({"four"}); ASSERT_EQ("NOT_FOUND", Get(3, "fodor")); ASSERT_OK(Put(1, "fodor", "mirko")); ASSERT_EQ("mirko", Get(1, "fodor")); ASSERT_EQ("NOT_FOUND", Get(3, "fodor")); Close(); ASSERT_TRUE(TryOpen({"default"}).IsInvalidArgument()); Open({"default", "one", "three", "four"}); DropColumnFamilies({1}); Reopen(); Close(); std::vector families; ASSERT_OK(DB::ListColumnFamilies(db_options_, dbname_, &families)); std::sort(families.begin(), families.end()); ASSERT_TRUE(families == std::vector({"default", "four", "three"})); } TEST_F(ColumnFamilyTest, BulkAddDrop) { constexpr int kNumCF = 1000; ColumnFamilyOptions cf_options; WriteOptions write_options; Open(); std::vector cf_names; std::vector cf_handles; for (int i = 1; i <= kNumCF; i++) { cf_names.push_back("cf1-" + ToString(i)); } ASSERT_OK(db_->CreateColumnFamilies(cf_options, cf_names, &cf_handles)); for (int i = 1; i <= kNumCF; i++) { ASSERT_OK(db_->Put(write_options, cf_handles[i - 1], "foo", "bar")); } ASSERT_OK(db_->DropColumnFamilies(cf_handles)); std::vector cf_descriptors; for (auto* handle : cf_handles) { delete handle; } cf_handles.clear(); for (int i = 1; i <= kNumCF; i++) { cf_descriptors.emplace_back("cf2-" + ToString(i), ColumnFamilyOptions()); } ASSERT_OK(db_->CreateColumnFamilies(cf_descriptors, &cf_handles)); for (int i = 1; i <= kNumCF; i++) { ASSERT_OK(db_->Put(write_options, cf_handles[i - 1], "foo", "bar")); } ASSERT_OK(db_->DropColumnFamilies(cf_handles)); for (auto* handle : cf_handles) { delete handle; } Close(); std::vector families; ASSERT_OK(DB::ListColumnFamilies(db_options_, dbname_, &families)); std::sort(families.begin(), families.end()); ASSERT_TRUE(families == std::vector({"default"})); } TEST_F(ColumnFamilyTest, DropTest) { // first iteration - dont reopen DB before dropping // second iteration - reopen DB before dropping for (int iter = 0; iter < 2; ++iter) { Open({"default"}); CreateColumnFamiliesAndReopen({"pikachu"}); for (int i = 0; i < 100; ++i) { ASSERT_OK(Put(1, ToString(i), "bar" + ToString(i))); } ASSERT_OK(Flush(1)); if (iter == 1) { Reopen(); } ASSERT_EQ("bar1", Get(1, "1")); AssertCountLiveFiles(1); DropColumnFamilies({1}); // make sure that all files are deleted when we drop the column family AssertCountLiveFiles(0); Destroy(); } } TEST_F(ColumnFamilyTest, WriteBatchFailure) { Open(); CreateColumnFamiliesAndReopen({"one", "two"}); WriteBatch batch; batch.Put(handles_[0], Slice("existing"), Slice("column-family")); batch.Put(handles_[1], Slice("non-existing"), Slice("column-family")); ASSERT_OK(db_->Write(WriteOptions(), &batch)); DropColumnFamilies({1}); WriteOptions woptions_ignore_missing_cf; woptions_ignore_missing_cf.ignore_missing_column_families = true; batch.Put(handles_[0], Slice("still here"), Slice("column-family")); ASSERT_OK(db_->Write(woptions_ignore_missing_cf, &batch)); ASSERT_EQ("column-family", Get(0, "still here")); Status s = db_->Write(WriteOptions(), &batch); ASSERT_TRUE(s.IsInvalidArgument()); Close(); } TEST_F(ColumnFamilyTest, ReadWrite) { Open(); CreateColumnFamiliesAndReopen({"one", "two"}); ASSERT_OK(Put(0, "foo", "v1")); ASSERT_OK(Put(0, "bar", "v2")); ASSERT_OK(Put(1, "mirko", "v3")); ASSERT_OK(Put(0, "foo", "v2")); ASSERT_OK(Put(2, "fodor", "v5")); for (int iter = 0; iter <= 3; ++iter) { ASSERT_EQ("v2", Get(0, "foo")); ASSERT_EQ("v2", Get(0, "bar")); ASSERT_EQ("v3", Get(1, "mirko")); ASSERT_EQ("v5", Get(2, "fodor")); ASSERT_EQ("NOT_FOUND", Get(0, "fodor")); ASSERT_EQ("NOT_FOUND", Get(1, "fodor")); ASSERT_EQ("NOT_FOUND", Get(2, "foo")); if (iter <= 1) { Reopen(); } } Close(); } TEST_F(ColumnFamilyTest, IgnoreRecoveredLog) { std::string backup_logs = dbname_ + "/backup_logs"; // delete old files in backup_logs directory ASSERT_OK(env_->CreateDirIfMissing(dbname_)); ASSERT_OK(env_->CreateDirIfMissing(backup_logs)); std::vector old_files; env_->GetChildren(backup_logs, &old_files); for (auto& file : old_files) { if (file != "." && file != "..") { env_->DeleteFile(backup_logs + "/" + file); } } column_family_options_.merge_operator = MergeOperators::CreateUInt64AddOperator(); db_options_.wal_dir = dbname_ + "/logs"; Destroy(); Open(); CreateColumnFamilies({"cf1", "cf2"}); // fill up the DB std::string one, two, three; PutFixed64(&one, 1); PutFixed64(&two, 2); PutFixed64(&three, 3); ASSERT_OK(Merge(0, "foo", one)); ASSERT_OK(Merge(1, "mirko", one)); ASSERT_OK(Merge(0, "foo", one)); ASSERT_OK(Merge(2, "bla", one)); ASSERT_OK(Merge(2, "fodor", one)); ASSERT_OK(Merge(0, "bar", one)); ASSERT_OK(Merge(2, "bla", one)); ASSERT_OK(Merge(1, "mirko", two)); ASSERT_OK(Merge(1, "franjo", one)); // copy the logs to backup std::vector logs; env_->GetChildren(db_options_.wal_dir, &logs); for (auto& log : logs) { if (log != ".." && log != ".") { CopyFile(db_options_.wal_dir + "/" + log, backup_logs + "/" + log); } } // recover the DB Close(); // 1. check consistency // 2. copy the logs from backup back to WAL dir. if the recovery happens // again on the same log files, this should lead to incorrect results // due to applying merge operator twice // 3. check consistency for (int iter = 0; iter < 2; ++iter) { // assert consistency Open({"default", "cf1", "cf2"}); ASSERT_EQ(two, Get(0, "foo")); ASSERT_EQ(one, Get(0, "bar")); ASSERT_EQ(three, Get(1, "mirko")); ASSERT_EQ(one, Get(1, "franjo")); ASSERT_EQ(one, Get(2, "fodor")); ASSERT_EQ(two, Get(2, "bla")); Close(); if (iter == 0) { // copy the logs from backup back to wal dir for (auto& log : logs) { if (log != ".." && log != ".") { CopyFile(backup_logs + "/" + log, db_options_.wal_dir + "/" + log); } } } } } #ifndef ROCKSDB_LITE // TEST functions used are not supported TEST_F(ColumnFamilyTest, FlushTest) { Open(); CreateColumnFamiliesAndReopen({"one", "two"}); ASSERT_OK(Put(0, "foo", "v1")); ASSERT_OK(Put(0, "bar", "v2")); ASSERT_OK(Put(1, "mirko", "v3")); ASSERT_OK(Put(0, "foo", "v2")); ASSERT_OK(Put(2, "fodor", "v5")); for (int j = 0; j < 2; j++) { ReadOptions ro; std::vector iterators; // Hold super version. if (j == 0) { ASSERT_OK(db_->NewIterators(ro, handles_, &iterators)); } for (int i = 0; i < 3; ++i) { uint64_t max_total_in_memory_state = MaxTotalInMemoryState(); Flush(i); AssertMaxTotalInMemoryState(max_total_in_memory_state); } ASSERT_OK(Put(1, "foofoo", "bar")); ASSERT_OK(Put(0, "foofoo", "bar")); for (auto* it : iterators) { delete it; } } Reopen(); for (int iter = 0; iter <= 2; ++iter) { ASSERT_EQ("v2", Get(0, "foo")); ASSERT_EQ("v2", Get(0, "bar")); ASSERT_EQ("v3", Get(1, "mirko")); ASSERT_EQ("v5", Get(2, "fodor")); ASSERT_EQ("NOT_FOUND", Get(0, "fodor")); ASSERT_EQ("NOT_FOUND", Get(1, "fodor")); ASSERT_EQ("NOT_FOUND", Get(2, "foo")); if (iter <= 1) { Reopen(); } } Close(); } // Makes sure that obsolete log files get deleted TEST_F(ColumnFamilyTest, LogDeletionTest) { db_options_.max_total_wal_size = std::numeric_limits::max(); column_family_options_.arena_block_size = 4 * 1024; column_family_options_.write_buffer_size = 100000; // 100KB Open(); CreateColumnFamilies({"one", "two", "three", "four"}); // Each bracket is one log file. if number is in (), it means // we don't need it anymore (it's been flushed) // [] AssertCountLiveLogFiles(0); PutRandomData(0, 1, 100); // [0] PutRandomData(1, 1, 100); // [0, 1] PutRandomData(1, 1000, 100); WaitForFlush(1); // [0, (1)] [1] AssertCountLiveLogFiles(2); PutRandomData(0, 1, 100); // [0, (1)] [0, 1] AssertCountLiveLogFiles(2); PutRandomData(2, 1, 100); // [0, (1)] [0, 1, 2] PutRandomData(2, 1000, 100); WaitForFlush(2); // [0, (1)] [0, 1, (2)] [2] AssertCountLiveLogFiles(3); PutRandomData(2, 1000, 100); WaitForFlush(2); // [0, (1)] [0, 1, (2)] [(2)] [2] AssertCountLiveLogFiles(4); PutRandomData(3, 1, 100); // [0, (1)] [0, 1, (2)] [(2)] [2, 3] PutRandomData(1, 1, 100); // [0, (1)] [0, 1, (2)] [(2)] [1, 2, 3] AssertCountLiveLogFiles(4); PutRandomData(1, 1000, 100); WaitForFlush(1); // [0, (1)] [0, (1), (2)] [(2)] [(1), 2, 3] [1] AssertCountLiveLogFiles(5); PutRandomData(0, 1000, 100); WaitForFlush(0); // [(0), (1)] [(0), (1), (2)] [(2)] [(1), 2, 3] [1, (0)] [0] // delete obsolete logs --> // [(1), 2, 3] [1, (0)] [0] AssertCountLiveLogFiles(3); PutRandomData(0, 1000, 100); WaitForFlush(0); // [(1), 2, 3] [1, (0)], [(0)] [0] AssertCountLiveLogFiles(4); PutRandomData(1, 1000, 100); WaitForFlush(1); // [(1), 2, 3] [(1), (0)] [(0)] [0, (1)] [1] AssertCountLiveLogFiles(5); PutRandomData(2, 1000, 100); WaitForFlush(2); // [(1), (2), 3] [(1), (0)] [(0)] [0, (1)] [1, (2)], [2] AssertCountLiveLogFiles(6); PutRandomData(3, 1000, 100); WaitForFlush(3); // [(1), (2), (3)] [(1), (0)] [(0)] [0, (1)] [1, (2)], [2, (3)] [3] // delete obsolete logs --> // [0, (1)] [1, (2)], [2, (3)] [3] AssertCountLiveLogFiles(4); Close(); } #endif // !ROCKSDB_LITE TEST_F(ColumnFamilyTest, CrashAfterFlush) { std::unique_ptr fault_env( new FaultInjectionTestEnv(env_)); db_options_.env = fault_env.get(); Open(); CreateColumnFamilies({"one"}); WriteBatch batch; batch.Put(handles_[0], Slice("foo"), Slice("bar")); batch.Put(handles_[1], Slice("foo"), Slice("bar")); ASSERT_OK(db_->Write(WriteOptions(), &batch)); Flush(0); fault_env->SetFilesystemActive(false); std::vector names; for (auto name : names_) { if (name != "") { names.push_back(name); } } Close(); fault_env->DropUnsyncedFileData(); fault_env->ResetState(); Open(names, {}); // Write batch should be atomic. ASSERT_EQ(Get(0, "foo"), Get(1, "foo")); Close(); db_options_.env = env_; } TEST_F(ColumnFamilyTest, OpenNonexistentColumnFamily) { ASSERT_OK(TryOpen({"default"})); Close(); ASSERT_TRUE(TryOpen({"default", "dne"}).IsInvalidArgument()); } #ifndef ROCKSDB_LITE // WaitForFlush() is not supported // Makes sure that obsolete log files get deleted TEST_F(ColumnFamilyTest, DifferentWriteBufferSizes) { // disable flushing stale column families db_options_.max_total_wal_size = std::numeric_limits::max(); Open(); CreateColumnFamilies({"one", "two", "three"}); ColumnFamilyOptions default_cf, one, two, three; // setup options. all column families have max_write_buffer_number setup to 10 // "default" -> 100KB memtable, start flushing immediatelly // "one" -> 200KB memtable, start flushing with two immutable memtables // "two" -> 1MB memtable, start flushing with three immutable memtables // "three" -> 90KB memtable, start flushing with four immutable memtables default_cf.write_buffer_size = 100000; default_cf.arena_block_size = 4 * 4096; default_cf.max_write_buffer_number = 10; default_cf.min_write_buffer_number_to_merge = 1; default_cf.max_write_buffer_number_to_maintain = 0; one.write_buffer_size = 200000; one.arena_block_size = 4 * 4096; one.max_write_buffer_number = 10; one.min_write_buffer_number_to_merge = 2; one.max_write_buffer_number_to_maintain = 1; two.write_buffer_size = 1000000; two.arena_block_size = 4 * 4096; two.max_write_buffer_number = 10; two.min_write_buffer_number_to_merge = 3; two.max_write_buffer_number_to_maintain = 2; three.write_buffer_size = 4096 * 22; three.arena_block_size = 4096; three.max_write_buffer_number = 10; three.min_write_buffer_number_to_merge = 4; three.max_write_buffer_number_to_maintain = -1; Reopen({default_cf, one, two, three}); int micros_wait_for_flush = 10000; PutRandomData(0, 100, 1000); WaitForFlush(0); AssertNumberOfImmutableMemtables({0, 0, 0, 0}); AssertCountLiveLogFiles(1); PutRandomData(1, 200, 1000); env_->SleepForMicroseconds(micros_wait_for_flush); AssertNumberOfImmutableMemtables({0, 1, 0, 0}); AssertCountLiveLogFiles(2); PutRandomData(2, 1000, 1000); env_->SleepForMicroseconds(micros_wait_for_flush); AssertNumberOfImmutableMemtables({0, 1, 1, 0}); AssertCountLiveLogFiles(3); PutRandomData(2, 1000, 1000); env_->SleepForMicroseconds(micros_wait_for_flush); AssertNumberOfImmutableMemtables({0, 1, 2, 0}); AssertCountLiveLogFiles(4); PutRandomData(3, 93, 990); env_->SleepForMicroseconds(micros_wait_for_flush); AssertNumberOfImmutableMemtables({0, 1, 2, 1}); AssertCountLiveLogFiles(5); PutRandomData(3, 88, 990); env_->SleepForMicroseconds(micros_wait_for_flush); AssertNumberOfImmutableMemtables({0, 1, 2, 2}); AssertCountLiveLogFiles(6); PutRandomData(3, 88, 990); env_->SleepForMicroseconds(micros_wait_for_flush); AssertNumberOfImmutableMemtables({0, 1, 2, 3}); AssertCountLiveLogFiles(7); PutRandomData(0, 100, 1000); WaitForFlush(0); AssertNumberOfImmutableMemtables({0, 1, 2, 3}); AssertCountLiveLogFiles(8); PutRandomData(2, 100, 10000); WaitForFlush(2); AssertNumberOfImmutableMemtables({0, 1, 0, 3}); AssertCountLiveLogFiles(9); PutRandomData(3, 88, 990); WaitForFlush(3); AssertNumberOfImmutableMemtables({0, 1, 0, 0}); AssertCountLiveLogFiles(10); PutRandomData(3, 88, 990); env_->SleepForMicroseconds(micros_wait_for_flush); AssertNumberOfImmutableMemtables({0, 1, 0, 1}); AssertCountLiveLogFiles(11); PutRandomData(1, 200, 1000); WaitForFlush(1); AssertNumberOfImmutableMemtables({0, 0, 0, 1}); AssertCountLiveLogFiles(5); PutRandomData(3, 88 * 3, 990); WaitForFlush(3); PutRandomData(3, 88 * 4, 990); WaitForFlush(3); AssertNumberOfImmutableMemtables({0, 0, 0, 0}); AssertCountLiveLogFiles(12); PutRandomData(0, 100, 1000); WaitForFlush(0); AssertNumberOfImmutableMemtables({0, 0, 0, 0}); AssertCountLiveLogFiles(12); PutRandomData(2, 3 * 1000, 1000); WaitForFlush(2); AssertNumberOfImmutableMemtables({0, 0, 0, 0}); AssertCountLiveLogFiles(12); PutRandomData(1, 2*200, 1000); WaitForFlush(1); AssertNumberOfImmutableMemtables({0, 0, 0, 0}); AssertCountLiveLogFiles(7); Close(); } #endif // !ROCKSDB_LITE #ifndef ROCKSDB_LITE // Cuckoo is not supported in lite TEST_F(ColumnFamilyTest, MemtableNotSupportSnapshot) { db_options_.allow_concurrent_memtable_write = false; Open(); auto* s1 = dbfull()->GetSnapshot(); ASSERT_TRUE(s1 != nullptr); dbfull()->ReleaseSnapshot(s1); // Add a column family that doesn't support snapshot ColumnFamilyOptions first; first.memtable_factory.reset(NewHashCuckooRepFactory(1024 * 1024)); CreateColumnFamilies({"first"}, {first}); auto* s2 = dbfull()->GetSnapshot(); ASSERT_TRUE(s2 == nullptr); // Add a column family that supports snapshot. Snapshot stays not supported. ColumnFamilyOptions second; CreateColumnFamilies({"second"}, {second}); auto* s3 = dbfull()->GetSnapshot(); ASSERT_TRUE(s3 == nullptr); Close(); } #endif // !ROCKSDB_LITE class TestComparator : public Comparator { int Compare(const rocksdb::Slice& a, const rocksdb::Slice& b) const override { return 0; } const char* Name() const override { return "Test"; } void FindShortestSeparator(std::string* start, const rocksdb::Slice& limit) const override {} void FindShortSuccessor(std::string* key) const override {} }; static TestComparator third_comparator; static TestComparator fourth_comparator; // Test that we can retrieve the comparator from a created CF TEST_F(ColumnFamilyTest, GetComparator) { Open(); // Add a column family with no comparator specified CreateColumnFamilies({"first"}); const Comparator* comp = handles_[0]->GetComparator(); ASSERT_EQ(comp, BytewiseComparator()); // Add three column families - one with no comparator and two // with comparators specified ColumnFamilyOptions second, third, fourth; second.comparator = &third_comparator; third.comparator = &fourth_comparator; CreateColumnFamilies({"second", "third", "fourth"}, {second, third, fourth}); ASSERT_EQ(handles_[1]->GetComparator(), BytewiseComparator()); ASSERT_EQ(handles_[2]->GetComparator(), &third_comparator); ASSERT_EQ(handles_[3]->GetComparator(), &fourth_comparator); Close(); } TEST_F(ColumnFamilyTest, DifferentMergeOperators) { Open(); CreateColumnFamilies({"first", "second"}); ColumnFamilyOptions default_cf, first, second; first.merge_operator = MergeOperators::CreateUInt64AddOperator(); second.merge_operator = MergeOperators::CreateStringAppendOperator(); Reopen({default_cf, first, second}); std::string one, two, three; PutFixed64(&one, 1); PutFixed64(&two, 2); PutFixed64(&three, 3); ASSERT_OK(Put(0, "foo", two)); ASSERT_OK(Put(0, "foo", one)); ASSERT_TRUE(Merge(0, "foo", two).IsNotSupported()); ASSERT_EQ(Get(0, "foo"), one); ASSERT_OK(Put(1, "foo", two)); ASSERT_OK(Put(1, "foo", one)); ASSERT_OK(Merge(1, "foo", two)); ASSERT_EQ(Get(1, "foo"), three); ASSERT_OK(Put(2, "foo", two)); ASSERT_OK(Put(2, "foo", one)); ASSERT_OK(Merge(2, "foo", two)); ASSERT_EQ(Get(2, "foo"), one + "," + two); Close(); } #ifndef ROCKSDB_LITE // WaitForFlush() is not supported TEST_F(ColumnFamilyTest, DifferentCompactionStyles) { Open(); CreateColumnFamilies({"one", "two"}); ColumnFamilyOptions default_cf, one, two; db_options_.max_open_files = 20; // only 10 files in file cache default_cf.compaction_style = kCompactionStyleLevel; default_cf.num_levels = 3; default_cf.write_buffer_size = 64 << 10; // 64KB default_cf.target_file_size_base = 30 << 10; default_cf.max_compaction_bytes = static_cast(1) << 60; BlockBasedTableOptions table_options; table_options.no_block_cache = true; default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options)); one.compaction_style = kCompactionStyleUniversal; one.num_levels = 1; // trigger compaction if there are >= 4 files one.level0_file_num_compaction_trigger = 4; one.write_buffer_size = 120000; two.compaction_style = kCompactionStyleLevel; two.num_levels = 4; two.level0_file_num_compaction_trigger = 3; two.write_buffer_size = 100000; Reopen({default_cf, one, two}); // SETUP column family "one" -- universal style for (int i = 0; i < one.level0_file_num_compaction_trigger - 1; ++i) { PutRandomData(1, 10, 12000); PutRandomData(1, 1, 10); WaitForFlush(1); AssertFilesPerLevel(ToString(i + 1), 1); } // SETUP column family "two" -- level style with 4 levels for (int i = 0; i < two.level0_file_num_compaction_trigger - 1; ++i) { PutRandomData(2, 10, 12000); PutRandomData(2, 1, 10); WaitForFlush(2); AssertFilesPerLevel(ToString(i + 1), 2); } // TRIGGER compaction "one" PutRandomData(1, 10, 12000); PutRandomData(1, 1, 10); // TRIGGER compaction "two" PutRandomData(2, 10, 12000); PutRandomData(2, 1, 10); // WAIT for compactions WaitForCompaction(); // VERIFY compaction "one" AssertFilesPerLevel("1", 1); // VERIFY compaction "two" AssertFilesPerLevel("0,1", 2); CompactAll(2); AssertFilesPerLevel("0,1", 2); Close(); } #endif // !ROCKSDB_LITE #ifndef ROCKSDB_LITE // Sync points not supported in RocksDB Lite TEST_F(ColumnFamilyTest, MultipleManualCompactions) { Open(); CreateColumnFamilies({"one", "two"}); ColumnFamilyOptions default_cf, one, two; db_options_.max_open_files = 20; // only 10 files in file cache db_options_.max_background_compactions = 3; default_cf.compaction_style = kCompactionStyleLevel; default_cf.num_levels = 3; default_cf.write_buffer_size = 64 << 10; // 64KB default_cf.target_file_size_base = 30 << 10; default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100; BlockBasedTableOptions table_options; table_options.no_block_cache = true; default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options)); one.compaction_style = kCompactionStyleUniversal; one.num_levels = 1; // trigger compaction if there are >= 4 files one.level0_file_num_compaction_trigger = 4; one.write_buffer_size = 120000; two.compaction_style = kCompactionStyleLevel; two.num_levels = 4; two.level0_file_num_compaction_trigger = 3; two.write_buffer_size = 100000; Reopen({default_cf, one, two}); // SETUP column family "one" -- universal style for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) { PutRandomData(1, 10, 12000, true); PutRandomData(1, 1, 10, true); WaitForFlush(1); AssertFilesPerLevel(ToString(i + 1), 1); } bool cf_1_1 = true; rocksdb::SyncPoint::GetInstance()->LoadDependency( {{"ColumnFamilyTest::MultiManual:4", "ColumnFamilyTest::MultiManual:1"}, {"ColumnFamilyTest::MultiManual:2", "ColumnFamilyTest::MultiManual:5"}, {"ColumnFamilyTest::MultiManual:2", "ColumnFamilyTest::MultiManual:3"}}); rocksdb::SyncPoint::GetInstance()->SetCallBack( "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) { if (cf_1_1) { TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:4"); cf_1_1 = false; TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:3"); } }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); std::vector threads; threads.emplace_back([&] { CompactRangeOptions compact_options; compact_options.exclusive_manual_compaction = false; ASSERT_OK( db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)); }); // SETUP column family "two" -- level style with 4 levels for (int i = 0; i < two.level0_file_num_compaction_trigger - 2; ++i) { PutRandomData(2, 10, 12000); PutRandomData(2, 1, 10); WaitForFlush(2); AssertFilesPerLevel(ToString(i + 1), 2); } threads.emplace_back([&] { TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:1"); CompactRangeOptions compact_options; compact_options.exclusive_manual_compaction = false; ASSERT_OK( db_->CompactRange(compact_options, handles_[2], nullptr, nullptr)); TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:2"); }); TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:5"); for (auto& t : threads) { t.join(); } // VERIFY compaction "one" AssertFilesPerLevel("1", 1); // VERIFY compaction "two" AssertFilesPerLevel("0,1", 2); CompactAll(2); AssertFilesPerLevel("0,1", 2); // Compare against saved keys std::set::iterator key_iter = keys_.begin(); while (key_iter != keys_.end()) { ASSERT_NE("NOT_FOUND", Get(1, *key_iter)); key_iter++; } Close(); } TEST_F(ColumnFamilyTest, AutomaticAndManualCompactions) { Open(); CreateColumnFamilies({"one", "two"}); ColumnFamilyOptions default_cf, one, two; db_options_.max_open_files = 20; // only 10 files in file cache db_options_.max_background_compactions = 3; default_cf.compaction_style = kCompactionStyleLevel; default_cf.num_levels = 3; default_cf.write_buffer_size = 64 << 10; // 64KB default_cf.target_file_size_base = 30 << 10; default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100; BlockBasedTableOptions table_options; table_options.no_block_cache = true; default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options)); one.compaction_style = kCompactionStyleUniversal; one.num_levels = 1; // trigger compaction if there are >= 4 files one.level0_file_num_compaction_trigger = 4; one.write_buffer_size = 120000; two.compaction_style = kCompactionStyleLevel; two.num_levels = 4; two.level0_file_num_compaction_trigger = 3; two.write_buffer_size = 100000; Reopen({default_cf, one, two}); // make sure all background compaction jobs can be scheduled auto stop_token = dbfull()->TEST_write_controler().GetCompactionPressureToken(); bool cf_1_1 = true; rocksdb::SyncPoint::GetInstance()->LoadDependency( {{"ColumnFamilyTest::AutoManual:4", "ColumnFamilyTest::AutoManual:1"}, {"ColumnFamilyTest::AutoManual:2", "ColumnFamilyTest::AutoManual:5"}, {"ColumnFamilyTest::AutoManual:2", "ColumnFamilyTest::AutoManual:3"}}); rocksdb::SyncPoint::GetInstance()->SetCallBack( "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) { if (cf_1_1) { cf_1_1 = false; TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:4"); TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:3"); } }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); // SETUP column family "one" -- universal style for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) { PutRandomData(1, 10, 12000, true); PutRandomData(1, 1, 10, true); WaitForFlush(1); AssertFilesPerLevel(ToString(i + 1), 1); } TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:1"); // SETUP column family "two" -- level style with 4 levels for (int i = 0; i < two.level0_file_num_compaction_trigger - 2; ++i) { PutRandomData(2, 10, 12000); PutRandomData(2, 1, 10); WaitForFlush(2); AssertFilesPerLevel(ToString(i + 1), 2); } rocksdb::port::Thread threads([&] { CompactRangeOptions compact_options; compact_options.exclusive_manual_compaction = false; ASSERT_OK( db_->CompactRange(compact_options, handles_[2], nullptr, nullptr)); TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:2"); }); TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:5"); threads.join(); // WAIT for compactions WaitForCompaction(); // VERIFY compaction "one" AssertFilesPerLevel("1", 1); // VERIFY compaction "two" AssertFilesPerLevel("0,1", 2); CompactAll(2); AssertFilesPerLevel("0,1", 2); // Compare against saved keys std::set::iterator key_iter = keys_.begin(); while (key_iter != keys_.end()) { ASSERT_NE("NOT_FOUND", Get(1, *key_iter)); key_iter++; } } TEST_F(ColumnFamilyTest, ManualAndAutomaticCompactions) { Open(); CreateColumnFamilies({"one", "two"}); ColumnFamilyOptions default_cf, one, two; db_options_.max_open_files = 20; // only 10 files in file cache db_options_.max_background_compactions = 3; default_cf.compaction_style = kCompactionStyleLevel; default_cf.num_levels = 3; default_cf.write_buffer_size = 64 << 10; // 64KB default_cf.target_file_size_base = 30 << 10; default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100; BlockBasedTableOptions table_options; table_options.no_block_cache = true; default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options)); one.compaction_style = kCompactionStyleUniversal; one.num_levels = 1; // trigger compaction if there are >= 4 files one.level0_file_num_compaction_trigger = 4; one.write_buffer_size = 120000; two.compaction_style = kCompactionStyleLevel; two.num_levels = 4; two.level0_file_num_compaction_trigger = 3; two.write_buffer_size = 100000; Reopen({default_cf, one, two}); // make sure all background compaction jobs can be scheduled auto stop_token = dbfull()->TEST_write_controler().GetCompactionPressureToken(); // SETUP column family "one" -- universal style for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) { PutRandomData(1, 10, 12000, true); PutRandomData(1, 1, 10, true); WaitForFlush(1); AssertFilesPerLevel(ToString(i + 1), 1); } bool cf_1_1 = true; bool cf_1_2 = true; rocksdb::SyncPoint::GetInstance()->LoadDependency( {{"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:1"}, {"ColumnFamilyTest::ManualAuto:5", "ColumnFamilyTest::ManualAuto:2"}, {"ColumnFamilyTest::ManualAuto:2", "ColumnFamilyTest::ManualAuto:3"}}); rocksdb::SyncPoint::GetInstance()->SetCallBack( "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) { if (cf_1_1) { TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:4"); cf_1_1 = false; TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:3"); } else if (cf_1_2) { TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:2"); cf_1_2 = false; } }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::port::Thread threads([&] { CompactRangeOptions compact_options; compact_options.exclusive_manual_compaction = false; ASSERT_OK( db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)); }); TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:1"); // SETUP column family "two" -- level style with 4 levels for (int i = 0; i < two.level0_file_num_compaction_trigger; ++i) { PutRandomData(2, 10, 12000); PutRandomData(2, 1, 10); WaitForFlush(2); AssertFilesPerLevel(ToString(i + 1), 2); } TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:5"); threads.join(); // WAIT for compactions WaitForCompaction(); // VERIFY compaction "one" AssertFilesPerLevel("1", 1); // VERIFY compaction "two" AssertFilesPerLevel("0,1", 2); CompactAll(2); AssertFilesPerLevel("0,1", 2); // Compare against saved keys std::set::iterator key_iter = keys_.begin(); while (key_iter != keys_.end()) { ASSERT_NE("NOT_FOUND", Get(1, *key_iter)); key_iter++; } } TEST_F(ColumnFamilyTest, SameCFManualManualCompactions) { Open(); CreateColumnFamilies({"one"}); ColumnFamilyOptions default_cf, one; db_options_.max_open_files = 20; // only 10 files in file cache db_options_.max_background_compactions = 3; default_cf.compaction_style = kCompactionStyleLevel; default_cf.num_levels = 3; default_cf.write_buffer_size = 64 << 10; // 64KB default_cf.target_file_size_base = 30 << 10; default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100; BlockBasedTableOptions table_options; table_options.no_block_cache = true; default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options)); one.compaction_style = kCompactionStyleUniversal; one.num_levels = 1; // trigger compaction if there are >= 4 files one.level0_file_num_compaction_trigger = 4; one.write_buffer_size = 120000; Reopen({default_cf, one}); // make sure all background compaction jobs can be scheduled auto stop_token = dbfull()->TEST_write_controler().GetCompactionPressureToken(); // SETUP column family "one" -- universal style for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) { PutRandomData(1, 10, 12000, true); PutRandomData(1, 1, 10, true); WaitForFlush(1); AssertFilesPerLevel(ToString(i + 1), 1); } bool cf_1_1 = true; bool cf_1_2 = true; rocksdb::SyncPoint::GetInstance()->LoadDependency( {{"ColumnFamilyTest::ManualManual:4", "ColumnFamilyTest::ManualManual:2"}, {"ColumnFamilyTest::ManualManual:4", "ColumnFamilyTest::ManualManual:5"}, {"ColumnFamilyTest::ManualManual:1", "ColumnFamilyTest::ManualManual:2"}, {"ColumnFamilyTest::ManualManual:1", "ColumnFamilyTest::ManualManual:3"}}); rocksdb::SyncPoint::GetInstance()->SetCallBack( "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) { if (cf_1_1) { TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:4"); cf_1_1 = false; TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:3"); } else if (cf_1_2) { TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:2"); cf_1_2 = false; } }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::port::Thread threads([&] { CompactRangeOptions compact_options; compact_options.exclusive_manual_compaction = true; ASSERT_OK( db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)); }); TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:5"); WaitForFlush(1); // Add more L0 files and force another manual compaction for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) { PutRandomData(1, 10, 12000, true); PutRandomData(1, 1, 10, true); WaitForFlush(1); AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i), 1); } rocksdb::port::Thread threads1([&] { CompactRangeOptions compact_options; compact_options.exclusive_manual_compaction = false; ASSERT_OK( db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)); }); TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:1"); threads.join(); threads1.join(); WaitForCompaction(); // VERIFY compaction "one" ASSERT_LE(NumTableFilesAtLevel(0, 1), 2); // Compare against saved keys std::set::iterator key_iter = keys_.begin(); while (key_iter != keys_.end()) { ASSERT_NE("NOT_FOUND", Get(1, *key_iter)); key_iter++; } } TEST_F(ColumnFamilyTest, SameCFManualAutomaticCompactions) { Open(); CreateColumnFamilies({"one"}); ColumnFamilyOptions default_cf, one; db_options_.max_open_files = 20; // only 10 files in file cache db_options_.max_background_compactions = 3; default_cf.compaction_style = kCompactionStyleLevel; default_cf.num_levels = 3; default_cf.write_buffer_size = 64 << 10; // 64KB default_cf.target_file_size_base = 30 << 10; default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100; BlockBasedTableOptions table_options; table_options.no_block_cache = true; default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options)); one.compaction_style = kCompactionStyleUniversal; one.num_levels = 1; // trigger compaction if there are >= 4 files one.level0_file_num_compaction_trigger = 4; one.write_buffer_size = 120000; Reopen({default_cf, one}); // make sure all background compaction jobs can be scheduled auto stop_token = dbfull()->TEST_write_controler().GetCompactionPressureToken(); // SETUP column family "one" -- universal style for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) { PutRandomData(1, 10, 12000, true); PutRandomData(1, 1, 10, true); WaitForFlush(1); AssertFilesPerLevel(ToString(i + 1), 1); } bool cf_1_1 = true; bool cf_1_2 = true; rocksdb::SyncPoint::GetInstance()->LoadDependency( {{"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:2"}, {"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:5"}, {"ColumnFamilyTest::ManualAuto:1", "ColumnFamilyTest::ManualAuto:2"}, {"ColumnFamilyTest::ManualAuto:1", "ColumnFamilyTest::ManualAuto:3"}}); rocksdb::SyncPoint::GetInstance()->SetCallBack( "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) { if (cf_1_1) { TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:4"); cf_1_1 = false; TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:3"); } else if (cf_1_2) { TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:2"); cf_1_2 = false; } }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::port::Thread threads([&] { CompactRangeOptions compact_options; compact_options.exclusive_manual_compaction = false; ASSERT_OK( db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)); }); TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:5"); WaitForFlush(1); // Add more L0 files and force automatic compaction for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) { PutRandomData(1, 10, 12000, true); PutRandomData(1, 1, 10, true); WaitForFlush(1); AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i), 1); } TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:1"); threads.join(); WaitForCompaction(); // VERIFY compaction "one" ASSERT_LE(NumTableFilesAtLevel(0, 1), 2); // Compare against saved keys std::set::iterator key_iter = keys_.begin(); while (key_iter != keys_.end()) { ASSERT_NE("NOT_FOUND", Get(1, *key_iter)); key_iter++; } } TEST_F(ColumnFamilyTest, SameCFManualAutomaticCompactionsLevel) { Open(); CreateColumnFamilies({"one"}); ColumnFamilyOptions default_cf, one; db_options_.max_open_files = 20; // only 10 files in file cache db_options_.max_background_compactions = 3; default_cf.compaction_style = kCompactionStyleLevel; default_cf.num_levels = 3; default_cf.write_buffer_size = 64 << 10; // 64KB default_cf.target_file_size_base = 30 << 10; default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100; BlockBasedTableOptions table_options; table_options.no_block_cache = true; default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options)); one.compaction_style = kCompactionStyleLevel; one.num_levels = 1; // trigger compaction if there are >= 4 files one.level0_file_num_compaction_trigger = 3; one.write_buffer_size = 120000; Reopen({default_cf, one}); // make sure all background compaction jobs can be scheduled auto stop_token = dbfull()->TEST_write_controler().GetCompactionPressureToken(); // SETUP column family "one" -- level style for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) { PutRandomData(1, 10, 12000, true); PutRandomData(1, 1, 10, true); WaitForFlush(1); AssertFilesPerLevel(ToString(i + 1), 1); } bool cf_1_1 = true; bool cf_1_2 = true; rocksdb::SyncPoint::GetInstance()->LoadDependency( {{"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:2"}, {"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:5"}, {"ColumnFamilyTest::ManualAuto:3", "ColumnFamilyTest::ManualAuto:2"}, {"LevelCompactionPicker::PickCompactionBySize:0", "ColumnFamilyTest::ManualAuto:3"}, {"ColumnFamilyTest::ManualAuto:1", "ColumnFamilyTest::ManualAuto:3"}}); rocksdb::SyncPoint::GetInstance()->SetCallBack( "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) { if (cf_1_1) { TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:4"); cf_1_1 = false; TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:3"); } else if (cf_1_2) { TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:2"); cf_1_2 = false; } }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::port::Thread threads([&] { CompactRangeOptions compact_options; compact_options.exclusive_manual_compaction = false; ASSERT_OK( db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)); }); TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:5"); // Add more L0 files and force automatic compaction for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) { PutRandomData(1, 10, 12000, true); PutRandomData(1, 1, 10, true); WaitForFlush(1); AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i), 1); } TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:1"); threads.join(); WaitForCompaction(); // VERIFY compaction "one" AssertFilesPerLevel("0,1", 1); // Compare against saved keys std::set::iterator key_iter = keys_.begin(); while (key_iter != keys_.end()) { ASSERT_NE("NOT_FOUND", Get(1, *key_iter)); key_iter++; } } // This test checks for automatic getting a conflict if there is a // manual which has not yet been scheduled. // The manual compaction waits in NotScheduled // We generate more files and then trigger an automatic compaction // This will wait because there is an unscheduled manual compaction. // Once the conflict is hit, the manual compaction starts and ends // Then another automatic will start and end. TEST_F(ColumnFamilyTest, SameCFManualAutomaticConflict) { Open(); CreateColumnFamilies({"one"}); ColumnFamilyOptions default_cf, one; db_options_.max_open_files = 20; // only 10 files in file cache db_options_.max_background_compactions = 3; default_cf.compaction_style = kCompactionStyleLevel; default_cf.num_levels = 3; default_cf.write_buffer_size = 64 << 10; // 64KB default_cf.target_file_size_base = 30 << 10; default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100; BlockBasedTableOptions table_options; table_options.no_block_cache = true; default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options)); one.compaction_style = kCompactionStyleUniversal; one.num_levels = 1; // trigger compaction if there are >= 4 files one.level0_file_num_compaction_trigger = 4; one.write_buffer_size = 120000; Reopen({default_cf, one}); // make sure all background compaction jobs can be scheduled auto stop_token = dbfull()->TEST_write_controler().GetCompactionPressureToken(); // SETUP column family "one" -- universal style for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) { PutRandomData(1, 10, 12000, true); PutRandomData(1, 1, 10, true); WaitForFlush(1); AssertFilesPerLevel(ToString(i + 1), 1); } bool cf_1_1 = true; bool cf_1_2 = true; rocksdb::SyncPoint::GetInstance()->LoadDependency( {{"DBImpl::BackgroundCompaction()::Conflict", "ColumnFamilyTest::ManualAutoCon:7"}, {"ColumnFamilyTest::ManualAutoCon:9", "ColumnFamilyTest::ManualAutoCon:8"}, {"ColumnFamilyTest::ManualAutoCon:2", "ColumnFamilyTest::ManualAutoCon:6"}, {"ColumnFamilyTest::ManualAutoCon:4", "ColumnFamilyTest::ManualAutoCon:5"}, {"ColumnFamilyTest::ManualAutoCon:1", "ColumnFamilyTest::ManualAutoCon:2"}, {"ColumnFamilyTest::ManualAutoCon:1", "ColumnFamilyTest::ManualAutoCon:3"}}); rocksdb::SyncPoint::GetInstance()->SetCallBack( "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) { if (cf_1_1) { TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:4"); cf_1_1 = false; TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:3"); } else if (cf_1_2) { cf_1_2 = false; TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:2"); } }); rocksdb::SyncPoint::GetInstance()->SetCallBack( "DBImpl::RunManualCompaction:NotScheduled", [&](void* arg) { InstrumentedMutex* mutex = static_cast(arg); mutex->Unlock(); TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:9"); TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:7"); mutex->Lock(); }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::port::Thread threads([&] { CompactRangeOptions compact_options; compact_options.exclusive_manual_compaction = false; ASSERT_OK( db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)); TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:6"); }); TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:8"); WaitForFlush(1); // Add more L0 files and force automatic compaction for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) { PutRandomData(1, 10, 12000, true); PutRandomData(1, 1, 10, true); WaitForFlush(1); AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i), 1); } TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:5"); // Add more L0 files and force automatic compaction for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) { PutRandomData(1, 10, 12000, true); PutRandomData(1, 1, 10, true); WaitForFlush(1); } TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:1"); threads.join(); WaitForCompaction(); // VERIFY compaction "one" ASSERT_LE(NumTableFilesAtLevel(0, 1), 3); // Compare against saved keys std::set::iterator key_iter = keys_.begin(); while (key_iter != keys_.end()) { ASSERT_NE("NOT_FOUND", Get(1, *key_iter)); key_iter++; } } // In this test, we generate enough files to trigger automatic compactions. // The automatic compaction waits in NonTrivial:AfterRun // We generate more files and then trigger an automatic compaction // This will wait because the automatic compaction has files it needs. // Once the conflict is hit, the automatic compaction starts and ends // Then the manual will run and end. TEST_F(ColumnFamilyTest, SameCFAutomaticManualCompactions) { Open(); CreateColumnFamilies({"one"}); ColumnFamilyOptions default_cf, one; db_options_.max_open_files = 20; // only 10 files in file cache db_options_.max_background_compactions = 3; default_cf.compaction_style = kCompactionStyleLevel; default_cf.num_levels = 3; default_cf.write_buffer_size = 64 << 10; // 64KB default_cf.target_file_size_base = 30 << 10; default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100; BlockBasedTableOptions table_options; table_options.no_block_cache = true; default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options)); one.compaction_style = kCompactionStyleUniversal; one.num_levels = 1; // trigger compaction if there are >= 4 files one.level0_file_num_compaction_trigger = 4; one.write_buffer_size = 120000; Reopen({default_cf, one}); // make sure all background compaction jobs can be scheduled auto stop_token = dbfull()->TEST_write_controler().GetCompactionPressureToken(); bool cf_1_1 = true; bool cf_1_2 = true; rocksdb::SyncPoint::GetInstance()->LoadDependency( {{"ColumnFamilyTest::AutoManual:4", "ColumnFamilyTest::AutoManual:2"}, {"ColumnFamilyTest::AutoManual:4", "ColumnFamilyTest::AutoManual:5"}, {"CompactionPicker::CompactRange:Conflict", "ColumnFamilyTest::AutoManual:3"}}); rocksdb::SyncPoint::GetInstance()->SetCallBack( "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) { if (cf_1_1) { TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:4"); cf_1_1 = false; TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:3"); } else if (cf_1_2) { TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:2"); cf_1_2 = false; } }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); // SETUP column family "one" -- universal style for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) { PutRandomData(1, 10, 12000, true); PutRandomData(1, 1, 10, true); WaitForFlush(1); AssertFilesPerLevel(ToString(i + 1), 1); } TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:5"); // Add another L0 file and force automatic compaction for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) { PutRandomData(1, 10, 12000, true); PutRandomData(1, 1, 10, true); WaitForFlush(1); } CompactRangeOptions compact_options; compact_options.exclusive_manual_compaction = false; ASSERT_OK(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)); TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:1"); WaitForCompaction(); // VERIFY compaction "one" AssertFilesPerLevel("1", 1); // Compare against saved keys std::set::iterator key_iter = keys_.begin(); while (key_iter != keys_.end()) { ASSERT_NE("NOT_FOUND", Get(1, *key_iter)); key_iter++; } } #endif // !ROCKSDB_LITE #ifndef ROCKSDB_LITE // Tailing interator not supported namespace { std::string IterStatus(Iterator* iter) { std::string result; if (iter->Valid()) { result = iter->key().ToString() + "->" + iter->value().ToString(); } else { result = "(invalid)"; } return result; } } // anonymous namespace TEST_F(ColumnFamilyTest, NewIteratorsTest) { // iter == 0 -- no tailing // iter == 2 -- tailing for (int iter = 0; iter < 2; ++iter) { Open(); CreateColumnFamiliesAndReopen({"one", "two"}); ASSERT_OK(Put(0, "a", "b")); ASSERT_OK(Put(1, "b", "a")); ASSERT_OK(Put(2, "c", "m")); ASSERT_OK(Put(2, "v", "t")); std::vector iterators; ReadOptions options; options.tailing = (iter == 1); ASSERT_OK(db_->NewIterators(options, handles_, &iterators)); for (auto it : iterators) { it->SeekToFirst(); } ASSERT_EQ(IterStatus(iterators[0]), "a->b"); ASSERT_EQ(IterStatus(iterators[1]), "b->a"); ASSERT_EQ(IterStatus(iterators[2]), "c->m"); ASSERT_OK(Put(1, "x", "x")); for (auto it : iterators) { it->Next(); } ASSERT_EQ(IterStatus(iterators[0]), "(invalid)"); if (iter == 0) { // no tailing ASSERT_EQ(IterStatus(iterators[1]), "(invalid)"); } else { // tailing ASSERT_EQ(IterStatus(iterators[1]), "x->x"); } ASSERT_EQ(IterStatus(iterators[2]), "v->t"); for (auto it : iterators) { delete it; } Destroy(); } } #endif // !ROCKSDB_LITE #ifndef ROCKSDB_LITE // ReadOnlyDB is not supported TEST_F(ColumnFamilyTest, ReadOnlyDBTest) { Open(); CreateColumnFamiliesAndReopen({"one", "two", "three", "four"}); ASSERT_OK(Put(0, "a", "b")); ASSERT_OK(Put(1, "foo", "bla")); ASSERT_OK(Put(2, "foo", "blabla")); ASSERT_OK(Put(3, "foo", "blablabla")); ASSERT_OK(Put(4, "foo", "blablablabla")); DropColumnFamilies({2}); Close(); // open only a subset of column families AssertOpenReadOnly({"default", "one", "four"}); ASSERT_EQ("NOT_FOUND", Get(0, "foo")); ASSERT_EQ("bla", Get(1, "foo")); ASSERT_EQ("blablablabla", Get(2, "foo")); // test newiterators { std::vector iterators; ASSERT_OK(db_->NewIterators(ReadOptions(), handles_, &iterators)); for (auto it : iterators) { it->SeekToFirst(); } ASSERT_EQ(IterStatus(iterators[0]), "a->b"); ASSERT_EQ(IterStatus(iterators[1]), "foo->bla"); ASSERT_EQ(IterStatus(iterators[2]), "foo->blablablabla"); for (auto it : iterators) { it->Next(); } ASSERT_EQ(IterStatus(iterators[0]), "(invalid)"); ASSERT_EQ(IterStatus(iterators[1]), "(invalid)"); ASSERT_EQ(IterStatus(iterators[2]), "(invalid)"); for (auto it : iterators) { delete it; } } Close(); // can't open dropped column family Status s = OpenReadOnly({"default", "one", "two"}); ASSERT_TRUE(!s.ok()); // Can't open without specifying default column family s = OpenReadOnly({"one", "four"}); ASSERT_TRUE(!s.ok()); } #endif // !ROCKSDB_LITE #ifndef ROCKSDB_LITE // WaitForFlush() is not supported in lite TEST_F(ColumnFamilyTest, DontRollEmptyLogs) { Open(); CreateColumnFamiliesAndReopen({"one", "two", "three", "four"}); for (size_t i = 0; i < handles_.size(); ++i) { PutRandomData(static_cast(i), 10, 100); } int num_writable_file_start = env_->GetNumberOfNewWritableFileCalls(); // this will trigger the flushes for (int i = 0; i <= 4; ++i) { ASSERT_OK(Flush(i)); } for (int i = 0; i < 4; ++i) { WaitForFlush(i); } int total_new_writable_files = env_->GetNumberOfNewWritableFileCalls() - num_writable_file_start; ASSERT_EQ(static_cast(total_new_writable_files), handles_.size() + 1); Close(); } #endif // !ROCKSDB_LITE #ifndef ROCKSDB_LITE // WaitForCompaction() is not supported in lite TEST_F(ColumnFamilyTest, FlushStaleColumnFamilies) { Open(); CreateColumnFamilies({"one", "two"}); ColumnFamilyOptions default_cf, one, two; default_cf.write_buffer_size = 100000; // small write buffer size default_cf.arena_block_size = 4096; default_cf.disable_auto_compactions = true; one.disable_auto_compactions = true; two.disable_auto_compactions = true; db_options_.max_total_wal_size = 210000; Reopen({default_cf, one, two}); PutRandomData(2, 1, 10); // 10 bytes for (int i = 0; i < 2; ++i) { PutRandomData(0, 100, 1000); // flush WaitForFlush(0); AssertCountLiveFiles(i + 1); } // third flush. now, CF [two] should be detected as stale and flushed // column family 1 should not be flushed since it's empty PutRandomData(0, 100, 1000); // flush WaitForFlush(0); WaitForFlush(2); // 3 files for default column families, 1 file for column family [two], zero // files for column family [one], because it's empty AssertCountLiveFiles(4); Flush(0); ASSERT_EQ(0, dbfull()->TEST_total_log_size()); Close(); } #endif // !ROCKSDB_LITE TEST_F(ColumnFamilyTest, CreateMissingColumnFamilies) { Status s = TryOpen({"one", "two"}); ASSERT_TRUE(!s.ok()); db_options_.create_missing_column_families = true; s = TryOpen({"default", "one", "two"}); ASSERT_TRUE(s.ok()); Close(); } TEST_F(ColumnFamilyTest, SanitizeOptions) { DBOptions db_options; for (int s = kCompactionStyleLevel; s <= kCompactionStyleUniversal; ++s) { for (int l = 0; l <= 2; l++) { for (int i = 1; i <= 3; i++) { for (int j = 1; j <= 3; j++) { for (int k = 1; k <= 3; k++) { ColumnFamilyOptions original; original.compaction_style = static_cast(s); original.num_levels = l; original.level0_stop_writes_trigger = i; original.level0_slowdown_writes_trigger = j; original.level0_file_num_compaction_trigger = k; original.write_buffer_size = l * 4 * 1024 * 1024 + i * 1024 * 1024 + j * 1024 + k; ColumnFamilyOptions result = SanitizeOptions(ImmutableDBOptions(db_options), original); ASSERT_TRUE(result.level0_stop_writes_trigger >= result.level0_slowdown_writes_trigger); ASSERT_TRUE(result.level0_slowdown_writes_trigger >= result.level0_file_num_compaction_trigger); ASSERT_TRUE(result.level0_file_num_compaction_trigger == original.level0_file_num_compaction_trigger); if (s == kCompactionStyleLevel) { ASSERT_GE(result.num_levels, 2); } else { ASSERT_GE(result.num_levels, 1); if (original.num_levels >= 1) { ASSERT_EQ(result.num_levels, original.num_levels); } } // Make sure Sanitize options sets arena_block_size to 1/8 of // the write_buffer_size, rounded up to a multiple of 4k. size_t expected_arena_block_size = l * 4 * 1024 * 1024 / 8 + i * 1024 * 1024 / 8; if (j + k != 0) { // not a multiple of 4k, round up 4k expected_arena_block_size += 4 * 1024; } ASSERT_EQ(expected_arena_block_size, result.arena_block_size); } } } } } } TEST_F(ColumnFamilyTest, ReadDroppedColumnFamily) { // iter 0 -- drop CF, don't reopen // iter 1 -- delete CF, reopen for (int iter = 0; iter < 2; ++iter) { db_options_.create_missing_column_families = true; db_options_.max_open_files = 20; // delete obsolete files always db_options_.delete_obsolete_files_period_micros = 0; Open({"default", "one", "two"}); ColumnFamilyOptions options; options.level0_file_num_compaction_trigger = 100; options.level0_slowdown_writes_trigger = 200; options.level0_stop_writes_trigger = 200; options.write_buffer_size = 100000; // small write buffer size Reopen({options, options, options}); // 1MB should create ~10 files for each CF int kKeysNum = 10000; PutRandomData(0, kKeysNum, 100); PutRandomData(1, kKeysNum, 100); PutRandomData(2, kKeysNum, 100); { std::unique_ptr iterator( db_->NewIterator(ReadOptions(), handles_[2])); iterator->SeekToFirst(); if (iter == 0) { // Drop CF two ASSERT_OK(db_->DropColumnFamily(handles_[2])); } else { // delete CF two db_->DestroyColumnFamilyHandle(handles_[2]); handles_[2] = nullptr; } // Make sure iterator created can still be used. int count = 0; for (; iterator->Valid(); iterator->Next()) { ASSERT_OK(iterator->status()); ++count; } ASSERT_OK(iterator->status()); ASSERT_EQ(count, kKeysNum); } // Add bunch more data to other CFs PutRandomData(0, kKeysNum, 100); PutRandomData(1, kKeysNum, 100); if (iter == 1) { Reopen(); } // Since we didn't delete CF handle, RocksDB's contract guarantees that // we're still able to read dropped CF for (int i = 0; i < 3; ++i) { std::unique_ptr iterator( db_->NewIterator(ReadOptions(), handles_[i])); int count = 0; for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { ASSERT_OK(iterator->status()); ++count; } ASSERT_OK(iterator->status()); ASSERT_EQ(count, kKeysNum * ((i == 2) ? 1 : 2)); } Close(); Destroy(); } } TEST_F(ColumnFamilyTest, FlushAndDropRaceCondition) { db_options_.create_missing_column_families = true; Open({"default", "one"}); ColumnFamilyOptions options; options.level0_file_num_compaction_trigger = 100; options.level0_slowdown_writes_trigger = 200; options.level0_stop_writes_trigger = 200; options.max_write_buffer_number = 20; options.write_buffer_size = 100000; // small write buffer size Reopen({options, options}); rocksdb::SyncPoint::GetInstance()->LoadDependency( {{"VersionSet::LogAndApply::ColumnFamilyDrop:0", "FlushJob::WriteLevel0Table"}, {"VersionSet::LogAndApply::ColumnFamilyDrop:1", "FlushJob::InstallResults"}, {"FlushJob::InstallResults", "VersionSet::LogAndApply::ColumnFamilyDrop:2"}}); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); test::SleepingBackgroundTask sleeping_task; env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, Env::Priority::HIGH); // 1MB should create ~10 files for each CF int kKeysNum = 10000; PutRandomData(1, kKeysNum, 100); std::vector threads; threads.emplace_back([&] { ASSERT_OK(db_->DropColumnFamily(handles_[1])); }); sleeping_task.WakeUp(); sleeping_task.WaitUntilDone(); sleeping_task.Reset(); // now we sleep again. this is just so we're certain that flush job finished env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, Env::Priority::HIGH); sleeping_task.WakeUp(); sleeping_task.WaitUntilDone(); { // Since we didn't delete CF handle, RocksDB's contract guarantees that // we're still able to read dropped CF std::unique_ptr iterator( db_->NewIterator(ReadOptions(), handles_[1])); int count = 0; for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { ASSERT_OK(iterator->status()); ++count; } ASSERT_OK(iterator->status()); ASSERT_EQ(count, kKeysNum); } for (auto& t : threads) { t.join(); } Close(); Destroy(); } #ifndef ROCKSDB_LITE // skipped as persisting options is not supported in ROCKSDB_LITE namespace { std::atomic test_stage(0); const int kMainThreadStartPersistingOptionsFile = 1; const int kChildThreadFinishDroppingColumnFamily = 2; const int kChildThreadWaitingMainThreadPersistOptions = 3; void DropSingleColumnFamily(ColumnFamilyTest* cf_test, int cf_id, std::vector* comparators) { while (test_stage < kMainThreadStartPersistingOptionsFile) { Env::Default()->SleepForMicroseconds(100); } cf_test->DropColumnFamilies({cf_id}); if ((*comparators)[cf_id]) { delete (*comparators)[cf_id]; (*comparators)[cf_id] = nullptr; } test_stage = kChildThreadFinishDroppingColumnFamily; } } // namespace TEST_F(ColumnFamilyTest, CreateAndDropRace) { const int kCfCount = 5; std::vector cf_opts; std::vector comparators; for (int i = 0; i < kCfCount; ++i) { cf_opts.emplace_back(); comparators.push_back(new test::SimpleSuffixReverseComparator()); cf_opts.back().comparator = comparators.back(); } db_options_.create_if_missing = true; db_options_.create_missing_column_families = true; auto main_thread_id = std::this_thread::get_id(); rocksdb::SyncPoint::GetInstance()->SetCallBack("PersistRocksDBOptions:start", [&](void* arg) { auto current_thread_id = std::this_thread::get_id(); // If it's the main thread hitting this sync-point, then it // will be blocked until some other thread update the test_stage. if (main_thread_id == current_thread_id) { test_stage = kMainThreadStartPersistingOptionsFile; while (test_stage < kChildThreadFinishDroppingColumnFamily) { Env::Default()->SleepForMicroseconds(100); } } }); rocksdb::SyncPoint::GetInstance()->SetCallBack( "WriteThread::EnterUnbatched:Wait", [&](void* arg) { // This means a thread doing DropColumnFamily() is waiting for // other thread to finish persisting options. // In such case, we update the test_stage to unblock the main thread. test_stage = kChildThreadWaitingMainThreadPersistOptions; // Note that based on the test setting, this must not be the // main thread. ASSERT_NE(main_thread_id, std::this_thread::get_id()); }); // Create a database with four column families Open({"default", "one", "two", "three"}, {cf_opts[0], cf_opts[1], cf_opts[2], cf_opts[3]}); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); // Start a thread that will drop the first column family // and its comparator rocksdb::port::Thread drop_cf_thread(DropSingleColumnFamily, this, 1, &comparators); DropColumnFamilies({2}); drop_cf_thread.join(); Close(); Destroy(); for (auto* comparator : comparators) { if (comparator) { delete comparator; } } } #endif // !ROCKSDB_LITE TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) { const uint64_t kBaseRate = 800000u; db_options_.delayed_write_rate = kBaseRate; db_options_.max_background_compactions = 6; Open({"default"}); ColumnFamilyData* cfd = static_cast(db_->DefaultColumnFamily())->cfd(); VersionStorageInfo* vstorage = cfd->current()->storage_info(); MutableCFOptions mutable_cf_options(column_family_options_); mutable_cf_options.level0_slowdown_writes_trigger = 20; mutable_cf_options.level0_stop_writes_trigger = 10000; mutable_cf_options.soft_pending_compaction_bytes_limit = 200; mutable_cf_options.hard_pending_compaction_bytes_limit = 2000; mutable_cf_options.disable_auto_compactions = false; vstorage->TEST_set_estimated_compaction_needed_bytes(50); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); vstorage->TEST_set_estimated_compaction_needed_bytes(201); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate()); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); vstorage->TEST_set_estimated_compaction_needed_bytes(400); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate()); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); vstorage->TEST_set_estimated_compaction_needed_bytes(500); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate / 1.25 / 1.25, GetDbDelayedWriteRate()); vstorage->TEST_set_estimated_compaction_needed_bytes(450); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate()); vstorage->TEST_set_estimated_compaction_needed_bytes(205); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate()); vstorage->TEST_set_estimated_compaction_needed_bytes(202); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate()); vstorage->TEST_set_estimated_compaction_needed_bytes(201); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate()); vstorage->TEST_set_estimated_compaction_needed_bytes(198); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); vstorage->TEST_set_estimated_compaction_needed_bytes(399); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate()); vstorage->TEST_set_estimated_compaction_needed_bytes(599); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate()); vstorage->TEST_set_estimated_compaction_needed_bytes(2001); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(IsDbWriteStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); vstorage->TEST_set_estimated_compaction_needed_bytes(3001); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(IsDbWriteStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); vstorage->TEST_set_estimated_compaction_needed_bytes(390); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate()); vstorage->TEST_set_estimated_compaction_needed_bytes(100); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); vstorage->set_l0_delay_trigger_count(100); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate()); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); vstorage->set_l0_delay_trigger_count(101); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate()); vstorage->set_l0_delay_trigger_count(0); vstorage->TEST_set_estimated_compaction_needed_bytes(300); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate / 1.25 / 1.25, GetDbDelayedWriteRate()); vstorage->set_l0_delay_trigger_count(101); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate / 1.25 / 1.25 / 1.25, GetDbDelayedWriteRate()); vstorage->TEST_set_estimated_compaction_needed_bytes(200); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate / 1.25 / 1.25, GetDbDelayedWriteRate()); vstorage->set_l0_delay_trigger_count(0); vstorage->TEST_set_estimated_compaction_needed_bytes(0); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); mutable_cf_options.disable_auto_compactions = true; dbfull()->TEST_write_controler().set_delayed_write_rate(kBaseRate); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); vstorage->set_l0_delay_trigger_count(50); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(0, GetDbDelayedWriteRate()); ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); vstorage->set_l0_delay_trigger_count(60); vstorage->TEST_set_estimated_compaction_needed_bytes(300); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(0, GetDbDelayedWriteRate()); ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); mutable_cf_options.disable_auto_compactions = false; vstorage->set_l0_delay_trigger_count(70); vstorage->TEST_set_estimated_compaction_needed_bytes(500); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate()); vstorage->set_l0_delay_trigger_count(71); vstorage->TEST_set_estimated_compaction_needed_bytes(501); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate()); } TEST_F(ColumnFamilyTest, CompactionSpeedupSingleColumnFamily) { db_options_.max_background_compactions = 6; Open({"default"}); ColumnFamilyData* cfd = static_cast(db_->DefaultColumnFamily())->cfd(); VersionStorageInfo* vstorage = cfd->current()->storage_info(); MutableCFOptions mutable_cf_options(column_family_options_); // Speed up threshold = min(4 * 2, 4 + (36 - 4)/4) = 8 mutable_cf_options.level0_file_num_compaction_trigger = 4; mutable_cf_options.level0_slowdown_writes_trigger = 36; mutable_cf_options.level0_stop_writes_trigger = 50; // Speedup threshold = 200 / 4 = 50 mutable_cf_options.soft_pending_compaction_bytes_limit = 200; mutable_cf_options.hard_pending_compaction_bytes_limit = 2000; vstorage->TEST_set_estimated_compaction_needed_bytes(40); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed()); vstorage->TEST_set_estimated_compaction_needed_bytes(50); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); vstorage->TEST_set_estimated_compaction_needed_bytes(300); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); vstorage->TEST_set_estimated_compaction_needed_bytes(45); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed()); vstorage->set_l0_delay_trigger_count(7); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed()); vstorage->set_l0_delay_trigger_count(9); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); vstorage->set_l0_delay_trigger_count(6); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed()); // Speed up threshold = min(4 * 2, 4 + (12 - 4)/4) = 6 mutable_cf_options.level0_file_num_compaction_trigger = 4; mutable_cf_options.level0_slowdown_writes_trigger = 16; mutable_cf_options.level0_stop_writes_trigger = 30; vstorage->set_l0_delay_trigger_count(5); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed()); vstorage->set_l0_delay_trigger_count(7); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); vstorage->set_l0_delay_trigger_count(3); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed()); } TEST_F(ColumnFamilyTest, WriteStallTwoColumnFamilies) { const uint64_t kBaseRate = 810000u; db_options_.delayed_write_rate = kBaseRate; Open(); CreateColumnFamilies({"one"}); ColumnFamilyData* cfd = static_cast(db_->DefaultColumnFamily())->cfd(); VersionStorageInfo* vstorage = cfd->current()->storage_info(); ColumnFamilyData* cfd1 = static_cast(handles_[1])->cfd(); VersionStorageInfo* vstorage1 = cfd1->current()->storage_info(); MutableCFOptions mutable_cf_options(column_family_options_); mutable_cf_options.level0_slowdown_writes_trigger = 20; mutable_cf_options.level0_stop_writes_trigger = 10000; mutable_cf_options.soft_pending_compaction_bytes_limit = 200; mutable_cf_options.hard_pending_compaction_bytes_limit = 2000; MutableCFOptions mutable_cf_options1 = mutable_cf_options; mutable_cf_options1.soft_pending_compaction_bytes_limit = 500; vstorage->TEST_set_estimated_compaction_needed_bytes(50); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); vstorage1->TEST_set_estimated_compaction_needed_bytes(201); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); vstorage1->TEST_set_estimated_compaction_needed_bytes(600); cfd1->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate()); vstorage->TEST_set_estimated_compaction_needed_bytes(70); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate()); vstorage1->TEST_set_estimated_compaction_needed_bytes(800); cfd1->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate()); vstorage->TEST_set_estimated_compaction_needed_bytes(300); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate / 1.25 / 1.25, GetDbDelayedWriteRate()); vstorage1->TEST_set_estimated_compaction_needed_bytes(700); cfd1->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate()); vstorage->TEST_set_estimated_compaction_needed_bytes(500); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate / 1.25 / 1.25, GetDbDelayedWriteRate()); vstorage1->TEST_set_estimated_compaction_needed_bytes(600); cfd1->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate()); } TEST_F(ColumnFamilyTest, CompactionSpeedupTwoColumnFamilies) { db_options_.max_background_compactions = 6; column_family_options_.soft_pending_compaction_bytes_limit = 200; column_family_options_.hard_pending_compaction_bytes_limit = 2000; Open(); CreateColumnFamilies({"one"}); ColumnFamilyData* cfd = static_cast(db_->DefaultColumnFamily())->cfd(); VersionStorageInfo* vstorage = cfd->current()->storage_info(); ColumnFamilyData* cfd1 = static_cast(handles_[1])->cfd(); VersionStorageInfo* vstorage1 = cfd1->current()->storage_info(); MutableCFOptions mutable_cf_options(column_family_options_); // Speed up threshold = min(4 * 2, 4 + (36 - 4)/4) = 8 mutable_cf_options.level0_file_num_compaction_trigger = 4; mutable_cf_options.level0_slowdown_writes_trigger = 36; mutable_cf_options.level0_stop_writes_trigger = 30; // Speedup threshold = 200 / 4 = 50 mutable_cf_options.soft_pending_compaction_bytes_limit = 200; mutable_cf_options.hard_pending_compaction_bytes_limit = 2000; MutableCFOptions mutable_cf_options1 = mutable_cf_options; mutable_cf_options1.level0_slowdown_writes_trigger = 16; vstorage->TEST_set_estimated_compaction_needed_bytes(40); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed()); vstorage->TEST_set_estimated_compaction_needed_bytes(60); cfd1->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed()); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); vstorage1->TEST_set_estimated_compaction_needed_bytes(30); cfd1->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); vstorage1->TEST_set_estimated_compaction_needed_bytes(70); cfd1->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); vstorage->TEST_set_estimated_compaction_needed_bytes(20); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); vstorage1->TEST_set_estimated_compaction_needed_bytes(3); cfd1->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed()); vstorage->set_l0_delay_trigger_count(9); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); vstorage1->set_l0_delay_trigger_count(2); cfd1->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); vstorage->set_l0_delay_trigger_count(0); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed()); } #ifndef ROCKSDB_LITE TEST_F(ColumnFamilyTest, FlushCloseWALFiles) { SpecialEnv env(Env::Default()); db_options_.env = &env; db_options_.max_background_flushes = 1; column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(2)); Open(); CreateColumnFamilies({"one"}); ASSERT_OK(Put(1, "fodor", "mirko")); ASSERT_OK(Put(0, "fodor", "mirko")); ASSERT_OK(Put(1, "fodor", "mirko")); rocksdb::SyncPoint::GetInstance()->LoadDependency({ {"DBImpl::BGWorkFlush:done", "FlushCloseWALFiles:0"}, }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); // Block flush jobs from running test::SleepingBackgroundTask sleeping_task; env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, Env::Priority::HIGH); WriteOptions wo; wo.sync = true; ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko")); ASSERT_EQ(2, env.num_open_wal_file_.load()); sleeping_task.WakeUp(); sleeping_task.WaitUntilDone(); TEST_SYNC_POINT("FlushCloseWALFiles:0"); rocksdb::SyncPoint::GetInstance()->DisableProcessing(); ASSERT_EQ(1, env.num_open_wal_file_.load()); Reopen(); ASSERT_EQ("mirko", Get(0, "fodor")); ASSERT_EQ("mirko", Get(1, "fodor")); db_options_.env = env_; Close(); } #endif // !ROCKSDB_LITE #ifndef ROCKSDB_LITE // WaitForFlush() is not supported TEST_F(ColumnFamilyTest, IteratorCloseWALFile1) { SpecialEnv env(Env::Default()); db_options_.env = &env; db_options_.max_background_flushes = 1; column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(2)); Open(); CreateColumnFamilies({"one"}); ASSERT_OK(Put(1, "fodor", "mirko")); // Create an iterator holding the current super version. Iterator* it = db_->NewIterator(ReadOptions(), handles_[1]); // A flush will make `it` hold the last reference of its super version. Flush(1); ASSERT_OK(Put(1, "fodor", "mirko")); ASSERT_OK(Put(0, "fodor", "mirko")); ASSERT_OK(Put(1, "fodor", "mirko")); // Flush jobs will close previous WAL files after finishing. By // block flush jobs from running, we trigger a condition where // the iterator destructor should close the WAL files. test::SleepingBackgroundTask sleeping_task; env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, Env::Priority::HIGH); WriteOptions wo; wo.sync = true; ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko")); ASSERT_EQ(2, env.num_open_wal_file_.load()); // Deleting the iterator will clear its super version, triggering // closing all files delete it; ASSERT_EQ(1, env.num_open_wal_file_.load()); sleeping_task.WakeUp(); sleeping_task.WaitUntilDone(); WaitForFlush(1); Reopen(); ASSERT_EQ("mirko", Get(0, "fodor")); ASSERT_EQ("mirko", Get(1, "fodor")); db_options_.env = env_; Close(); } TEST_F(ColumnFamilyTest, IteratorCloseWALFile2) { SpecialEnv env(Env::Default()); // Allow both of flush and purge job to schedule. env.SetBackgroundThreads(2, Env::HIGH); db_options_.env = &env; db_options_.max_background_flushes = 1; column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(2)); Open(); CreateColumnFamilies({"one"}); ASSERT_OK(Put(1, "fodor", "mirko")); // Create an iterator holding the current super version. ReadOptions ro; ro.background_purge_on_iterator_cleanup = true; Iterator* it = db_->NewIterator(ro, handles_[1]); // A flush will make `it` hold the last reference of its super version. Flush(1); ASSERT_OK(Put(1, "fodor", "mirko")); ASSERT_OK(Put(0, "fodor", "mirko")); ASSERT_OK(Put(1, "fodor", "mirko")); rocksdb::SyncPoint::GetInstance()->LoadDependency({ {"ColumnFamilyTest::IteratorCloseWALFile2:0", "DBImpl::BGWorkPurge:start"}, {"ColumnFamilyTest::IteratorCloseWALFile2:2", "DBImpl::BackgroundCallFlush:start"}, {"DBImpl::BGWorkPurge:end", "ColumnFamilyTest::IteratorCloseWALFile2:1"}, }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); WriteOptions wo; wo.sync = true; ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko")); ASSERT_EQ(2, env.num_open_wal_file_.load()); // Deleting the iterator will clear its super version, triggering // closing all files delete it; ASSERT_EQ(2, env.num_open_wal_file_.load()); TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:0"); TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:1"); ASSERT_EQ(1, env.num_open_wal_file_.load()); TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:2"); WaitForFlush(1); ASSERT_EQ(1, env.num_open_wal_file_.load()); rocksdb::SyncPoint::GetInstance()->DisableProcessing(); Reopen(); ASSERT_EQ("mirko", Get(0, "fodor")); ASSERT_EQ("mirko", Get(1, "fodor")); db_options_.env = env_; Close(); } #endif // !ROCKSDB_LITE #ifndef ROCKSDB_LITE // TEST functions are not supported in lite TEST_F(ColumnFamilyTest, ForwardIteratorCloseWALFile) { SpecialEnv env(Env::Default()); // Allow both of flush and purge job to schedule. env.SetBackgroundThreads(2, Env::HIGH); db_options_.env = &env; db_options_.max_background_flushes = 1; column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(3)); column_family_options_.level0_file_num_compaction_trigger = 2; Open(); CreateColumnFamilies({"one"}); ASSERT_OK(Put(1, "fodor", "mirko")); ASSERT_OK(Put(1, "fodar2", "mirko")); Flush(1); // Create an iterator holding the current super version, as well as // the SST file just flushed. ReadOptions ro; ro.tailing = true; ro.background_purge_on_iterator_cleanup = true; Iterator* it = db_->NewIterator(ro, handles_[1]); // A flush will make `it` hold the last reference of its super version. ASSERT_OK(Put(1, "fodor", "mirko")); ASSERT_OK(Put(1, "fodar2", "mirko")); Flush(1); WaitForCompaction(); ASSERT_OK(Put(1, "fodor", "mirko")); ASSERT_OK(Put(1, "fodor", "mirko")); ASSERT_OK(Put(0, "fodor", "mirko")); ASSERT_OK(Put(1, "fodor", "mirko")); rocksdb::SyncPoint::GetInstance()->LoadDependency({ {"ColumnFamilyTest::IteratorCloseWALFile2:0", "DBImpl::BGWorkPurge:start"}, {"ColumnFamilyTest::IteratorCloseWALFile2:2", "DBImpl::BackgroundCallFlush:start"}, {"DBImpl::BGWorkPurge:end", "ColumnFamilyTest::IteratorCloseWALFile2:1"}, }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); WriteOptions wo; wo.sync = true; ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko")); env.delete_count_.store(0); ASSERT_EQ(2, env.num_open_wal_file_.load()); // Deleting the iterator will clear its super version, triggering // closing all files it->Seek(""); ASSERT_EQ(2, env.num_open_wal_file_.load()); ASSERT_EQ(0, env.delete_count_.load()); TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:0"); TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:1"); ASSERT_EQ(1, env.num_open_wal_file_.load()); ASSERT_EQ(1, env.delete_count_.load()); TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:2"); WaitForFlush(1); ASSERT_EQ(1, env.num_open_wal_file_.load()); ASSERT_EQ(1, env.delete_count_.load()); delete it; rocksdb::SyncPoint::GetInstance()->DisableProcessing(); Reopen(); ASSERT_EQ("mirko", Get(0, "fodor")); ASSERT_EQ("mirko", Get(1, "fodor")); db_options_.env = env_; Close(); } #endif // !ROCKSDB_LITE // Disable on windows because SyncWAL requires env->IsSyncThreadSafe() // to return true which is not so in unbuffered mode. #ifndef OS_WIN TEST_F(ColumnFamilyTest, LogSyncConflictFlush) { Open(); CreateColumnFamiliesAndReopen({"one", "two"}); Put(0, "", ""); Put(1, "foo", "bar"); rocksdb::SyncPoint::GetInstance()->LoadDependency( {{"DBImpl::SyncWAL:BeforeMarkLogsSynced:1", "ColumnFamilyTest::LogSyncConflictFlush:1"}, {"ColumnFamilyTest::LogSyncConflictFlush:2", "DBImpl::SyncWAL:BeforeMarkLogsSynced:2"}}); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::port::Thread thread([&] { db_->SyncWAL(); }); TEST_SYNC_POINT("ColumnFamilyTest::LogSyncConflictFlush:1"); Flush(1); Put(1, "foo", "bar"); Flush(1); TEST_SYNC_POINT("ColumnFamilyTest::LogSyncConflictFlush:2"); thread.join(); rocksdb::SyncPoint::GetInstance()->DisableProcessing(); Close(); } #endif // this test is placed here, because the infrastructure for Column Family // test is being used to ensure a roll of wal files. // Basic idea is to test that WAL truncation is being detected and not // ignored TEST_F(ColumnFamilyTest, DISABLED_LogTruncationTest) { Open(); CreateColumnFamiliesAndReopen({"one", "two"}); Build(0, 100); // Flush the 0th column family to force a roll of the wal log Flush(0); // Add some more entries Build(100, 100); std::vector filenames; ASSERT_OK(env_->GetChildren(dbname_, &filenames)); // collect wal files std::vector logfs; for (size_t i = 0; i < filenames.size(); i++) { uint64_t number; FileType type; if (!(ParseFileName(filenames[i], &number, &type))) continue; if (type != kLogFile) continue; logfs.push_back(filenames[i]); } std::sort(logfs.begin(), logfs.end()); ASSERT_GE(logfs.size(), 2); // Take the last but one file, and truncate it std::string fpath = dbname_ + "/" + logfs[logfs.size() - 2]; std::vector names_save = names_; uint64_t fsize; ASSERT_OK(env_->GetFileSize(fpath, &fsize)); ASSERT_GT(fsize, 0); Close(); std::string backup_logs = dbname_ + "/backup_logs"; std::string t_fpath = backup_logs + "/" + logfs[logfs.size() - 2]; ASSERT_OK(env_->CreateDirIfMissing(backup_logs)); // Not sure how easy it is to make this data driven. // need to read back the WAL file and truncate last 10 // entries CopyFile(fpath, t_fpath, fsize - 9180); ASSERT_OK(env_->DeleteFile(fpath)); ASSERT_OK(env_->RenameFile(t_fpath, fpath)); db_options_.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; OpenReadOnly(names_save); CheckMissed(); Close(); Open(names_save); CheckMissed(); Close(); // cleanup env_->DeleteDir(backup_logs); } } // namespace rocksdb int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }