diff --git a/HISTORY.md b/HISTORY.md index fcc031bcf..815ac9c80 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -4,6 +4,7 @@ * Introduced `Memoryllocator`, which lets the user specify custom allocator for memory in block cache. * Introduced `PerfContextByLevel` as part of `PerfContext` which allows storing perf context at each level. Also replaced `__thread` with `thread_local` keyword for perf_context. * With level_compaction_dynamic_level_bytes = true, level multiplier may be adjusted automatically when Level 0 to 1 compaction is lagged behind. +* Introduced DB option `atomic_flush`. If true, RocksDB supports flushing multiple column families and atomically committing the result to MANIFEST. Useful when WAL is disabled. ### Bug Fixes * Fix corner case where a write group leader blocked due to write stall blocks other writers in queue with WriteOptions::no_slowdown set. diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index 3601f8e94..3725ff954 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -87,19 +87,28 @@ Status DBImpl::GetLiveFiles(std::vector& ret, if (flush_memtable) { // flush all dirty data to disk. Status status; - for (auto cfd : *versions_->GetColumnFamilySet()) { - if (cfd->IsDropped()) { - continue; - } - cfd->Ref(); + if (atomic_flush_) { + autovector cfds; + SelectColumnFamiliesForAtomicFlush(&cfds); mutex_.Unlock(); - status = FlushMemTable(cfd, FlushOptions(), FlushReason::kGetLiveFiles); - TEST_SYNC_POINT("DBImpl::GetLiveFiles:1"); - TEST_SYNC_POINT("DBImpl::GetLiveFiles:2"); + status = AtomicFlushMemTables(cfds, FlushOptions(), + FlushReason::kGetLiveFiles); mutex_.Lock(); - cfd->Unref(); - if (!status.ok()) { - break; + } else { + for (auto cfd : *versions_->GetColumnFamilySet()) { + if (cfd->IsDropped()) { + continue; + } + cfd->Ref(); + mutex_.Unlock(); + status = FlushMemTable(cfd, FlushOptions(), FlushReason::kGetLiveFiles); + TEST_SYNC_POINT("DBImpl::GetLiveFiles:1"); + TEST_SYNC_POINT("DBImpl::GetLiveFiles:2"); + mutex_.Lock(); + cfd->Unref(); + if (!status.ok()) { + break; + } } } versions_->GetColumnFamilySet()->FreeDeadColumnFamilies(); diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index deae09c3c..6f6cbe196 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -25,6 +25,12 @@ class DBFlushDirectIOTest : public DBFlushTest, DBFlushDirectIOTest() : DBFlushTest() {} }; +class DBAtomicFlushTest : public DBFlushTest, + public ::testing::WithParamInterface { + public: + DBAtomicFlushTest() : DBFlushTest() {} +}; + // We had issue when two background threads trying to flush at the same time, // only one of them get committed. The test verifies the issue is fixed. TEST_F(DBFlushTest, FlushWhileWritingManifest) { @@ -214,9 +220,82 @@ TEST_F(DBFlushTest, FlushError) { ASSERT_NE(s, Status::OK()); } +TEST_P(DBAtomicFlushTest, ManualAtomicFlush) { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.atomic_flush = GetParam(); + options.write_buffer_size = (static_cast(64) << 20); + + CreateAndReopenWithCF({"pikachu", "eevee"}, options); + size_t num_cfs = handles_.size(); + ASSERT_EQ(3, num_cfs); + WriteOptions wopts; + wopts.disableWAL = true; + for (size_t i = 0; i != num_cfs; ++i) { + ASSERT_OK(Put(static_cast(i) /*cf*/, "key", "value", wopts)); + } + std::vector cf_ids; + for (size_t i = 0; i != num_cfs; ++i) { + cf_ids.emplace_back(static_cast(i)); + } + ASSERT_OK(Flush(cf_ids)); + for (size_t i = 0; i != num_cfs; ++i) { + auto cfh = static_cast(handles_[i]); + ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed()); + ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty()); + } +} + +TEST_P(DBAtomicFlushTest, AtomicFlushTriggeredByMemTableFull) { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.atomic_flush = GetParam(); + // 4KB so that we can easily trigger auto flush. + options.write_buffer_size = 4096; + + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::BackgroundCallFlush:FlushFinish:0", + "DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + CreateAndReopenWithCF({"pikachu", "eevee"}, options); + size_t num_cfs = handles_.size(); + ASSERT_EQ(3, num_cfs); + WriteOptions wopts; + wopts.disableWAL = true; + for (size_t i = 0; i != num_cfs; ++i) { + ASSERT_OK(Put(static_cast(i) /*cf*/, "key", "value", wopts)); + } + // Keep writing to one of them column families to trigger auto flush. + for (int i = 0; i != 4000; ++i) { + ASSERT_OK(Put(static_cast(num_cfs) - 1 /*cf*/, + "key" + std::to_string(i), "value" + std::to_string(i), + wopts)); + } + + TEST_SYNC_POINT( + "DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck"); + if (options.atomic_flush) { + for (size_t i = 0; i != num_cfs - 1; ++i) { + auto cfh = static_cast(handles_[i]); + ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed()); + ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty()); + } + } else { + for (size_t i = 0; i != num_cfs - 1; ++i) { + auto cfh = static_cast(handles_[i]); + ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed()); + ASSERT_FALSE(cfh->cfd()->mem()->IsEmpty()); + } + } + SyncPoint::GetInstance()->DisableProcessing(); +} + INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest, testing::Bool()); +INSTANTIATE_TEST_CASE_P(DBAtomicFlushTest, DBAtomicFlushTest, testing::Bool()); + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_impl.cc b/db/db_impl.cc index 9c2975589..cf4e84675 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -220,6 +220,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, preserve_deletes_(options.preserve_deletes), closed_(false), error_handler_(this, immutable_db_options_, &mutex_), + atomic_flush_(options.atomic_flush), atomic_flush_commit_in_progress_(false) { // !batch_per_trx_ implies seq_per_batch_ because it is only unset for // WriteUnprepared, which should use seq_per_batch_. @@ -305,7 +306,30 @@ Status DBImpl::ResumeImpl() { // We cannot guarantee consistency of the WAL. So force flush Memtables of // all the column families if (s.ok()) { - s = FlushAllCFs(FlushReason::kErrorRecovery); + FlushOptions flush_opts; + // We allow flush to stall write since we are trying to resume from error. + flush_opts.allow_write_stall = true; + if (atomic_flush_) { + autovector cfds; + SelectColumnFamiliesForAtomicFlush(&cfds); + mutex_.Unlock(); + s = AtomicFlushMemTables(cfds, flush_opts, FlushReason::kErrorRecovery); + mutex_.Lock(); + } else { + for (auto cfd : *versions_->GetColumnFamilySet()) { + if (cfd->IsDropped()) { + continue; + } + cfd->Ref(); + mutex_.Unlock(); + s = FlushMemTable(cfd, flush_opts, FlushReason::kErrorRecovery); + mutex_.Lock(); + cfd->Unref(); + if (!s.ok()) { + break; + } + } + } if (!s.ok()) { ROCKS_LOG_INFO(immutable_db_options_.info_log, "DB resume requested but failed due to Flush failure [%s]", @@ -377,13 +401,21 @@ void DBImpl::CancelAllBackgroundWork(bool wait) { if (!shutting_down_.load(std::memory_order_acquire) && has_unpersisted_data_.load(std::memory_order_relaxed) && !mutable_db_options_.avoid_flush_during_shutdown) { - for (auto cfd : *versions_->GetColumnFamilySet()) { - if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) { - cfd->Ref(); - mutex_.Unlock(); - FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown); - mutex_.Lock(); - cfd->Unref(); + if (atomic_flush_) { + autovector cfds; + SelectColumnFamiliesForAtomicFlush(&cfds); + mutex_.Unlock(); + AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown); + mutex_.Lock(); + } else { + for (auto cfd : *versions_->GetColumnFamilySet()) { + if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) { + cfd->Ref(); + mutex_.Unlock(); + FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown); + mutex_.Lock(); + cfd->Unref(); + } } } versions_->GetColumnFamilySet()->FreeDeadColumnFamilies(); @@ -3099,11 +3131,21 @@ Status DBImpl::IngestExternalFile( TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:NeedFlush", &need_flush); if (status.ok() && need_flush) { - mutex_.Unlock(); - status = FlushMemTable(cfd, FlushOptions(), - FlushReason::kExternalFileIngestion, - true /* writes_stopped */); - mutex_.Lock(); + if (atomic_flush_) { + mutex_.Unlock(); + autovector cfds; + SelectColumnFamiliesForAtomicFlush(&cfds); + status = AtomicFlushMemTables(cfds, FlushOptions(), + FlushReason::kExternalFileIngestion, + true /* writes_stopped */); + mutex_.Lock(); + } else { + mutex_.Unlock(); + status = FlushMemTable(cfd, FlushOptions(), + FlushReason::kExternalFileIngestion, + true /* writes_stopped */); + mutex_.Lock(); + } } } diff --git a/db/db_impl.h b/db/db_impl.h index 6ada36d53..255d02101 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -228,6 +228,9 @@ class DBImpl : public DB { using DB::Flush; virtual Status Flush(const FlushOptions& options, ColumnFamilyHandle* column_family) override; + virtual Status Flush( + const FlushOptions& options, + const std::vector& column_families) override; virtual Status FlushWAL(bool sync) override; bool TEST_WALBufferIsEmpty(); virtual Status SyncWAL() override; @@ -965,10 +968,17 @@ class DBImpl : public DB { Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context); + void SelectColumnFamiliesForAtomicFlush(autovector* cfds); + // Force current memtable contents to be flushed. Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options, FlushReason flush_reason, bool writes_stopped = false); + Status AtomicFlushMemTables( + const autovector& column_family_datas, + const FlushOptions& options, FlushReason flush_reason, + bool writes_stopped = false); + // Wait until flushing this column family won't stall writes Status WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd, bool* flush_needed); @@ -977,14 +987,22 @@ class DBImpl : public DB { // If flush_memtable_id is non-null, wait until the memtable with the ID // gets flush. Otherwise, wait until the column family don't have any // memtable pending flush. + // resuming_from_bg_err indicates whether the caller is attempting to resume + // from background error. Status WaitForFlushMemTable(ColumnFamilyData* cfd, - const uint64_t* flush_memtable_id = nullptr) { - return WaitForFlushMemTables({cfd}, {flush_memtable_id}); + const uint64_t* flush_memtable_id = nullptr, + bool resuming_from_bg_err = false) { + return WaitForFlushMemTables({cfd}, {flush_memtable_id}, + resuming_from_bg_err); } // Wait for memtables to be flushed for multiple column families. Status WaitForFlushMemTables( const autovector& cfds, - const autovector& flush_memtable_ids); + const autovector& flush_memtable_ids, + bool resuming_from_bg_err); + + // REQUIRES: mutex locked and in write thread. + void AssignAtomicFlushSeq(const autovector& cfds); // REQUIRES: mutex locked Status SwitchWAL(WriteContext* write_context); @@ -1049,6 +1067,9 @@ class DBImpl : public DB { // column families in this request, this flush is considered complete. typedef std::vector> FlushRequest; + void GenerateFlushRequest(const autovector& cfds, + FlushRequest* req); + void SchedulePendingFlush(const FlushRequest& req, FlushReason flush_reason); void SchedulePendingCompaction(ColumnFamilyData* cfd); @@ -1109,8 +1130,6 @@ class DBImpl : public DB { Status CloseHelper(); - Status FlushAllCFs(FlushReason flush_reason); - void WaitForBackgroundWork(); // table_cache_ provides its own synchronization @@ -1584,6 +1603,9 @@ class DBImpl : public DB { ErrorHandler error_handler_; + // True if DB enables atomic flush. + bool atomic_flush_; + // True if the DB is committing atomic flush. // TODO (yanqin) the current impl assumes that the entire DB belongs to // a single atomic flush group. In the future we need to add a new class diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index b436987e9..49a23b48e 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -211,6 +211,10 @@ Status DBImpl::FlushMemTableToOutputFile( Status DBImpl::FlushMemTablesToOutputFiles( const autovector& bg_flush_args, bool* made_progress, JobContext* job_context, LogBuffer* log_buffer) { + if (atomic_flush_) { + return AtomicFlushMemTablesToOutputFiles(bg_flush_args, made_progress, + job_context, log_buffer); + } Status s; for (auto& arg : bg_flush_args) { ColumnFamilyData* cfd = arg.cfd_; @@ -318,11 +322,23 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( s = SyncClosedLogs(job_context); } + // exec_status stores the execution status of flush_jobs as + // + autovector> exec_status; + for (int i = 0; i != num_cfs; ++i) { + // Initially all jobs are not executed, with status OK. + std::pair elem(false, Status::OK()); + exec_status.emplace_back(elem); + } + if (s.ok()) { // TODO (yanqin): parallelize jobs with threads. for (int i = 0; i != num_cfs; ++i) { - s = jobs[i].Run(&logs_with_prep_tracker_, &file_meta[i]); - if (!s.ok()) { + exec_status[i].second = + jobs[i].Run(&logs_with_prep_tracker_, &file_meta[i]); + exec_status[i].first = true; + if (!exec_status[i].second.ok()) { + s = exec_status[i].second; break; } } @@ -401,12 +417,19 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( } if (!s.ok()) { + // Have to cancel the flush jobs that have NOT executed because we need to + // unref the versions. for (int i = 0; i != num_cfs; ++i) { - auto& mems = jobs[i].GetMemTables(); - cfds[i]->imm()->RollbackMemtableFlush(mems, file_meta[i].fd.GetNumber()); - jobs[i].Cancel(); + if (!exec_status[i].first) { + jobs[i].Cancel(); + } } if (!s.IsShutdownInProgress()) { + for (int i = 0; i != num_cfs; ++i) { + auto& mems = jobs[i].GetMemTables(); + cfds[i]->imm()->RollbackMemtableFlush(mems, + file_meta[i].fd.GetNumber()); + } Status new_bg_error = s; error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); } @@ -543,8 +566,15 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options, if (flush_needed) { FlushOptions fo; fo.allow_write_stall = options.allow_write_stall; - s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction, - false /* writes_stopped*/); + if (atomic_flush_) { + autovector cfds; + SelectColumnFamiliesForAtomicFlush(&cfds); + s = AtomicFlushMemTables(cfds, fo, FlushReason::kManualCompaction, + false /* writes_stopped */); + } else { + s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction, + false /* writes_stopped*/); + } if (!s.ok()) { LogFlush(immutable_db_options_.info_log); return s; @@ -1165,72 +1195,59 @@ Status DBImpl::Flush(const FlushOptions& flush_options, auto cfh = reinterpret_cast(column_family); ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.", cfh->GetName().c_str()); - Status s = - FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualFlush); + Status s; + if (atomic_flush_) { + s = AtomicFlushMemTables({cfh->cfd()}, flush_options, + FlushReason::kManualFlush); + } else { + s = FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualFlush); + } + ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush finished, status: %s\n", cfh->GetName().c_str(), s.ToString().c_str()); return s; } -Status DBImpl::FlushAllCFs(FlushReason flush_reason) { +Status DBImpl::Flush(const FlushOptions& flush_options, + const std::vector& column_families) { Status s; - WriteContext context; - WriteThread::Writer w; - - mutex_.AssertHeld(); - write_thread_.EnterUnbatched(&w, &mutex_); - - FlushRequest flush_req; - for (auto cfd : *versions_->GetColumnFamilySet()) { - if (cfd->imm()->NumNotFlushed() == 0 && cfd->mem()->IsEmpty() && - cached_recoverable_state_empty_.load()) { - // Nothing to flush - continue; - } - - // SwitchMemtable() will release and reacquire mutex during execution - s = SwitchMemtable(cfd, &context); - if (!s.ok()) { - break; - } - - cfd->imm()->FlushRequested(); - - flush_req.emplace_back(cfd, cfd->imm()->GetLatestMemTableID()); - } - - // schedule flush - if (s.ok() && !flush_req.empty()) { - SchedulePendingFlush(flush_req, flush_reason); - MaybeScheduleFlushOrCompaction(); - } - - write_thread_.ExitUnbatched(&w); - - if (s.ok()) { - for (auto& flush : flush_req) { - auto cfd = flush.first; - auto flush_memtable_id = flush.second; - while (cfd->imm()->NumNotFlushed() > 0 && - cfd->imm()->GetEarliestMemTableID() <= flush_memtable_id) { - if (!error_handler_.GetRecoveryError().ok()) { - break; - } - if (cfd->IsDropped()) { - // FlushJob cannot flush a dropped CF, if we did not break here - // we will loop forever since cfd->imm()->NumNotFlushed() will never - // drop to zero - continue; - } - cfd->Ref(); - bg_cv_.Wait(); - cfd->Unref(); + if (!atomic_flush_) { + for (auto cfh : column_families) { + s = Flush(flush_options, cfh); + if (!s.ok()) { + break; } } + } else { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Manual atomic flush start.\n" + "=====Column families:====="); + for (auto cfh : column_families) { + auto cfhi = static_cast(cfh); + ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", + cfhi->GetName().c_str()); + } + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "=====End of column families list====="); + autovector cfds; + std::for_each(column_families.begin(), column_families.end(), + [&cfds](ColumnFamilyHandle* elem) { + auto cfh = static_cast(elem); + cfds.emplace_back(cfh->cfd()); + }); + s = AtomicFlushMemTables(cfds, flush_options, FlushReason::kManualFlush); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Manual atomic flush finished, status: %s\n", + "=====Column families:=====", s.ToString().c_str()); + for (auto cfh : column_families) { + auto cfhi = static_cast(cfh); + ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", + cfhi->GetName().c_str()); + } + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "=====End of column families list====="); } - - flush_req.clear(); return s; } @@ -1364,6 +1381,19 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, return manual.status; } +void DBImpl::GenerateFlushRequest(const autovector& cfds, + FlushRequest* req) { + assert(req != nullptr); + for (const auto cfd : cfds) { + if (nullptr == cfd) { + // cfd may be null, see DBImpl::ScheduleFlushes + continue; + } + uint64_t max_memtable_id = cfd->imm()->GetLatestMemTableID(); + req->emplace_back(cfd, max_memtable_id); + } +} + Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& flush_options, FlushReason flush_reason, bool writes_stopped) { @@ -1415,12 +1445,89 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, cfds.push_back(iter.first); flush_memtable_ids.push_back(&(iter.second)); } - s = WaitForFlushMemTables(cfds, flush_memtable_ids); + s = WaitForFlushMemTables(cfds, flush_memtable_ids, + (flush_reason == FlushReason::kErrorRecovery)); } TEST_SYNC_POINT("FlushMemTableFinished"); return s; } +// Flush all elments in 'column_family_datas' +// and atomically record the result to the MANIFEST. +Status DBImpl::AtomicFlushMemTables( + const autovector& column_family_datas, + const FlushOptions& flush_options, FlushReason flush_reason, + bool writes_stopped) { + Status s; + if (!flush_options.allow_write_stall) { + int num_cfs_to_flush = 0; + for (auto cfd : column_family_datas) { + bool flush_needed = true; + s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed); + if (!s.ok()) { + return s; + } else if (flush_needed) { + ++num_cfs_to_flush; + } + } + if (0 == num_cfs_to_flush) { + return s; + } + } + FlushRequest flush_req; + autovector cfds; + { + WriteContext context; + InstrumentedMutexLock guard_lock(&mutex_); + + WriteThread::Writer w; + if (!writes_stopped) { + write_thread_.EnterUnbatched(&w, &mutex_); + } + + for (auto cfd : column_family_datas) { + if (cfd->IsDropped()) { + continue; + } + if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() || + !cached_recoverable_state_empty_.load()) { + cfds.emplace_back(cfd); + } + } + for (auto cfd : cfds) { + cfd->Ref(); + s = SwitchMemtable(cfd, &context); + cfd->Unref(); + if (!s.ok()) { + break; + } + } + if (s.ok()) { + AssignAtomicFlushSeq(cfds); + for (auto cfd : cfds) { + cfd->imm()->FlushRequested(); + } + GenerateFlushRequest(cfds, &flush_req); + SchedulePendingFlush(flush_req, flush_reason); + MaybeScheduleFlushOrCompaction(); + } + + if (!writes_stopped) { + write_thread_.ExitUnbatched(&w); + } + } + + if (s.ok() && flush_options.wait) { + autovector flush_memtable_ids; + for (auto& iter : flush_req) { + flush_memtable_ids.push_back(&(iter.second)); + } + s = WaitForFlushMemTables(cfds, flush_memtable_ids, + (flush_reason == FlushReason::kErrorRecovery)); + } + return s; +} + // Calling FlushMemTable(), whether from DB::Flush() or from Backup Engine, can // cause write stall, for example if one memtable is being flushed already. // This method tries to avoid write stall (similar to CompactRange() behavior) @@ -1492,16 +1599,25 @@ Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd, // 2) if flush_memtable_ids[i] is null, then all memtables in THIS column // family have to be flushed. // Finish waiting when ALL column families finish flushing memtables. +// resuming_from_bg_err indicates whether the caller is trying to resume from +// background error or in normal processing. Status DBImpl::WaitForFlushMemTables( const autovector& cfds, - const autovector& flush_memtable_ids) { + const autovector& flush_memtable_ids, + bool resuming_from_bg_err) { int num = static_cast(cfds.size()); // Wait until the compaction completes InstrumentedMutexLock l(&mutex_); - while (!error_handler_.IsDBStopped()) { + // If the caller is trying to resume from bg error, then + // error_handler_.IsDBStopped() is true. + while (resuming_from_bg_err || !error_handler_.IsDBStopped()) { if (shutting_down_.load(std::memory_order_acquire)) { return Status::ShutdownInProgress(); } + // If an error has occurred during resumption, then no need to wait. + if (!error_handler_.GetRecoveryError().ok()) { + break; + } // Number of column families that have been dropped. int num_dropped = 0; // Number of column families that have finished flush. @@ -1527,7 +1643,9 @@ Status DBImpl::WaitForFlushMemTables( bg_cv_.Wait(); } Status s; - if (error_handler_.IsDBStopped()) { + // If not resuming from bg error, and an error has caused the DB to stop, + // then report the bg error to caller. + if (!resuming_from_bg_err && error_handler_.IsDBStopped()) { s = error_handler_.GetBGError(); } return s; @@ -1867,6 +1985,7 @@ void DBImpl::BackgroundCallFlush() { mutex_.Lock(); } + TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FlushFinish:0"); ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); // If flush failed, we want to delete all temporary files that we might have diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 11f5e1548..bbdd5df37 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -123,7 +123,7 @@ Status DBImpl::TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family) { auto cfh = reinterpret_cast(column_family); cfd = cfh->cfd(); } - return WaitForFlushMemTable(cfd); + return WaitForFlushMemTable(cfd, nullptr, false); } Status DBImpl::TEST_WaitForCompact(bool wait_unscheduled) { diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 8910ab08e..4efb1d00e 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -1014,6 +1014,28 @@ Status DBImpl::WriteRecoverableState() { return Status::OK(); } +void DBImpl::SelectColumnFamiliesForAtomicFlush( + autovector* cfds) { + for (ColumnFamilyData* cfd : *versions_->GetColumnFamilySet()) { + if (cfd->IsDropped()) { + continue; + } + if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() || + !cached_recoverable_state_empty_.load()) { + cfds->push_back(cfd); + } + } +} + +// Assign sequence number for atomic flush. +void DBImpl::AssignAtomicFlushSeq(const autovector& cfds) { + assert(atomic_flush_); + auto seq = versions_->LastSequence(); + for (auto cfd : cfds) { + cfd->imm()->AssignAtomicFlushSeq(seq); + } +} + Status DBImpl::SwitchWAL(WriteContext* write_context) { mutex_.AssertHeld(); assert(write_context != nullptr); @@ -1062,21 +1084,36 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) { oldest_alive_log, total_log_size_.load(), GetMaxTotalWalSize()); // no need to refcount because drop is happening in write thread, so can't // happen while we're in the write thread - FlushRequest flush_req; - for (auto cfd : *versions_->GetColumnFamilySet()) { - if (cfd->IsDropped()) { - continue; - } - if (cfd->OldestLogToKeep() <= oldest_alive_log) { - status = SwitchMemtable(cfd, write_context); - if (!status.ok()) { - break; + autovector cfds; + if (atomic_flush_) { + SelectColumnFamiliesForAtomicFlush(&cfds); + } else { + for (auto cfd : *versions_->GetColumnFamilySet()) { + if (cfd->IsDropped()) { + continue; } - flush_req.emplace_back(cfd, cfd->imm()->GetLatestMemTableID()); - cfd->imm()->FlushRequested(); + if (cfd->OldestLogToKeep() <= oldest_alive_log) { + cfds.push_back(cfd); + } + } + } + for (const auto cfd : cfds) { + cfd->Ref(); + status = SwitchMemtable(cfd, write_context); + cfd->Unref(); + if (!status.ok()) { + break; } } if (status.ok()) { + if (atomic_flush_) { + AssignAtomicFlushSeq(cfds); + } + for (auto cfd : cfds) { + cfd->imm()->FlushRequested(); + } + FlushRequest flush_req; + GenerateFlushRequest(cfds, &flush_req); SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager); MaybeScheduleFlushOrCompaction(); } @@ -1101,29 +1138,32 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) { write_buffer_manager_->buffer_size()); // no need to refcount because drop is happening in write thread, so can't // happen while we're in the write thread - ColumnFamilyData* cfd_picked = nullptr; - SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber; + autovector cfds; + if (atomic_flush_) { + SelectColumnFamiliesForAtomicFlush(&cfds); + } else { + ColumnFamilyData* cfd_picked = nullptr; + SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber; - for (auto cfd : *versions_->GetColumnFamilySet()) { - if (cfd->IsDropped()) { - continue; - } - if (!cfd->mem()->IsEmpty()) { - // We only consider active mem table, hoping immutable memtable is - // already in the process of flushing. - uint64_t seq = cfd->mem()->GetCreationSeq(); - if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) { - cfd_picked = cfd; - seq_num_for_cf_picked = seq; + for (auto cfd : *versions_->GetColumnFamilySet()) { + if (cfd->IsDropped()) { + continue; + } + if (!cfd->mem()->IsEmpty()) { + // We only consider active mem table, hoping immutable memtable is + // already in the process of flushing. + uint64_t seq = cfd->mem()->GetCreationSeq(); + if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) { + cfd_picked = cfd; + seq_num_for_cf_picked = seq; + } } } + if (cfd_picked != nullptr) { + cfds.push_back(cfd_picked); + } } - autovector cfds; - if (cfd_picked != nullptr) { - cfds.push_back(cfd_picked); - } - FlushRequest flush_req; for (const auto cfd : cfds) { cfd->Ref(); status = SwitchMemtable(cfd, write_context); @@ -1131,11 +1171,16 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) { if (!status.ok()) { break; } - uint64_t flush_memtable_id = cfd->imm()->GetLatestMemTableID(); - cfd->imm()->FlushRequested(); - flush_req.emplace_back(cfd, flush_memtable_id); } if (status.ok()) { + if (atomic_flush_) { + AssignAtomicFlushSeq(cfds); + } + for (const auto cfd : cfds) { + cfd->imm()->FlushRequested(); + } + FlushRequest flush_req; + GenerateFlushRequest(cfds, &flush_req); SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull); MaybeScheduleFlushOrCompaction(); } @@ -1258,25 +1303,36 @@ Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options, } Status DBImpl::ScheduleFlushes(WriteContext* context) { - ColumnFamilyData* cfd; - FlushRequest flush_req; + autovector cfds; + if (atomic_flush_) { + SelectColumnFamiliesForAtomicFlush(&cfds); + for (auto cfd : cfds) { + cfd->Ref(); + } + flush_scheduler_.Clear(); + } else { + ColumnFamilyData* tmp_cfd; + while ((tmp_cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) { + cfds.push_back(tmp_cfd); + } + } Status status; - while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) { + for (auto& cfd : cfds) { status = SwitchMemtable(cfd, context); - bool should_schedule = true; if (cfd->Unref()) { delete cfd; - should_schedule = false; + cfd = nullptr; } if (!status.ok()) { break; } - if (should_schedule) { - uint64_t flush_memtable_id = cfd->imm()->GetLatestMemTableID(); - flush_req.emplace_back(cfd, flush_memtable_id); - } } if (status.ok()) { + if (atomic_flush_) { + AssignAtomicFlushSeq(cfds); + } + FlushRequest flush_req; + GenerateFlushRequest(cfds, &flush_req); SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull); MaybeScheduleFlushOrCompaction(); } diff --git a/db/db_test.cc b/db/db_test.cc index 036735200..7ee23780f 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2659,6 +2659,11 @@ class ModelDB : public DB { Status ret; return ret; } + virtual Status Flush( + const rocksdb::FlushOptions& /*options*/, + const std::vector& /*column_families*/) override { + return Status::OK(); + } virtual Status SyncWAL() override { return Status::OK(); } diff --git a/db/db_test_util.cc b/db/db_test_util.cc index a4d506b4a..7daea9559 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -646,6 +646,13 @@ Status DBTestBase::Flush(int cf) { } } +Status DBTestBase::Flush(const std::vector& cf_ids) { + std::vector cfhs; + std::for_each(cf_ids.begin(), cf_ids.end(), + [&cfhs, this](int id) { cfhs.emplace_back(handles_[id]); }); + return db_->Flush(FlushOptions(), cfhs); +} + Status DBTestBase::Put(const Slice& k, const Slice& v, WriteOptions wo) { if (kMergePut == option_config_) { return db_->Merge(wo, k, v); diff --git a/db/db_test_util.h b/db/db_test_util.h index 7f26e708a..9081ee358 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -799,6 +799,8 @@ class DBTestBase : public testing::Test { Status Flush(int cf = 0); + Status Flush(const std::vector& cf_ids); + Status Put(const Slice& k, const Slice& v, WriteOptions wo = WriteOptions()); Status Put(int cf, const Slice& k, const Slice& v, diff --git a/db/memtable_list.h b/db/memtable_list.h index 75f5cd56c..8f23def13 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -283,6 +283,21 @@ class MemTableList { return memlist.front()->GetID(); } + void AssignAtomicFlushSeq(const SequenceNumber& seq) { + const auto& memlist = current_->memlist_; + // Scan the memtable list from new to old + for (auto it = memlist.begin(); it != memlist.end(); ++it) { + MemTable* mem = *it; + if (mem->atomic_flush_seqno_ == kMaxSequenceNumber) { + mem->atomic_flush_seqno_ = seq; + } else { + // Earlier memtables must have been assigned a atomic flush seq, no + // need to continue scan. + break; + } + } + } + private: // DB mutex held void InstallNewVersion(); diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index f1430bce8..f7517b91f 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -900,11 +900,22 @@ class DB { virtual DBOptions GetDBOptions() const = 0; // Flush all mem-table data. + // Flush a single column family, even when atomic flush is enabled. To flush + // multiple column families, use Flush(options, column_families). virtual Status Flush(const FlushOptions& options, ColumnFamilyHandle* column_family) = 0; virtual Status Flush(const FlushOptions& options) { return Flush(options, DefaultColumnFamily()); } + // Flushes multiple column families. + // If atomic flush is not enabled, Flush(options, column_families) is + // equivalent to calling Flush(options, column_family) multiple times. + // If atomic flush is enabled, Flush(options, column_families) will flush all + // column families specified in 'column_families' up to the latest sequence + // number at the time when flush is requested. + virtual Status Flush( + const FlushOptions& options, + const std::vector& column_families) = 0; // Flush the WAL memory buffer to the file. If sync is true, it calls SyncWAL // afterwards. diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index c4597c489..63a65fa16 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -943,6 +943,20 @@ struct DBOptions { // relies on manual invocation of FlushWAL to write the WAL buffer to its // file. bool manual_wal_flush = false; + + // If true, RocksDB supports flushing multiple column families and committing + // their results atomically to MANIFEST. Note that it is not + // necessary to set atomic_flush to true if WAL is always enabled since WAL + // allows the database to be restored to the last persistent state in WAL. + // This option is useful when there are column families with writes NOT + // protected by WAL. + // For manual flush, application has to specify which column families to + // flush atomically in DB::Flush. + // For auto-triggered flush, RocksDB atomically flushes ALL column families. + // + // Currently, any WAL-enabled writes after atomic flush may be replayed + // independently if the process crashes later and tries to recover. + bool atomic_flush = false; }; // Options to control the behavior of a database (passed to DB::Open) diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 721203f7c..88426b3f5 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -278,6 +278,11 @@ class StackableDB : public DB { ColumnFamilyHandle* column_family) override { return db_->Flush(fopts, column_family); } + virtual Status Flush( + const FlushOptions& fopts, + const std::vector& column_families) override { + return db_->Flush(fopts, column_families); + } virtual Status SyncWAL() override { return db_->SyncWAL(); diff --git a/options/db_options.h b/options/db_options.h index 107d35c87..2cd83b55d 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -78,6 +78,7 @@ struct ImmutableDBOptions { bool preserve_deletes; bool two_write_queues; bool manual_wal_flush; + bool atomic_flush; }; struct MutableDBOptions { diff --git a/options/options_helper.cc b/options/options_helper.cc index f4c59ff06..fe0cbb558 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -1554,7 +1554,11 @@ std::unordered_map offsetof(struct ImmutableDBOptions, manual_wal_flush)}}, {"seq_per_batch", {0, OptionType::kBoolean, OptionVerificationType::kDeprecated, false, - 0}}}; + 0}}, + {"atomic_flush", + {offsetof(struct DBOptions, atomic_flush), OptionType::kBoolean, + OptionVerificationType::kNormal, false, + offsetof(struct ImmutableDBOptions, atomic_flush)}}}; std::unordered_map OptionsHelper::block_base_table_index_type_string_map = { diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index ded152ba9..cad1af3d7 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -291,7 +291,8 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "concurrent_prepare=false;" "two_write_queues=false;" "manual_wal_flush=false;" - "seq_per_batch=false;", + "seq_per_batch=false;" + "atomic_flush=false", new_options)); ASSERT_EQ(unset_bytes_base, NumUnsetBytes(new_options_ptr, sizeof(DBOptions), diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 45a7c9a0d..3740138c9 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -133,6 +133,8 @@ DEFINE_bool(test_batches_snapshots, false, "\t(b) No long validation at the end (more speed up)\n" "\t(c) Test snapshot and atomicity of batch writes"); +DEFINE_bool(atomic_flush, false, "If true, the test enables atomic flush\n"); + DEFINE_int32(threads, 32, "Number of concurrent threads to run."); DEFINE_int32(ttl, -1, @@ -2218,6 +2220,8 @@ class StressTest { fprintf(stdout, "Format version : %d\n", FLAGS_format_version); fprintf(stdout, "TransactionDB : %s\n", FLAGS_use_txn ? "true" : "false"); + fprintf(stdout, "Atomic flush : %s\n", + FLAGS_atomic_flush ? "true" : "false"); fprintf(stdout, "Column families : %d\n", FLAGS_column_families); if (!FLAGS_test_batches_snapshots) { fprintf(stdout, "Clear CFs one in : %d\n", @@ -2363,6 +2367,7 @@ class StressTest { FLAGS_universal_max_merge_width; options_.compaction_options_universal.max_size_amplification_percent = FLAGS_universal_max_size_amplification_percent; + options_.atomic_flush = FLAGS_atomic_flush; } else { #ifdef ROCKSDB_LITE fprintf(stderr, "--options_file not supported in lite mode\n"); @@ -3327,6 +3332,252 @@ class BatchedOpsStressTest : public StressTest { virtual void VerifyDb(ThreadState* /* thread */) const {} }; +class AtomicFlushStressTest : public StressTest { + public: + AtomicFlushStressTest() : batch_id_(0) {} + + virtual ~AtomicFlushStressTest() {} + + virtual Status TestPut(ThreadState* thread, WriteOptions& write_opts, + const ReadOptions& /* read_opts */, + const std::vector& rand_column_families, + const std::vector& rand_keys, + char (&value)[100], + std::unique_ptr& /* lock */) { + std::string key_str = Key(rand_keys[0]); + Slice key = key_str; + uint64_t value_base = batch_id_.fetch_add(1); + size_t sz = + GenerateValue(static_cast(value_base), value, sizeof(value)); + Slice v(value, sz); + WriteBatch batch; + for (auto cf : rand_column_families) { + ColumnFamilyHandle* cfh = column_families_[cf]; + if (FLAGS_use_merge) { + batch.Merge(cfh, key, v); + } else { /* !FLAGS_use_merge */ + batch.Put(cfh, key, v); + } + } + Status s = db_->Write(write_opts, &batch); + if (!s.ok()) { + fprintf(stderr, "multi put or merge error: %s\n", s.ToString().c_str()); + thread->stats.AddErrors(1); + } else { + size_t num = rand_column_families.size(); + thread->stats.AddBytesForWrites(num, (sz + 1) * num); + } + + return s; + } + + virtual Status TestDelete(ThreadState* thread, WriteOptions& write_opts, + const std::vector& rand_column_families, + const std::vector& rand_keys, + std::unique_ptr& /* lock */) { + std::string key_str = Key(rand_keys[0]); + Slice key = key_str; + WriteBatch batch; + for (auto cf : rand_column_families) { + ColumnFamilyHandle* cfh = column_families_[cf]; + batch.Delete(cfh, key); + } + Status s = db_->Write(write_opts, &batch); + if (!s.ok()) { + fprintf(stderr, "multidel error: %s\n", s.ToString().c_str()); + thread->stats.AddErrors(1); + } else { + thread->stats.AddDeletes(rand_column_families.size()); + } + return s; + } + + virtual Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts, + const std::vector& rand_column_families, + const std::vector& rand_keys, + std::unique_ptr& /* lock */) { + int64_t rand_key = rand_keys[0]; + auto shared = thread->shared; + int64_t max_key = shared->GetMaxKey(); + if (rand_key > max_key - FLAGS_range_deletion_width) { + rand_key = + thread->rand.Next() % (max_key - FLAGS_range_deletion_width + 1); + } + std::string key_str = Key(rand_key); + Slice key = key_str; + std::string end_key_str = Key(rand_key + FLAGS_range_deletion_width); + Slice end_key = end_key_str; + WriteBatch batch; + for (auto cf : rand_column_families) { + ColumnFamilyHandle* cfh = column_families_[rand_column_families[cf]]; + batch.DeleteRange(cfh, key, end_key); + } + Status s = db_->Write(write_opts, &batch); + if (!s.ok()) { + fprintf(stderr, "multi del range error: %s\n", s.ToString().c_str()); + thread->stats.AddErrors(1); + } else { + thread->stats.AddRangeDeletions(rand_column_families.size()); + } + return s; + } + + virtual void TestIngestExternalFile( + ThreadState* /* thread */, + const std::vector& /* rand_column_families */, + const std::vector& /* rand_keys */, + std::unique_ptr& /* lock */) { + assert(false); + fprintf(stderr, + "AtomicFlushStressTest does not support TestIngestExternalFile " + "because it's not possible to verify the result\n"); + std::terminate(); + } + + virtual Status TestGet(ThreadState* thread, const ReadOptions& readoptions, + const std::vector& rand_column_families, + const std::vector& rand_keys) { + std::string key_str = Key(rand_keys[0]); + Slice key = key_str; + auto cfh = + column_families_[rand_column_families[thread->rand.Next() % + rand_column_families.size()]]; + std::string from_db; + Status s = db_->Get(readoptions, cfh, key, &from_db); + if (s.ok()) { + thread->stats.AddGets(1, 1); + } else if (s.IsNotFound()) { + thread->stats.AddGets(1, 0); + } else { + thread->stats.AddErrors(1); + } + return s; + } + + virtual Status TestPrefixScan(ThreadState* thread, + const ReadOptions& readoptions, + const std::vector& rand_column_families, + const std::vector& rand_keys) { + std::string key_str = Key(rand_keys[0]); + Slice key = key_str; + Slice prefix = Slice(key.data(), FLAGS_prefix_size); + + std::string upper_bound; + Slice ub_slice; + ReadOptions ro_copy = readoptions; + if (thread->rand.OneIn(2) && GetNextPrefix(prefix, &upper_bound)) { + ub_slice = Slice(upper_bound); + ro_copy.iterate_upper_bound = &ub_slice; + } + auto cfh = + column_families_[rand_column_families[thread->rand.Next() % + rand_column_families.size()]]; + Iterator* iter = db_->NewIterator(ro_copy, cfh); + int64_t count = 0; + for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); + iter->Next()) { + ++count; + } + assert(count <= (static_cast(1) << ((8 - FLAGS_prefix_size) * 8))); + Status s = iter->status(); + if (s.ok()) { + thread->stats.AddPrefixes(1, static_cast(count)); + } else { + thread->stats.AddErrors(1); + } + delete iter; + return s; + } + + virtual void VerifyDb(ThreadState* thread) const { + ReadOptions options(FLAGS_verify_checksum, true); + // We must set total_order_seek to true because we are doing a SeekToFirst + // on a column family whose memtables may support (by default) prefix-based + // iterator. In this case, NewIterator with options.total_order_seek being + // false returns a prefix-based iterator. Calling SeekToFirst using this + // iterator causes the iterator to become invalid. That means we cannot + // iterate the memtable using this iterator any more, although the memtable + // contains the most up-to-date key-values. + options.total_order_seek = true; + assert(thread != nullptr); + auto shared = thread->shared; + std::vector > iters(column_families_.size()); + for (size_t i = 0; i != column_families_.size(); ++i) { + iters[i].reset(db_->NewIterator(options, column_families_[i])); + } + for (auto& iter : iters) { + iter->SeekToFirst(); + } + size_t num = column_families_.size(); + assert(num == iters.size()); + do { + size_t valid_cnt = 0; + for (auto& iter : iters) { + if (iter->Valid()) { + ++valid_cnt; + } + } + if (valid_cnt == 0) { + break; + } else if (valid_cnt != iters.size()) { + fprintf(stderr, "Finished iterating the following column families:\n"); + for (size_t i = 0; i != num; ++i) { + if (!iters[i]->Valid()) { + fprintf(stderr, "%s ", column_families_[i]->GetName().c_str()); + } + } + fprintf(stderr, + "\nThe following column families have data that have not been " + "scanned:\n"); + for (size_t i = 0; i != num; ++i) { + if (iters[i]->Valid()) { + fprintf(stderr, "%s ", column_families_[i]->GetName().c_str()); + } + } + fprintf(stderr, "\n"); + } + // If the program reaches here, then all column families' iterators are + // still valid. + Slice key; + Slice value; + for (size_t i = 0; i != num; ++i) { + if (i == 0) { + key = iters[i]->key(); + value = iters[i]->value(); + } else { + if (key.compare(iters[i]->key()) != 0) { + fprintf(stderr, "Verification failed\n"); + fprintf(stderr, "cf%s: %s => %s\n", + column_families_[0]->GetName().c_str(), + key.ToString(true /* hex */).c_str(), + value.ToString(/* hex */).c_str()); + fprintf(stderr, "cf%s: %s => %s\n", + column_families_[i]->GetName().c_str(), + iters[i]->key().ToString(true /* hex */).c_str(), + iters[i]->value().ToString(true /* hex */).c_str()); + shared->SetVerificationFailure(); + } + } + } + for (auto& iter : iters) { + iter->Next(); + } + } while (true); + } + + virtual std::vector GenerateColumnFamilies( + const int /* num_column_families */, int /* rand_column_family */) const { + std::vector ret; + int num = static_cast(column_families_.size()); + int k = 0; + std::generate_n(back_inserter(ret), num, [&k]() -> int { return k++; }); + return ret; + } + + private: + std::atomic batch_id_; +}; + } // namespace rocksdb int main(int argc, char** argv) { @@ -3428,7 +3679,9 @@ int main(int argc, char** argv) { rocksdb_kill_prefix_blacklist = SplitString(FLAGS_kill_prefix_blacklist); std::unique_ptr stress; - if (FLAGS_test_batches_snapshots) { + if (FLAGS_atomic_flush) { + stress.reset(new rocksdb::AtomicFlushStressTest()); + } else if (FLAGS_test_batches_snapshots) { stress.reset(new rocksdb::BatchedOpsStressTest()); } else { stress.reset(new rocksdb::NonBatchedOpsStressTest());