From 8244f13448ddf6490fb4245780008d3ce862c2ca Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Fri, 3 Jun 2022 16:33:00 -0700 Subject: [PATCH] Fix a bug in WAL tracking (#10087) Summary: Closing https://github.com/facebook/rocksdb/issues/10080 When `SyncWAL()` calls `MarkLogsSynced()`, even if there is only one active WAL file, this event should still be added to the MANIFEST. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10087 Test Plan: make check Reviewed By: ajkr Differential Revision: D36797580 Pulled By: riversand963 fbshipit-source-id: 24184c9dd606b3939a454ed41de6e868d1519999 --- HISTORY.md | 4 ++++ db/db_basic_test.cc | 21 +++++++++++++++++++++ db/db_impl/db_impl.cc | 11 ++++++----- file/writable_file_writer.cc | 16 ++++++++++------ file/writable_file_writer.h | 6 ++++-- 5 files changed, 45 insertions(+), 13 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index af44d06a0..0d208f25f 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,4 +1,8 @@ # Rocksdb Change Log +## 7.3.1 (06/08/2022) +### Bug Fixes +* Fix a bug in WAL tracking. Before this PR (#10087), calling `SyncWAL()` on the only WAL file of the db will not log the event in MANIFEST, thus allowing a subsequent `DB::Open` even if the WAL file is missing or corrupted. + ## 7.3.0 (05/20/2022) ### Bug Fixes * Fixed a bug where manual flush would block forever even though flush options had wait=false. diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index a6bcea43b..28ce0a1df 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -4096,6 +4096,27 @@ TEST_F(DBBasicTest, VerifyFileChecksums) { Reopen(options); ASSERT_TRUE(db_->VerifyFileChecksums(ReadOptions()).IsInvalidArgument()); } + +TEST_F(DBBasicTest, ManualWalSync) { + Options options = CurrentOptions(); + options.track_and_verify_wals_in_manifest = true; + options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency; + DestroyAndReopen(options); + + ASSERT_OK(Put("x", "y")); + // This does not create a new WAL. + ASSERT_OK(db_->SyncWAL()); + EXPECT_FALSE(dbfull()->GetVersionSet()->GetWalSet().GetWals().empty()); + + std::unique_ptr wal; + Status s = db_->GetCurrentWalFile(&wal); + ASSERT_OK(s); + Close(); + + EXPECT_OK(env_->DeleteFile(LogFileName(dbname_, wal->LogNumber()))); + + ASSERT_TRUE(TryReopen(options).IsCorruption()); +} #endif // !ROCKSDB_LITE // A test class for intercepting random reads and injecting artificial diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index e0140707d..5c9778e2e 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1441,12 +1441,13 @@ Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) { for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) { auto& wal = *it; assert(wal.getting_synced); + if (immutable_db_options_.track_and_verify_wals_in_manifest && + wal.writer->file()->GetFileSize() > 0) { + synced_wals.AddWal(wal.number, + WalMetadata(wal.writer->file()->GetFileSize())); + } + if (logs_.size() > 1) { - if (immutable_db_options_.track_and_verify_wals_in_manifest && - wal.writer->file()->GetFileSize() > 0) { - synced_wals.AddWal(wal.number, - WalMetadata(wal.writer->file()->GetFileSize())); - } logs_to_free_.push_back(wal.ReleaseWriter()); // To modify logs_ both mutex_ and log_write_mutex_ must be held InstrumentedMutexLock l(&log_write_mutex_); diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc index e56742f7f..ce1b7bc3d 100644 --- a/file/writable_file_writer.cc +++ b/file/writable_file_writer.cc @@ -163,7 +163,8 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum, TEST_KILL_RANDOM("WritableFileWriter::Append:1"); if (s.ok()) { - filesize_ += data.size(); + uint64_t cur_size = filesize_.load(std::memory_order_acquire); + filesize_.store(cur_size + data.size(), std::memory_order_release); } return s; } @@ -191,7 +192,8 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes, cap = buf_.Capacity() - buf_.CurrentSize(); } pending_sync_ = true; - filesize_ += pad_bytes; + uint64_t cur_size = filesize_.load(std::memory_order_acquire); + filesize_.store(cur_size + pad_bytes, std::memory_order_release); if (perform_data_verification_) { buffered_data_crc32c_checksum_ = crc32c::Extend(buffered_data_crc32c_checksum_, @@ -227,14 +229,15 @@ IOStatus WritableFileWriter::Close() { start_ts = FileOperationInfo::StartNow(); } #endif - interim = writable_file_->Truncate(filesize_, io_options, nullptr); + uint64_t filesz = filesize_.load(std::memory_order_acquire); + interim = writable_file_->Truncate(filesz, io_options, nullptr); #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { auto finish_ts = FileOperationInfo::FinishNow(); NotifyOnFileTruncateFinish(start_ts, finish_ts, s); if (!interim.ok()) { NotifyOnIOError(interim, FileOperationType::kTruncate, file_name(), - filesize_); + filesz); } } #endif @@ -372,8 +375,9 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) { const uint64_t kBytesNotSyncRange = 1024 * 1024; // recent 1MB is not synced. const uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB. - if (filesize_ > kBytesNotSyncRange) { - uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange; + uint64_t cur_size = filesize_.load(std::memory_order_acquire); + if (cur_size > kBytesNotSyncRange) { + uint64_t offset_sync_to = cur_size - kBytesNotSyncRange; offset_sync_to -= offset_sync_to % kBytesAlignWhenSync; assert(offset_sync_to >= last_sync_size_); if (offset_sync_to > 0 && diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h index 000159faa..2cbc715b3 100644 --- a/file/writable_file_writer.h +++ b/file/writable_file_writer.h @@ -142,7 +142,7 @@ class WritableFileWriter { size_t max_buffer_size_; // Actually written data size can be used for truncate // not counting padding data - uint64_t filesize_; + std::atomic filesize_; #ifndef ROCKSDB_LITE // This is necessary when we use unbuffered access // and writes must happen on aligned offsets @@ -255,7 +255,9 @@ class WritableFileWriter { // returns NotSupported status. IOStatus SyncWithoutFlush(bool use_fsync); - uint64_t GetFileSize() const { return filesize_; } + uint64_t GetFileSize() const { + return filesize_.load(std::memory_order_acquire); + } IOStatus InvalidateCache(size_t offset, size_t length) { return writable_file_->InvalidateCache(offset, length);