diff --git a/Makefile b/Makefile index 8cb9a46cc..258663790 100644 --- a/Makefile +++ b/Makefile @@ -869,7 +869,7 @@ arena_test: util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS) autovector_test: util/autovector_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) -column_family_test: db/column_family_test.o $(LIBOBJECTS) $(TESTHARNESS) +column_family_test: db/column_family_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) table_properties_collector_test: db/table_properties_collector_test.o $(LIBOBJECTS) $(TESTHARNESS) diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 18150970c..7126dd561 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -2837,6 +2837,217 @@ TEST_F(ColumnFamilyTest, CompactionSpeedupTwoColumnFamilies) { ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); } +#ifndef ROCKSDB_LITE +TEST_F(ColumnFamilyTest, FlushCloseWALFiles) { + SpecialEnv env(Env::Default()); + db_options_.env = &env; + db_options_.max_background_flushes = 1; + column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(2)); + Open(); + CreateColumnFamilies({"one"}); + ASSERT_OK(Put(1, "fodor", "mirko")); + ASSERT_OK(Put(0, "fodor", "mirko")); + ASSERT_OK(Put(1, "fodor", "mirko")); + + // Block flush jobs from running + test::SleepingBackgroundTask sleeping_task; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::HIGH); + + WriteOptions wo; + wo.sync = true; + ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko")); + + ASSERT_EQ(2, env.num_open_wal_file_.load()); + + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); + WaitForFlush(1); + ASSERT_EQ(1, env.num_open_wal_file_.load()); + + Reopen(); + ASSERT_EQ("mirko", Get(0, "fodor")); + ASSERT_EQ("mirko", Get(1, "fodor")); + db_options_.env = env_; + Close(); +} +#endif // !ROCKSDB_LITE + +TEST_F(ColumnFamilyTest, IteratorCloseWALFile1) { + SpecialEnv env(Env::Default()); + db_options_.env = &env; + db_options_.max_background_flushes = 1; + column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(2)); + Open(); + CreateColumnFamilies({"one"}); + ASSERT_OK(Put(1, "fodor", "mirko")); + // Create an iterator holding the current super version. + Iterator* it = db_->NewIterator(ReadOptions(), handles_[1]); + // A flush will make `it` hold the last reference of its super version. + Flush(1); + + ASSERT_OK(Put(1, "fodor", "mirko")); + ASSERT_OK(Put(0, "fodor", "mirko")); + ASSERT_OK(Put(1, "fodor", "mirko")); + + // Flush jobs will close previous WAL files after finishing. By + // block flush jobs from running, we trigger a condition where + // the iterator destructor should close the WAL files. + test::SleepingBackgroundTask sleeping_task; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::HIGH); + + WriteOptions wo; + wo.sync = true; + ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko")); + + ASSERT_EQ(2, env.num_open_wal_file_.load()); + // Deleting the iterator will clear its super version, triggering + // closing all files + delete it; + ASSERT_EQ(1, env.num_open_wal_file_.load()); + + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); + WaitForFlush(1); + + Reopen(); + ASSERT_EQ("mirko", Get(0, "fodor")); + ASSERT_EQ("mirko", Get(1, "fodor")); + db_options_.env = env_; + Close(); +} + +TEST_F(ColumnFamilyTest, IteratorCloseWALFile2) { + SpecialEnv env(Env::Default()); + // Allow both of flush and purge job to schedule. + env.SetBackgroundThreads(2, Env::HIGH); + db_options_.env = &env; + db_options_.max_background_flushes = 1; + column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(2)); + Open(); + CreateColumnFamilies({"one"}); + ASSERT_OK(Put(1, "fodor", "mirko")); + // Create an iterator holding the current super version. + ReadOptions ro; + ro.background_purge_on_iterator_cleanup = true; + Iterator* it = db_->NewIterator(ro, handles_[1]); + // A flush will make `it` hold the last reference of its super version. + Flush(1); + + ASSERT_OK(Put(1, "fodor", "mirko")); + ASSERT_OK(Put(0, "fodor", "mirko")); + ASSERT_OK(Put(1, "fodor", "mirko")); + + rocksdb::SyncPoint::GetInstance()->LoadDependency({ + {"ColumnFamilyTest::IteratorCloseWALFile2:0", + "DBImpl::BGWorkPurge:start"}, + {"ColumnFamilyTest::IteratorCloseWALFile2:2", + "DBImpl::BackgroundCallFlush:start"}, + {"DBImpl::BGWorkPurge:end", "ColumnFamilyTest::IteratorCloseWALFile2:1"}, + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + WriteOptions wo; + wo.sync = true; + ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko")); + + ASSERT_EQ(2, env.num_open_wal_file_.load()); + // Deleting the iterator will clear its super version, triggering + // closing all files + delete it; + ASSERT_EQ(2, env.num_open_wal_file_.load()); + + TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:0"); + TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:1"); + ASSERT_EQ(1, env.num_open_wal_file_.load()); + TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:2"); + WaitForFlush(1); + ASSERT_EQ(1, env.num_open_wal_file_.load()); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + + Reopen(); + ASSERT_EQ("mirko", Get(0, "fodor")); + ASSERT_EQ("mirko", Get(1, "fodor")); + db_options_.env = env_; + Close(); +} + +#ifndef ROCKSDB_LITE // TEST functions are not supported in lite +TEST_F(ColumnFamilyTest, ForwardIteratorCloseWALFile) { + SpecialEnv env(Env::Default()); + // Allow both of flush and purge job to schedule. + env.SetBackgroundThreads(2, Env::HIGH); + db_options_.env = &env; + db_options_.max_background_flushes = 1; + column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(3)); + column_family_options_.level0_file_num_compaction_trigger = 2; + Open(); + CreateColumnFamilies({"one"}); + ASSERT_OK(Put(1, "fodor", "mirko")); + ASSERT_OK(Put(1, "fodar2", "mirko")); + Flush(1); + + // Create an iterator holding the current super version, as well as + // the SST file just flushed. + ReadOptions ro; + ro.tailing = true; + ro.background_purge_on_iterator_cleanup = true; + Iterator* it = db_->NewIterator(ro, handles_[1]); + // A flush will make `it` hold the last reference of its super version. + + ASSERT_OK(Put(1, "fodor", "mirko")); + ASSERT_OK(Put(1, "fodar2", "mirko")); + Flush(1); + + WaitForCompaction(); + + ASSERT_OK(Put(1, "fodor", "mirko")); + ASSERT_OK(Put(1, "fodor", "mirko")); + ASSERT_OK(Put(0, "fodor", "mirko")); + ASSERT_OK(Put(1, "fodor", "mirko")); + + rocksdb::SyncPoint::GetInstance()->LoadDependency({ + {"ColumnFamilyTest::IteratorCloseWALFile2:0", + "DBImpl::BGWorkPurge:start"}, + {"ColumnFamilyTest::IteratorCloseWALFile2:2", + "DBImpl::BackgroundCallFlush:start"}, + {"DBImpl::BGWorkPurge:end", "ColumnFamilyTest::IteratorCloseWALFile2:1"}, + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + WriteOptions wo; + wo.sync = true; + ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko")); + + env.delete_count_.store(0); + ASSERT_EQ(2, env.num_open_wal_file_.load()); + // Deleting the iterator will clear its super version, triggering + // closing all files + it->Seek(""); + ASSERT_EQ(2, env.num_open_wal_file_.load()); + ASSERT_EQ(0, env.delete_count_.load()); + + TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:0"); + TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:1"); + ASSERT_EQ(1, env.num_open_wal_file_.load()); + ASSERT_EQ(1, env.delete_count_.load()); + TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:2"); + WaitForFlush(1); + ASSERT_EQ(1, env.num_open_wal_file_.load()); + ASSERT_EQ(1, env.delete_count_.load()); + + delete it; + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + + Reopen(); + ASSERT_EQ("mirko", Get(0, "fodor")); + ASSERT_EQ("mirko", Get(1, "fodor")); + db_options_.env = env_; + Close(); +} +#endif // !ROCKSDB_LITE + // Disable on windows because SyncWAL requires env->IsSyncThreadSafe() // to return true which is not so in unbuffered mode. #ifndef OS_WIN diff --git a/db/db_impl.cc b/db/db_impl.cc index e0f25bc75..626b9756e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -716,6 +716,16 @@ uint64_t DBImpl::FindMinLogContainingOutstandingPrep() { return min_log; } +void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) { + if (!job_context->logs_to_free.empty()) { + for (auto l : job_context->logs_to_free) { + AddToLogsToFreeQueue(l); + } + job_context->logs_to_free.clear(); + SchedulePurge(); + } +} + // * Returns the list of live files in 'sst_live' // If it's doing full scan: // * Returns the list of all files in the filesystem in @@ -2988,8 +2998,9 @@ void DBImpl::BGWorkCompaction(void* arg) { void DBImpl::BGWorkPurge(void* db) { IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH); - TEST_SYNC_POINT("DBImpl::BGWorkPurge"); + TEST_SYNC_POINT("DBImpl::BGWorkPurge:start"); reinterpret_cast(db)->BackgroundCallPurge(); + TEST_SYNC_POINT("DBImpl::BGWorkPurge:end"); } void DBImpl::UnscheduleCallback(void* arg) { @@ -3004,20 +3015,32 @@ void DBImpl::UnscheduleCallback(void* arg) { void DBImpl::BackgroundCallPurge() { mutex_.Lock(); - while (!purge_queue_.empty()) { - auto purge_file = purge_queue_.begin(); - auto fname = purge_file->fname; - auto type = purge_file->type; - auto number = purge_file->number; - auto path_id = purge_file->path_id; - auto job_id = purge_file->job_id; - purge_queue_.pop_front(); + // We use one single loop to clear both queues so that after existing the loop + // both queues are empty. This is stricter than what is needed, but can make + // it easier for us to reason the correctness. + while (!purge_queue_.empty() || !logs_to_free_queue_.empty()) { + if (!purge_queue_.empty()) { + auto purge_file = purge_queue_.begin(); + auto fname = purge_file->fname; + auto type = purge_file->type; + auto number = purge_file->number; + auto path_id = purge_file->path_id; + auto job_id = purge_file->job_id; + purge_queue_.pop_front(); - mutex_.Unlock(); - Status file_deletion_status; - DeleteObsoleteFileImpl(file_deletion_status, job_id, fname, type, number, - path_id); - mutex_.Lock(); + mutex_.Unlock(); + Status file_deletion_status; + DeleteObsoleteFileImpl(file_deletion_status, job_id, fname, type, number, + path_id); + mutex_.Lock(); + } else { + assert(!logs_to_free_queue_.empty()); + log::Writer* log_writer = *(logs_to_free_queue_.begin()); + logs_to_free_queue_.pop_front(); + mutex_.Unlock(); + delete log_writer; + mutex_.Lock(); + } } bg_purge_scheduled_--; @@ -3084,6 +3107,8 @@ void DBImpl::BackgroundCallFlush() { JobContext job_context(next_job_id_.fetch_add(1), true); assert(bg_flush_scheduled_); + TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:start"); + LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); { InstrumentedMutexLock l(&mutex_); @@ -3655,6 +3680,9 @@ static void CleanupIteratorState(void* arg1, void* arg2) { state->mu->Lock(); state->super_version->Cleanup(); state->db->FindObsoleteFiles(&job_context, false, true); + if (state->background_purge) { + state->db->ScheduleBgLogWriterClose(&job_context); + } state->mu->Unlock(); delete state->super_version; diff --git a/db/db_impl.h b/db/db_impl.h index 86c694b07..592cd8a86 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -365,6 +365,10 @@ class DBImpl : public DB { // compaction status. int BGCompactionsAllowed() const; + // move logs pending closing from job_context to the DB queue and + // schedule a purge + void ScheduleBgLogWriterClose(JobContext* job_context); + // Returns the list of live files in 'live' and the list // of all files in the filesystem in 'candidate_files'. // If force == false and the last call was less than @@ -493,6 +497,9 @@ class DBImpl : public DB { void MarkLogAsHavingPrepSectionFlushed(uint64_t log); void MarkLogAsContainingPrepSection(uint64_t log); + void AddToLogsToFreeQueue(log::Writer* log_writer) { + logs_to_free_queue_.push_back(log_writer); + } Status NewDB(); @@ -879,6 +886,9 @@ class DBImpl : public DB { // A queue to store filenames of the files to be purged std::deque purge_queue_; + + // A queue to store log writers to close + std::deque logs_to_free_queue_; int unscheduled_flushes_; int unscheduled_compactions_; diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 22f882ab6..c79b05cba 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -30,6 +30,8 @@ SpecialEnv::SpecialEnv(Env* base) manifest_write_error_.store(false, std::memory_order_release); log_write_error_.store(false, std::memory_order_release); random_file_open_counter_.store(0, std::memory_order_relaxed); + delete_count_.store(0, std::memory_order_relaxed); + num_open_wal_file_.store(0); log_write_slowdown_ = 0; bytes_written_ = 0; sync_counter_ = 0; diff --git a/db/db_test_util.h b/db/db_test_util.h index 297ff1fbe..237dc51e4 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -285,7 +285,10 @@ class SpecialEnv : public EnvWrapper { class WalFile : public WritableFile { public: WalFile(SpecialEnv* env, unique_ptr&& b) - : env_(env), base_(std::move(b)) {} + : env_(env), base_(std::move(b)) { + env_->num_open_wal_file_.fetch_add(1); + } + virtual ~WalFile() { env_->num_open_wal_file_.fetch_add(-1); } Status Append(const Slice& data) override { #if !(defined NDEBUG) || !defined(OS_WIN) TEST_SYNC_POINT("SpecialEnv::WalFile::Append:1"); @@ -443,6 +446,11 @@ class SpecialEnv : public EnvWrapper { addon_time_.load(); } + virtual Status DeleteFile(const std::string& fname) override { + delete_count_.fetch_add(1); + return target()->DeleteFile(fname); + } + Random rnd_; port::Mutex rnd_mutex_; // Lock to pretect rnd_ @@ -470,6 +478,9 @@ class SpecialEnv : public EnvWrapper { // Slow down every log write, in micro-seconds. std::atomic log_write_slowdown_; + // Number of WAL files that are still open for write. + std::atomic num_open_wal_file_; + bool count_random_reads_; anon::AtomicCounter random_read_counter_; std::atomic random_read_bytes_counter_; @@ -494,6 +505,8 @@ class SpecialEnv : public EnvWrapper { std::atomic addon_time_; + std::atomic delete_count_; + bool time_elapse_only_sleep_; bool no_sleep_; diff --git a/db/flush_job.h b/db/flush_job.h index 285a8c14f..5dc6a9871 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -16,12 +16,17 @@ #include #include -#include "db/dbformat.h" #include "db/column_family.h" +#include "db/dbformat.h" +#include "db/flush_scheduler.h" +#include "db/internal_stats.h" +#include "db/job_context.h" #include "db/log_writer.h" #include "db/memtable_list.h" #include "db/snapshot_impl.h" #include "db/version_edit.h" +#include "db/write_controller.h" +#include "db/write_thread.h" #include "port/port.h" #include "rocksdb/db.h" #include "rocksdb/env.h" @@ -33,11 +38,6 @@ #include "util/instrumented_mutex.h" #include "util/stop_watch.h" #include "util/thread_local.h" -#include "db/internal_stats.h" -#include "db/write_controller.h" -#include "db/flush_scheduler.h" -#include "db/write_thread.h" -#include "db/job_context.h" namespace rocksdb { diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index bbca88f9c..f7eb8ca24 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -153,10 +153,14 @@ void ForwardIterator::SVCleanup() { db_->mutex_.Lock(); sv_->Cleanup(); db_->FindObsoleteFiles(&job_context, false, true); + if (read_options_.background_purge_on_iterator_cleanup) { + db_->ScheduleBgLogWriterClose(&job_context); + } db_->mutex_.Unlock(); delete sv_; if (job_context.HaveSomethingToDelete()) { - db_->PurgeObsoleteFiles(job_context); + db_->PurgeObsoleteFiles( + job_context, read_options_.background_purge_on_iterator_cleanup); } job_context.Clean(); } diff --git a/db/job_context.h b/db/job_context.h index 7761a8312..286d522b7 100644 --- a/db/job_context.h +++ b/db/job_context.h @@ -12,7 +12,6 @@ #include #include -#include "db/column_family.h" #include "db/log_writer.h" namespace rocksdb {