From c625b8d017d43d0bceffbd7c0cf039708578fd1f Mon Sep 17 00:00:00 2001 From: Baptiste Lemaire Date: Wed, 18 Aug 2021 17:39:00 -0700 Subject: [PATCH] Add condition on NotifyOnFlushComplete that FlushJob was not mempurge. Add event listeners to mempurge tests. (#8672) Summary: Previously, when a `FlushJob` was redirected to a MemPurge, the function `DBImpl::NotifyOnFlushComplete` was called, which created a series of issues because the JobInfo was not correctly collected from the memtables. This diff aims at correcting these two issues (`FlushJobInfo` collection in `FlushJob::MemPurge` , no call to `DBImpl::NotifyOnFlushComplete` after successful mempurge). Event listeners were added to the unit tests to handle these situations. Surprisingly none of the crashtests caught this issue, I will try to add event listeners to crash tests in the future. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8672 Reviewed By: akankshamahajan15 Differential Revision: D30383109 Pulled By: bjlemaire fbshipit-source-id: 35a8d4295886923ee4049a6447f00022cb221c73 --- db/db_flush_test.cc | 110 ++++++++++++++++++++++--- db/db_impl/db_impl_compaction_flush.cc | 28 +++++-- db/flush_job.cc | 20 ++++- db/flush_job.h | 3 +- 4 files changed, 142 insertions(+), 19 deletions(-) diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 824dc9e55..41a9a983e 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -663,6 +663,87 @@ TEST_F(DBFlushTest, StatisticsGarbageRangeDeletes) { Close(); } +#ifndef ROCKSDB_LITE +// This simple Listener can only handle one flush at a time. +class TestFlushListener : public EventListener { + public: + TestFlushListener(Env* env, DBFlushTest* test) + : slowdown_count(0), stop_count(0), db_closed(), env_(env), test_(test) { + db_closed = false; + } + + ~TestFlushListener() override { + prev_fc_info_.status.PermitUncheckedError(); // Ignore the status + } + void OnTableFileCreated(const TableFileCreationInfo& info) override { + // remember the info for later checking the FlushJobInfo. + prev_fc_info_ = info; + ASSERT_GT(info.db_name.size(), 0U); + ASSERT_GT(info.cf_name.size(), 0U); + ASSERT_GT(info.file_path.size(), 0U); + ASSERT_GT(info.job_id, 0); + ASSERT_GT(info.table_properties.data_size, 0U); + ASSERT_GT(info.table_properties.raw_key_size, 0U); + ASSERT_GT(info.table_properties.raw_value_size, 0U); + ASSERT_GT(info.table_properties.num_data_blocks, 0U); + ASSERT_GT(info.table_properties.num_entries, 0U); + ASSERT_EQ(info.file_checksum, kUnknownFileChecksum); + ASSERT_EQ(info.file_checksum_func_name, kUnknownFileChecksumFuncName); + } + + void OnFlushCompleted(DB* db, const FlushJobInfo& info) override { + flushed_dbs_.push_back(db); + flushed_column_family_names_.push_back(info.cf_name); + if (info.triggered_writes_slowdown) { + slowdown_count++; + } + if (info.triggered_writes_stop) { + stop_count++; + } + // verify whether the previously created file matches the flushed file. + ASSERT_EQ(prev_fc_info_.db_name, db->GetName()); + ASSERT_EQ(prev_fc_info_.cf_name, info.cf_name); + ASSERT_EQ(prev_fc_info_.job_id, info.job_id); + ASSERT_EQ(prev_fc_info_.file_path, info.file_path); + ASSERT_EQ(TableFileNameToNumber(info.file_path), info.file_number); + + // Note: the following chunk relies on the notification pertaining to the + // database pointed to by DBTestBase::db_, and is thus bypassed when + // that assumption does not hold (see the test case MultiDBMultiListeners + // below). + ASSERT_TRUE(test_); + if (db == test_->db_) { + std::vector> files_by_level; + test_->dbfull()->TEST_GetFilesMetaData(db->DefaultColumnFamily(), + &files_by_level); + + ASSERT_FALSE(files_by_level.empty()); + auto it = std::find_if(files_by_level[0].begin(), files_by_level[0].end(), + [&](const FileMetaData& meta) { + return meta.fd.GetNumber() == info.file_number; + }); + ASSERT_NE(it, files_by_level[0].end()); + ASSERT_EQ(info.oldest_blob_file_number, it->oldest_blob_file_number); + } + + ASSERT_EQ(db->GetEnv()->GetThreadID(), info.thread_id); + ASSERT_GT(info.thread_id, 0U); + } + + std::vector flushed_column_family_names_; + std::vector flushed_dbs_; + int slowdown_count; + int stop_count; + bool db_closing; + std::atomic_bool db_closed; + TableFileCreationInfo prev_fc_info_; + + protected: + Env* env_; + DBFlushTest* test_; +}; +#endif // !ROCKSDB_LITE + TEST_F(DBFlushTest, MemPurgeBasic) { Options options = CurrentOptions(); @@ -695,8 +776,11 @@ TEST_F(DBFlushTest, MemPurgeBasic) { // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). options.write_buffer_size = 1 << 20; // Activate the MemPurge prototype. - options.experimental_mempurge_threshold = - 1.0; // std::numeric_limits::max(); + options.experimental_mempurge_threshold = 1.0; +#ifndef ROCKSDB_LITE + TestFlushListener* listener = new TestFlushListener(options.env, this); + options.listeners.emplace_back(listener); +#endif // !ROCKSDB_LITE ASSERT_OK(TryReopen(options)); uint32_t mempurge_count = 0; uint32_t sst_count = 0; @@ -839,12 +923,15 @@ TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) { options.compression = kNoCompression; options.inplace_update_support = false; options.allow_concurrent_memtable_write = true; - +#ifndef ROCKSDB_LITE + TestFlushListener* listener = new TestFlushListener(options.env, this); + options.listeners.emplace_back(listener); +#endif // !ROCKSDB_LITE // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). options.write_buffer_size = 1 << 20; // Activate the MemPurge prototype. - options.experimental_mempurge_threshold = - 1.0; // std::numeric_limits::max(); + options.experimental_mempurge_threshold = 1.0; + ASSERT_OK(TryReopen(options)); uint32_t mempurge_count = 0; @@ -1037,7 +1124,10 @@ TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) { options.compression = kNoCompression; options.inplace_update_support = false; options.allow_concurrent_memtable_write = true; - +#ifndef ROCKSDB_LITE + TestFlushListener* listener = new TestFlushListener(options.env, this); + options.listeners.emplace_back(listener); +#endif // !ROCKSDB_LITE // Create a ConditionalUpdate compaction filter // that will update all the values of the KV pairs // where the keys are "lower" than KEY4. @@ -1047,8 +1137,8 @@ TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) { // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). options.write_buffer_size = 1 << 20; // Activate the MemPurge prototype. - options.experimental_mempurge_threshold = - 1.0; // std::numeric_limits::max(); + options.experimental_mempurge_threshold = 1.0; + ASSERT_OK(TryReopen(options)); uint32_t mempurge_count = 0; @@ -1123,8 +1213,8 @@ TEST_F(DBFlushTest, MemPurgeWALSupport) { // Enforce size of a single MemTable to 128KB. options.write_buffer_size = 128 << 10; // Activate the MemPurge prototype. - options.experimental_mempurge_threshold = - 1.0; // std::numeric_limits::max(); + options.experimental_mempurge_threshold = 1.0; + ASSERT_OK(TryReopen(options)); const size_t KVSIZE = 10; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 7ec42c1fd..df849d519 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -7,6 +7,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include +#include #include "db/builder.h" #include "db/db_impl/db_impl.h" @@ -198,7 +199,7 @@ Status DBImpl::FlushMemTableToOutputFile( need_cancel = true; } TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:AfterPickMemtables"); - + bool switched_to_mempurge = false; // Within flush_job.Run, rocksdb may call event listener to notify // file creation and deletion. // @@ -206,7 +207,8 @@ Status DBImpl::FlushMemTableToOutputFile( // and EventListener callback will be called when the db_mutex // is unlocked by the current thread. if (s.ok()) { - s = flush_job.Run(&logs_with_prep_tracker_, &file_meta); + s = flush_job.Run(&logs_with_prep_tracker_, &file_meta, + &switched_to_mempurge); need_cancel = false; } @@ -282,7 +284,9 @@ Status DBImpl::FlushMemTableToOutputFile( // from never needing it or ignoring the flush job status io_s.PermitUncheckedError(); } - if (s.ok()) { + // If flush ran smoothly and no mempurge happened + // install new SST file path. + if (s.ok() && (!switched_to_mempurge)) { #ifndef ROCKSDB_LITE // may temporarily unlock and lock the mutex. NotifyOnFlushCompleted(cfd, mutable_cf_options, @@ -411,6 +415,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( } std::vector file_meta(num_cfs); + // Use of deque because vector + // is specific and doesn't allow &v[i]. + std::deque switched_to_mempurge(num_cfs, false); Status s; IOStatus log_io_s = IOStatus::OK(); assert(num_cfs == static_cast(jobs.size())); @@ -460,10 +467,13 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( } if (s.ok()) { + assert(switched_to_mempurge.size() == + static_cast(num_cfs)); // TODO (yanqin): parallelize jobs with threads. for (int i = 1; i != num_cfs; ++i) { exec_status[i].second = - jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i]); + jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i], + &(switched_to_mempurge.at(i))); exec_status[i].first = true; io_status[i] = jobs[i]->io_status(); } @@ -475,8 +485,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( } assert(exec_status.size() > 0); assert(!file_meta.empty()); - exec_status[0].second = - jobs[0]->Run(&logs_with_prep_tracker_, &file_meta[0]); + exec_status[0].second = jobs[0]->Run( + &logs_with_prep_tracker_, file_meta.data() /* &file_meta[0] */, + switched_to_mempurge.empty() ? nullptr : &(switched_to_mempurge.at(0))); exec_status[0].first = true; io_status[0] = jobs[0]->io_status(); @@ -656,6 +667,11 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( immutable_db_options_.sst_file_manager.get()); assert(all_mutable_cf_options.size() == static_cast(num_cfs)); for (int i = 0; s.ok() && i != num_cfs; ++i) { + // If mempurge happened instead of Flush, + // no NotifyOnFlushCompleted call (no SST file created). + if (switched_to_mempurge[i]) { + continue; + } if (cfds[i]->IsDropped()) { continue; } diff --git a/db/flush_job.cc b/db/flush_job.cc index 43dc87d68..d520b709e 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -196,8 +196,8 @@ void FlushJob::PickMemTable() { base_->Ref(); // it is likely that we do not need this reference } -Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, - FileMetaData* file_meta) { +Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta, + bool* switched_to_mempurge) { TEST_SYNC_POINT("FlushJob::Start"); db_mutex_->AssertHeld(); assert(pick_memtable_called); @@ -244,6 +244,16 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, ROCKS_LOG_WARN(db_options_.info_log, "Mempurge process failed: %s\n", mempurge_s.ToString().c_str()); } + } else { + if (switched_to_mempurge) { + *switched_to_mempurge = true; + } else { + // The mempurge process was successful, but no switch_to_mempurge + // pointer provided so no way to propagate the state of flush job. + ROCKS_LOG_WARN(db_options_.info_log, + "Mempurge process succeeded" + "but no 'switched_to_mempurge' ptr provided.\n"); + } } } Status s; @@ -561,6 +571,12 @@ Status FlushJob::MemPurge() { // we do not call SchedulePendingFlush(). cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free); new_mem->Ref(); +#ifndef ROCKSDB_LITE + // Piggyback FlushJobInfo on the first flushed memtable. + db_mutex_->AssertHeld(); + meta_.fd.file_size = 0; + mems_[0]->SetFlushJobInfo(GetFlushJobInfo()); +#endif // !ROCKSDB_LITE db_mutex_->Unlock(); } else { s = Status::Aborted(Slice("Mempurge filled more than one memtable.")); diff --git a/db/flush_job.h b/db/flush_job.h index 81b4e86dd..49d57ef79 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -83,7 +83,8 @@ class FlushJob { // Once PickMemTable() is called, either Run() or Cancel() has to be called. void PickMemTable(); Status Run(LogsWithPrepTracker* prep_tracker = nullptr, - FileMetaData* file_meta = nullptr); + FileMetaData* file_meta = nullptr, + bool* switched_to_mempurge = nullptr); void Cancel(); const autovector& GetMemTables() const { return mems_; }