Fix a bug in WAL tracking (#10087)
Summary: Closing https://github.com/facebook/rocksdb/issues/10080 When `SyncWAL()` calls `MarkLogsSynced()`, even if there is only one active WAL file, this event should still be added to the MANIFEST. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10087 Test Plan: make check Reviewed By: ajkr Differential Revision: D36797580 Pulled By: riversand963 fbshipit-source-id: 24184c9dd606b3939a454ed41de6e868d1519999
This commit is contained in:
parent
c8bae6e29c
commit
8244f13448
@ -1,4 +1,8 @@
|
||||
# Rocksdb Change Log
|
||||
## 7.3.1 (06/08/2022)
|
||||
### Bug Fixes
|
||||
* Fix a bug in WAL tracking. Before this PR (#10087), calling `SyncWAL()` on the only WAL file of the db will not log the event in MANIFEST, thus allowing a subsequent `DB::Open` even if the WAL file is missing or corrupted.
|
||||
|
||||
## 7.3.0 (05/20/2022)
|
||||
### Bug Fixes
|
||||
* Fixed a bug where manual flush would block forever even though flush options had wait=false.
|
||||
|
@ -4096,6 +4096,27 @@ TEST_F(DBBasicTest, VerifyFileChecksums) {
|
||||
Reopen(options);
|
||||
ASSERT_TRUE(db_->VerifyFileChecksums(ReadOptions()).IsInvalidArgument());
|
||||
}
|
||||
|
||||
TEST_F(DBBasicTest, ManualWalSync) {
|
||||
Options options = CurrentOptions();
|
||||
options.track_and_verify_wals_in_manifest = true;
|
||||
options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
|
||||
DestroyAndReopen(options);
|
||||
|
||||
ASSERT_OK(Put("x", "y"));
|
||||
// This does not create a new WAL.
|
||||
ASSERT_OK(db_->SyncWAL());
|
||||
EXPECT_FALSE(dbfull()->GetVersionSet()->GetWalSet().GetWals().empty());
|
||||
|
||||
std::unique_ptr<LogFile> wal;
|
||||
Status s = db_->GetCurrentWalFile(&wal);
|
||||
ASSERT_OK(s);
|
||||
Close();
|
||||
|
||||
EXPECT_OK(env_->DeleteFile(LogFileName(dbname_, wal->LogNumber())));
|
||||
|
||||
ASSERT_TRUE(TryReopen(options).IsCorruption());
|
||||
}
|
||||
#endif // !ROCKSDB_LITE
|
||||
|
||||
// A test class for intercepting random reads and injecting artificial
|
||||
|
@ -1441,12 +1441,13 @@ Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) {
|
||||
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
|
||||
auto& wal = *it;
|
||||
assert(wal.getting_synced);
|
||||
if (logs_.size() > 1) {
|
||||
if (immutable_db_options_.track_and_verify_wals_in_manifest &&
|
||||
wal.writer->file()->GetFileSize() > 0) {
|
||||
synced_wals.AddWal(wal.number,
|
||||
WalMetadata(wal.writer->file()->GetFileSize()));
|
||||
}
|
||||
|
||||
if (logs_.size() > 1) {
|
||||
logs_to_free_.push_back(wal.ReleaseWriter());
|
||||
// To modify logs_ both mutex_ and log_write_mutex_ must be held
|
||||
InstrumentedMutexLock l(&log_write_mutex_);
|
||||
|
@ -163,7 +163,8 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum,
|
||||
|
||||
TEST_KILL_RANDOM("WritableFileWriter::Append:1");
|
||||
if (s.ok()) {
|
||||
filesize_ += data.size();
|
||||
uint64_t cur_size = filesize_.load(std::memory_order_acquire);
|
||||
filesize_.store(cur_size + data.size(), std::memory_order_release);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
@ -191,7 +192,8 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes,
|
||||
cap = buf_.Capacity() - buf_.CurrentSize();
|
||||
}
|
||||
pending_sync_ = true;
|
||||
filesize_ += pad_bytes;
|
||||
uint64_t cur_size = filesize_.load(std::memory_order_acquire);
|
||||
filesize_.store(cur_size + pad_bytes, std::memory_order_release);
|
||||
if (perform_data_verification_) {
|
||||
buffered_data_crc32c_checksum_ =
|
||||
crc32c::Extend(buffered_data_crc32c_checksum_,
|
||||
@ -227,14 +229,15 @@ IOStatus WritableFileWriter::Close() {
|
||||
start_ts = FileOperationInfo::StartNow();
|
||||
}
|
||||
#endif
|
||||
interim = writable_file_->Truncate(filesize_, io_options, nullptr);
|
||||
uint64_t filesz = filesize_.load(std::memory_order_acquire);
|
||||
interim = writable_file_->Truncate(filesz, io_options, nullptr);
|
||||
#ifndef ROCKSDB_LITE
|
||||
if (ShouldNotifyListeners()) {
|
||||
auto finish_ts = FileOperationInfo::FinishNow();
|
||||
NotifyOnFileTruncateFinish(start_ts, finish_ts, s);
|
||||
if (!interim.ok()) {
|
||||
NotifyOnIOError(interim, FileOperationType::kTruncate, file_name(),
|
||||
filesize_);
|
||||
filesz);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
@ -372,8 +375,9 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) {
|
||||
const uint64_t kBytesNotSyncRange =
|
||||
1024 * 1024; // recent 1MB is not synced.
|
||||
const uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB.
|
||||
if (filesize_ > kBytesNotSyncRange) {
|
||||
uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange;
|
||||
uint64_t cur_size = filesize_.load(std::memory_order_acquire);
|
||||
if (cur_size > kBytesNotSyncRange) {
|
||||
uint64_t offset_sync_to = cur_size - kBytesNotSyncRange;
|
||||
offset_sync_to -= offset_sync_to % kBytesAlignWhenSync;
|
||||
assert(offset_sync_to >= last_sync_size_);
|
||||
if (offset_sync_to > 0 &&
|
||||
|
@ -142,7 +142,7 @@ class WritableFileWriter {
|
||||
size_t max_buffer_size_;
|
||||
// Actually written data size can be used for truncate
|
||||
// not counting padding data
|
||||
uint64_t filesize_;
|
||||
std::atomic<uint64_t> filesize_;
|
||||
#ifndef ROCKSDB_LITE
|
||||
// This is necessary when we use unbuffered access
|
||||
// and writes must happen on aligned offsets
|
||||
@ -255,7 +255,9 @@ class WritableFileWriter {
|
||||
// returns NotSupported status.
|
||||
IOStatus SyncWithoutFlush(bool use_fsync);
|
||||
|
||||
uint64_t GetFileSize() const { return filesize_; }
|
||||
uint64_t GetFileSize() const {
|
||||
return filesize_.load(std::memory_order_acquire);
|
||||
}
|
||||
|
||||
IOStatus InvalidateCache(size_t offset, size_t length) {
|
||||
return writable_file_->InvalidateCache(offset, length);
|
||||
|
Loading…
Reference in New Issue
Block a user