Fix data loss after DB recovery by not allowing flush/compaction to be scheduled until DB opened

Summary:
Previous run may leave some SST files with higher file numbers than manifest indicates.
Compaction or flush may start to run while DB::Open() is still going on. SST file garbage collection may happen interleaving with compaction or flush, and overwrite files generated by compaction of flushes after they are generated. This might cause data loss. This possibility of interleaving is recently introduced.
Fix it by not allowing compaction or flush to be scheduled before DB::Open() finishes.

Test Plan: Add a unit test. This verification will have a chance to fail without the fix but doesn't fix without the fix.

Reviewers: kradhakrishnan, anthony, yhchiang, IslamAbdelRahman, igor

Reviewed By: igor

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D42399
This commit is contained in:
sdong 2015-07-15 19:58:28 -07:00
parent e4af3bfb27
commit 6c0c8dee7b
2 changed files with 96 additions and 18 deletions

View File

@ -2059,6 +2059,10 @@ Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) {
void DBImpl::MaybeScheduleFlushOrCompaction() {
mutex_.AssertHeld();
if (!opened_successfully_) {
// Compaction may introduce data race to DB open
return;
}
if (bg_work_gate_closed_) {
// gate closed for background work
return;
@ -2609,6 +2613,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
mutex_.Unlock();
status = compaction_job.Run();
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");
mutex_.Lock();
compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_);
@ -4317,11 +4322,14 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
}
}
}
TEST_SYNC_POINT("DBImpl::Open:Opened");
if (s.ok()) {
impl->opened_successfully_ = true;
impl->MaybeScheduleFlushOrCompaction();
}
impl->mutex_.Unlock();
if (s.ok()) {
impl->opened_successfully_ = true;
Log(InfoLogLevel::INFO_LEVEL, impl->db_options_.info_log, "DB pointer %p",
impl);
*dbptr = impl;

View File

@ -25,6 +25,7 @@
#include "util/logging.h"
#include "util/mock_env.h"
#include "util/mutexlock.h"
#include "util/sync_point.h"
#include "util/testharness.h"
#include "util/testutil.h"
@ -186,6 +187,13 @@ class FaultInjectionTestEnv : public EnvWrapper {
Status NewWritableFile(const std::string& fname,
unique_ptr<WritableFile>* result,
const EnvOptions& soptions) override {
if (!IsFilesystemActive()) {
return Status::Corruption("Not Active");
}
// Not allow overwriting files
if (target()->FileExists(fname)) {
return Status::Corruption("File already exists.");
}
Status s = target()->NewWritableFile(fname, result, soptions);
if (s.ok()) {
result->reset(new TestWritableFile(fname, std::move(*result), this));
@ -202,6 +210,9 @@ class FaultInjectionTestEnv : public EnvWrapper {
}
virtual Status DeleteFile(const std::string& f) override {
if (!IsFilesystemActive()) {
return Status::Corruption("Not Active");
}
Status s = EnvWrapper::DeleteFile(f);
if (!s.ok()) {
fprintf(stderr, "Cannot delete file %s: %s\n", f.c_str(),
@ -216,6 +227,9 @@ class FaultInjectionTestEnv : public EnvWrapper {
virtual Status RenameFile(const std::string& s,
const std::string& t) override {
if (!IsFilesystemActive()) {
return Status::Corruption("Not Active");
}
Status ret = EnvWrapper::RenameFile(s, t);
if (ret.ok()) {
@ -374,8 +388,11 @@ TestWritableFile::~TestWritableFile() {
}
Status TestWritableFile::Append(const Slice& data) {
if (!env_->IsFilesystemActive()) {
return Status::Corruption("Not Active");
}
Status s = target_->Append(data);
if (s.ok() && env_->IsFilesystemActive()) {
if (s.ok()) {
state_.pos_ += data.size();
}
return s;
@ -545,33 +562,34 @@ class FaultInjectionTest : public testing::Test {
ASSERT_OK(s);
}
void Build(const WriteOptions& write_options, int start_idx, int num_vals) {
void Build(const WriteOptions& write_options, int start_idx, int num_vals,
bool sequential = true) {
std::string key_space, value_space;
WriteBatch batch;
for (int i = start_idx; i < start_idx + num_vals; i++) {
Slice key = Key(i, &key_space);
Slice key = Key(sequential, i, &key_space);
batch.Clear();
batch.Put(key, Value(i, &value_space));
ASSERT_OK(db_->Write(write_options, &batch));
}
}
Status ReadValue(int i, std::string* val) const {
Status ReadValue(int i, std::string* val, bool sequential) const {
std::string key_space, value_space;
Slice key = Key(i, &key_space);
Slice key = Key(sequential, i, &key_space);
Value(i, &value_space);
ReadOptions options;
return db_->Get(options, key, val);
}
Status Verify(int start_idx, int num_vals,
ExpectedVerifResult expected) const {
Status Verify(int start_idx, int num_vals, ExpectedVerifResult expected,
bool seqeuntial = true) const {
std::string val;
std::string value_space;
Status s;
for (int i = start_idx; i < start_idx + num_vals && s.ok(); i++) {
Value(i, &value_space);
s = ReadValue(i, &val);
s = ReadValue(i, &val, seqeuntial);
if (s.ok()) {
EXPECT_EQ(value_space, val);
}
@ -591,9 +609,16 @@ class FaultInjectionTest : public testing::Test {
}
// Return the ith key
Slice Key(int i, std::string* storage) const {
Slice Key(bool seqeuntial, int i, std::string* storage) const {
int num = i;
if (!seqeuntial) {
// random transfer
const int m = 0x5bd1e995;
num *= m;
num ^= num << 24;
}
char buf[100];
snprintf(buf, sizeof(buf), "%016d", i);
snprintf(buf, sizeof(buf), "%016d", num);
storage->assign(buf, strlen(buf));
return Slice(*storage);
}
@ -773,14 +798,14 @@ TEST_F(FaultInjectionTest, DISABLED_WriteOptionSyncTest) {
write_options.sync = false;
std::string key_space, value_space;
ASSERT_OK(
db_->Put(write_options, Key(1, &key_space), Value(1, &value_space)));
ASSERT_OK(db_->Put(write_options, Key(true, 1, &key_space),
Value(1, &value_space)));
FlushOptions flush_options;
flush_options.wait = false;
ASSERT_OK(db_->Flush(flush_options));
write_options.sync = true;
ASSERT_OK(
db_->Put(write_options, Key(2, &key_space), Value(2, &value_space)));
ASSERT_OK(db_->Put(write_options, Key(true, 2, &key_space),
Value(2, &value_space)));
env_->SetFilesystemActive(false);
NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced);
@ -789,14 +814,59 @@ TEST_F(FaultInjectionTest, DISABLED_WriteOptionSyncTest) {
ASSERT_OK(OpenDB());
std::string val;
Value(2, &value_space);
ASSERT_OK(ReadValue(2, &val));
ASSERT_OK(ReadValue(2, &val, true));
ASSERT_EQ(value_space, val);
Value(1, &value_space);
ASSERT_OK(ReadValue(1, &val));
ASSERT_OK(ReadValue(1, &val, true));
ASSERT_EQ(value_space, val);
}
TEST_F(FaultInjectionTest, UninstalledCompaction) {
options_.target_file_size_base = 32 * 1024;
options_.write_buffer_size = 100 << 10; // 100KB
options_.level0_file_num_compaction_trigger = 6;
options_.level0_stop_writes_trigger = 1 << 10;
options_.level0_slowdown_writes_trigger = 1 << 10;
options_.max_background_compactions = 1;
OpenDB();
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"FaultInjectionTest::FaultTest:0", "DBImpl::BGWorkCompaction"},
{"CompactionJob::Run():End", "FaultInjectionTest::FaultTest:1"},
{"FaultInjectionTest::FaultTest:2",
"DBImpl::BackgroundCompaction:NonTrivial:AfterRun"},
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
int kNumKeys = 1000;
Build(WriteOptions(), 0, kNumKeys, false);
FlushOptions flush_options;
flush_options.wait = true;
db_->Flush(flush_options);
ASSERT_OK(db_->Put(WriteOptions(), "", ""));
TEST_SYNC_POINT("FaultInjectionTest::FaultTest:0");
TEST_SYNC_POINT("FaultInjectionTest::FaultTest:1");
env_->SetFilesystemActive(false);
TEST_SYNC_POINT("FaultInjectionTest::FaultTest:2");
CloseDB();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
ResetDBState(kResetDropUnsyncedData);
std::atomic<bool> opened(false);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::Open:Opened", [&](void* arg) { opened.store(true); });
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BGWorkCompaction",
[&](void* arg) { ASSERT_TRUE(opened.load()); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(OpenDB());
static_cast<DBImpl*>(db_)->TEST_WaitForCompact();
ASSERT_OK(Verify(0, kNumKeys, FaultInjectionTest::kValExpectFound, false));
ASSERT_OK(db_->Put(WriteOptions(), "", ""));
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
} // namespace rocksdb
int main(int argc, char** argv) {