Do not spin in a tight loop attempting compactions if there is a compaction error

Summary: as subject. ported the change from google code leveldb 1.5

Test Plan: run db_test

Reviewers: dhruba

Differential Revision: https://reviews.facebook.net/D4839
This commit is contained in:
heyongqiang 2012-08-22 16:57:51 -07:00
parent 935fdd030b
commit 6fee5a74f5
3 changed files with 60 additions and 7 deletions

View File

@ -700,8 +700,21 @@ void DBImpl::BackgroundCall() {
MutexLock l(&mutex_); MutexLock l(&mutex_);
assert(bg_compaction_scheduled_); assert(bg_compaction_scheduled_);
if (!shutting_down_.Acquire_Load()) { if (!shutting_down_.Acquire_Load()) {
BackgroundCompaction(); Status s = BackgroundCompaction();
if (!s.ok()) {
// Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to
// chew up resources for failed compactions for the duration of
// the problem.
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
Log(options_.info_log, "Waiting after background compaction error: %s",
s.ToString().c_str());
mutex_.Unlock();
env_->SleepForMicroseconds(1000000);
mutex_.Lock();
}
} }
bg_compaction_scheduled_ = false; bg_compaction_scheduled_ = false;
MaybeScheduleLogDBDeployStats(); MaybeScheduleLogDBDeployStats();
@ -712,12 +725,11 @@ void DBImpl::BackgroundCall() {
bg_cv_.SignalAll(); bg_cv_.SignalAll();
} }
void DBImpl::BackgroundCompaction() { Status DBImpl::BackgroundCompaction() {
mutex_.AssertHeld(); mutex_.AssertHeld();
if (imm_ != NULL) { if (imm_ != NULL) {
CompactMemTable(); return CompactMemTable();
return;
} }
Compaction* c; Compaction* c;
@ -792,6 +804,7 @@ void DBImpl::BackgroundCompaction() {
} }
manual_compaction_ = NULL; manual_compaction_ = NULL;
} }
return status;
} }
void DBImpl::CleanupCompaction(CompactionState* compact) { void DBImpl::CleanupCompaction(CompactionState* compact) {
@ -1368,6 +1381,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
} else if (imm_ != NULL) { } else if (imm_ != NULL) {
// We have filled up the current memtable, but the previous // We have filled up the current memtable, but the previous
// one is still being compacted, so we wait. // one is still being compacted, so we wait.
Log(options_.info_log, "wait for memtable compaction...\n");
bg_cv_.Wait(); bg_cv_.Wait();
} else if (versions_->NumLevelFiles(0) >= } else if (versions_->NumLevelFiles(0) >=
options_.level0_stop_writes_trigger) { options_.level0_stop_writes_trigger) {

View File

@ -121,7 +121,7 @@ class DBImpl : public DB {
void MaybeScheduleCompaction(); void MaybeScheduleCompaction();
static void BGWork(void* db); static void BGWork(void* db);
void BackgroundCall(); void BackgroundCall();
void BackgroundCompaction(); Status BackgroundCompaction();
void CleanupCompaction(CompactionState* compact); void CleanupCompaction(CompactionState* compact);
Status DoCompactionWork(CompactionState* compact); Status DoCompactionWork(CompactionState* compact);

View File

@ -56,12 +56,18 @@ class SpecialEnv : public EnvWrapper {
// Simulate no-space errors while this pointer is non-NULL. // Simulate no-space errors while this pointer is non-NULL.
port::AtomicPointer no_space_; port::AtomicPointer no_space_;
// Simulate non-writable file system while this pointer is non-NULL
port::AtomicPointer non_writable_;
bool count_random_reads_; bool count_random_reads_;
AtomicCounter random_read_counter_; AtomicCounter random_read_counter_;
AtomicCounter sleep_counter_;
explicit SpecialEnv(Env* base) : EnvWrapper(base) { explicit SpecialEnv(Env* base) : EnvWrapper(base) {
delay_sstable_sync_.Release_Store(NULL); delay_sstable_sync_.Release_Store(NULL);
no_space_.Release_Store(NULL); no_space_.Release_Store(NULL);
non_writable_.Release_Store(NULL);
count_random_reads_ = false; count_random_reads_ = false;
} }
@ -95,6 +101,10 @@ class SpecialEnv : public EnvWrapper {
} }
}; };
if (non_writable_.Acquire_Load() != NULL) {
return Status::IOError("simulated write error");
}
Status s = target()->NewWritableFile(f, r); Status s = target()->NewWritableFile(f, r);
if (s.ok()) { if (s.ok()) {
if (strstr(f.c_str(), ".sst") != NULL) { if (strstr(f.c_str(), ".sst") != NULL) {
@ -127,6 +137,11 @@ class SpecialEnv : public EnvWrapper {
} }
return s; return s;
} }
virtual void SleepForMicroseconds(int micros) {
sleep_counter_.Increment();
target()->SleepForMicroseconds(micros);
}
}; };
class DBTest { class DBTest {
@ -1568,13 +1583,37 @@ TEST(DBTest, NoSpace) {
Compact("a", "z"); Compact("a", "z");
const int num_files = CountFiles(); const int num_files = CountFiles();
env_->no_space_.Release_Store(env_); // Force out-of-space errors env_->no_space_.Release_Store(env_); // Force out-of-space errors
for (int i = 0; i < 10; i++) { for (int i = 0; i < 5; i++) {
for (int level = 0; level < dbfull()->NumberLevels()-1; level++) { for (int level = 0; level < dbfull()->NumberLevels()-1; level++) {
dbfull()->TEST_CompactRange(level, NULL, NULL); dbfull()->TEST_CompactRange(level, NULL, NULL);
} }
} }
env_->no_space_.Release_Store(NULL); env_->no_space_.Release_Store(NULL);
ASSERT_LT(CountFiles(), num_files + 5); ASSERT_LT(CountFiles(), num_files + 3);
// Check that compaction attempts slept after errors
ASSERT_GE(env_->sleep_counter_.Read(), 5);
}
TEST(DBTest, NonWritableFileSystem)
{
Options options = CurrentOptions();
options.write_buffer_size = 1000;
options.env = env_;
Reopen(&options);
ASSERT_OK(Put("foo", "v1"));
env_->non_writable_.Release_Store(env_); // Force errors for new files
std::string big(100000, 'x');
int errors = 0;
for (int i = 0; i < 20; i++) {
fprintf(stderr, "iter %d; errors %d\n", i, errors);
if (!Put("foo", big).ok()) {
errors++;
env_->SleepForMicroseconds(100000);
}
}
ASSERT_GT(errors, 0);
env_->non_writable_.Release_Store(NULL);
} }
TEST(DBTest, FilesDeletedAfterCompaction) { TEST(DBTest, FilesDeletedAfterCompaction) {