diff --git a/db/db_impl.h b/db/db_impl.h index ca1072442..dceffedff 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -464,6 +464,7 @@ class DBImpl : public DB { int TEST_BGCompactionsAllowed() const; int TEST_BGFlushesAllowed() const; + size_t TEST_GetWalPreallocateBlockSize(uint64_t write_buffer_size) const; #endif // NDEBUG @@ -934,6 +935,12 @@ class DBImpl : public DB { Status WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, MemTable* mem, VersionEdit* edit); + // Restore alive_log_files_ and total_log_size_ after recovery. + // It needs to run only when there's no flush during recovery + // (e.g. avoid_flush_during_recovery=true). May also trigger flush + // in case total_log_size > max_total_wal_size. + Status RestoreAliveLogFiles(const std::vector& log_numbers); + // num_bytes: for slowdown case, delay time is calculated based on // `num_bytes` going through. Status DelayWrite(uint64_t num_bytes, const WriteOptions& write_options); diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 6ed358d3b..3cf8778ad 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -237,5 +237,11 @@ SequenceNumber DBImpl::TEST_GetLastVisibleSequence() const { } } +size_t DBImpl::TEST_GetWalPreallocateBlockSize( + uint64_t write_buffer_size) const { + InstrumentedMutexLock l(&mutex_); + return GetWalPreallocateBlockSize(write_buffer_size); +} + } // namespace rocksdb #endif // NDEBUG diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 8371b0720..4852d41de 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -396,6 +396,16 @@ Status DBImpl::Recover( } } } + + // Initial max_total_in_memory_state_ before recovery logs. Log recovery + // may check this value to decide whether to flush. + max_total_in_memory_state_ = 0; + for (auto cfd : *versions_->GetColumnFamilySet()) { + auto* mutable_cf_options = cfd->GetLatestMutableCFOptions(); + max_total_in_memory_state_ += mutable_cf_options->write_buffer_size * + mutable_cf_options->max_write_buffer_number; + } + if (s.ok()) { SequenceNumber next_sequence(kMaxSequenceNumber); default_cf_handle_ = new ColumnFamilyHandleImpl( @@ -468,14 +478,6 @@ Status DBImpl::Recover( } } - // Initial value - max_total_in_memory_state_ = 0; - for (auto cfd : *versions_->GetColumnFamilySet()) { - auto* mutable_cf_options = cfd->GetLatestMutableCFOptions(); - max_total_in_memory_state_ += mutable_cf_options->write_buffer_size * - mutable_cf_options->max_write_buffer_number; - } - return s; } @@ -885,18 +887,8 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, } } - if (data_seen && !flushed) { - // Mark these as alive so they'll be considered for deletion later by - // FindObsoleteFiles() - if (two_write_queues_) { - log_write_mutex_.Lock(); - } - for (auto log_number : log_numbers) { - alive_log_files_.push_back(LogFileNumberSize(log_number)); - } - if (two_write_queues_) { - log_write_mutex_.Unlock(); - } + if (status.ok() && data_seen && !flushed) { + status = RestoreAliveLogFiles(log_numbers); } event_logger_.Log() << "job" << job_id << "event" @@ -905,6 +897,60 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, return status; } +Status DBImpl::RestoreAliveLogFiles(const std::vector& log_numbers) { + if (log_numbers.empty()) { + return Status::OK(); + } + Status s; + mutex_.AssertHeld(); + assert(immutable_db_options_.avoid_flush_during_recovery); + if (two_write_queues_) { + log_write_mutex_.Lock(); + } + // Mark these as alive so they'll be considered for deletion later by + // FindObsoleteFiles() + total_log_size_ = 0; + log_empty_ = false; + for (auto log_number : log_numbers) { + LogFileNumberSize log(log_number); + std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number); + // This gets the appear size of the logs, not including preallocated space. + s = env_->GetFileSize(fname, &log.size); + if (!s.ok()) { + break; + } + total_log_size_ += log.size; + alive_log_files_.push_back(log); + // We preallocate space for logs, but then after a crash and restart, those + // preallocated space are not needed anymore. It is likely only the last + // log has such preallocated space, so we only truncate for the last log. + if (log_number == log_numbers.back()) { + std::unique_ptr last_log; + Status truncate_status = env_->ReopenWritableFile( + fname, &last_log, + env_->OptimizeForLogWrite( + env_options_, + BuildDBOptions(immutable_db_options_, mutable_db_options_))); + if (truncate_status.ok()) { + truncate_status = last_log->Truncate(log.size); + } + if (truncate_status.ok()) { + truncate_status = last_log->Close(); + } + // Not a critical error if fail to truncate. + if (!truncate_status.ok()) { + ROCKS_LOG_WARN(immutable_db_options_.info_log, + "Failed to truncate log #%" PRIu64 ": %s", log_number, + truncate_status.ToString().c_str()); + } + } + } + if (two_write_queues_) { + log_write_mutex_.Unlock(); + } + return s; +} + Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, MemTable* mem, VersionEdit* edit) { mutex_.AssertHeld(); diff --git a/db/db_test_util.h b/db/db_test_util.h index 9a41ffd56..596baf896 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -315,6 +315,9 @@ class SpecialEnv : public EnvWrapper { } } uint64_t GetFileSize() override { return base_->GetFileSize(); } + Status Allocate(uint64_t offset, uint64_t len) override { + return base_->Allocate(offset, len); + } private: SpecialEnv* env_; @@ -368,6 +371,9 @@ class SpecialEnv : public EnvWrapper { bool IsSyncThreadSafe() const override { return env_->is_wal_sync_thread_safe_.load(); } + Status Allocate(uint64_t offset, uint64_t len) override { + return base_->Allocate(offset, len); + } private: SpecialEnv* env_; diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 1c5bfb6d5..9b123d921 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -18,6 +18,15 @@ namespace rocksdb { class DBWALTest : public DBTestBase { public: DBWALTest() : DBTestBase("/db_wal_test") {} + +#if defined(ROCKSDB_PLATFORM_POSIX) + uint64_t GetAllocatedFileSize(std::string file_name) { + struct stat sbuf; + int err = stat(file_name.c_str(), &sbuf); + assert(err == 0); + return sbuf.st_blocks * 512; + } +#endif }; // A SpecialEnv enriched to give more insight about deleted files @@ -1329,6 +1338,99 @@ TEST_F(DBWALTest, RecoverFromCorruptedWALWithoutFlush) { } } +// Tests that total log size is recovered if we set +// avoid_flush_during_recovery=true. +// Flush should trigger if max_total_wal_size is reached. +TEST_F(DBWALTest, RestoreTotalLogSizeAfterRecoverWithoutFlush) { + class TestFlushListener : public EventListener { + public: + std::atomic count{0}; + + TestFlushListener() = default; + + void OnFlushBegin(DB* /*db*/, const FlushJobInfo& flush_job_info) override { + count++; + assert(FlushReason::kWriteBufferManager == flush_job_info.flush_reason); + } + }; + std::shared_ptr test_listener = + std::make_shared(); + + constexpr size_t kKB = 1024; + constexpr size_t kMB = 1024 * 1024; + Options options = CurrentOptions(); + options.avoid_flush_during_recovery = true; + options.max_total_wal_size = 1 * kMB; + options.listeners.push_back(test_listener); + // Have to open DB in multi-CF mode to trigger flush when + // max_total_wal_size is reached. + CreateAndReopenWithCF({"one"}, options); + // Write some keys and we will end up with one log file which is slightly + // smaller than 1MB. + std::string value_100k(100 * kKB, 'v'); + std::string value_300k(300 * kKB, 'v'); + ASSERT_OK(Put(0, "foo", "v1")); + for (int i = 0; i < 9; i++) { + ASSERT_OK(Put(1, "key" + ToString(i), value_100k)); + } + // Get log files before reopen. + VectorLogPtr log_files_before; + ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before)); + ASSERT_EQ(1, log_files_before.size()); + uint64_t log_size_before = log_files_before[0]->SizeFileBytes(); + ASSERT_GT(log_size_before, 900 * kKB); + ASSERT_LT(log_size_before, 1 * kMB); + ReopenWithColumnFamilies({"default", "one"}, options); + // Write one more value to make log larger than 1MB. + ASSERT_OK(Put(1, "bar", value_300k)); + // Get log files again. A new log file will be opened. + VectorLogPtr log_files_after_reopen; + ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after_reopen)); + ASSERT_EQ(2, log_files_after_reopen.size()); + ASSERT_EQ(log_files_before[0]->LogNumber(), + log_files_after_reopen[0]->LogNumber()); + ASSERT_GT(log_files_after_reopen[0]->SizeFileBytes() + + log_files_after_reopen[1]->SizeFileBytes(), + 1 * kMB); + // Write one more key to trigger flush. + ASSERT_OK(Put(0, "foo", "v2")); + dbfull()->TEST_WaitForFlushMemTable(); + // Flushed two column families. + ASSERT_EQ(2, test_listener->count.load()); +} + +#if defined(ROCKSDB_PLATFORM_POSIX) +#if defined(ROCKSDB_FALLOCATE_PRESENT) +// Tests that we will truncate the preallocated space of the last log from +// previous. +TEST_F(DBWALTest, TruncateLastLogAfterRecoverWithoutFlush) { + constexpr size_t kKB = 1024; + Options options = CurrentOptions(); + options.avoid_flush_during_recovery = true; + DestroyAndReopen(options); + size_t preallocated_size = + dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size); + ASSERT_OK(Put("foo", "v1")); + VectorLogPtr log_files_before; + ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before)); + ASSERT_EQ(1, log_files_before.size()); + auto& file_before = log_files_before[0]; + ASSERT_LT(file_before->SizeFileBytes(), 1 * kKB); + // The log file has preallocated space. + ASSERT_GE(GetAllocatedFileSize(dbname_ + file_before->PathName()), + preallocated_size); + Reopen(options); + VectorLogPtr log_files_after; + ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after)); + ASSERT_EQ(1, log_files_after.size()); + ASSERT_LT(log_files_after[0]->SizeFileBytes(), 1 * kKB); + // The preallocated space should be truncated. + ASSERT_LT(GetAllocatedFileSize(dbname_ + file_before->PathName()), + preallocated_size); +} +#endif // ROCKSDB_FALLOCATE_PRESENT +#endif // ROCKSDB_PLATFORM_POSIX + #endif // ROCKSDB_LITE TEST_F(DBWALTest, WalTermTest) {