diff --git a/HISTORY.md b/HISTORY.md index 89ff62f5e..c67620383 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,6 +5,10 @@ * Added experimental support for optimistic transactions. See include/rocksdb/utilities/optimistic_transaction.h for more info. * Added a new way to report QPS from db_bench (check out --report_file and --report_interval_seconds) * Added a cache for individual rows. See DBOptions::row_cache for more info. +* Several new features on EventListener (see include/rocksdb/listener.h): + - OnCompationCompleted() now returns per-compaciton job statistics, defined in include/rocksdb/compaction_job_stats.h. + - Added OnTableFileCreated() and OnTableFileDeleted(). +* Add compaction_options_universal.enable_trivial_move to true, to allow trivial move while performing universal compaction. Trivial move will happen only when all the input files are non overlapping. ### Public API changes * EventListener::OnFlushCompleted() now passes FlushJobInfo instead of a list of parameters. diff --git a/db/column_family.cc b/db/column_family.cc index 4a600f611..b39518e08 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -59,6 +59,7 @@ ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() { if (job_context.HaveSomethingToDelete()) { db_->PurgeObsoleteFiles(job_context); } + job_context.Clean(); } } diff --git a/db/compaction.cc b/db/compaction.cc index 192cbfb49..93a4658cc 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -167,8 +167,14 @@ bool Compaction::IsTrivialMove() const { return false; } + // Used in universal compaction, where trivial move can be done if the + // input files are non overlapping + if (cfd_->ioptions()->compaction_options_universal.allow_trivial_move) { + return is_trivial_move_; + } + return (start_level_ != output_level_ && num_input_levels() == 1 && - input(0, 0)->fd.GetPathId() == GetOutputPathId() && + input(0, 0)->fd.GetPathId() == output_path_id() && InputCompressionMatchesOutput() && TotalFileSize(grandparents_) <= max_grandparent_overlap_bytes_); } diff --git a/db/compaction.h b/db/compaction.h index beddf2363..64cd3565a 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -109,20 +109,20 @@ class Compaction { } // Maximum size of files to build during this compaction. - uint64_t MaxOutputFileSize() const { return max_output_file_size_; } + uint64_t max_output_file_size() const { return max_output_file_size_; } // What compression for output - CompressionType OutputCompressionType() const { return output_compression_; } + CompressionType output_compression() const { return output_compression_; } // Whether need to write output file to second DB path. - uint32_t GetOutputPathId() const { return output_path_id_; } + uint32_t output_path_id() const { return output_path_id_; } // Is this a trivial compaction that can be implemented by just // moving a single input file to the next level (no merging or splitting) bool IsTrivialMove() const; // If true, then the compaction can be done by simply deleting input files. - bool IsDeletionCompaction() const { + bool deletion_compaction() const { return deletion_compaction_; } @@ -150,13 +150,26 @@ class Compaction { double score() const { return score_; } // Is this compaction creating a file in the bottom most level? - bool BottomMostLevel() { return bottommost_level_; } + bool bottommost_level() { return bottommost_level_; } // Does this compaction include all sst files? - bool IsFullCompaction() { return is_full_compaction_; } + bool is_full_compaction() { return is_full_compaction_; } // Was this compaction triggered manually by the client? - bool IsManualCompaction() { return is_manual_compaction_; } + bool is_manual_compaction() { return is_manual_compaction_; } + + // Used when allow_trivial_move option is set in + // Universal compaction. If all the input files are + // non overlapping, then is_trivial_move_ variable + // will be set true, else false + void set_is_trivial_move(bool trivial_move) { + is_trivial_move_ = trivial_move; + } + + // Used when allow_trivial_move option is set in + // Universal compaction. Returns true, if the input files + // are non-overlapping and can be trivially moved. + bool is_trivial_move() { return is_trivial_move_; } // Return the MutableCFOptions that should be used throughout the compaction // procedure @@ -238,6 +251,11 @@ class Compaction { // Is this compaction requested by the client? const bool is_manual_compaction_; + // True if we can do trivial move in Universal multi level + // compaction + + bool is_trivial_move_; + // "level_ptrs_" holds indices into "input_version_->levels_", where each // index remembers which file of an associated level we are currently used // to check KeyNotExistsBeyondOutputLevel() for deletion operation. diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 9a4b7bc91..ac07851f2 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -238,12 +238,12 @@ void CompactionJob::ReportStartedCompaction( // In the current design, a CompactionJob is always created // for non-trivial compaction. assert(compaction->IsTrivialMove() == false || - compaction->IsManualCompaction() == true); + compaction->is_manual_compaction() == true); ThreadStatusUtil::SetThreadOperationProperty( ThreadStatus::COMPACTION_PROP_FLAGS, - compaction->IsManualCompaction() + - (compaction->IsDeletionCompaction() << 1)); + compaction->is_manual_compaction() + + (compaction->deletion_compaction() << 1)); ThreadStatusUtil::SetThreadOperationProperty( ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES, @@ -263,7 +263,7 @@ void CompactionJob::ReportStartedCompaction( if (compaction_job_stats_) { compaction_job_stats_->is_manual_compaction = - compaction->IsManualCompaction(); + compaction->is_manual_compaction(); } } @@ -298,7 +298,7 @@ void CompactionJob::Prepare() { } // Is this compaction producing files at the bottommost level? - bottommost_level_ = compact_->compaction->BottomMostLevel(); + bottommost_level_ = compact_->compaction->bottommost_level(); } Status CompactionJob::Run() { @@ -864,7 +864,7 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, // Close output file if it is big enough if (compact_->builder->FileSize() >= - compact_->compaction->MaxOutputFileSize()) { + compact_->compaction->max_output_file_size()) { status = FinishCompactionOutputFile(input); if (!status.ok()) { break; @@ -1160,7 +1160,7 @@ Status CompactionJob::OpenCompactionOutputFile() { uint64_t file_number = versions_->NewFileNumber(); // Make the output file std::string fname = TableFileName(db_options_.db_paths, file_number, - compact_->compaction->GetOutputPathId()); + compact_->compaction->output_path_id()); Status s = env_->NewWritableFile(fname, &compact_->outfile, env_options_); if (!s.ok()) { @@ -1174,7 +1174,7 @@ Status CompactionJob::OpenCompactionOutputFile() { } CompactionState::Output out; out.number = file_number; - out.path_id = compact_->compaction->GetOutputPathId(); + out.path_id = compact_->compaction->output_path_id(); out.smallest.Clear(); out.largest.Clear(); out.smallest_seqno = out.largest_seqno = 0; @@ -1198,7 +1198,7 @@ Status CompactionJob::OpenCompactionOutputFile() { compact_->builder.reset(NewTableBuilder( *cfd->ioptions(), cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), compact_->outfile.get(), - compact_->compaction->OutputCompressionType(), + compact_->compaction->output_compression(), cfd->ioptions()->compression_opts, skip_filters)); LogFlush(db_options_.info_log); return s; diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index c901c704d..63507d94a 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -76,11 +76,10 @@ class CompactionJobTest : public testing::Test { largest = internal_key; largest_seqno = sequence_number; } - std::pair key_value( - {bottommost_internal_key.Encode().ToString(), value}); - contents.insert(key_value); + contents.insert({internal_key.Encode().ToString(), value}); if (i == 1 || k < kKeysPerFile / 2) { - expected_results.insert(key_value); + expected_results.insert( + {bottommost_internal_key.Encode().ToString(), value}); } } @@ -97,7 +96,7 @@ class CompactionJobTest : public testing::Test { mutable_cf_options_, &edit, &mutex_); mutex_.Unlock(); } - versions_->SetLastSequence(sequence_number); + versions_->SetLastSequence(sequence_number + 1); return expected_results; } @@ -169,9 +168,8 @@ void VerifyInitializationOfCompactionJobStats( void VerifyCompactionJobStats( const CompactionJobStats& compaction_job_stats, const std::vector& files, - size_t num_output_files, - uint64_t min_elapsed_time) { - ASSERT_GE(compaction_job_stats.elapsed_micros, min_elapsed_time); + size_t num_output_files) { + ASSERT_GE(compaction_job_stats.elapsed_micros, 0U); ASSERT_EQ(compaction_job_stats.num_input_files, files.size()); ASSERT_EQ(compaction_job_stats.num_output_files, num_output_files); } @@ -210,7 +208,6 @@ TEST_F(CompactionJobTest, Simple) { std::move(yield_callback), &event_logger, false, db_name, &compaction_job_stats); - auto start_micros = Env::Default()->NowMicros(); VerifyInitializationOfCompactionJobStats(compaction_job_stats); compaction_job.Prepare(); @@ -224,7 +221,7 @@ TEST_F(CompactionJobTest, Simple) { VerifyCompactionJobStats( compaction_job_stats, - files, 1, (Env::Default()->NowMicros() - start_micros) / 2); + files, 1); mock_table_factory_->AssertLatestFile(expected_results); ASSERT_EQ(yield_callback_called, 20000); diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index effaff444..8d271d1d0 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -15,6 +15,7 @@ #include #include +#include #include #include @@ -37,6 +38,64 @@ uint64_t TotalCompensatedFileSize(const std::vector& files) { return sum; } +// Used in universal compaction when trivial move is enabled. +// This structure is used for the construction of min heap +// that contains the file meta data, the level of the file +// and the index of the file in that level + +struct InputFileInfo { + FileMetaData* f; + size_t level; + size_t index; +}; + +// Used in universal compaction when trivial move is enabled. +// This comparator is used for the construction of min heap +// based on the smallest key of the file. +struct UserKeyComparator { + explicit UserKeyComparator(const Comparator* ucmp) { ucmp_ = ucmp; } + + bool operator()(InputFileInfo i1, InputFileInfo i2) const { + return (ucmp_->Compare(i1.f->smallest.user_key(), + i2.f->smallest.user_key()) > 0); + } + + private: + const Comparator* ucmp_; +}; + +typedef std::priority_queue, + UserKeyComparator> SmallestKeyHeap; + +// This function creates the heap that is used to find if the files are +// overlapping during universal compaction when the allow_trivial_move +// is set. +SmallestKeyHeap create_level_heap(Compaction* c, const Comparator* ucmp) { + SmallestKeyHeap smallest_key_priority_q = + SmallestKeyHeap(UserKeyComparator(ucmp)); + + InputFileInfo input_file; + + for (size_t l = 0; l < c->num_input_levels(); l++) { + if (c->num_input_files(l) != 0) { + if (l == 0 && c->start_level() == 0) { + for (size_t i = 0; i < c->num_input_files(0); i++) { + input_file.f = c->input(0, i); + input_file.level = 0; + input_file.index = i; + smallest_key_priority_q.push(std::move(input_file)); + } + } else { + input_file.f = c->input(l, 0); + input_file.level = l; + input_file.index = 0; + smallest_key_priority_q.push(std::move(input_file)); + } + } + } + return smallest_key_priority_q; +} + } // anonymous namespace // Determine compression type, based on user options, level of the output @@ -1106,6 +1165,50 @@ void GetSmallestLargestSeqno(const std::vector& files, } // namespace #endif +// Algorithm that checks to see if there are any overlapping +// files in the input +bool CompactionPicker::IsInputNonOverlapping(Compaction* c) { + auto comparator = icmp_->user_comparator(); + int first_iter = 1; + + InputFileInfo prev, curr, next; + + SmallestKeyHeap smallest_key_priority_q = + create_level_heap(c, icmp_->user_comparator()); + + while (!smallest_key_priority_q.empty()) { + curr = smallest_key_priority_q.top(); + smallest_key_priority_q.pop(); + + if (first_iter) { + prev = curr; + first_iter = 0; + } else { + if (comparator->Compare(prev.f->largest.user_key(), + curr.f->smallest.user_key()) >= 0) { + // found overlapping files, return false + return false; + } + assert(comparator->Compare(curr.f->largest.user_key(), + prev.f->largest.user_key()) > 0); + prev = curr; + } + + next.f = nullptr; + + if (curr.level != 0 && curr.index < c->num_input_files(curr.level) - 1) { + next.f = c->input(curr.level, curr.index + 1); + next.level = curr.level; + next.index = curr.index + 1; + } + + if (next.f) { + smallest_key_priority_q.push(std::move(next)); + } + } + return true; +} + // Universal style of compaction. Pick files that are contiguous in // time-range to compact. // @@ -1168,6 +1271,10 @@ Compaction* UniversalCompactionPicker::PickCompaction( return nullptr; } + if (ioptions_.compaction_options_universal.allow_trivial_move == true) { + c->set_is_trivial_move(IsInputNonOverlapping(c)); + } + // validate that all the chosen files of L0 are non overlapping in time #ifndef NDEBUG SequenceNumber prev_smallest_seqno = 0U; diff --git a/db/compaction_picker.h b/db/compaction_picker.h index 403410196..65ca73abf 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -105,6 +105,12 @@ class CompactionPicker { const VersionStorageInfo* vstorage, const CompactionOptions& compact_options) const; + // Used in universal compaction when the enabled_trivial_move + // option is set. Checks whether there are any overlapping files + // in the input. Returns true if the input files are non + // overlapping. + bool IsInputNonOverlapping(Compaction* c); + protected: int NumberLevels() const { return ioptions_.num_levels; } diff --git a/db/compaction_picker_test.cc b/db/compaction_picker_test.cc index 149931cce..e6b31fbfa 100644 --- a/db/compaction_picker_test.cc +++ b/db/compaction_picker_test.cc @@ -77,6 +77,8 @@ class CompactionPickerTest : public testing::Test { f->fd = FileDescriptor(file_number, path_id, file_size); f->smallest = InternalKey(smallest, smallest_seq, kTypeValue); f->largest = InternalKey(largest, largest_seq, kTypeValue); + f->smallest_seqno = smallest_seq; + f->largest_seqno = largest_seq; f->compensated_file_size = file_size; f->refs = 0; vstorage_->AddFile(level, f); @@ -365,6 +367,64 @@ TEST_F(CompactionPickerTest, NeedsCompactionUniversal) { vstorage_->CompactionScore(0) >= 1); } } +// Tests if the files can be trivially moved in multi level +// universal compaction when allow_trivial_move option is set +// In this test as the input files overlaps, they cannot +// be trivially moved. + +TEST_F(CompactionPickerTest, CannotTrivialMoveUniversal) { + const uint64_t kFileSize = 100000; + + ioptions_.compaction_options_universal.allow_trivial_move = true; + NewVersionStorage(1, kCompactionStyleUniversal); + UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_); + // must return false when there's no files. + ASSERT_EQ(universal_compaction_picker.NeedsCompaction(vstorage_.get()), + false); + + NewVersionStorage(3, kCompactionStyleUniversal); + + Add(0, 1U, "150", "200", kFileSize, 0, 500, 550); + Add(0, 2U, "201", "250", kFileSize, 0, 401, 450); + Add(0, 4U, "260", "300", kFileSize, 0, 260, 300); + Add(1, 5U, "100", "151", kFileSize, 0, 200, 251); + Add(1, 3U, "301", "350", kFileSize, 0, 101, 150); + Add(2, 6U, "120", "200", kFileSize, 0, 20, 100); + + UpdateVersionStorageInfo(); + + std::unique_ptr compaction( + universal_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_)); + + ASSERT_TRUE(!compaction->is_trivial_move()); +} +// Tests if the files can be trivially moved in multi level +// universal compaction when allow_trivial_move option is set +// In this test as the input files doesn't overlaps, they should +// be trivially moved. +TEST_F(CompactionPickerTest, AllowsTrivialMoveUniversal) { + const uint64_t kFileSize = 100000; + + ioptions_.compaction_options_universal.allow_trivial_move = true; + UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_); + + NewVersionStorage(3, kCompactionStyleUniversal); + + Add(0, 1U, "150", "200", kFileSize, 0, 500, 550); + Add(0, 2U, "201", "250", kFileSize, 0, 401, 450); + Add(0, 4U, "260", "300", kFileSize, 0, 260, 300); + Add(1, 5U, "010", "080", kFileSize, 0, 200, 251); + Add(2, 3U, "301", "350", kFileSize, 0, 101, 150); + + UpdateVersionStorageInfo(); + + std::unique_ptr compaction( + universal_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_)); + + ASSERT_TRUE(compaction->is_trivial_move()); +} TEST_F(CompactionPickerTest, NeedsCompactionFIFO) { NewVersionStorage(1, kCompactionStyleFIFO); @@ -419,6 +479,76 @@ TEST_F(CompactionPickerTest, ParentIndexResetBug) { cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_)); } +// This test checks ExpandWhileOverlapping() by having overlapping user keys +// ranges (with different sequence numbers) in the input files. +TEST_F(CompactionPickerTest, OverlappingUserKeys) { + NewVersionStorage(6, kCompactionStyleLevel); + Add(1, 1U, "100", "150", 1U); + // Overlapping user keys + Add(1, 2U, "200", "400", 1U); + Add(1, 3U, "400", "500", 1000000000U, 0, 0); + Add(2, 4U, "600", "700", 1U); + UpdateVersionStorageInfo(); + + std::unique_ptr compaction(level_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_)); + ASSERT_TRUE(compaction.get() != nullptr); + ASSERT_EQ(1U, compaction->num_input_levels()); + ASSERT_EQ(2U, compaction->num_input_files(0)); + ASSERT_EQ(2U, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(3U, compaction->input(0, 1)->fd.GetNumber()); +} + +TEST_F(CompactionPickerTest, OverlappingUserKeys2) { + NewVersionStorage(6, kCompactionStyleLevel); + // Overlapping user keys on same level and output level + Add(1, 1U, "200", "400", 1000000000U); + Add(1, 2U, "400", "500", 1U, 0, 0); + Add(2, 3U, "400", "600", 1U); + // The following file is not in the compaction despite overlapping user keys + Add(2, 4U, "600", "700", 1U, 0, 0); + UpdateVersionStorageInfo(); + + std::unique_ptr compaction(level_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_)); + ASSERT_TRUE(compaction.get() != nullptr); + ASSERT_EQ(2U, compaction->num_input_levels()); + ASSERT_EQ(2U, compaction->num_input_files(0)); + ASSERT_EQ(1U, compaction->num_input_files(1)); + ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber()); + ASSERT_EQ(3U, compaction->input(1, 0)->fd.GetNumber()); +} + +TEST_F(CompactionPickerTest, OverlappingUserKeys3) { + NewVersionStorage(6, kCompactionStyleLevel); + // Chain of overlapping user key ranges (forces ExpandWhileOverlapping() to + // expand multiple times) + Add(1, 1U, "100", "150", 1U); + Add(1, 2U, "150", "200", 1U, 0, 0); + Add(1, 3U, "200", "250", 1000000000U, 0, 0); + Add(1, 4U, "250", "300", 1U, 0, 0); + Add(1, 5U, "300", "350", 1U, 0, 0); + // Output level overlaps with the beginning and the end of the chain + Add(2, 6U, "050", "100", 1U); + Add(2, 7U, "350", "400", 1U); + UpdateVersionStorageInfo(); + + std::unique_ptr compaction(level_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_)); + ASSERT_TRUE(compaction.get() != nullptr); + ASSERT_EQ(2U, compaction->num_input_levels()); + ASSERT_EQ(5U, compaction->num_input_files(0)); + ASSERT_EQ(2U, compaction->num_input_files(1)); + ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber()); + ASSERT_EQ(3U, compaction->input(0, 2)->fd.GetNumber()); + ASSERT_EQ(4U, compaction->input(0, 3)->fd.GetNumber()); + ASSERT_EQ(5U, compaction->input(0, 4)->fd.GetNumber()); + ASSERT_EQ(6U, compaction->input(1, 0)->fd.GetNumber()); + ASSERT_EQ(7U, compaction->input(1, 1)->fd.GetNumber()); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_impl.cc b/db/db_impl.cc index 260b1baef..1c7f734d8 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -527,6 +527,25 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, versions_->GetObsoleteFiles(&job_context->sst_delete_files, job_context->min_pending_output); + uint64_t min_log_number = versions_->MinLogNumber(); + if (!alive_log_files_.empty()) { + // find newly obsoleted log files + while (alive_log_files_.begin()->number < min_log_number) { + auto& earliest = *alive_log_files_.begin(); + job_context->log_delete_files.push_back(earliest.number); + total_log_size_ -= earliest.size; + alive_log_files_.pop_front(); + // Current log should always stay alive since it can't have + // number < MinLogNumber(). + assert(alive_log_files_.size()); + } + } + + // We're just cleaning up for DB::Write(). + assert(job_context->logs_to_free.empty()); + job_context->logs_to_free = logs_to_free_; + logs_to_free_.clear(); + // store the current filenum, lognum, etc job_context->manifest_file_number = versions_->manifest_file_number(); job_context->pending_manifest_file_number = @@ -1309,17 +1328,6 @@ Status DBImpl::FlushMemTableToOutputFile( VersionStorageInfo::LevelSummaryStorage tmp; LogToBuffer(log_buffer, "[%s] Level summary: %s\n", cfd->GetName().c_str(), cfd->current()->storage_info()->LevelSummary(&tmp)); - - if (disable_delete_obsolete_files_ == 0) { - // add to deletion state - while (alive_log_files_.size() && - alive_log_files_.begin()->number < versions_->MinLogNumber()) { - const auto& earliest = *alive_log_files_.begin(); - job_context->log_delete_files.push_back(earliest.number); - total_log_size_ -= earliest.size; - alive_log_files_.pop_front(); - } - } } if (!s.ok() && !s.IsShutdownInProgress() && db_options_.paranoid_checks && @@ -1609,7 +1617,7 @@ Status DBImpl::CompactFilesImpl( assert(c); c->SetInputVersion(version); // deletion compaction currently not allowed in CompactFiles. - assert(!c->IsDeletionCompaction()); + assert(!c->deletion_compaction()); auto yield_callback = [&]() { return CallFlushDuringCompaction( @@ -1620,7 +1628,7 @@ Status DBImpl::CompactFilesImpl( CompactionJob compaction_job( job_context->job_id, c.get(), db_options_, env_options_, versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(), - directories_.GetDataDir(c->GetOutputPathId()), stats_, + directories_.GetDataDir(c->output_path_id()), stats_, snapshots_.GetAll(), table_cache_, std::move(yield_callback), &event_logger_, c->mutable_cf_options()->paranoid_file_checks, dbname_, nullptr); // Here we pass a nullptr for CompactionJobStats because @@ -2145,7 +2153,9 @@ void DBImpl::RecordFlushIOStats() { void DBImpl::BGWorkFlush(void* db) { IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH); + TEST_SYNC_POINT("DBImpl::BGWorkFlush"); reinterpret_cast(db)->BackgroundCallFlush(); + TEST_SYNC_POINT("DBImpl::BGWorkFlush:done"); } void DBImpl::BGWorkCompaction(void* db) { @@ -2238,10 +2248,6 @@ void DBImpl::BackgroundCallFlush() { ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); - // We're just cleaning up for DB::Write() - job_context.logs_to_free = logs_to_free_; - logs_to_free_.clear(); - // If flush failed, we want to delete all temporary files that we might have // created. Thus, we force full scan in FindObsoleteFiles() FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress()); @@ -2308,10 +2314,6 @@ void DBImpl::BackgroundCallCompaction() { ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); - // We're just cleaning up for DB::Write() - job_context.logs_to_free = logs_to_free_; - logs_to_free_.clear(); - // If compaction failed, we want to delete all temporary files that we might // have created (they might not be all recorded in job_context in case of a // failure). Thus, we force full scan in FindObsoleteFiles() @@ -2502,7 +2504,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, if (!c) { // Nothing to do LogToBuffer(log_buffer, "Compaction nothing to do"); - } else if (c->IsDeletionCompaction()) { + } else if (c->deletion_compaction()) { // TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old // file if there is alive snapshot pointing to it assert(c->num_input_files(1) == 0); @@ -2536,21 +2538,27 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, // Move files to next level int32_t moved_files = 0; int64_t moved_bytes = 0; - for (size_t i = 0; i < c->num_input_files(0); i++) { - FileMetaData* f = c->input(0, i); - c->edit()->DeleteFile(c->level(), f->fd.GetNumber()); - c->edit()->AddFile(c->output_level(), f->fd.GetNumber(), - f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest, - f->largest, f->smallest_seqno, f->largest_seqno, - f->marked_for_compaction); + for (unsigned int l = 0; l < c->num_input_levels(); l++) { + if (l == static_cast(c->output_level())) { + continue; + } + for (size_t i = 0; i < c->num_input_files(l); i++) { + FileMetaData* f = c->input(l, i); + c->edit()->DeleteFile(c->level(), f->fd.GetNumber()); + c->edit()->AddFile(c->output_level(), f->fd.GetNumber(), + f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest, + f->largest, f->smallest_seqno, f->largest_seqno, + f->marked_for_compaction); - LogToBuffer(log_buffer, - "[%s] Moving #%" PRIu64 " to level-%d %" PRIu64 " bytes\n", - c->column_family_data()->GetName().c_str(), f->fd.GetNumber(), - c->output_level(), f->fd.GetFileSize()); - ++moved_files; - moved_bytes += f->fd.GetFileSize(); + LogToBuffer(log_buffer, + "[%s] Moving #%" PRIu64 " to level-%d %" PRIu64 " bytes\n", + c->column_family_data()->GetName().c_str(), + f->fd.GetNumber(), c->output_level(), f->fd.GetFileSize()); + ++moved_files; + moved_bytes += f->fd.GetFileSize(); + } } + status = versions_->LogAndApply(c->column_family_data(), *c->mutable_cf_options(), c->edit(), &mutex_, directories_.GetDbDir()); @@ -2589,7 +2597,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, CompactionJob compaction_job( job_context->job_id, c.get(), db_options_, env_options_, versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(), - directories_.GetDataDir(c->GetOutputPathId()), stats_, + directories_.GetDataDir(c->output_path_id()), stats_, snapshots_.GetAll(), table_cache_, std::move(yield_callback), &event_logger_, c->mutable_cf_options()->paranoid_file_checks, dbname_, &compaction_job_stats); diff --git a/db/db_impl.h b/db/db_impl.h index d06134aa9..f065a70f9 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -290,6 +290,8 @@ class DBImpl : public DB { size_t TEST_LogsToFreeSize(); + uint64_t TEST_LogfileNumber(); + #endif // ROCKSDB_LITE // Returns the list of live files in 'live' and the list diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 35703cf1a..66177ed7a 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -148,5 +148,10 @@ size_t DBImpl::TEST_LogsToFreeSize() { return logs_to_free_.size(); } +uint64_t DBImpl::TEST_LogfileNumber() { + InstrumentedMutexLock l(&mutex_); + return logfile_number_; +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/db/db_iter.cc b/db/db_iter.cc index 7ed00365e..6bee64635 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -350,9 +350,6 @@ void DBIter::MergeValuesNewToOld() { void DBIter::Prev() { assert(valid_); if (direction_ == kForward) { - if (!iter_->Valid()) { - iter_->SeekToLast(); - } FindPrevUserKey(); direction_ = kReverse; } @@ -556,7 +553,7 @@ void DBIter::FindNextUserKey() { ParsedInternalKey ikey; FindParseableKey(&ikey, kForward); while (iter_->Valid() && - user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) { + user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) != 0) { iter_->Next(); FindParseableKey(&ikey, kForward); } @@ -571,7 +568,7 @@ void DBIter::FindPrevUserKey() { ParsedInternalKey ikey; FindParseableKey(&ikey, kReverse); while (iter_->Valid() && - user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) >= 0) { + user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) == 0) { if (num_skipped >= max_skip_) { num_skipped = 0; IterKey last_key; diff --git a/db/db_iter_test.cc b/db/db_iter_test.cc index e5c58e4d9..2cb81aab1 100644 --- a/db/db_iter_test.cc +++ b/db/db_iter_test.cc @@ -1668,7 +1668,9 @@ TEST_F(DBIteratorTest, DBIterator8) { ASSERT_EQ(db_iter->value().ToString(), "0"); } -TEST_F(DBIteratorTest, DBIterator9) { +// TODO(3.13): fix the issue of Seek() then Prev() which might not necessary +// return the biggest element smaller than the seek key. +TEST_F(DBIteratorTest, DISABLED_DBIterator9) { Options options; options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); { @@ -1716,7 +1718,9 @@ TEST_F(DBIteratorTest, DBIterator9) { } } -TEST_F(DBIteratorTest, DBIterator10) { +// TODO(3.13): fix the issue of Seek() then Prev() which might not necessary +// return the biggest element smaller than the seek key. +TEST_F(DBIteratorTest, DISABLED_DBIterator10) { Options options; TestIterator* internal_iter = new TestIterator(BytewiseComparator()); diff --git a/db/db_test.cc b/db/db_test.cc index e0f3996be..631f446a5 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -4534,7 +4534,50 @@ TEST_P(DBTestUniversalCompactionMultiLevels, UniversalCompactionMultiLevels) { ASSERT_EQ(Get(1, Key(i % num_keys)), Key(i)); } } +// Tests universal compaction with trivial move enabled +TEST_P(DBTestUniversalCompactionMultiLevels, UniversalCompactionTrivialMove) { + int32_t trivial_move = 0; + int32_t non_trivial_move = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:TrivialMove", + [&](void* arg) { trivial_move++; }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:NonTrivial", + [&](void* arg) { non_trivial_move++; }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + Options options; + options.compaction_style = kCompactionStyleUniversal; + options.compaction_options_universal.allow_trivial_move = true; + options.num_levels = 3; + options.write_buffer_size = 100 << 10; // 100KB + options.level0_file_num_compaction_trigger = 3; + options.max_background_compactions = 1; + options.target_file_size_base = 32 * 1024; + options = CurrentOptions(options); + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + + // Trigger compaction if size amplification exceeds 110% + options.compaction_options_universal.max_size_amplification_percent = 110; + options = CurrentOptions(options); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + + Random rnd(301); + int num_keys = 15000; + for (int i = 0; i < num_keys; i++) { + ASSERT_OK(Put(1, Key(i), Key(i))); + } + std::vector values; + + ASSERT_OK(Flush(1)); + dbfull()->TEST_WaitForCompact(); + + ASSERT_GT(trivial_move, 0); + ASSERT_EQ(non_trivial_move, 0); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} INSTANTIATE_TEST_CASE_P(DBTestUniversalCompactionMultiLevels, DBTestUniversalCompactionMultiLevels, ::testing::Values(3, 20)); @@ -8669,28 +8712,78 @@ TEST_F(DBTest, TransactionLogIteratorCorruptedLog) { // // Test WAL recovery for the various modes available -// TODO krad: -// 1. Add tests when there are more than one log file // class RecoveryTestHelper { public: - // Recreate and fill the store with some data - static size_t FillData(DBTest* test, const Options& options) { - size_t count = 0; - test->DestroyAndReopen(options); + // Number of WAL files to generate + static const int kWALFilesCount = 10; + // Starting number for the WAL file name like 00010.log + static const int kWALFileOffset = 10; + // Keys to be written per WAL file + static const int kKeysPerWALFile = 1024; + // Size of the value + static const int kValueSize = 10; - for (int i = 0; i < 1024; i++) { - test->Put("key" + ToString(i), test->DummyString(10)); - ++count; + // Create WAL files with values filled in + static void FillData(DBTest* test, Options& options, + const size_t wal_count, size_t & count) { + DBOptions & db_options = options; + + count = 0; + + shared_ptr table_cache = NewLRUCache(50000, 16); + EnvOptions env_options; + WriteBuffer write_buffer(db_options.db_write_buffer_size); + + unique_ptr versions; + unique_ptr wal_manager; + WriteController write_controller; + + versions.reset(new VersionSet(test->dbname_, &db_options, env_options, + table_cache.get(), &write_buffer, + &write_controller)); + + wal_manager.reset(new WalManager(db_options, env_options)); + + std::unique_ptr current_log_writer; + + for (size_t j = kWALFileOffset; j < wal_count + kWALFileOffset; j++) { + uint64_t current_log_number = j; + std::string fname = LogFileName(test->dbname_, current_log_number); + unique_ptr file; + ASSERT_OK(db_options.env->NewWritableFile(fname, &file, env_options)); + current_log_writer.reset(new log::Writer(std::move(file))); + + for (int i = 0; i < kKeysPerWALFile; i++) { + std::string key = "key" + ToString(count++); + std::string value = test->DummyString(kValueSize); + assert(current_log_writer.get() != nullptr); + uint64_t seq = versions->LastSequence() + 1; + WriteBatch batch; + batch.Put(key, value); + WriteBatchInternal::SetSequence(&batch, seq); + current_log_writer->AddRecord(WriteBatchInternal::Contents(&batch)); + versions->SetLastSequence(seq); + } } + } + + // Recreate and fill the store with some data + static size_t FillData(DBTest* test, Options& options) { + options.create_if_missing = true; + test->DestroyAndReopen(options); + test->Close(); + + size_t count = 0; + FillData(test, options, kWALFilesCount, count); return count; } // Read back all the keys we wrote and return the number of keys found static size_t GetData(DBTest* test) { size_t count = 0; - for (size_t i = 0; i < 1024; i++) { + for (size_t i = 0; i < kWALFilesCount * kKeysPerWALFile; i++) { if (test->Get("key" + ToString(i)) != "NOT_FOUND") { ++count; } @@ -8698,6 +8791,30 @@ class RecoveryTestHelper { return count; } + // Manuall corrupt the specified WAL + static void CorruptWAL(DBTest * test, Options& options, + const double off, const double len, + const int wal_file_id, const bool trunc = false) { + Env* env = options.env; + std::string fname = LogFileName(test->dbname_, wal_file_id); + uint64_t size; + ASSERT_OK(env->GetFileSize(fname, &size)); + ASSERT_GT(size, 0); +#ifdef OS_WIN + // Windows disk cache behaves differently. When we truncate + // the original content is still in the cache due to the original + // handle is still open. Generally, in Windows, one prohibits + // shared access to files and it is not needed for WAL but we allow + // it to induce corruption at various tests. + test->Close(); +#endif + if (trunc) { + ASSERT_EQ(0, truncate(fname.c_str(), size * off)); + } else { + InduceCorruption(fname, size * off, size * len); + } + } + // Overwrite data with 'a' from offset for length len static void InduceCorruption(const std::string& filename, uint32_t offset, uint32_t len) { @@ -8714,32 +8831,6 @@ class RecoveryTestHelper { close(fd); } - - // Corrupt the last WAL file from (filesize * off) for length (filesize * len) - static void CorruptWAL(DBTest* test, const double off, const double len, - const bool trunc = false) { - rocksdb::VectorLogPtr wal_files; - ASSERT_OK(test->dbfull()->GetSortedWalFiles(wal_files)); - ASSERT_EQ(wal_files.size(), 1); - const auto logfile_path = - test->dbname_ + "/" + wal_files.front()->PathName(); - auto size = wal_files.front()->SizeFileBytes(); - -#ifdef OS_WIN - // Windows disk cache behaves differently. When we truncate - // the original content is still in the cache due to the original - // handle is still open. Generally, in Windows, one prohibits - // shared access to files and it is not needed for WAL but we allow - // it to induce corruption at various tests. - test->Close(); -#endif - - if (trunc) { - ASSERT_EQ(0, truncate(logfile_path.c_str(), size * off)); - } else { - InduceCorruption(logfile_path, size * off, size * len); - } - } }; // Test scope: @@ -8747,26 +8838,32 @@ class RecoveryTestHelper { // at the end of any of the logs // - We do not expect to open the data store for corruption TEST_F(DBTest, kTolerateCorruptedTailRecords) { - for (auto trunc : {true, false}) { - for (int i = 0; i < 4; i++) { - // Fill data for testing - Options options = CurrentOptions(); - const size_t row_count = RecoveryTestHelper::FillData(this, options); + const int jstart = RecoveryTestHelper::kWALFileOffset; + const int jend = jstart + RecoveryTestHelper::kWALFilesCount; - // test checksum failure or parsing - RecoveryTestHelper::CorruptWAL(this, i * .3, /*len%=*/.1, trunc); + for (auto trunc : {true, false}) { /* Corruption style */ + for (int i = 0; i < 4; i++) { /* Corruption offset position */ + for (int j = jstart; j < jend; j++) { /* WAL file */ + // Fill data for testing + Options options = CurrentOptions(); + const size_t row_count = RecoveryTestHelper::FillData(this, options); + // test checksum failure or parsing + RecoveryTestHelper::CorruptWAL(this, options, /*off=*/ i * .3, + /*len%=*/ .1, /*wal=*/ j, trunc); - if (trunc) { - options.wal_recovery_mode = - WALRecoveryMode::kTolerateCorruptedTailRecords; - ASSERT_OK(TryReopen(options)); - const size_t recovered_row_count = RecoveryTestHelper::GetData(this); - ASSERT_TRUE(i == 0 || recovered_row_count > 0); - ASSERT_LT(recovered_row_count, row_count); - } else { - options.wal_recovery_mode = - WALRecoveryMode::kTolerateCorruptedTailRecords; - ASSERT_NOK(TryReopen(options)); + if (trunc) { + options.wal_recovery_mode = + WALRecoveryMode::kTolerateCorruptedTailRecords; + options.create_if_missing = false; + ASSERT_OK(TryReopen(options)); + const size_t recovered_row_count = RecoveryTestHelper::GetData(this); + ASSERT_TRUE(i == 0 || recovered_row_count > 0); + ASSERT_LT(recovered_row_count, row_count); + } else { + options.wal_recovery_mode = + WALRecoveryMode::kTolerateCorruptedTailRecords; + ASSERT_NOK(TryReopen(options)); + } } } } @@ -8776,23 +8873,34 @@ TEST_F(DBTest, kTolerateCorruptedTailRecords) { // We don't expect the data store to be opened if there is any corruption // (leading, middle or trailing -- incomplete writes or corruption) TEST_F(DBTest, kAbsoluteConsistency) { + const int jstart = RecoveryTestHelper::kWALFileOffset; + const int jend = jstart + RecoveryTestHelper::kWALFilesCount; + + // Verify clean slate behavior Options options = CurrentOptions(); const size_t row_count = RecoveryTestHelper::FillData(this, options); options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency; + options.create_if_missing = false; ASSERT_OK(TryReopen(options)); ASSERT_EQ(RecoveryTestHelper::GetData(this), row_count); - for (auto trunc : {true, false}) { - for (int i = 0; i < 4; i++) { + for (auto trunc : {true, false}) { /* Corruption style */ + for (int i = 0; i < 4; i++) { /* Corruption offset position */ if (trunc && i == 0) { continue; } - options = CurrentOptions(); - RecoveryTestHelper::FillData(this, options); - RecoveryTestHelper::CorruptWAL(this, i * .3, /*len%=*/.1, trunc); - options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency; - ASSERT_NOK(TryReopen(options)); + for (int j = jstart; j < jend; j++) { /* wal files */ + // fill with new date + RecoveryTestHelper::FillData(this, options); + // corrupt the wal + RecoveryTestHelper::CorruptWAL(this, options, /*off=*/ i * .3, + /*len%=*/.1, j, trunc); + // verify + options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency; + options.create_if_missing = false; + ASSERT_NOK(TryReopen(options)); + } } } } @@ -8801,34 +8909,49 @@ TEST_F(DBTest, kAbsoluteConsistency) { // - We expect to open data store under all circumstances // - We expect only data upto the point where the first error was encountered TEST_F(DBTest, kPointInTimeRecovery) { - for (auto trunc : {true, false}) { - for (int i = 0; i < 4; i++) { - // Fill data for testing - Options options = CurrentOptions(); - const size_t row_count = RecoveryTestHelper::FillData(this, options); + const int jstart = RecoveryTestHelper::kWALFileOffset; + const int jend = jstart + RecoveryTestHelper::kWALFilesCount; + const int maxkeys = RecoveryTestHelper::kWALFilesCount * + RecoveryTestHelper::kKeysPerWALFile; - // test checksum failure or parsing - RecoveryTestHelper::CorruptWAL(this, i * .3, /*len%=*/.1, trunc); + for (auto trunc : {true, false}) { /* Corruption style */ + for (int i = 0; i < 4; i++) { /* Offset of corruption */ + for (int j = jstart; j < jend; j++) { /* WAL file */ + // Fill data for testing + Options options = CurrentOptions(); + const size_t row_count = RecoveryTestHelper::FillData(this, options); - options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; + // Corrupt the wal + RecoveryTestHelper::CorruptWAL(this, options, /*off=*/ i * .3, + /*len%=*/.1, j, trunc); - ASSERT_OK(TryReopen(options)); + // Verify + options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; + options.create_if_missing = false; + ASSERT_OK(TryReopen(options)); - size_t recovered_row_count = RecoveryTestHelper::GetData(this); - ASSERT_LT(recovered_row_count, row_count); + // Probe data for invariants + size_t recovered_row_count = RecoveryTestHelper::GetData(this); + ASSERT_LT(recovered_row_count, row_count); - // verify that the keys are sequential and there is no break - bool expect_data = true; - for (size_t j = 0; j < 1024; ++j) { - bool found = Get("key" + ToString(i)) != "NOT_FOUND"; - if (expect_data && !found) { - expect_data = false; + bool expect_data = true; + for (size_t k = 0; k < maxkeys; ++k) { + bool found = Get("key" + ToString(i)) != "NOT_FOUND"; + if (expect_data && !found) { + expect_data = false; + } + ASSERT_EQ(found, expect_data); } - ASSERT_EQ(found, expect_data); - } - ASSERT_TRUE(i != 0 || recovered_row_count == 0); - ASSERT_TRUE(i != 1 || recovered_row_count < row_count / 2); + const size_t min = RecoveryTestHelper::kKeysPerWALFile * + (j - RecoveryTestHelper::kWALFileOffset); + ASSERT_GE(recovered_row_count, min); + if (!trunc && i != 0) { + const size_t max = RecoveryTestHelper::kKeysPerWALFile * + (j - RecoveryTestHelper::kWALFileOffset + 1); + ASSERT_LE(recovered_row_count, max); + } + } } } } @@ -8837,22 +8960,32 @@ TEST_F(DBTest, kPointInTimeRecovery) { // - We expect to open the data store under all scenarios // - We expect to have recovered records past the corruption zone TEST_F(DBTest, kSkipAnyCorruptedRecords) { - for (auto trunc : {true, false}) { - for (int i = 0; i < 4; i++) { - // Fill data for testing - Options options = CurrentOptions(); - const size_t row_count = RecoveryTestHelper::FillData(this, options); + const int jstart = RecoveryTestHelper::kWALFileOffset; + const int jend = jstart + RecoveryTestHelper::kWALFilesCount; - // induce leading corruption - RecoveryTestHelper::CorruptWAL(this, i * .3, /*len%=*/.1, trunc); + for (auto trunc : {true, false}) { /* Corruption style */ + for (int i = 0; i < 4; i++) { /* Corruption offset */ + for (int j = jstart; j < jend; j++) { /* wal files */ + // Fill data for testing + Options options = CurrentOptions(); + const size_t row_count = RecoveryTestHelper::FillData(this, options); - options.wal_recovery_mode = WALRecoveryMode::kSkipAnyCorruptedRecords; - ASSERT_OK(TryReopen(options)); - size_t recovered_row_count = RecoveryTestHelper::GetData(this); - ASSERT_LT(recovered_row_count, row_count); + // Corrupt the WAL + RecoveryTestHelper::CorruptWAL(this, options, /*off=*/ i * .3, + /*len%=*/.1, j, trunc); - if (!trunc) { - ASSERT_TRUE(i != 0 || recovered_row_count > 0); + // Verify behavior + options.wal_recovery_mode = WALRecoveryMode::kSkipAnyCorruptedRecords; + options.create_if_missing = false; + ASSERT_OK(TryReopen(options)); + + // Probe data for invariants + size_t recovered_row_count = RecoveryTestHelper::GetData(this); + ASSERT_LT(recovered_row_count, row_count); + + if (!trunc) { + ASSERT_TRUE(i != 0 || recovered_row_count > 0); + } } } } @@ -11274,10 +11407,10 @@ TEST_F(DBTest, ThreadStatusSingleCompaction) { {"CompactionJob::Run():Start", "DBTest::ThreadStatusSingleCompaction:1"}, {"DBTest::ThreadStatusSingleCompaction:2", "CompactionJob::Run():End"}, }); - rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - for (int tests = 0; tests < 2; ++tests) { DestroyAndReopen(options); + rocksdb::SyncPoint::GetInstance()->ClearTrace(); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); Random rnd(301); // The Put Phase. @@ -11309,8 +11442,8 @@ TEST_F(DBTest, ThreadStatusSingleCompaction) { // repeat the test with disabling thread tracking. options.enable_thread_tracking = false; + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } - rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } TEST_F(DBTest, PreShutdownManualCompaction) { @@ -12183,7 +12316,7 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) { "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) { Compaction* compaction = reinterpret_cast(arg); if (compaction->output_level() == 4) { - ASSERT_TRUE(compaction->OutputCompressionType() == kLZ4Compression); + ASSERT_TRUE(compaction->output_compression() == kLZ4Compression); num_lz4.fetch_add(1); } }); @@ -12218,10 +12351,10 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) { "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) { Compaction* compaction = reinterpret_cast(arg); if (compaction->output_level() == 4 && compaction->start_level() == 3) { - ASSERT_TRUE(compaction->OutputCompressionType() == kZlibCompression); + ASSERT_TRUE(compaction->output_compression() == kZlibCompression); num_zlib.fetch_add(1); } else { - ASSERT_TRUE(compaction->OutputCompressionType() == kLZ4Compression); + ASSERT_TRUE(compaction->output_compression() == kLZ4Compression); num_lz4.fetch_add(1); } }); @@ -12696,6 +12829,7 @@ TEST_F(DBTest, DontDeletePendingOutputs) { dbfull()->FindObsoleteFiles(&job_context, true /*force*/); dbfull()->TEST_UnlockMutex(); dbfull()->PurgeObsoleteFiles(job_context); + job_context.Clean(); }; env_->table_write_callback_ = &purge_obsolete_files_function; @@ -14137,7 +14271,9 @@ TEST_F(DBTest, RowCache) { ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 1); } -TEST_F(DBTest, PrevAfterMerge) { +// TODO(3.13): fix the issue of Seek() + Prev() which might not necessary +// return the biggest key which is smaller than the seek key. +TEST_F(DBTest, DISABLED_PrevAfterMerge) { Options options; options.create_if_missing = true; options.merge_operator = MergeOperators::CreatePutOperator(); @@ -14160,6 +14296,40 @@ TEST_F(DBTest, PrevAfterMerge) { ASSERT_EQ("1", it->key().ToString()); } +TEST_F(DBTest, DeletingOldWalAfterDrop) { + rocksdb::SyncPoint::GetInstance()->LoadDependency( + { { "Test:AllowFlushes", "DBImpl::BGWorkFlush" }, + { "DBImpl::BGWorkFlush:done", "Test:WaitForFlush"} }); + rocksdb::SyncPoint::GetInstance()->ClearTrace(); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + Options options = CurrentOptions(); + options.max_total_wal_size = 8192; + options.compression = kNoCompression; + options.write_buffer_size = 1 << 20; + options.level0_file_num_compaction_trigger = (1<<30); + options.level0_slowdown_writes_trigger = (1<<30); + options.level0_stop_writes_trigger = (1<<30); + options.disable_auto_compactions = true; + DestroyAndReopen(options); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + CreateColumnFamilies({"cf1", "cf2"}, options); + ASSERT_OK(Put(0, "key1", DummyString(8192))); + ASSERT_OK(Put(0, "key2", DummyString(8192))); + // the oldest wal should now be getting_flushed + ASSERT_OK(db_->DropColumnFamily(handles_[0])); + // all flushes should now do nothing because their CF is dropped + TEST_SYNC_POINT("Test:AllowFlushes"); + TEST_SYNC_POINT("Test:WaitForFlush"); + uint64_t lognum1 = dbfull()->TEST_LogfileNumber(); + ASSERT_OK(Put(1, "key3", DummyString(8192))); + ASSERT_OK(Put(1, "key4", DummyString(8192))); + // new wal should have been created + uint64_t lognum2 = dbfull()->TEST_LogfileNumber(); + EXPECT_GT(lognum2, lognum1); +} + } // namespace rocksdb #endif diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index b4410199e..5ed125930 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -169,6 +169,7 @@ void ForwardIterator::Cleanup(bool release_sv) { if (job_context.HaveSomethingToDelete()) { db_->PurgeObsoleteFiles(job_context); } + job_context.Clean(); } } } diff --git a/db/job_context.h b/db/job_context.h index d0281443e..5a54e2d85 100644 --- a/db/job_context.h +++ b/db/job_context.h @@ -83,6 +83,10 @@ struct JobContext { new_superversion = create_superversion ? new SuperVersion() : nullptr; } + // For non-empty JobContext Clean() has to be called at least once before + // before destruction (see asserts in ~JobContext()). Should be called with + // unlocked DB mutex. Destructor doesn't call Clean() to avoid accidentally + // doing potentially slow Clean() with locked DB mutex. void Clean() { // free pending memtables for (auto m : memtables_to_free) { @@ -109,6 +113,7 @@ struct JobContext { assert(memtables_to_free.size() == 0); assert(superversions_to_free.size() == 0); assert(new_superversion == nullptr); + assert(logs_to_free.size() == 0); } }; diff --git a/db/version_set.h b/db/version_set.h index 9ee6aeaa9..778e537f5 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -612,7 +612,9 @@ class VersionSet { uint64_t MinLogNumber() const { uint64_t min_log_num = std::numeric_limits::max(); for (auto cfd : *column_family_set_) { - if (min_log_num > cfd->GetLogNumber()) { + // It's safe to ignore dropped column families here: + // cfd->IsDropped() becomes true after the drop is persisted in MANIFEST. + if (min_log_num > cfd->GetLogNumber() && !cfd->IsDropped()) { min_log_num = cfd->GetLogNumber(); } } diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 6a62111a2..bcbb5a3a8 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -622,6 +622,7 @@ enum InfoLogLevel : unsigned char { WARN_LEVEL, ERROR_LEVEL, FATAL_LEVEL, + HEADER_LEVEL, NUM_INFO_LOG_LEVELS, }; diff --git a/include/rocksdb/universal_compaction.h b/include/rocksdb/universal_compaction.h index 229e50b25..e0f9f830f 100644 --- a/include/rocksdb/universal_compaction.h +++ b/include/rocksdb/universal_compaction.h @@ -69,6 +69,11 @@ class CompactionOptionsUniversal { // Default: kCompactionStopStyleTotalSize CompactionStopStyle stop_style; + // Option to optimize the universal multi level compaction by enabling + // trivial move for non overlapping files. + // Default: false + bool allow_trivial_move; + // Default set of parameters CompactionOptionsUniversal() : size_ratio(1), @@ -76,7 +81,8 @@ class CompactionOptionsUniversal { max_merge_width(UINT_MAX), max_size_amplification_percent(200), compression_size_percent(-1), - stop_style(kCompactionStopStyleTotalSize) {} + stop_style(kCompactionStopStyleTotalSize), + allow_trivial_move(false) {} }; } // namespace rocksdb diff --git a/table/merger.cc b/table/merger.cc index b418b88a4..32220571c 100644 --- a/table/merger.cc +++ b/table/merger.cc @@ -9,8 +9,8 @@ #include "table/merger.h" -#include #include +#include #include "rocksdb/comparator.h" #include "rocksdb/iterator.h" @@ -215,6 +215,12 @@ class MergingIterator : public Iterator { } } direction_ = kReverse; + // Note that we don't do assert(current_ == CurrentReverse()) here + // because it is possible to have some keys larger than the seek-key + // inserted between Seek() and SeekToLast(), which makes current_ not + // equal to CurrentReverse(). + // + // assert(current_ == CurrentReverse()); } current_->Prev(); diff --git a/util/auto_roll_logger_test.cc b/util/auto_roll_logger_test.cc index 746d426d6..cd116958f 100644 --- a/util/auto_roll_logger_test.cc +++ b/util/auto_roll_logger_test.cc @@ -261,28 +261,29 @@ TEST_F(AutoRollLoggerTest, InfoLogLevel) { // becomes out of scope. { AutoRollLogger logger(Env::Default(), kTestDir, "", log_size, 0); - for (int log_level = InfoLogLevel::FATAL_LEVEL; + for (int log_level = InfoLogLevel::HEADER_LEVEL; log_level >= InfoLogLevel::DEBUG_LEVEL; log_level--) { logger.SetInfoLogLevel((InfoLogLevel)log_level); for (int log_type = InfoLogLevel::DEBUG_LEVEL; - log_type <= InfoLogLevel::FATAL_LEVEL; log_type++) { + log_type <= InfoLogLevel::HEADER_LEVEL; log_type++) { // log messages with log level smaller than log_level will not be // logged. LogMessage((InfoLogLevel)log_type, &logger, kSampleMessage.c_str()); } - log_lines += InfoLogLevel::FATAL_LEVEL - log_level + 1; + log_lines += InfoLogLevel::HEADER_LEVEL - log_level + 1; } - for (int log_level = InfoLogLevel::FATAL_LEVEL; + for (int log_level = InfoLogLevel::HEADER_LEVEL; log_level >= InfoLogLevel::DEBUG_LEVEL; log_level--) { logger.SetInfoLogLevel((InfoLogLevel)log_level); // again, messages with level smaller than log_level will not be logged. + Log(InfoLogLevel::HEADER_LEVEL, &logger, "%s", kSampleMessage.c_str()); Debug(&logger, "%s", kSampleMessage.c_str()); Info(&logger, "%s", kSampleMessage.c_str()); Warn(&logger, "%s", kSampleMessage.c_str()); Error(&logger, "%s", kSampleMessage.c_str()); Fatal(&logger, "%s", kSampleMessage.c_str()); - log_lines += InfoLogLevel::FATAL_LEVEL - log_level + 1; + log_lines += InfoLogLevel::HEADER_LEVEL - log_level + 1; } } std::ifstream inFile(AutoRollLoggerTest::kLogFile.c_str()); @@ -336,41 +337,54 @@ TEST_F(AutoRollLoggerTest, LogHeaderTest) { static const size_t LOG_MAX_SIZE = 1024 * 5; static const std::string HEADER_STR = "Log header line"; - InitTestDb(); + // test_num == 0 -> standard call to Header() + // test_num == 1 -> call to Log() with InfoLogLevel::HEADER_LEVEL + for (int test_num = 0; test_num < 2; test_num++) { - AutoRollLogger logger(Env::Default(), kTestDir, /*db_log_dir=*/ "", - LOG_MAX_SIZE, /*log_file_time_to_roll=*/ 0); + InitTestDb(); - // log some headers - for (size_t i = 0; i < MAX_HEADERS; i++) { - Header(&logger, "%s %d", HEADER_STR.c_str(), i); - } + AutoRollLogger logger(Env::Default(), kTestDir, /*db_log_dir=*/ "", + LOG_MAX_SIZE, /*log_file_time_to_roll=*/ 0); - const string& newfname = logger.TEST_log_fname().c_str(); - - // log enough data to cause a roll over - int i = 0; - for (size_t iter = 0; iter < 2; iter++) { - while (logger.GetLogFileSize() < LOG_MAX_SIZE) { - Info(&logger, (kSampleMessage + ":LogHeaderTest line %d").c_str(), i); - ++i; + if (test_num == 0) { + // Log some headers explicitly using Header() + for (size_t i = 0; i < MAX_HEADERS; i++) { + Header(&logger, "%s %d", HEADER_STR.c_str(), i); + } + } else if (test_num == 1) { + // HEADER_LEVEL should make this behave like calling Header() + for (size_t i = 0; i < MAX_HEADERS; i++) { + Log(InfoLogLevel::HEADER_LEVEL, &logger, "%s %d", + HEADER_STR.c_str(), i); + } } - Info(&logger, "Rollover"); - } + const string& newfname = logger.TEST_log_fname().c_str(); - // Flus the log for the latest file - LogFlush(&logger); + // Log enough data to cause a roll over + int i = 0; + for (size_t iter = 0; iter < 2; iter++) { + while (logger.GetLogFileSize() < LOG_MAX_SIZE) { + Info(&logger, (kSampleMessage + ":LogHeaderTest line %d").c_str(), i); + ++i; + } - const list oldfiles = GetOldFileNames(newfname); + Info(&logger, "Rollover"); + } - ASSERT_EQ(oldfiles.size(), (size_t) 2); + // Flush the log for the latest file + LogFlush(&logger); - for (auto oldfname : oldfiles) { - // verify that the files rolled over - ASSERT_NE(oldfname, newfname); - // verify that the old log contains all the header logs - ASSERT_EQ(GetLinesCount(oldfname, HEADER_STR), MAX_HEADERS); + const list oldfiles = GetOldFileNames(newfname); + + ASSERT_EQ(oldfiles.size(), (size_t) 2); + + for (auto oldfname : oldfiles) { + // verify that the files rolled over + ASSERT_NE(oldfname, newfname); + // verify that the old log contains all the header logs + ASSERT_EQ(GetLinesCount(oldfname, HEADER_STR), MAX_HEADERS); + } } } diff --git a/util/env.cc b/util/env.cc index d98d926d8..9da6fe1b1 100644 --- a/util/env.cc +++ b/util/env.cc @@ -61,7 +61,13 @@ void Log(const InfoLogLevel log_level, Logger* info_log, const char* format, if (info_log && info_log->GetInfoLogLevel() <= log_level) { va_list ap; va_start(ap, format); - info_log->Logv(log_level, format, ap); + + if (log_level == InfoLogLevel::HEADER_LEVEL) { + info_log->LogHeader(format, ap); + } else { + info_log->Logv(log_level, format, ap); + } + va_end(ap); } } diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index 6b61b56e4..5343efc28 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -487,26 +487,6 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env, copy_file_buffer_size_(kDefaultCopyFileBufferSize), read_only_(read_only) { - // set up threads perform copies from files_to_copy_ in the background - for (int t = 0; t < options_.max_background_operations; t++) { - threads_.emplace_back([&]() { - CopyWorkItem work_item; - while (files_to_copy_.read(work_item)) { - CopyResult result; - result.status = CopyFile(work_item.src_path, - work_item.dst_path, - work_item.src_env, - work_item.dst_env, - work_item.sync, - work_item.rate_limiter, - &result.size, - &result.checksum_value, - work_item.size_limit); - work_item.result.set_value(std::move(result)); - } - }); - } - if (read_only_) { Log(options_.info_log, "Starting read_only backup engine"); } @@ -626,6 +606,27 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env, if (!read_only_) { PutLatestBackupFileContents(latest_backup_id_); // Ignore errors } + + // set up threads perform copies from files_to_copy_ in the background + for (int t = 0; t < options_.max_background_operations; t++) { + threads_.emplace_back([&]() { + CopyWorkItem work_item; + while (files_to_copy_.read(work_item)) { + CopyResult result; + result.status = CopyFile(work_item.src_path, + work_item.dst_path, + work_item.src_env, + work_item.dst_env, + work_item.sync, + work_item.rate_limiter, + &result.size, + &result.checksum_value, + work_item.size_limit); + work_item.result.set_value(std::move(result)); + } + }); + } + Log(options_.info_log, "Initialized BackupEngine"); }