Add condition on NotifyOnFlushComplete that FlushJob was not mempurge. Add event listeners to mempurge tests. (#8672)

Summary:
Previously, when a `FlushJob` was redirected to a MemPurge, the function `DBImpl::NotifyOnFlushComplete` was called, which created a series of issues because the JobInfo was not correctly collected from the memtables.
This diff aims at correcting these two issues (`FlushJobInfo` collection in `FlushJob::MemPurge` , no call to `DBImpl::NotifyOnFlushComplete` after successful mempurge).
Event listeners were added to the unit tests to handle these situations.
Surprisingly none of the crashtests caught this issue, I will try to add event listeners to crash tests in the future.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8672

Reviewed By: akankshamahajan15

Differential Revision: D30383109

Pulled By: bjlemaire

fbshipit-source-id: 35a8d4295886923ee4049a6447f00022cb221c73
This commit is contained in:
Baptiste Lemaire 2021-08-18 17:39:00 -07:00 committed by Facebook GitHub Bot
parent d10801e983
commit c625b8d017
4 changed files with 142 additions and 19 deletions

View File

@ -663,6 +663,87 @@ TEST_F(DBFlushTest, StatisticsGarbageRangeDeletes) {
Close();
}
#ifndef ROCKSDB_LITE
// This simple Listener can only handle one flush at a time.
class TestFlushListener : public EventListener {
public:
TestFlushListener(Env* env, DBFlushTest* test)
: slowdown_count(0), stop_count(0), db_closed(), env_(env), test_(test) {
db_closed = false;
}
~TestFlushListener() override {
prev_fc_info_.status.PermitUncheckedError(); // Ignore the status
}
void OnTableFileCreated(const TableFileCreationInfo& info) override {
// remember the info for later checking the FlushJobInfo.
prev_fc_info_ = info;
ASSERT_GT(info.db_name.size(), 0U);
ASSERT_GT(info.cf_name.size(), 0U);
ASSERT_GT(info.file_path.size(), 0U);
ASSERT_GT(info.job_id, 0);
ASSERT_GT(info.table_properties.data_size, 0U);
ASSERT_GT(info.table_properties.raw_key_size, 0U);
ASSERT_GT(info.table_properties.raw_value_size, 0U);
ASSERT_GT(info.table_properties.num_data_blocks, 0U);
ASSERT_GT(info.table_properties.num_entries, 0U);
ASSERT_EQ(info.file_checksum, kUnknownFileChecksum);
ASSERT_EQ(info.file_checksum_func_name, kUnknownFileChecksumFuncName);
}
void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
flushed_dbs_.push_back(db);
flushed_column_family_names_.push_back(info.cf_name);
if (info.triggered_writes_slowdown) {
slowdown_count++;
}
if (info.triggered_writes_stop) {
stop_count++;
}
// verify whether the previously created file matches the flushed file.
ASSERT_EQ(prev_fc_info_.db_name, db->GetName());
ASSERT_EQ(prev_fc_info_.cf_name, info.cf_name);
ASSERT_EQ(prev_fc_info_.job_id, info.job_id);
ASSERT_EQ(prev_fc_info_.file_path, info.file_path);
ASSERT_EQ(TableFileNameToNumber(info.file_path), info.file_number);
// Note: the following chunk relies on the notification pertaining to the
// database pointed to by DBTestBase::db_, and is thus bypassed when
// that assumption does not hold (see the test case MultiDBMultiListeners
// below).
ASSERT_TRUE(test_);
if (db == test_->db_) {
std::vector<std::vector<FileMetaData>> files_by_level;
test_->dbfull()->TEST_GetFilesMetaData(db->DefaultColumnFamily(),
&files_by_level);
ASSERT_FALSE(files_by_level.empty());
auto it = std::find_if(files_by_level[0].begin(), files_by_level[0].end(),
[&](const FileMetaData& meta) {
return meta.fd.GetNumber() == info.file_number;
});
ASSERT_NE(it, files_by_level[0].end());
ASSERT_EQ(info.oldest_blob_file_number, it->oldest_blob_file_number);
}
ASSERT_EQ(db->GetEnv()->GetThreadID(), info.thread_id);
ASSERT_GT(info.thread_id, 0U);
}
std::vector<std::string> flushed_column_family_names_;
std::vector<DB*> flushed_dbs_;
int slowdown_count;
int stop_count;
bool db_closing;
std::atomic_bool db_closed;
TableFileCreationInfo prev_fc_info_;
protected:
Env* env_;
DBFlushTest* test_;
};
#endif // !ROCKSDB_LITE
TEST_F(DBFlushTest, MemPurgeBasic) {
Options options = CurrentOptions();
@ -695,8 +776,11 @@ TEST_F(DBFlushTest, MemPurgeBasic) {
// Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
options.write_buffer_size = 1 << 20;
// Activate the MemPurge prototype.
options.experimental_mempurge_threshold =
1.0; // std::numeric_limits<double>::max();
options.experimental_mempurge_threshold = 1.0;
#ifndef ROCKSDB_LITE
TestFlushListener* listener = new TestFlushListener(options.env, this);
options.listeners.emplace_back(listener);
#endif // !ROCKSDB_LITE
ASSERT_OK(TryReopen(options));
uint32_t mempurge_count = 0;
uint32_t sst_count = 0;
@ -839,12 +923,15 @@ TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) {
options.compression = kNoCompression;
options.inplace_update_support = false;
options.allow_concurrent_memtable_write = true;
#ifndef ROCKSDB_LITE
TestFlushListener* listener = new TestFlushListener(options.env, this);
options.listeners.emplace_back(listener);
#endif // !ROCKSDB_LITE
// Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
options.write_buffer_size = 1 << 20;
// Activate the MemPurge prototype.
options.experimental_mempurge_threshold =
1.0; // std::numeric_limits<double>::max();
options.experimental_mempurge_threshold = 1.0;
ASSERT_OK(TryReopen(options));
uint32_t mempurge_count = 0;
@ -1037,7 +1124,10 @@ TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) {
options.compression = kNoCompression;
options.inplace_update_support = false;
options.allow_concurrent_memtable_write = true;
#ifndef ROCKSDB_LITE
TestFlushListener* listener = new TestFlushListener(options.env, this);
options.listeners.emplace_back(listener);
#endif // !ROCKSDB_LITE
// Create a ConditionalUpdate compaction filter
// that will update all the values of the KV pairs
// where the keys are "lower" than KEY4.
@ -1047,8 +1137,8 @@ TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) {
// Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
options.write_buffer_size = 1 << 20;
// Activate the MemPurge prototype.
options.experimental_mempurge_threshold =
1.0; // std::numeric_limits<double>::max();
options.experimental_mempurge_threshold = 1.0;
ASSERT_OK(TryReopen(options));
uint32_t mempurge_count = 0;
@ -1123,8 +1213,8 @@ TEST_F(DBFlushTest, MemPurgeWALSupport) {
// Enforce size of a single MemTable to 128KB.
options.write_buffer_size = 128 << 10;
// Activate the MemPurge prototype.
options.experimental_mempurge_threshold =
1.0; // std::numeric_limits<double>::max();
options.experimental_mempurge_threshold = 1.0;
ASSERT_OK(TryReopen(options));
const size_t KVSIZE = 10;

View File

@ -7,6 +7,7 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <cinttypes>
#include <deque>
#include "db/builder.h"
#include "db/db_impl/db_impl.h"
@ -198,7 +199,7 @@ Status DBImpl::FlushMemTableToOutputFile(
need_cancel = true;
}
TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:AfterPickMemtables");
bool switched_to_mempurge = false;
// Within flush_job.Run, rocksdb may call event listener to notify
// file creation and deletion.
//
@ -206,7 +207,8 @@ Status DBImpl::FlushMemTableToOutputFile(
// and EventListener callback will be called when the db_mutex
// is unlocked by the current thread.
if (s.ok()) {
s = flush_job.Run(&logs_with_prep_tracker_, &file_meta);
s = flush_job.Run(&logs_with_prep_tracker_, &file_meta,
&switched_to_mempurge);
need_cancel = false;
}
@ -282,7 +284,9 @@ Status DBImpl::FlushMemTableToOutputFile(
// from never needing it or ignoring the flush job status
io_s.PermitUncheckedError();
}
if (s.ok()) {
// If flush ran smoothly and no mempurge happened
// install new SST file path.
if (s.ok() && (!switched_to_mempurge)) {
#ifndef ROCKSDB_LITE
// may temporarily unlock and lock the mutex.
NotifyOnFlushCompleted(cfd, mutable_cf_options,
@ -411,6 +415,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
}
std::vector<FileMetaData> file_meta(num_cfs);
// Use of deque<bool> because vector<bool>
// is specific and doesn't allow &v[i].
std::deque<bool> switched_to_mempurge(num_cfs, false);
Status s;
IOStatus log_io_s = IOStatus::OK();
assert(num_cfs == static_cast<int>(jobs.size()));
@ -460,10 +467,13 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
}
if (s.ok()) {
assert(switched_to_mempurge.size() ==
static_cast<long unsigned int>(num_cfs));
// TODO (yanqin): parallelize jobs with threads.
for (int i = 1; i != num_cfs; ++i) {
exec_status[i].second =
jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i]);
jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i],
&(switched_to_mempurge.at(i)));
exec_status[i].first = true;
io_status[i] = jobs[i]->io_status();
}
@ -475,8 +485,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
}
assert(exec_status.size() > 0);
assert(!file_meta.empty());
exec_status[0].second =
jobs[0]->Run(&logs_with_prep_tracker_, &file_meta[0]);
exec_status[0].second = jobs[0]->Run(
&logs_with_prep_tracker_, file_meta.data() /* &file_meta[0] */,
switched_to_mempurge.empty() ? nullptr : &(switched_to_mempurge.at(0)));
exec_status[0].first = true;
io_status[0] = jobs[0]->io_status();
@ -656,6 +667,11 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
immutable_db_options_.sst_file_manager.get());
assert(all_mutable_cf_options.size() == static_cast<size_t>(num_cfs));
for (int i = 0; s.ok() && i != num_cfs; ++i) {
// If mempurge happened instead of Flush,
// no NotifyOnFlushCompleted call (no SST file created).
if (switched_to_mempurge[i]) {
continue;
}
if (cfds[i]->IsDropped()) {
continue;
}

View File

@ -196,8 +196,8 @@ void FlushJob::PickMemTable() {
base_->Ref(); // it is likely that we do not need this reference
}
Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
FileMetaData* file_meta) {
Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta,
bool* switched_to_mempurge) {
TEST_SYNC_POINT("FlushJob::Start");
db_mutex_->AssertHeld();
assert(pick_memtable_called);
@ -244,6 +244,16 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
ROCKS_LOG_WARN(db_options_.info_log, "Mempurge process failed: %s\n",
mempurge_s.ToString().c_str());
}
} else {
if (switched_to_mempurge) {
*switched_to_mempurge = true;
} else {
// The mempurge process was successful, but no switch_to_mempurge
// pointer provided so no way to propagate the state of flush job.
ROCKS_LOG_WARN(db_options_.info_log,
"Mempurge process succeeded"
"but no 'switched_to_mempurge' ptr provided.\n");
}
}
}
Status s;
@ -561,6 +571,12 @@ Status FlushJob::MemPurge() {
// we do not call SchedulePendingFlush().
cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free);
new_mem->Ref();
#ifndef ROCKSDB_LITE
// Piggyback FlushJobInfo on the first flushed memtable.
db_mutex_->AssertHeld();
meta_.fd.file_size = 0;
mems_[0]->SetFlushJobInfo(GetFlushJobInfo());
#endif // !ROCKSDB_LITE
db_mutex_->Unlock();
} else {
s = Status::Aborted(Slice("Mempurge filled more than one memtable."));

View File

@ -83,7 +83,8 @@ class FlushJob {
// Once PickMemTable() is called, either Run() or Cancel() has to be called.
void PickMemTable();
Status Run(LogsWithPrepTracker* prep_tracker = nullptr,
FileMetaData* file_meta = nullptr);
FileMetaData* file_meta = nullptr,
bool* switched_to_mempurge = nullptr);
void Cancel();
const autovector<MemTable*>& GetMemTables() const { return mems_; }