diff --git a/db/column_family_test.cc b/db/column_family_test.cc index dc2f99499..7621f3974 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -303,6 +303,7 @@ class ColumnFamilyTest : public testing::Test { ASSERT_OK(Put(cf, key, RandomString(&rnd_, key_value_size - 10))); } } + db_->FlushWAL(false); } #ifndef ROCKSDB_LITE // TEST functions in DB are not supported in lite @@ -580,6 +581,7 @@ TEST_P(FlushEmptyCFTestWithParam, FlushEmptyCFTest) { Flush(0); ASSERT_OK(Put(1, "bar", "v3")); // seqID 4 ASSERT_OK(Put(1, "foo", "v4")); // seqID 5 + db_->FlushWAL(false); // Preserve file system state up to here to simulate a crash condition. fault_env->SetFilesystemActive(false); @@ -642,6 +644,7 @@ TEST_P(FlushEmptyCFTestWithParam, FlushEmptyCFTest2) { // Write to log file D ASSERT_OK(Put(1, "bar", "v4")); // seqID 7 ASSERT_OK(Put(1, "bar", "v5")); // seqID 8 + db_->FlushWAL(false); // Preserve file system state up to here to simulate a crash condition. fault_env->SetFilesystemActive(false); std::vector names; diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index 05ad80df6..967ac72b5 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -144,6 +144,7 @@ class CompactionJobTest : public testing::Test { } void SetLastSequence(const SequenceNumber sequence_number) { + versions_->SetLastToBeWrittenSequence(sequence_number + 1); versions_->SetLastSequence(sequence_number + 1); } diff --git a/db/db_impl.cc b/db/db_impl.cc index afec15935..06b5e09ef 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -160,6 +160,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) is_snapshot_supported_(true), write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()), write_thread_(immutable_db_options_), + nonmem_write_thread_(immutable_db_options_), write_controller_(mutable_db_options_.delayed_write_rate), // Use delayed_write_rate as a base line to determine the initial // low pri write rate limit. It may be adjusted later. @@ -189,7 +190,9 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) bg_work_paused_(0), bg_compaction_paused_(0), refitting_level_(false), - opened_successfully_(false) { + opened_successfully_(false), + concurrent_prepare_(options.concurrent_prepare), + manual_wal_flush_(options.manual_wal_flush) { env_->GetAbsolutePath(dbname, &db_absolute_path_); // Reserve ten files or so for other uses and give the rest to TableCache. @@ -612,6 +615,26 @@ int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, return minimum_level; } +Status DBImpl::FlushWAL(bool sync) { + { + // We need to lock log_write_mutex_ since logs_ might change concurrently + InstrumentedMutexLock wl(&log_write_mutex_); + log::Writer* cur_log_writer = logs_.back().writer; + auto s = cur_log_writer->WriteBuffer(); + if (!s.ok()) { + ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s", + s.ToString().c_str()); + } + if (!sync) { + ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=false"); + return s; + } + } + // sync = true + ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=true"); + return SyncWAL(); +} + Status DBImpl::SyncWAL() { autovector logs_to_sync; bool need_log_dir_sync; @@ -650,6 +673,7 @@ Status DBImpl::SyncWAL() { need_log_dir_sync = !log_dir_synced_; } + TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1"); RecordTick(stats_, WAL_FILE_SYNCED); Status status; for (log::Writer* log : logs_to_sync) { @@ -661,6 +685,7 @@ Status DBImpl::SyncWAL() { if (status.ok() && need_log_dir_sync) { status = directories_.GetWalDir()->Fsync(); } + TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2"); TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1"); { @@ -2634,9 +2659,13 @@ Status DBImpl::IngestExternalFile( InstrumentedMutexLock l(&mutex_); TEST_SYNC_POINT("DBImpl::AddFile:MutexLock"); - // Stop writes to the DB + // Stop writes to the DB by entering both write threads WriteThread::Writer w; write_thread_.EnterUnbatched(&w, &mutex_); + WriteThread::Writer nonmem_w; + if (concurrent_prepare_) { + nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); + } num_running_ingest_file_++; @@ -2677,6 +2706,9 @@ Status DBImpl::IngestExternalFile( } // Resume writes to the DB + if (concurrent_prepare_) { + nonmem_write_thread_.ExitUnbatched(&nonmem_w); + } write_thread_.ExitUnbatched(&w); // Update stats diff --git a/db/db_impl.h b/db/db_impl.h index 03aac360f..285a7c861 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -199,6 +199,7 @@ class DBImpl : public DB { using DB::Flush; virtual Status Flush(const FlushOptions& options, ColumnFamilyHandle* column_family) override; + virtual Status FlushWAL(bool sync) override; virtual Status SyncWAL() override; virtual SequenceNumber GetLatestSequenceNumber() const override; @@ -621,6 +622,10 @@ class DBImpl : public DB { uint64_t* log_used = nullptr, uint64_t log_ref = 0, bool disable_memtable = false); + Status WriteImplWALOnly(const WriteOptions& options, WriteBatch* updates, + WriteCallback* callback = nullptr, + uint64_t* log_used = nullptr, uint64_t log_ref = 0); + uint64_t FindMinLogContainingOutstandingPrep(); uint64_t FindMinPrepLogReferencedByMemTable(); @@ -746,9 +751,20 @@ class DBImpl : public DB { Status PreprocessWrite(const WriteOptions& write_options, bool* need_log_sync, WriteContext* write_context); + WriteBatch* MergeBatch(const WriteThread::WriteGroup& write_group, + WriteBatch* tmp_batch, size_t* write_with_wal); + + Status WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer, + uint64_t* log_used, uint64_t* log_size); + Status WriteToWAL(const WriteThread::WriteGroup& write_group, - log::Writer* log_writer, bool need_log_sync, - bool need_log_dir_sync, SequenceNumber sequence); + log::Writer* log_writer, uint64_t* log_used, + bool need_log_sync, bool need_log_dir_sync, + SequenceNumber sequence); + + Status ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group, + uint64_t* log_used, SequenceNumber* last_sequence, + int total_count); // Used by WriteImpl to update bg_error_ if paranoid check is enabled. void WriteCallbackStatusCheck(const Status& status); @@ -827,10 +843,12 @@ class DBImpl : public DB { // Lock over the persistent DB state. Non-nullptr iff successfully acquired. FileLock* db_lock_; - // The mutex for options file related operations. - // NOTE: should never acquire options_file_mutex_ and mutex_ at the - // same time. - InstrumentedMutex options_files_mutex_; + // It is used to concurrently update stats in the write threads + InstrumentedMutex stat_mutex_; + // It protects the back() of logs_ and alive_log_files_. Any push_back to + // these must be under log_write_mutex_ and any access that requires the + // back() to remain the same must also lock log_write_mutex_. + InstrumentedMutex log_write_mutex_; // State below is protected by mutex_ mutable InstrumentedMutex mutex_; @@ -891,6 +909,10 @@ class DBImpl : public DB { // - back() and items with getting_synced=true are not popped, // - it follows that write thread with unlocked mutex_ can safely access // back() and items with getting_synced=true. + // -- Update: apparently this was a mistake. back() should be called under + // mute_: https://github.com/facebook/rocksdb/pull/1774 + // - When concurrent write threads is enabled, back(), push_back(), and + // pop_front() must be called within log_write_mutex_ std::deque logs_; // Signaled when getting_synced becomes false for some of the logs_. InstrumentedCondVar log_sync_cv_; @@ -939,8 +961,10 @@ class DBImpl : public DB { WriteBufferManager* write_buffer_manager_; WriteThread write_thread_; - WriteBatch tmp_batch_; + // The write thread when the writers have no memtable write. This will be used + // in 2PC to batch the prepares separately from the serial commit. + WriteThread nonmem_write_thread_; WriteController write_controller_; @@ -948,6 +972,8 @@ class DBImpl : public DB { // Size of the last batch group. In slowdown mode, next write needs to // sleep if it uses up the quota. + // Note: This is to protect memtable and compaction. If the batch only writes + // to the WAL its size need not to be included in this. uint64_t last_batch_group_size_; FlushScheduler flush_scheduler_; @@ -1190,6 +1216,11 @@ class DBImpl : public DB { bool MCOverlap(ManualCompaction* m, ManualCompaction* m1); size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const; + + // When set, we use a seprate queue for writes that dont write to memtable. In + // 2PC these are the writes at Prepare phase. + const bool concurrent_prepare_; + const bool manual_wal_flush_; }; extern Options SanitizeOptions(const std::string& db, diff --git a/db/db_impl_files.cc b/db/db_impl_files.cc index 607b8ff08..d0a22321a 100644 --- a/db/db_impl_files.cc +++ b/db/db_impl_files.cc @@ -265,7 +265,10 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, continue; } logs_to_free_.push_back(log.ReleaseWriter()); - logs_.pop_front(); + { + InstrumentedMutexLock wl(&log_write_mutex_); + logs_.pop_front(); + } } // Current log cannot be obsolete. assert(!logs_.empty()); diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 995b329bf..2baa7e070 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -725,6 +725,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, auto last_sequence = *next_sequence - 1; if ((*next_sequence != kMaxSequenceNumber) && (versions_->LastSequence() <= last_sequence)) { + versions_->SetLastToBeWrittenSequence(last_sequence); versions_->SetLastSequence(last_sequence); } } diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index 7039ba29b..cc209a912 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -5,7 +5,6 @@ // This source code is also licensed under the GPLv2 license found in the // COPYING file in the root directory of this source tree. - #include "db/db_impl_readonly.h" #include "db/compacted_db_impl.h" diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 058f8ef28..fea326ca0 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -66,6 +66,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (my_batch == nullptr) { return Status::Corruption("Batch is nullptr!"); } + if (concurrent_prepare_ && immutable_db_options_.enable_pipelined_write) { + return Status::NotSupported( + "pipelined_writes is not compatible with concurrent prepares"); + } Status status; if (write_options.low_pri) { @@ -75,6 +79,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } } + if (concurrent_prepare_ && disable_memtable) { + return WriteImplWALOnly(write_options, my_batch, callback, log_used, + log_ref); + } + if (immutable_db_options_.enable_pipelined_write) { return PipelinedWriteImpl(write_options, my_batch, callback, log_used, log_ref, disable_memtable); @@ -133,14 +142,22 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, WriteContext write_context; WriteThread::WriteGroup write_group; bool in_parallel_group = false; - uint64_t last_sequence = versions_->LastSequence(); + uint64_t last_sequence; + if (!concurrent_prepare_) { + last_sequence = versions_->LastSequence(); + } mutex_.Lock(); bool need_log_sync = !write_options.disableWAL && write_options.sync; bool need_log_dir_sync = need_log_sync && !log_dir_synced_; - status = PreprocessWrite(write_options, &need_log_sync, &write_context); - log::Writer* cur_log_writer = logs_.back().writer; + if (!concurrent_prepare_ || !disable_memtable) { + // With concurrent writes we do preprocess only in the write thread that + // also does write to memtable to avoid sync issue on shared data structure + // with the other thread + status = PreprocessWrite(write_options, &need_log_sync, &write_context); + } + log::Writer* log_writer = logs_.back().writer; mutex_.Unlock(); @@ -180,9 +197,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } } - const SequenceNumber current_sequence = last_sequence + 1; - last_sequence += total_count; - + if (concurrent_prepare_) { + stat_mutex_.Lock(); + } // Update stats while we are an exclusive group leader, so we know // that nobody else can be writing to these particular stats. // We're optimistic, updating the stats before we successfully @@ -201,6 +218,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other); } MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size); + if (concurrent_prepare_) { + stat_mutex_.Unlock(); + } if (write_options.disableWAL) { has_unpersisted_data_.store(true, std::memory_order_relaxed); @@ -208,14 +228,27 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, PERF_TIMER_STOP(write_pre_and_post_process_time); - if (status.ok() && !write_options.disableWAL) { - PERF_TIMER_GUARD(write_wal_time); - status = WriteToWAL(write_group, cur_log_writer, need_log_sync, - need_log_dir_sync, current_sequence); - if (log_used != nullptr) { - *log_used = logfile_number_; + if (!concurrent_prepare_) { + if (status.ok() && !write_options.disableWAL) { + PERF_TIMER_GUARD(write_wal_time); + status = WriteToWAL(write_group, log_writer, log_used, need_log_sync, + need_log_dir_sync, last_sequence + 1); + } + } else { + assert(!need_log_sync && !need_log_dir_sync); + if (status.ok() && !write_options.disableWAL) { + PERF_TIMER_GUARD(write_wal_time); + // LastToBeWrittenSequence is increased inside WriteToWAL under + // wal_write_mutex_ to ensure ordered events in WAL + status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, + total_count); + } else { + // Otherwise we inc seq number for memtable writes + last_sequence = versions_->FetchAddLastToBeWrittenSequence(total_count); } } + const SequenceNumber current_sequence = last_sequence + 1; + last_sequence += total_count; if (status.ok()) { PERF_TIMER_GUARD(write_memtable_time); @@ -263,6 +296,15 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, mutex_.Lock(); MarkLogsSynced(logfile_number_, need_log_dir_sync, status); mutex_.Unlock(); + // Requesting sync with concurrent_prepare_ is expected to be very rare. We + // hance provide a simple implementation that is not necessarily efficient. + if (concurrent_prepare_) { + if (manual_wal_flush_) { + status = FlushWAL(true); + } else { + status = SyncWAL(); + } + } } bool should_exit_batch_group = true; @@ -272,7 +314,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w); } if (should_exit_batch_group) { - versions_->SetLastSequence(last_sequence); + if (status.ok()) { + versions_->SetLastSequence(last_sequence); + } MemTableInsertStatusCheck(w.status); write_thread_.ExitAsBatchGroupLeader(write_group, w.status); } @@ -304,7 +348,7 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, bool need_log_sync = !write_options.disableWAL && write_options.sync; bool need_log_dir_sync = need_log_sync && !log_dir_synced_; w.status = PreprocessWrite(write_options, &need_log_sync, &write_context); - log::Writer* cur_log_writer = logs_.back().writer; + log::Writer* log_writer = logs_.back().writer; mutex_.Unlock(); // This can set non-OK status if callback fail. @@ -352,8 +396,8 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, wal_write_group.size - 1); RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1); } - w.status = WriteToWAL(wal_write_group, cur_log_writer, need_log_sync, - need_log_dir_sync, current_sequence); + w.status = WriteToWAL(wal_write_group, log_writer, log_used, + need_log_sync, need_log_dir_sync, current_sequence); } if (!w.CallbackFailed()) { @@ -403,11 +447,91 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, } assert(w.state == WriteThread::STATE_COMPLETED); - if (log_used != nullptr) { - *log_used = w.log_used; + return w.FinalStatus(); +} + +Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, + WriteBatch* my_batch, WriteCallback* callback, + uint64_t* log_used, uint64_t log_ref) { + Status status; + PERF_TIMER_GUARD(write_pre_and_post_process_time); + WriteThread::Writer w(write_options, my_batch, callback, log_ref, + true /* disable_memtable */); + if (write_options.disableWAL) { + return status; + } + RecordTick(stats_, WRITE_WITH_WAL); + StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE); + + nonmem_write_thread_.JoinBatchGroup(&w); + assert(w.state != WriteThread::STATE_PARALLEL_MEMTABLE_WRITER); + if (w.state == WriteThread::STATE_COMPLETED) { + if (log_used != nullptr) { + *log_used = w.log_used; + } + return w.FinalStatus(); + } + // else we are the leader of the write batch group + assert(w.state == WriteThread::STATE_GROUP_LEADER); + WriteContext write_context; + WriteThread::WriteGroup write_group; + uint64_t last_sequence; + nonmem_write_thread_.EnterAsBatchGroupLeader(&w, &write_group); + // Note: no need to update last_batch_group_size_ here since the batch writes + // to WAL only + + uint64_t total_byte_size = 0; + for (auto* writer : write_group) { + if (writer->CheckCallback(this)) { + total_byte_size = WriteBatchInternal::AppendedByteSize( + total_byte_size, WriteBatchInternal::ByteSize(writer->batch)); + } } - return w.FinalStatus(); + stat_mutex_.Lock(); + // Update stats while we are an exclusive group leader, so we know + // that nobody else can be writing to these particular stats. + // We're optimistic, updating the stats before we successfully + // commit. That lets us release our leader status early. + auto stats = default_cf_internal_stats_; + stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size); + RecordTick(stats_, BYTES_WRITTEN, total_byte_size); + stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1); + RecordTick(stats_, WRITE_DONE_BY_SELF); + auto write_done_by_other = write_group.size - 1; + if (write_done_by_other > 0) { + stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, write_done_by_other); + RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other); + } + MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size); + stat_mutex_.Unlock(); + + PERF_TIMER_STOP(write_pre_and_post_process_time); + + PERF_TIMER_GUARD(write_wal_time); + // LastToBeWrittenSequence is increased inside WriteToWAL under + // wal_write_mutex_ to ensure ordered events in WAL + status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, + 0 /*total_count*/); + if (status.ok() && write_options.sync) { + // Requesting sync with concurrent_prepare_ is expected to be very rare. We + // hance provide a simple implementation that is not necessarily efficient. + if (manual_wal_flush_) { + status = FlushWAL(true); + } else { + status = SyncWAL(); + } + } + PERF_TIMER_START(write_pre_and_post_process_time); + + if (!w.CallbackFailed()) { + ParanoidCheck(status); + } + nonmem_write_thread_.ExitAsBatchGroupLeader(write_group, w.status); + if (status.ok()) { + status = w.FinalStatus(); + } + return status; } void DBImpl::WriteCallbackStatusCheck(const Status& status) { @@ -519,13 +643,12 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, return status; } -Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, - log::Writer* log_writer, bool need_log_sync, - bool need_log_dir_sync, SequenceNumber sequence) { - Status status; - +WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, + WriteBatch* tmp_batch, size_t* write_with_wal) { + assert(write_with_wal != nullptr); + assert(tmp_batch != nullptr); WriteBatch* merged_batch = nullptr; - size_t write_with_wal = 0; + *write_with_wal = 0; auto* leader = write_group.leader; if (write_group.size == 1 && leader->ShouldWriteToWAL() && leader->batch->GetWalTerminationPoint().is_cleared()) { @@ -534,30 +657,54 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, // and the batch is not wanting to be truncated merged_batch = leader->batch; leader->log_used = logfile_number_; - write_with_wal = 1; + *write_with_wal = 1; } else { // WAL needs all of the batches flattened into a single batch. // We could avoid copying here with an iov-like AddRecord // interface - merged_batch = &tmp_batch_; + merged_batch = tmp_batch; for (auto writer : write_group) { if (writer->ShouldWriteToWAL()) { WriteBatchInternal::Append(merged_batch, writer->batch, /*WAL_only*/ true); - write_with_wal++; + (*write_with_wal)++; } writer->log_used = logfile_number_; } } + return merged_batch; +} - WriteBatchInternal::SetSequence(merged_batch, sequence); - - Slice log_entry = WriteBatchInternal::Contents(merged_batch); - status = log_writer->AddRecord(log_entry); +Status DBImpl::WriteToWAL(const WriteBatch& merged_batch, + log::Writer* log_writer, uint64_t* log_used, + uint64_t* log_size) { + assert(log_size != nullptr); + Slice log_entry = WriteBatchInternal::Contents(&merged_batch); + *log_size = log_entry.size(); + Status status = log_writer->AddRecord(log_entry); + if (log_used != nullptr) { + *log_used = logfile_number_; + } total_log_size_ += log_entry.size(); alive_log_files_.back().AddSize(log_entry.size()); log_empty_ = false; - uint64_t log_size = log_entry.size(); + return status; +} + +Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, + log::Writer* log_writer, uint64_t* log_used, + bool need_log_sync, bool need_log_dir_sync, + SequenceNumber sequence) { + Status status; + + size_t write_with_wal = 0; + WriteBatch* merged_batch = + MergeBatch(write_group, &tmp_batch_, &write_with_wal); + + WriteBatchInternal::SetSequence(merged_batch, sequence); + + uint64_t log_size; + status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size); if (status.ok() && need_log_sync) { StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS); @@ -599,6 +746,41 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, return status; } +Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group, + uint64_t* log_used, + SequenceNumber* last_sequence, + int total_count) { + Status status; + + WriteBatch tmp_batch; + size_t write_with_wal = 0; + WriteBatch* merged_batch = + MergeBatch(write_group, &tmp_batch, &write_with_wal); + + // We need to lock log_write_mutex_ since logs_ and alive_log_files might be + // pushed back concurrently + log_write_mutex_.Lock(); + *last_sequence = versions_->FetchAddLastToBeWrittenSequence(total_count); + auto sequence = *last_sequence + 1; + WriteBatchInternal::SetSequence(merged_batch, sequence); + + log::Writer* log_writer = logs_.back().writer; + uint64_t log_size; + status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size); + log_write_mutex_.Unlock(); + + if (status.ok()) { + stat_mutex_.Lock(); + auto stats = default_cf_internal_stats_; + stats->AddDBStats(InternalStats::WAL_FILE_BYTES, log_size); + RecordTick(stats_, WAL_FILE_BYTES, log_size); + stats->AddDBStats(InternalStats::WRITE_WITH_WAL, write_with_wal); + RecordTick(stats_, WRITE_WITH_WAL, write_with_wal); + stat_mutex_.Unlock(); + } + return status; +} + Status DBImpl::HandleWALFull(WriteContext* write_context) { mutex_.AssertHeld(); assert(write_context != nullptr); @@ -895,9 +1077,9 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { lfile->SetPreallocationBlockSize(preallocate_block_size); unique_ptr file_writer( new WritableFileWriter(std::move(lfile), opt_env_opt)); - new_log = - new log::Writer(std::move(file_writer), new_log_number, - immutable_db_options_.recycle_log_file_num > 0); + new_log = new log::Writer( + std::move(file_writer), new_log_number, + immutable_db_options_.recycle_log_file_num > 0, manual_wal_flush_); } } @@ -931,8 +1113,15 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { assert(new_log != nullptr); log_empty_ = true; log_dir_synced_ = false; + log_write_mutex_.Lock(); + if (!logs_.empty()) { + // Alway flush the buffer of the last log before switching to a new one + log::Writer* cur_log_writer = logs_.back().writer; + cur_log_writer->WriteBuffer(); + } logs_.emplace_back(logfile_number_, new_log); alive_log_files_.push_back(LogFileNumberSize(logfile_number_)); + log_write_mutex_.Unlock(); } for (auto loop_cfd : *versions_->GetColumnFamilySet()) { // all this is just optimization to delete logs that diff --git a/db/db_log_iter_test.cc b/db/db_log_iter_test.cc index 2f56348d2..84c8776a7 100644 --- a/db/db_log_iter_test.cc +++ b/db/db_log_iter_test.cc @@ -118,6 +118,7 @@ TEST_F(DBTestXactLogIterator, TransactionLogIteratorRace) { dbfull()->Flush(FlushOptions()); Put("key4", DummyString(1024)); ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4U); + dbfull()->FlushWAL(false); { auto iter = OpenTransactionLogIter(0); @@ -134,6 +135,7 @@ TEST_F(DBTestXactLogIterator, TransactionLogIteratorRace) { // "key5" would be written in a new memtable and log Put("key5", DummyString(1024)); + dbfull()->FlushWAL(false); { // this iter would miss "key4" if not fixed auto iter = OpenTransactionLogIter(0); @@ -183,6 +185,7 @@ TEST_F(DBTestXactLogIterator, TransactionLogIteratorCorruptedLog) { Put("key"+ToString(i), DummyString(10)); } dbfull()->Flush(FlushOptions()); + dbfull()->FlushWAL(false); // Corrupt this log to create a gap rocksdb::VectorLogPtr wal_files; ASSERT_OK(dbfull()->GetSortedWalFiles(wal_files)); @@ -196,6 +199,7 @@ TEST_F(DBTestXactLogIterator, TransactionLogIteratorCorruptedLog) { // Insert a new entry to a new log file Put("key1025", DummyString(10)); + dbfull()->FlushWAL(false); // Try to read from the beginning. Should stop before the gap and read less // than 1025 entries auto iter = OpenTransactionLogIter(0); diff --git a/db/db_test_util.cc b/db/db_test_util.cc index d932b5542..e199ac526 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -469,6 +469,12 @@ Options DBTestBase::GetOptions( options.enable_pipelined_write = true; break; } + case kConcurrentWALWrites: { + // This options optimize 2PC commit path + options.concurrent_prepare = true; + options.manual_wal_flush = true; + break; + } default: break; diff --git a/db/db_test_util.h b/db/db_test_util.h index 372baed57..755b6929f 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -641,13 +641,14 @@ class DBTestBase : public testing::Test { kRecycleLogFiles = 28, kConcurrentSkipList = 29, kPipelinedWrite = 30, - kEnd = 31, - kDirectIO = 32, - kLevelSubcompactions = 33, - kUniversalSubcompactions = 34, - kBlockBasedTableWithIndexRestartInterval = 35, - kBlockBasedTableWithPartitionedIndex = 36, - kPartitionedFilterWithNewTableReaderForCompactions = 37, + kConcurrentWALWrites = 31, + kEnd = 32, + kDirectIO = 33, + kLevelSubcompactions = 34, + kUniversalSubcompactions = 35, + kBlockBasedTableWithIndexRestartInterval = 36, + kBlockBasedTableWithPartitionedIndex = 37, + kPartitionedFilterWithNewTableReaderForCompactions = 38, }; public: diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 443c99453..bb211cb34 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -122,9 +122,11 @@ TEST_F(DBWALTest, SyncWALNotWaitWrite) { rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::port::Thread thread([&]() { ASSERT_OK(Put("foo2", "bar2")); }); - TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1"); + // Moving this to SyncWAL before the actual fsync + // TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1"); ASSERT_OK(db_->SyncWAL()); - TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2"); + // Moving this to SyncWAL after actual fsync + // TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2"); thread.join(); @@ -660,6 +662,7 @@ TEST_F(DBWALTest, PartOfWritesWithWALDisabled) { ASSERT_OK(Flush(0)); ASSERT_OK(Put(0, "key", "v5", wal_on)); // seq id 5 ASSERT_EQ("v5", Get(0, "key")); + dbfull()->FlushWAL(false); // Simulate a crash. fault_env->SetFilesystemActive(false); Close(); @@ -729,6 +732,7 @@ class RecoveryTestHelper { batch.Put(key, value); WriteBatchInternal::SetSequence(&batch, seq); current_log_writer->AddRecord(WriteBatchInternal::Contents(&batch)); + versions->SetLastToBeWrittenSequence(seq); versions->SetLastSequence(seq); } } @@ -1113,6 +1117,7 @@ TEST_F(DBWALTest, RecoverWithoutFlushMultipleCF) { ASSERT_EQ(3, countWalFiles()); Flush(1); ASSERT_OK(Put(2, "key7", kLargeValue)); + dbfull()->FlushWAL(false); ASSERT_EQ(4, countWalFiles()); // Reopen twice and validate. diff --git a/db/db_write_test.cc b/db/db_write_test.cc index 66ce4668d..424709900 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -71,6 +71,7 @@ TEST_P(DBWriteTest, ReturnSeuqneceNumberMultiThreaded) { INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest, testing::Values(DBTestBase::kDefault, + DBTestBase::kConcurrentWALWrites, DBTestBase::kPipelinedWrite)); } // namespace rocksdb diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 081eacc00..2c94d07d3 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -146,6 +146,8 @@ Status ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed) { return status; } +// REQUIRES: we have become the only writer by entering both write_thread_ and +// nonmem_write_thread_ Status ExternalSstFileIngestionJob::Run() { Status status; #ifndef NDEBUG @@ -164,6 +166,8 @@ Status ExternalSstFileIngestionJob::Run() { // if the dont overlap with any ranges since we have snapshots force_global_seqno = true; } + // It is safe to use this instead of LastToBeWrittenSequence since we are + // the only active writer, and hence they are equal const SequenceNumber last_seqno = versions_->LastSequence(); SuperVersion* super_version = cfd_->GetSuperVersion(); edit_.SetColumnFamily(cfd_->GetID()); @@ -197,6 +201,7 @@ Status ExternalSstFileIngestionJob::Run() { } if (consumed_seqno) { + versions_->SetLastToBeWrittenSequence(last_seqno + 1); versions_->SetLastSequence(last_seqno + 1); } diff --git a/db/fault_injection_test.cc b/db/fault_injection_test.cc index 98b9b9b97..5aa1bfdc2 100644 --- a/db/fault_injection_test.cc +++ b/db/fault_injection_test.cc @@ -412,6 +412,7 @@ TEST_P(FaultInjectionTest, WriteOptionSyncTest) { write_options.sync = true; ASSERT_OK( db_->Put(write_options, Key(2, &key_space), Value(2, &value_space))); + db_->FlushWAL(false); env_->SetFilesystemActive(false); NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced); @@ -496,7 +497,7 @@ TEST_P(FaultInjectionTest, ManualLogSyncTest) { ASSERT_OK(db_->Flush(flush_options)); ASSERT_OK( db_->Put(write_options, Key(2, &key_space), Value(2, &value_space))); - ASSERT_OK(db_->SyncWAL()); + ASSERT_OK(db_->FlushWAL(true)); env_->SetFilesystemActive(false); NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced); diff --git a/db/log_writer.cc b/db/log_writer.cc index 4081a3b13..36480c403 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -20,20 +20,22 @@ namespace rocksdb { namespace log { -Writer::Writer(unique_ptr&& dest, - uint64_t log_number, bool recycle_log_files) +Writer::Writer(unique_ptr&& dest, uint64_t log_number, + bool recycle_log_files, bool manual_flush) : dest_(std::move(dest)), block_offset_(0), log_number_(log_number), - recycle_log_files_(recycle_log_files) { + recycle_log_files_(recycle_log_files), + manual_flush_(manual_flush) { for (int i = 0; i <= kMaxRecordType; i++) { char t = static_cast(i); type_crc_[i] = crc32c::Value(&t, 1); } } -Writer::~Writer() { -} +Writer::~Writer() { WriteBuffer(); } + +Status Writer::WriteBuffer() { return dest_->Flush(); } Status Writer::AddRecord(const Slice& slice) { const char* ptr = slice.data(); @@ -129,7 +131,9 @@ Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { if (s.ok()) { s = dest_->Append(Slice(ptr, n)); if (s.ok()) { - s = dest_->Flush(); + if (!manual_flush_) { + s = dest_->Flush(); + } } } block_offset_ += header_size + n; diff --git a/db/log_writer.h b/db/log_writer.h index c88234402..c6cb12233 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -74,8 +74,8 @@ class Writer { // Create a writer that will append data to "*dest". // "*dest" must be initially empty. // "*dest" must remain live while this Writer is in use. - explicit Writer(unique_ptr&& dest, - uint64_t log_number, bool recycle_log_files); + explicit Writer(unique_ptr&& dest, uint64_t log_number, + bool recycle_log_files, bool manual_flush = false); ~Writer(); Status AddRecord(const Slice& slice); @@ -85,6 +85,8 @@ class Writer { uint64_t get_log_number() const { return log_number_; } + Status WriteBuffer(); + private: unique_ptr dest_; size_t block_offset_; // Current offset in block @@ -98,6 +100,10 @@ class Writer { Status EmitPhysicalRecord(RecordType type, const char* ptr, size_t length); + // If true, it does not flush after each write. Instead it relies on the upper + // layer to manually does the flush by calling ::WriteBuffer() + bool manual_flush_; + // No copying allowed Writer(const Writer&); void operator=(const Writer&); diff --git a/db/repair.cc b/db/repair.cc index 1f9e344e1..615ad454f 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -524,6 +524,7 @@ class Repairer { max_sequence = tables_[i].max_sequence; } } + vset_.SetLastToBeWrittenSequence(max_sequence); vset_.SetLastSequence(max_sequence); for (const auto& cf_id_and_tables : cf_id_to_tables) { diff --git a/db/version_set.cc b/db/version_set.cc index 6c220b5ef..6387e2df2 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2286,6 +2286,7 @@ VersionSet::VersionSet(const std::string& dbname, manifest_file_number_(0), // Filled by Recover() pending_manifest_file_number_(0), last_sequence_(0), + last_to_be_written_sequence_(0), prev_log_number_(0), current_version_number_(0), manifest_file_size_(0), @@ -2922,6 +2923,7 @@ Status VersionSet::Recover( manifest_file_size_ = current_manifest_file_size; next_file_number_.store(next_file + 1); + last_to_be_written_sequence_ = last_sequence; last_sequence_ = last_sequence; prev_log_number_ = previous_log_number; @@ -3291,6 +3293,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, } next_file_number_.store(next_file + 1); + last_to_be_written_sequence_ = last_sequence; last_sequence_ = last_sequence; prev_log_number_ = previous_log_number; diff --git a/db/version_set.h b/db/version_set.h index f1f0dcb64..17627a4b1 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -699,12 +699,31 @@ class VersionSet { return last_sequence_.load(std::memory_order_acquire); } + // Note: memory_order_acquire must be sufficient. + uint64_t LastToBeWrittenSequence() const { + return last_to_be_written_sequence_.load(std::memory_order_seq_cst); + } + // Set the last sequence number to s. void SetLastSequence(uint64_t s) { assert(s >= last_sequence_); + // Last visible seqeunce must always be less than last written seq + assert(!db_options_->concurrent_prepare || + s <= last_to_be_written_sequence_); last_sequence_.store(s, std::memory_order_release); } + // Note: memory_order_release must be sufficient + void SetLastToBeWrittenSequence(uint64_t s) { + assert(s >= last_to_be_written_sequence_); + last_to_be_written_sequence_.store(s, std::memory_order_seq_cst); + } + + // Note: memory_order_release must be sufficient + uint64_t FetchAddLastToBeWrittenSequence(uint64_t s) { + return last_to_be_written_sequence_.fetch_add(s, std::memory_order_seq_cst); + } + // Mark the specified file number as used. // REQUIRED: this is only called during single-threaded recovery void MarkFileNumberUsedDuringRecovery(uint64_t number); @@ -804,7 +823,10 @@ class VersionSet { uint64_t manifest_file_number_; uint64_t options_file_number_; uint64_t pending_manifest_file_number_; + // The last seq visible to reads std::atomic last_sequence_; + // The last seq with which a writer has written/will write. + std::atomic last_to_be_written_sequence_; uint64_t prev_log_number_; // 0 or backing store for memtable being compacted // Opened lazily diff --git a/db/wal_manager_test.cc b/db/wal_manager_test.cc index fb79601e3..f38131106 100644 --- a/db/wal_manager_test.cc +++ b/db/wal_manager_test.cc @@ -69,6 +69,7 @@ class WalManagerTest : public testing::Test { batch.Put(key, value); WriteBatchInternal::SetSequence(&batch, seq); current_log_writer_->AddRecord(WriteBatchInternal::Contents(&batch)); + versions_->SetLastToBeWrittenSequence(seq); versions_->SetLastSequence(seq); } diff --git a/db/write_callback_test.cc b/db/write_callback_test.cc index 03a82174a..18727ee6a 100644 --- a/db/write_callback_test.cc +++ b/db/write_callback_test.cc @@ -125,176 +125,180 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) { {false, false, true, false, true}, }; - for (auto& allow_parallel : {true, false}) { - for (auto& allow_batching : {true, false}) { - for (auto& enable_WAL : {true, false}) { - for (auto& enable_pipelined_write : {true, false}) { - for (auto& write_group : write_scenarios) { - Options options; - options.create_if_missing = true; - options.allow_concurrent_memtable_write = allow_parallel; - options.enable_pipelined_write = enable_pipelined_write; + for (auto& two_queues : {true, false}) { + for (auto& allow_parallel : {true, false}) { + for (auto& allow_batching : {true, false}) { + for (auto& enable_WAL : {true, false}) { + for (auto& enable_pipelined_write : {true, false}) { + for (auto& write_group : write_scenarios) { + Options options; + options.create_if_missing = true; + options.allow_concurrent_memtable_write = allow_parallel; + options.enable_pipelined_write = enable_pipelined_write; + options.concurrent_prepare = two_queues; - ReadOptions read_options; - DB* db; - DBImpl* db_impl; + ReadOptions read_options; + DB* db; + DBImpl* db_impl; - DestroyDB(dbname, options); - ASSERT_OK(DB::Open(options, dbname, &db)); + DestroyDB(dbname, options); + ASSERT_OK(DB::Open(options, dbname, &db)); - db_impl = dynamic_cast(db); - ASSERT_TRUE(db_impl); + db_impl = dynamic_cast(db); + ASSERT_TRUE(db_impl); - std::atomic threads_waiting(0); - std::atomic seq(db_impl->GetLatestSequenceNumber()); - ASSERT_EQ(db_impl->GetLatestSequenceNumber(), 0); + std::atomic threads_waiting(0); + std::atomic seq(db_impl->GetLatestSequenceNumber()); + ASSERT_EQ(db_impl->GetLatestSequenceNumber(), 0); - rocksdb::SyncPoint::GetInstance()->SetCallBack( - "WriteThread::JoinBatchGroup:Wait", [&](void* arg) { - uint64_t cur_threads_waiting = 0; - bool is_leader = false; - bool is_last = false; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::JoinBatchGroup:Wait", [&](void* arg) { + uint64_t cur_threads_waiting = 0; + bool is_leader = false; + bool is_last = false; - // who am i + // who am i + do { + cur_threads_waiting = threads_waiting.load(); + is_leader = (cur_threads_waiting == 0); + is_last = (cur_threads_waiting == write_group.size() - 1); + } while (!threads_waiting.compare_exchange_strong( + cur_threads_waiting, cur_threads_waiting + 1)); + + // check my state + auto* writer = reinterpret_cast(arg); + + if (is_leader) { + ASSERT_TRUE(writer->state == + WriteThread::State::STATE_GROUP_LEADER); + } else { + ASSERT_TRUE(writer->state == + WriteThread::State::STATE_INIT); + } + + // (meta test) the first WriteOP should indeed be the first + // and the last should be the last (all others can be out of + // order) + if (is_leader) { + ASSERT_TRUE(writer->callback->Callback(nullptr).ok() == + !write_group.front().callback_.should_fail_); + } else if (is_last) { + ASSERT_TRUE(writer->callback->Callback(nullptr).ok() == + !write_group.back().callback_.should_fail_); + } + + // wait for friends + while (threads_waiting.load() < write_group.size()) { + } + }); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::JoinBatchGroup:DoneWaiting", [&](void* arg) { + // check my state + auto* writer = reinterpret_cast(arg); + + if (!allow_batching) { + // no batching so everyone should be a leader + ASSERT_TRUE(writer->state == + WriteThread::State::STATE_GROUP_LEADER); + } else if (!allow_parallel) { + ASSERT_TRUE(writer->state == + WriteThread::State::STATE_COMPLETED || + (enable_pipelined_write && + writer->state == + WriteThread::State:: + STATE_MEMTABLE_WRITER_LEADER)); + } + }); + + std::atomic thread_num(0); + std::atomic dummy_key(0); + std::function write_with_callback_func = [&]() { + uint32_t i = thread_num.fetch_add(1); + Random rnd(i); + + // leaders gotta lead + while (i > 0 && threads_waiting.load() < 1) { + } + + // loser has to lose + while (i == write_group.size() - 1 && + threads_waiting.load() < write_group.size() - 1) { + } + + auto& write_op = write_group.at(i); + write_op.Clear(); + write_op.callback_.allow_batching_ = allow_batching; + + // insert some keys + for (uint32_t j = 0; j < rnd.Next() % 50; j++) { + // grab unique key + char my_key = 0; do { - cur_threads_waiting = threads_waiting.load(); - is_leader = (cur_threads_waiting == 0); - is_last = (cur_threads_waiting == write_group.size() - 1); - } while (!threads_waiting.compare_exchange_strong( - cur_threads_waiting, cur_threads_waiting + 1)); + my_key = dummy_key.load(); + } while ( + !dummy_key.compare_exchange_strong(my_key, my_key + 1)); - // check my state - auto* writer = reinterpret_cast(arg); + string skey(5, my_key); + string sval(10, my_key); + write_op.Put(skey, sval); - if (is_leader) { - ASSERT_TRUE(writer->state == - WriteThread::State::STATE_GROUP_LEADER); - } else { - ASSERT_TRUE(writer->state == - WriteThread::State::STATE_INIT); + if (!write_op.callback_.should_fail_) { + seq.fetch_add(1); } - - // (meta test) the first WriteOP should indeed be the first - // and the last should be the last (all others can be out of - // order) - if (is_leader) { - ASSERT_TRUE(writer->callback->Callback(nullptr).ok() == - !write_group.front().callback_.should_fail_); - } else if (is_last) { - ASSERT_TRUE(writer->callback->Callback(nullptr).ok() == - !write_group.back().callback_.should_fail_); - } - - // wait for friends - while (threads_waiting.load() < write_group.size()) { - } - }); - - rocksdb::SyncPoint::GetInstance()->SetCallBack( - "WriteThread::JoinBatchGroup:DoneWaiting", [&](void* arg) { - // check my state - auto* writer = reinterpret_cast(arg); - - if (!allow_batching) { - // no batching so everyone should be a leader - ASSERT_TRUE(writer->state == - WriteThread::State::STATE_GROUP_LEADER); - } else if (!allow_parallel) { - ASSERT_TRUE( - writer->state == WriteThread::State::STATE_COMPLETED || - (enable_pipelined_write && - writer->state == - WriteThread::State::STATE_MEMTABLE_WRITER_LEADER)); - } - }); - - std::atomic thread_num(0); - std::atomic dummy_key(0); - std::function write_with_callback_func = [&]() { - uint32_t i = thread_num.fetch_add(1); - Random rnd(i); - - // leaders gotta lead - while (i > 0 && threads_waiting.load() < 1) { - } - - // loser has to lose - while (i == write_group.size() - 1 && - threads_waiting.load() < write_group.size() - 1) { - } - - auto& write_op = write_group.at(i); - write_op.Clear(); - write_op.callback_.allow_batching_ = allow_batching; - - // insert some keys - for (uint32_t j = 0; j < rnd.Next() % 50; j++) { - // grab unique key - char my_key = 0; - do { - my_key = dummy_key.load(); - } while ( - !dummy_key.compare_exchange_strong(my_key, my_key + 1)); - - string skey(5, my_key); - string sval(10, my_key); - write_op.Put(skey, sval); - - if (!write_op.callback_.should_fail_) { - seq.fetch_add(1); } - } - WriteOptions woptions; - woptions.disableWAL = !enable_WAL; - woptions.sync = enable_WAL; - Status s = db_impl->WriteWithCallback( - woptions, &write_op.write_batch_, &write_op.callback_); + WriteOptions woptions; + woptions.disableWAL = !enable_WAL; + woptions.sync = enable_WAL; + Status s = db_impl->WriteWithCallback( + woptions, &write_op.write_batch_, &write_op.callback_); - if (write_op.callback_.should_fail_) { - ASSERT_TRUE(s.IsBusy()); - } else { - ASSERT_OK(s); - } - }; - - rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - - // do all the writes - std::vector threads; - for (uint32_t i = 0; i < write_group.size(); i++) { - threads.emplace_back(write_with_callback_func); - } - for (auto& t : threads) { - t.join(); - } - - rocksdb::SyncPoint::GetInstance()->DisableProcessing(); - - // check for keys - string value; - for (auto& w : write_group) { - ASSERT_TRUE(w.callback_.was_called_.load()); - for (auto& kvp : w.kvs_) { - if (w.callback_.should_fail_) { - ASSERT_TRUE( - db->Get(read_options, kvp.first, &value).IsNotFound()); + if (write_op.callback_.should_fail_) { + ASSERT_TRUE(s.IsBusy()); } else { - ASSERT_OK(db->Get(read_options, kvp.first, &value)); - ASSERT_EQ(value, kvp.second); + ASSERT_OK(s); + } + }; + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // do all the writes + std::vector threads; + for (uint32_t i = 0; i < write_group.size(); i++) { + threads.emplace_back(write_with_callback_func); + } + for (auto& t : threads) { + t.join(); + } + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + + // check for keys + string value; + for (auto& w : write_group) { + ASSERT_TRUE(w.callback_.was_called_.load()); + for (auto& kvp : w.kvs_) { + if (w.callback_.should_fail_) { + ASSERT_TRUE( + db->Get(read_options, kvp.first, &value).IsNotFound()); + } else { + ASSERT_OK(db->Get(read_options, kvp.first, &value)); + ASSERT_EQ(value, kvp.second); + } } } + + ASSERT_EQ(seq.load(), db_impl->GetLatestSequenceNumber()); + + delete db; + DestroyDB(dbname, options); } - - ASSERT_EQ(seq.load(), db_impl->GetLatestSequenceNumber()); - - delete db; - DestroyDB(dbname, options); } } } } - } +} } TEST_F(WriteCallbackTest, WriteCallBackTest) { diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index c25bcf771..67d4aac43 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -853,6 +853,11 @@ class DB { return Flush(options, DefaultColumnFamily()); } + // Flush the WAL memory buffer to the file. If sync is true, it calls SyncWAL + // afterwards. + virtual Status FlushWAL(bool sync) { + return Status::NotSupported("FlushWAL not implemented"); + } // Sync the wal. Note that Write() followed by SyncWAL() is not exactly the // same as Write() with sync=true: in the latter case the changes won't be // visible until the sync is done. diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 283085a53..1c90a68be 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -887,6 +887,18 @@ struct DBOptions { // DEFAULT: false // Immutable. bool allow_ingest_behind = false; + + // If enabled it uses two queues for writes, one for the ones with + // disable_memtable and one for the ones that also write to memtable. This + // allows the memtable writes not to lag behind other writes. It can be used + // to optimize MySQL 2PC in which only the commits, which are serial, write to + // memtable. + bool concurrent_prepare = false; + + // If true WAL is not flushed automatically after each write. Instead it + // relies on manual invocation of FlushWAL to write the WAL buffer to its + // file. + bool manual_wal_flush = false; }; // Options to control the behavior of a database (passed to DB::Open) diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 7ae8c9e4a..db5068b1d 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -268,6 +268,8 @@ class StackableDB : public DB { return db_->SyncWAL(); } + virtual Status FlushWAL(bool sync) override { return db_->FlushWAL(sync); } + #ifndef ROCKSDB_LITE virtual Status DisableFileDeletions() override { diff --git a/options/db_options.cc b/options/db_options.cc index d990ca81a..030a70b9c 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -86,7 +86,9 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) fail_if_options_file_error(options.fail_if_options_file_error), dump_malloc_stats(options.dump_malloc_stats), avoid_flush_during_recovery(options.avoid_flush_during_recovery), - allow_ingest_behind(options.allow_ingest_behind) { + allow_ingest_behind(options.allow_ingest_behind), + concurrent_prepare(options.concurrent_prepare), + manual_wal_flush(options.manual_wal_flush) { } void ImmutableDBOptions::Dump(Logger* log) const { @@ -219,6 +221,10 @@ void ImmutableDBOptions::Dump(Logger* log) const { avoid_flush_during_recovery); ROCKS_LOG_HEADER(log, " Options.allow_ingest_behind: %d", allow_ingest_behind); + ROCKS_LOG_HEADER(log, " Options.concurrent_prepare: %d", + concurrent_prepare); + ROCKS_LOG_HEADER(log, " Options.manual_wal_flush: %d", + manual_wal_flush); } MutableDBOptions::MutableDBOptions() diff --git a/options/db_options.h b/options/db_options.h index c174aeb08..f8c291b3b 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -79,6 +79,8 @@ struct ImmutableDBOptions { bool dump_malloc_stats; bool avoid_flush_during_recovery; bool allow_ingest_behind; + bool concurrent_prepare; + bool manual_wal_flush; }; struct MutableDBOptions { diff --git a/options/options_helper.h b/options/options_helper.h index a82cc9e47..399164514 100644 --- a/options/options_helper.h +++ b/options/options_helper.h @@ -349,7 +349,15 @@ static std::unordered_map db_options_type_info = { {"allow_ingest_behind", {offsetof(struct DBOptions, allow_ingest_behind), OptionType::kBoolean, OptionVerificationType::kNormal, false, - offsetof(struct ImmutableDBOptions, allow_ingest_behind)}}}; + offsetof(struct ImmutableDBOptions, allow_ingest_behind)}}, + {"concurrent_prepare", + {offsetof(struct DBOptions, concurrent_prepare), OptionType::kBoolean, + OptionVerificationType::kNormal, false, + offsetof(struct ImmutableDBOptions, concurrent_prepare)}}, + {"manual_wal_flush", + {offsetof(struct DBOptions, manual_wal_flush), OptionType::kBoolean, + OptionVerificationType::kNormal, false, + offsetof(struct ImmutableDBOptions, manual_wal_flush)}}}; // offset_of is used to get the offset of a class data member // ex: offset_of(&ColumnFamilyOptions::num_levels) diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index bfe080828..8345ec182 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -13,22 +13,11 @@ #define __STDC_FORMAT_MACROS #endif -#include -#include #include -#include -#include "options/options_helper.h" #include "options/options_parser.h" -#include "options/options_sanity_check.h" -#include "rocksdb/cache.h" #include "rocksdb/convenience.h" -#include "rocksdb/memtablerep.h" -#include "rocksdb/utilities/leveldb_options.h" -#include "util/random.h" -#include "util/stderr_logger.h" #include "util/testharness.h" -#include "util/testutil.h" #ifndef GFLAGS bool FLAGS_enable_print = false; @@ -294,7 +283,9 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "allow_2pc=false;" "avoid_flush_during_recovery=false;" "avoid_flush_during_shutdown=false;" - "allow_ingest_behind=false;", + "allow_ingest_behind=false;" + "concurrent_prepare=false;" + "manual_wal_flush=false;", new_options)); ASSERT_EQ(unset_bytes_base, NumUnsetBytes(new_options_ptr, sizeof(DBOptions), diff --git a/table/block_based_table_factory.cc b/table/block_based_table_factory.cc index 4cd0ba51c..1db900e35 100644 --- a/table/block_based_table_factory.cc +++ b/table/block_based_table_factory.cc @@ -9,7 +9,6 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. - #include "table/block_based_table_factory.h" #include diff --git a/util/murmurhash.cc b/util/murmurhash.cc index 309166bce..376b644b4 100644 --- a/util/murmurhash.cc +++ b/util/murmurhash.cc @@ -8,8 +8,8 @@ /* Murmurhash from http://sites.google.com/site/murmurhash/ - All code is released to the public domain. For business purposes, Murmurhash is - under the MIT license. + All code is released to the public domain. For business purposes, Murmurhash + is under the MIT license. */ #include "murmurhash.h" diff --git a/util/murmurhash.h b/util/murmurhash.h index b368313fa..403b67c45 100644 --- a/util/murmurhash.h +++ b/util/murmurhash.h @@ -8,8 +8,8 @@ /* Murmurhash from http://sites.google.com/site/murmurhash/ - All code is released to the public domain. For business purposes, Murmurhash is - under the MIT license. + All code is released to the public domain. For business purposes, Murmurhash + is under the MIT license. */ #pragma once #include diff --git a/util/transaction_test_util.cc b/util/transaction_test_util.cc index 3d1764b90..223395991 100644 --- a/util/transaction_test_util.cc +++ b/util/transaction_test_util.cc @@ -12,6 +12,7 @@ #include #include +#include #include "rocksdb/db.h" #include "rocksdb/utilities/optimistic_transaction_db.h" @@ -135,6 +136,13 @@ bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn, if (s.ok()) { if (txn != nullptr) { + std::hash hasher; + char name[64]; + snprintf(name, 64, "txn%zu-%d", hasher(std::this_thread::get_id()), + txn_id_++); + assert(strlen(name) < 64 - 1); + txn->SetName(name); + s = txn->Prepare(); s = txn->Commit(); if (!s.ok()) { diff --git a/util/transaction_test_util.h b/util/transaction_test_util.h index 97c62841f..4e192bac8 100644 --- a/util/transaction_test_util.h +++ b/util/transaction_test_util.h @@ -104,6 +104,8 @@ class RandomTransactionInserter { Transaction* txn_ = nullptr; Transaction* optimistic_txn_ = nullptr; + std::atomic txn_id_; + bool DoInsert(DB* db, Transaction* txn, bool is_optimistic); }; diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index f2a97d1d8..3a761c3f0 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -136,6 +136,9 @@ class DummyDB : public StackableDB { return Status::OK(); } + // To avoid FlushWAL called on stacked db which is nullptr + virtual Status FlushWAL(bool sync) override { return Status::OK(); } + std::vector live_files_; // pair std::vector> wal_files_; diff --git a/utilities/checkpoint/checkpoint_impl.cc b/utilities/checkpoint/checkpoint_impl.cc index 19543aad4..3a9606af4 100644 --- a/utilities/checkpoint/checkpoint_impl.cc +++ b/utilities/checkpoint/checkpoint_impl.cc @@ -209,6 +209,7 @@ Status CheckpointImpl::CreateCustomCheckpoint( TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1"); TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"); + db_->FlushWAL(false /* sync */); } // if we have more than one column family, we need to also get WAL files if (s.ok()) { diff --git a/utilities/transactions/transaction_impl.cc b/utilities/transactions/transaction_impl.cc index 731e138c8..b848b1be2 100644 --- a/utilities/transactions/transaction_impl.cc +++ b/utilities/transactions/transaction_impl.cc @@ -274,6 +274,8 @@ Status TransactionImpl::Commit() { s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, log_number_); if (!s.ok()) { + ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, + "Commit write failed"); return s; } diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index c93daddcc..5abe23cdb 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -35,7 +35,8 @@ using std::string; namespace rocksdb { -class TransactionTest : public ::testing::TestWithParam { +class TransactionTest + : public ::testing::TestWithParam> { public: TransactionDB* db; FaultInjectionTestEnv* env; @@ -52,13 +53,14 @@ class TransactionTest : public ::testing::TestWithParam { options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); env = new FaultInjectionTestEnv(Env::Default()); options.env = env; + options.concurrent_prepare = std::get<1>(GetParam()); dbname = test::TmpDir() + "/transaction_testdb"; DestroyDB(dbname, options); txn_db_options.transaction_lock_timeout = 0; txn_db_options.default_lock_timeout = 0; Status s; - if (GetParam() == false) { + if (std::get<0>(GetParam()) == false) { s = TransactionDB::Open(options, txn_db_options, dbname, &db); } else { s = OpenWithStackableDB(); @@ -79,7 +81,7 @@ class TransactionTest : public ::testing::TestWithParam { env->DropUnsyncedFileData(); env->ResetState(); Status s; - if (GetParam() == false) { + if (std::get<0>(GetParam()) == false) { s = TransactionDB::Open(options, txn_db_options, dbname, &db); } else { s = OpenWithStackableDB(); @@ -91,7 +93,7 @@ class TransactionTest : public ::testing::TestWithParam { delete db; DestroyDB(dbname, options); Status s; - if (GetParam() == false) { + if (std::get<0>(GetParam()) == false) { s = TransactionDB::Open(options, txn_db_options, dbname, &db); } else { s = OpenWithStackableDB(); @@ -122,9 +124,17 @@ class TransactionTest : public ::testing::TestWithParam { } }; -INSTANTIATE_TEST_CASE_P(DBAsBaseDB, TransactionTest, ::testing::Values(false)); +class MySQLStyleTransactionTest : public TransactionTest {}; + +INSTANTIATE_TEST_CASE_P(DBAsBaseDB, TransactionTest, + ::testing::Values(std::make_tuple(false, false))); INSTANTIATE_TEST_CASE_P(StackableDBAsBaseDB, TransactionTest, - ::testing::Values(true)); + ::testing::Values(std::make_tuple(true, false))); +INSTANTIATE_TEST_CASE_P(MySQLStyleTransactionTest, MySQLStyleTransactionTest, + ::testing::Values(std::make_tuple(false, false), + std::make_tuple(false, true), + std::make_tuple(true, false), + std::make_tuple(true, true))); TEST_P(TransactionTest, DoubleEmptyWrite) { WriteOptions write_options; @@ -957,6 +967,7 @@ TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) { s = db->Get(read_options, Slice("foo"), &value); ASSERT_TRUE(s.IsNotFound()); + db->FlushWAL(false); delete txn; // kill and reopen s = ReOpenNoDelete(); @@ -1021,7 +1032,8 @@ TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) { ASSERT_EQ(db->GetTransactionByName("xid"), nullptr); } -TEST_P(TransactionTest, TwoPhaseMultiThreadTest) { +// TODO this test needs to be updated with serial commits +TEST_P(TransactionTest, DISABLED_TwoPhaseMultiThreadTest) { // mix transaction writes and regular writes const uint32_t NUM_TXN_THREADS = 50; std::atomic txn_thread_num(0); @@ -1546,6 +1558,8 @@ TEST_P(TransactionTest, TwoPhaseOutOfOrderDelete) { s = db->Put(wal_on, "cats", "dogs4"); ASSERT_OK(s); + db->FlushWAL(false); + // kill and reopen env->SetFilesystemActive(false); ReOpenNoDelete(); @@ -4487,7 +4501,7 @@ Status TransactionStressTestInserter(TransactionDB* db, } } // namespace -TEST_P(TransactionTest, TransactionStressTest) { +TEST_P(MySQLStyleTransactionTest, TransactionStressTest) { const size_t num_threads = 4; const size_t num_transactions_per_thread = 10000; const size_t num_sets = 3;