diff --git a/HISTORY.md b/HISTORY.md index 30f6c55aa..2f20ecbe3 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -43,6 +43,7 @@ * DB:Open() will fail if the compression specified in Options is not linked with the binary. If you see this failure, recompile RocksDB with compression libraries present on your system. Also, previously our default compression was snappy. This behavior is now changed. Now, the default compression is snappy only if it's available on the system. If it isn't we change the default to kNoCompression. * We changed how we account for memory used in block cache. Previously, we only counted the sum of block sizes currently present in block cache. Now, we count the actual memory usage of the blocks. For example, a block of size 4.5KB will use 8KB memory with jemalloc. This might decrease your memory usage and possibly decrease performance. Increase block cache size if you see this happening after an upgrade. * Add BackupEngineImpl.options_.max_background_operations to specify the maximum number of operations that may be performed in parallel. Add support for parallelized backup and restore. +* Add DB::SyncWAL() that does a WAL sync without blocking writers. ## 3.11.0 (5/19/2015) ### New Features diff --git a/db/db_impl.cc b/db/db_impl.cc index f6d0b8efc..10f1d2675 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -612,11 +612,6 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, } } -Status DBImpl::SyncLog(log::Writer* log) { - assert(log); - return log->file()->Sync(db_options_.use_fsync); -} - namespace { bool CompareCandidateFile(const JobContext::CandidateFileInfo& first, const JobContext::CandidateFileInfo& second) { @@ -1951,6 +1946,85 @@ Status DBImpl::Flush(const FlushOptions& flush_options, return FlushMemTable(cfh->cfd(), flush_options); } +Status DBImpl::SyncWAL() { + autovector logs_to_sync; + bool need_log_dir_sync; + uint64_t current_log_number; + + { + InstrumentedMutexLock l(&mutex_); + assert(!logs_.empty()); + + // This SyncWAL() call only cares about logs up to this number. + current_log_number = logfile_number_; + + while (logs_.front().number <= current_log_number && + logs_.front().getting_synced) { + log_sync_cv_.Wait(); + } + // First check that logs are safe to sync in background. + for (auto it = logs_.begin(); + it != logs_.end() && it->number <= current_log_number; ++it) { + if (!it->writer->file()->writable_file()->IsSyncThreadSafe()) { + return Status::NotSupported( + "SyncWAL() is not supported for this implementation of WAL file", + db_options_.allow_mmap_writes + ? "try setting Options::allow_mmap_writes to false" + : Slice()); + } + } + for (auto it = logs_.begin(); + it != logs_.end() && it->number <= current_log_number; ++it) { + auto& log = *it; + assert(!log.getting_synced); + log.getting_synced = true; + logs_to_sync.push_back(log.writer.get()); + } + + need_log_dir_sync = !log_dir_synced_; + } + + Status status; + for (log::Writer* log : logs_to_sync) { + status = log->file()->SyncWithoutFlush(db_options_.use_fsync); + if (!status.ok()) { + break; + } + } + if (status.ok() && need_log_dir_sync) { + status = directories_.GetWalDir()->Fsync(); + } + + { + InstrumentedMutexLock l(&mutex_); + MarkLogsSynced(current_log_number, need_log_dir_sync, status); + } + + return status; +} + +void DBImpl::MarkLogsSynced( + uint64_t up_to, bool synced_dir, const Status& status) { + mutex_.AssertHeld(); + if (synced_dir && + logfile_number_ == up_to && + status.ok()) { + log_dir_synced_ = true; + } + for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) { + auto& log = *it; + assert(log.getting_synced); + if (status.ok() && logs_.size() > 1) { + logs_to_free_.push_back(log.writer.release()); + logs_.erase(it++); + } else { + log.getting_synced = false; + ++it; + } + } + log_sync_cv_.SignalAll(); +} + SequenceNumber DBImpl::GetLatestSequenceNumber() const { return versions_->LastSequence(); } @@ -3475,13 +3549,14 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, uint64_t last_sequence = versions_->LastSequence(); WriteThread::Writer* last_writer = &w; autovector write_batch_group; - bool need_wal_sync = !write_options.disableWAL && write_options.sync; + bool need_log_sync = !write_options.disableWAL && write_options.sync; + bool need_log_dir_sync = need_log_sync && !log_dir_synced_; if (status.ok()) { last_batch_group_size_ = write_thread_.BuildBatchGroup(&last_writer, &write_batch_group); - if (need_wal_sync) { + if (need_log_sync) { while (logs_.front().getting_synced) { log_sync_cv_.Wait(); } @@ -3543,7 +3618,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, log_empty_ = false; log_size = log_entry.size(); RecordTick(stats_, WAL_FILE_BYTES, log_size); - if (status.ok() && need_wal_sync) { + if (status.ok() && need_log_sync) { RecordTick(stats_, WAL_FILE_SYNCED); StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS); // It's safe to access logs_ with unlocked mutex_ here because: @@ -3554,18 +3629,17 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // - as long as other threads don't modify it, it's safe to read // from std::deque from multiple threads concurrently. for (auto& log : logs_) { - status = SyncLog(log.writer.get()); + status = log.writer->file()->Sync(db_options_.use_fsync); if (!status.ok()) { break; } } - if (status.ok() && !log_dir_synced_) { + if (status.ok() && need_log_dir_sync) { // We only sync WAL directory the first time WAL syncing is // requested, so that in case users never turn on WAL sync, // we can avoid the disk I/O in the write code path. status = directories_.GetWalDir()->Fsync(); } - log_dir_synced_ = true; } } if (status.ok()) { @@ -3616,16 +3690,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, mutex_.AssertHeld(); - if (need_wal_sync) { - while (logs_.size() > 1) { - auto& log = logs_.front(); - assert(log.getting_synced); - logs_to_free_.push_back(log.writer.release()); - logs_.pop_front(); - } - assert(logs_.back().getting_synced); - logs_.back().getting_synced = false; - log_sync_cv_.SignalAll(); + if (need_log_sync) { + MarkLogsSynced(logfile_number_, need_log_dir_sync, status); } write_thread_.ExitWriteThread(&w, last_writer, status); @@ -3714,7 +3780,6 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { unique_ptr file_writer( new WritableFileWriter(std::move(lfile), opt_env_opt)); new_log = new log::Writer(std::move(file_writer)); - log_dir_synced_ = false; } } @@ -3739,6 +3804,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { logfile_number_ = new_log_number; assert(new_log != nullptr); log_empty_ = true; + log_dir_synced_ = false; logs_.emplace_back(logfile_number_, std::unique_ptr(new_log)); alive_log_files_.push_back(LogFileNumberSize(logfile_number_)); for (auto loop_cfd : *versions_->GetColumnFamilySet()) { diff --git a/db/db_impl.h b/db/db_impl.h index 4d63f6bb3..bdec031f7 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -158,6 +158,7 @@ class DBImpl : public DB { using DB::Flush; virtual Status Flush(const FlushOptions& options, ColumnFamilyHandle* column_family) override; + virtual Status SyncWAL() override; virtual SequenceNumber GetLatestSequenceNumber() const override; @@ -308,8 +309,6 @@ class DBImpl : public DB { // It is not necessary to hold the mutex when invoking this method. void PurgeObsoleteFiles(const JobContext& background_contet); - Status SyncLog(log::Writer* log); - ColumnFamilyHandle* DefaultColumnFamily() const override; const SnapshotList& snapshots() const { return snapshots_; } @@ -497,6 +496,9 @@ class DBImpl : public DB { void AddToFlushQueue(ColumnFamilyData* cfd); ColumnFamilyData* PopFirstFromFlushQueue(); + // helper function to call after some of the logs_ were synced + void MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status); + // table_cache_ provides its own synchronization std::shared_ptr table_cache_; diff --git a/db/db_test.cc b/db/db_test.cc index 9734bcbe7..b691d892a 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -5172,6 +5172,10 @@ class ModelDB: public DB { return ret; } + virtual Status SyncWAL() override { + return Status::OK(); + } + virtual Status DisableFileDeletions() override { return Status::OK(); } virtual Status EnableFileDeletions(bool force) override { return Status::OK(); @@ -8418,8 +8422,16 @@ TEST_F(DBTest, DeleteSchedulerMultipleDBPaths) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } +TEST_F(DBTest, UnsupportedManualSync) { + DestroyAndReopen(CurrentOptions()); + env_->is_wal_sync_thread_safe_.store(false); + Status s = db_->SyncWAL(); + ASSERT_TRUE(s.IsNotSupported()); +} + INSTANTIATE_TEST_CASE_P(DBTestWithParam, DBTestWithParam, ::testing::Values(1, 4)); + } // namespace rocksdb #endif diff --git a/db/fault_injection_test.cc b/db/fault_injection_test.cc index 9ddad2dd5..2d7ccccac 100644 --- a/db/fault_injection_test.cc +++ b/db/fault_injection_test.cc @@ -142,6 +142,7 @@ class TestWritableFile : public WritableFile { virtual Status Close() override; virtual Status Flush() override; virtual Status Sync() override; + virtual bool IsSyncThreadSafe() const override { return true; } private: FileState state_; @@ -891,6 +892,41 @@ TEST_P(FaultInjectionTest, UninstalledCompaction) { rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); } +TEST_P(FaultInjectionTest, ManualLogSyncTest) { + SleepingBackgroundTask sleeping_task_low; + env_->SetBackgroundThreads(1, Env::HIGH); + // Block the job queue to prevent flush job from running. + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + Env::Priority::HIGH); + + WriteOptions write_options; + write_options.sync = false; + + std::string key_space, value_space; + ASSERT_OK( + db_->Put(write_options, Key(1, &key_space), Value(1, &value_space))); + FlushOptions flush_options; + flush_options.wait = false; + ASSERT_OK(db_->Flush(flush_options)); + ASSERT_OK( + db_->Put(write_options, Key(2, &key_space), Value(2, &value_space))); + ASSERT_OK(db_->SyncWAL()); + + env_->SetFilesystemActive(false); + NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced); + sleeping_task_low.WakeUp(); + + ASSERT_OK(OpenDB()); + std::string val; + Value(2, &value_space); + ASSERT_OK(ReadValue(2, &val)); + ASSERT_EQ(value_space, val); + + Value(1, &value_space); + ASSERT_OK(ReadValue(1, &val)); + ASSERT_EQ(value_space, val); +} + INSTANTIATE_TEST_CASE_P(FaultTest, FaultInjectionTest, ::testing::Bool()); } // namespace rocksdb diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index bcd170796..b3f028e6d 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -545,6 +545,12 @@ class DB { return Flush(options, DefaultColumnFamily()); } + // Sync the wal. Note that Write() followed by SyncWAL() is not exactly the + // same as Write() with sync=true: in the latter case the changes won't be + // visible until the sync is done. + // Currently only works if allow_mmap_writes = false in Options. + virtual Status SyncWAL() = 0; + // The sequence number of the most recent transaction. virtual SequenceNumber GetLatestSequenceNumber() const = 0; diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 2be261e42..b9c4db045 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -461,6 +461,12 @@ class WritableFile { return Sync(); } + // true if Sync() and Fsync() are safe to call concurrently with Append() + // and Flush(). + virtual bool IsSyncThreadSafe() const { + return false; + } + /* * Change the priority in rate limiter if rate limiting is enabled. * If rate limiting is not enabled, this call has no effect. @@ -616,7 +622,7 @@ class RandomRWFile { class Directory { public: virtual ~Directory() {} - // Fsync directory + // Fsync directory. Can be called concurrently from multiple threads. virtual Status Fsync() = 0; }; @@ -894,6 +900,7 @@ class WritableFileWrapper : public WritableFile { Status Flush() override { return target_->Flush(); } Status Sync() override { return target_->Sync(); } Status Fsync() override { return target_->Fsync(); } + bool IsSyncThreadSafe() const override { return target_->IsSyncThreadSafe(); } void SetIOPriority(Env::IOPriority pri) override { target_->SetIOPriority(pri); } diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 20fb557b9..72202753b 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -975,7 +975,9 @@ struct DBOptions { // Allow the OS to mmap file for reading sst tables. Default: false bool allow_mmap_reads; - // Allow the OS to mmap file for writing. Default: false + // Allow the OS to mmap file for writing. + // DB::SyncWAL() only works if this is set to false. + // Default: false bool allow_mmap_writes; // Disable child process inherit open files. Default: true diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 6771d3c3c..63e4aaa15 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -192,6 +192,10 @@ class StackableDB : public DB { return db_->Flush(fopts, column_family); } + virtual Status SyncWAL() override { + return db_->SyncWAL(); + } + #ifndef ROCKSDB_LITE virtual Status DisableFileDeletions() override { diff --git a/util/db_test_util.h b/util/db_test_util.h index 819fa7c0a..7830b2d75 100644 --- a/util/db_test_util.h +++ b/util/db_test_util.h @@ -223,6 +223,9 @@ class SpecialEnv : public EnvWrapper { ++env_->sync_counter_; return base_->Sync(); } + bool IsSyncThreadSafe() const override { + return env_->is_wal_sync_thread_safe_.load(); + } private: SpecialEnv* env_; @@ -389,6 +392,8 @@ class SpecialEnv : public EnvWrapper { std::atomic addon_time_; bool no_sleep_; + + std::atomic is_wal_sync_thread_safe_ {true}; }; class DBTestBase : public testing::Test { diff --git a/util/env_posix.cc b/util/env_posix.cc index 47ff7284c..13ce1d35d 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -679,6 +679,10 @@ class PosixWritableFile : public WritableFile { return Status::OK(); } + virtual bool IsSyncThreadSafe() const override { + return true; + } + virtual uint64_t GetFileSize() override { return filesize_; } virtual Status InvalidateCache(size_t offset, size_t length) override { diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index a12a4b930..5f8bd134f 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -154,11 +154,7 @@ Status WritableFileWriter::Sync(bool use_fsync) { } TEST_KILL_RANDOM(rocksdb_kill_odds); if (pending_sync_) { - if (use_fsync) { - s = writable_file_->Fsync(); - } else { - s = writable_file_->Sync(); - } + s = SyncInternal(use_fsync); if (!s.ok()) { return s; } @@ -171,6 +167,25 @@ Status WritableFileWriter::Sync(bool use_fsync) { return Status::OK(); } +Status WritableFileWriter::SyncWithoutFlush(bool use_fsync) { + if (!writable_file_->IsSyncThreadSafe()) { + return Status::NotSupported( + "Can't WritableFileWriter::SyncWithoutFlush() because " + "WritableFile::IsSyncThreadSafe() is false"); + } + return SyncInternal(use_fsync); +} + +Status WritableFileWriter::SyncInternal(bool use_fsync) { + Status s; + if (use_fsync) { + s = writable_file_->Fsync(); + } else { + s = writable_file_->Sync(); + } + return s; +} + Status WritableFileWriter::RangeSync(off_t offset, off_t nbytes) { IOSTATS_TIMER_GUARD(range_sync_nanos); return writable_file_->RangeSync(offset, nbytes); diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index 396ad57ef..c089a5022 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -74,6 +74,11 @@ class WritableFileWriter { Status Sync(bool use_fsync); + // Sync only the data that was already Flush()ed. Safe to call concurrently + // with Append() and Flush(). If !writable_file_->IsSyncThreadSafe(), + // returns NotSupported status. + Status SyncWithoutFlush(bool use_fsync); + uint64_t GetFileSize() { return filesize_; } Status InvalidateCache(size_t offset, size_t length) { @@ -85,6 +90,7 @@ class WritableFileWriter { private: Status RangeSync(off_t offset, off_t nbytes); size_t RequestToken(size_t bytes); + Status SyncInternal(bool use_fsync); }; class RandomRWFileAccessor {