LogAndApply() should fail if the column family has been dropped
Summary: This patch finally fixes the ColumnFamilyTest.ReadDroppedColumnFamily test. The test has been failing very sporadically and it was hard to repro. However, I managed to write a new tests that reproes the failure deterministically. Here's what happens: 1. We start the flush for the column family 2. We check if the column family was dropped here:a3fc49bfdd/db/flush_job.cc (L149)
3. This check goes through, ends up in InstallMemtableFlushResults() and it goes into LogAndApply() 4. At about this time, we start dropping the column family. Dropping the column family process gets to LogAndApply() at about the same time as LogAndApply() from flush process 5. Drop column family goes through LogAndApply() first, marking the column family as dropped. 6. Flush process gets woken up and gets a chance to write to the MANIFEST. However, this is where it gets stuck:a3fc49bfdd/db/version_set.cc (L1975)
7. We see that the column family was dropped, so there is no need to write to the MANIFEST. We return OK. 8. Flush gets OK back from LogAndApply() and it deletes the memtable, thinking that the data is now safely persisted to sst file. The fix is pretty simple. Instead of OK, we return ShutdownInProgress. This is not really true, but we have been using this status code to also mean "this operation was canceled because the column family has been dropped". The fix is only one LOC. All other code is related to tests. I added a new test that reproes the failure. I also moved SleepingBackgroundTask to util/testutil.h (because I needed it in column_family_test for my new test). There's plenty of other places where we reimplement SleepingBackgroundTask, but I'll address that in a separate commit. Test Plan: 1. new test 2. make check 3. Make sure the ColumnFamilyTest.ReadDroppedColumnFamily doesn't fail on Travis: https://travis-ci.org/facebook/rocksdb/jobs/79952386 Reviewers: yhchiang, anthony, IslamAbdelRahman, kradhakrishnan, rven, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D46773
This commit is contained in:
parent
664acfa087
commit
0833b75104
@ -10,6 +10,7 @@
|
||||
#include <algorithm>
|
||||
#include <vector>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
||||
#include "db/db_impl.h"
|
||||
#include "rocksdb/db.h"
|
||||
@ -19,6 +20,7 @@
|
||||
#include "util/testharness.h"
|
||||
#include "util/testutil.h"
|
||||
#include "util/coding.h"
|
||||
#include "util/sync_point.h"
|
||||
#include "utilities/merge_operators.h"
|
||||
|
||||
namespace rocksdb {
|
||||
@ -1196,6 +1198,67 @@ TEST_F(ColumnFamilyTest, ReadDroppedColumnFamily) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(ColumnFamilyTest, FlushAndDropRaceCondition) {
|
||||
db_options_.create_missing_column_families = true;
|
||||
Open({"default", "one"});
|
||||
ColumnFamilyOptions options;
|
||||
options.level0_file_num_compaction_trigger = 100;
|
||||
options.level0_slowdown_writes_trigger = 200;
|
||||
options.level0_stop_writes_trigger = 200;
|
||||
options.max_write_buffer_number = 20;
|
||||
options.write_buffer_size = 100000; // small write buffer size
|
||||
Reopen({options, options});
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"VersionSet::LogAndApply::ColumnFamilyDrop:1"
|
||||
"FlushJob::InstallResults"},
|
||||
{"FlushJob::InstallResults",
|
||||
"VersionSet::LogAndApply::ColumnFamilyDrop:2", }});
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
test::SleepingBackgroundTask sleeping_task;
|
||||
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
|
||||
Env::Priority::HIGH);
|
||||
|
||||
// 1MB should create ~10 files for each CF
|
||||
int kKeysNum = 10000;
|
||||
PutRandomData(1, kKeysNum, 100);
|
||||
|
||||
std::vector<std::thread> threads;
|
||||
threads.emplace_back([&] { ASSERT_OK(db_->DropColumnFamily(handles_[1])); });
|
||||
|
||||
sleeping_task.WakeUp();
|
||||
sleeping_task.WaitUntilDone();
|
||||
sleeping_task.Reset();
|
||||
// now we sleep again. this is just so we're certain that flush job finished
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
|
||||
Env::Priority::HIGH);
|
||||
sleeping_task.WakeUp();
|
||||
sleeping_task.WaitUntilDone();
|
||||
|
||||
{
|
||||
// Since we didn't delete CF handle, RocksDB's contract guarantees that
|
||||
// we're still able to read dropped CF
|
||||
std::unique_ptr<Iterator> iterator(
|
||||
db_->NewIterator(ReadOptions(), handles_[1]));
|
||||
int count = 0;
|
||||
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
|
||||
ASSERT_OK(iterator->status());
|
||||
++count;
|
||||
}
|
||||
ASSERT_OK(iterator->status());
|
||||
ASSERT_EQ(count, kKeysNum);
|
||||
}
|
||||
for (auto& t : threads) {
|
||||
t.join();
|
||||
}
|
||||
|
||||
Close();
|
||||
Destroy();
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
100
db/db_test.cc
100
db/db_test.cc
@ -2219,61 +2219,16 @@ TEST_F(DBTest, NumImmutableMemTable) {
|
||||
} while (ChangeCompactOptions());
|
||||
}
|
||||
|
||||
class SleepingBackgroundTask {
|
||||
public:
|
||||
SleepingBackgroundTask()
|
||||
: bg_cv_(&mutex_), should_sleep_(true), done_with_sleep_(false) {}
|
||||
void DoSleep() {
|
||||
MutexLock l(&mutex_);
|
||||
while (should_sleep_) {
|
||||
bg_cv_.Wait();
|
||||
}
|
||||
done_with_sleep_ = true;
|
||||
bg_cv_.SignalAll();
|
||||
}
|
||||
void WakeUp() {
|
||||
MutexLock l(&mutex_);
|
||||
should_sleep_ = false;
|
||||
bg_cv_.SignalAll();
|
||||
}
|
||||
void WaitUntilDone() {
|
||||
MutexLock l(&mutex_);
|
||||
while (!done_with_sleep_) {
|
||||
bg_cv_.Wait();
|
||||
}
|
||||
}
|
||||
bool WokenUp() {
|
||||
MutexLock l(&mutex_);
|
||||
return should_sleep_ == false;
|
||||
}
|
||||
|
||||
void Reset() {
|
||||
MutexLock l(&mutex_);
|
||||
should_sleep_ = true;
|
||||
done_with_sleep_ = false;
|
||||
}
|
||||
|
||||
static void DoSleepTask(void* arg) {
|
||||
reinterpret_cast<SleepingBackgroundTask*>(arg)->DoSleep();
|
||||
}
|
||||
|
||||
private:
|
||||
port::Mutex mutex_;
|
||||
port::CondVar bg_cv_; // Signalled when background work finishes
|
||||
bool should_sleep_;
|
||||
bool done_with_sleep_;
|
||||
};
|
||||
|
||||
TEST_F(DBTest, FlushEmptyColumnFamily) {
|
||||
// Block flush thread and disable compaction thread
|
||||
env_->SetBackgroundThreads(1, Env::HIGH);
|
||||
env_->SetBackgroundThreads(1, Env::LOW);
|
||||
SleepingBackgroundTask sleeping_task_low;
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
test::SleepingBackgroundTask sleeping_task_low;
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
Env::Priority::LOW);
|
||||
SleepingBackgroundTask sleeping_task_high;
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_high,
|
||||
Env::Priority::HIGH);
|
||||
test::SleepingBackgroundTask sleeping_task_high;
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
|
||||
&sleeping_task_high, Env::Priority::HIGH);
|
||||
|
||||
Options options = CurrentOptions();
|
||||
// disable compaction
|
||||
@ -2312,12 +2267,12 @@ TEST_F(DBTest, GetProperty) {
|
||||
// Set sizes to both background thread pool to be 1 and block them.
|
||||
env_->SetBackgroundThreads(1, Env::HIGH);
|
||||
env_->SetBackgroundThreads(1, Env::LOW);
|
||||
SleepingBackgroundTask sleeping_task_low;
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
test::SleepingBackgroundTask sleeping_task_low;
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
Env::Priority::LOW);
|
||||
SleepingBackgroundTask sleeping_task_high;
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_high,
|
||||
Env::Priority::HIGH);
|
||||
test::SleepingBackgroundTask sleeping_task_high;
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
|
||||
&sleeping_task_high, Env::Priority::HIGH);
|
||||
|
||||
Options options = CurrentOptions();
|
||||
WriteOptions writeOpt = WriteOptions();
|
||||
@ -2566,8 +2521,8 @@ TEST_F(DBTest, EstimatePendingCompBytes) {
|
||||
// Set sizes to both background thread pool to be 1 and block them.
|
||||
env_->SetBackgroundThreads(1, Env::HIGH);
|
||||
env_->SetBackgroundThreads(1, Env::LOW);
|
||||
SleepingBackgroundTask sleeping_task_low;
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
test::SleepingBackgroundTask sleeping_task_low;
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
Env::Priority::LOW);
|
||||
|
||||
Options options = CurrentOptions();
|
||||
@ -6179,7 +6134,7 @@ TEST_F(DBTest, TableOptionsSanitizeTest) {
|
||||
TEST_F(DBTest, SanitizeNumThreads) {
|
||||
for (int attempt = 0; attempt < 2; attempt++) {
|
||||
const size_t kTotalTasks = 8;
|
||||
SleepingBackgroundTask sleeping_tasks[kTotalTasks];
|
||||
test::SleepingBackgroundTask sleeping_tasks[kTotalTasks];
|
||||
|
||||
Options options = CurrentOptions();
|
||||
if (attempt == 0) {
|
||||
@ -6191,7 +6146,8 @@ TEST_F(DBTest, SanitizeNumThreads) {
|
||||
|
||||
for (size_t i = 0; i < kTotalTasks; i++) {
|
||||
// Insert 5 tasks to low priority queue and 5 tasks to high priority queue
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_tasks[i],
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
|
||||
&sleeping_tasks[i],
|
||||
(i < 4) ? Env::Priority::LOW : Env::Priority::HIGH);
|
||||
}
|
||||
|
||||
@ -6483,8 +6439,8 @@ TEST_F(DBTest, DynamicMemtableOptions) {
|
||||
// max_background_flushes == 0, so flushes are getting executed by the
|
||||
// compaction thread
|
||||
env_->SetBackgroundThreads(1, Env::LOW);
|
||||
SleepingBackgroundTask sleeping_task_low;
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
test::SleepingBackgroundTask sleeping_task_low;
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
Env::Priority::LOW);
|
||||
// Start from scratch and disable compaction/flush. Flush can only happen
|
||||
// during compaction but trigger is pretty high
|
||||
@ -6519,7 +6475,7 @@ TEST_F(DBTest, DynamicMemtableOptions) {
|
||||
dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
|
||||
|
||||
sleeping_task_low.Reset();
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
Env::Priority::LOW);
|
||||
count = 0;
|
||||
while (!sleeping_task_low.WokenUp() && count < 1024) {
|
||||
@ -6542,7 +6498,7 @@ TEST_F(DBTest, DynamicMemtableOptions) {
|
||||
dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
|
||||
|
||||
sleeping_task_low.Reset();
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
Env::Priority::LOW);
|
||||
|
||||
count = 0;
|
||||
@ -7349,8 +7305,8 @@ TEST_F(DBTest, DynamicCompactionOptions) {
|
||||
// since level0_stop_writes_trigger = 8
|
||||
dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
|
||||
// Block compaction
|
||||
SleepingBackgroundTask sleeping_task_low;
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
test::SleepingBackgroundTask sleeping_task_low;
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
Env::Priority::LOW);
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
@ -7386,7 +7342,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {
|
||||
|
||||
// Block compaction again
|
||||
sleeping_task_low.Reset();
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
Env::Priority::LOW);
|
||||
count = 0;
|
||||
while (count < 64) {
|
||||
@ -7824,7 +7780,7 @@ TEST_F(DBTest, DeleteObsoleteFilesPendingOutputs) {
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_EQ("0,0,1", FilesPerLevel(0));
|
||||
|
||||
SleepingBackgroundTask blocking_thread;
|
||||
test::SleepingBackgroundTask blocking_thread;
|
||||
port::Mutex mutex_;
|
||||
bool already_blocked(false);
|
||||
|
||||
@ -7891,12 +7847,12 @@ TEST_F(DBTest, CloseSpeedup) {
|
||||
// Block background threads
|
||||
env_->SetBackgroundThreads(1, Env::LOW);
|
||||
env_->SetBackgroundThreads(1, Env::HIGH);
|
||||
SleepingBackgroundTask sleeping_task_low;
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
test::SleepingBackgroundTask sleeping_task_low;
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
Env::Priority::LOW);
|
||||
SleepingBackgroundTask sleeping_task_high;
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_high,
|
||||
Env::Priority::HIGH);
|
||||
test::SleepingBackgroundTask sleeping_task_high;
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
|
||||
&sleeping_task_high, Env::Priority::HIGH);
|
||||
|
||||
std::vector<std::string> filenames;
|
||||
env_->GetChildren(dbname_, &filenames);
|
||||
|
@ -154,6 +154,7 @@ Status FlushJob::Run(FileMetaData* file_meta) {
|
||||
if (!s.ok()) {
|
||||
cfd_->imm()->RollbackMemtableFlush(mems, meta.fd.GetNumber());
|
||||
} else {
|
||||
TEST_SYNC_POINT("FlushJob::InstallResults");
|
||||
// Replace immutable memtable with the generated Table
|
||||
s = cfd_->imm()->InstallMemtableFlushResults(
|
||||
cfd_, mutable_cf_options_, mems, versions_, db_mutex_,
|
||||
|
@ -1979,7 +1979,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
|
||||
if (!manifest_writers_.empty()) {
|
||||
manifest_writers_.front()->cv.Signal();
|
||||
}
|
||||
return Status::OK();
|
||||
// we steal this code to also inform about cf-drop
|
||||
return Status::ShutdownInProgress();
|
||||
}
|
||||
|
||||
std::vector<VersionEdit*> batch_edits;
|
||||
@ -2141,6 +2142,11 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
|
||||
new_manifest_file_size = descriptor_log_->file()->GetFileSize();
|
||||
}
|
||||
|
||||
if (edit->is_column_family_drop_) {
|
||||
TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1");
|
||||
TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2");
|
||||
}
|
||||
|
||||
LogFlush(db_options_->info_log);
|
||||
mu->Lock();
|
||||
}
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/iterator.h"
|
||||
#include "rocksdb/slice.h"
|
||||
#include "util/mutexlock.h"
|
||||
#include "util/random.h"
|
||||
|
||||
namespace rocksdb {
|
||||
@ -272,5 +273,50 @@ class NullLogger : public Logger {
|
||||
// Corrupts key by changing the type
|
||||
extern void CorruptKeyType(InternalKey* ikey);
|
||||
|
||||
class SleepingBackgroundTask {
|
||||
public:
|
||||
SleepingBackgroundTask()
|
||||
: bg_cv_(&mutex_), should_sleep_(true), done_with_sleep_(false) {}
|
||||
void DoSleep() {
|
||||
MutexLock l(&mutex_);
|
||||
while (should_sleep_) {
|
||||
bg_cv_.Wait();
|
||||
}
|
||||
done_with_sleep_ = true;
|
||||
bg_cv_.SignalAll();
|
||||
}
|
||||
void WakeUp() {
|
||||
MutexLock l(&mutex_);
|
||||
should_sleep_ = false;
|
||||
bg_cv_.SignalAll();
|
||||
}
|
||||
void WaitUntilDone() {
|
||||
MutexLock l(&mutex_);
|
||||
while (!done_with_sleep_) {
|
||||
bg_cv_.Wait();
|
||||
}
|
||||
}
|
||||
bool WokenUp() {
|
||||
MutexLock l(&mutex_);
|
||||
return should_sleep_ == false;
|
||||
}
|
||||
|
||||
void Reset() {
|
||||
MutexLock l(&mutex_);
|
||||
should_sleep_ = true;
|
||||
done_with_sleep_ = false;
|
||||
}
|
||||
|
||||
static void DoSleepTask(void* arg) {
|
||||
reinterpret_cast<SleepingBackgroundTask*>(arg)->DoSleep();
|
||||
}
|
||||
|
||||
private:
|
||||
port::Mutex mutex_;
|
||||
port::CondVar bg_cv_; // Signalled when background work finishes
|
||||
bool should_sleep_;
|
||||
bool done_with_sleep_;
|
||||
};
|
||||
|
||||
} // namespace test
|
||||
} // namespace rocksdb
|
||||
|
Loading…
Reference in New Issue
Block a user