diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 4198cf048..a90261fa8 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1199,7 +1199,7 @@ class DBImpl : public DB { friend class StatsHistoryTest_PersistentStatsCreateColumnFamilies_Test; #ifndef NDEBUG friend class DBTest2_ReadCallbackTest_Test; - friend class WriteCallbackTest_WriteWithCallbackTest_Test; + friend class WriteCallbackPTest_WriteWithCallbackTest_Test; friend class XFTransactionWriteHandler; friend class DBBlobIndexTest; friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; diff --git a/db/write_callback_test.cc b/db/write_callback_test.cc index df7d673aa..4bfc4e911 100644 --- a/db/write_callback_test.cc +++ b/db/write_callback_test.cc @@ -84,7 +84,28 @@ class MockWriteCallback : public WriteCallback { bool AllowWriteBatching() override { return allow_batching_; } }; -TEST_F(WriteCallbackTest, WriteWithCallbackTest) { +class WriteCallbackPTest + : public WriteCallbackTest, + public ::testing::WithParamInterface< + std::tuple> { + public: + WriteCallbackPTest() { + std::tie(unordered_write_, seq_per_batch_, two_queues_, allow_parallel_, + allow_batching_, enable_WAL_, enable_pipelined_write_) = + GetParam(); + } + + protected: + bool unordered_write_; + bool seq_per_batch_; + bool two_queues_; + bool allow_parallel_; + bool allow_batching_; + bool enable_WAL_; + bool enable_pipelined_write_; +}; + +TEST_P(WriteCallbackPTest, WriteWithCallbackTest) { struct WriteOP { WriteOP(bool should_fail = false) { callback_.should_fail_ = should_fail; } @@ -124,254 +145,238 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) { {false, false, true, false, true}, }; - for (auto& unordered_write : {true, false}) { - for (auto& seq_per_batch : {true, false}) { - for (auto& two_queues : {true, false}) { - for (auto& allow_parallel : {true, false}) { - for (auto& allow_batching : {true, false}) { - for (auto& enable_WAL : {true, false}) { - for (auto& enable_pipelined_write : {true, false}) { - for (auto& write_group : write_scenarios) { - Options options; - options.create_if_missing = true; - options.unordered_write = unordered_write; - options.allow_concurrent_memtable_write = allow_parallel; - options.enable_pipelined_write = enable_pipelined_write; - options.two_write_queues = two_queues; - // Skip unsupported combinations - if (options.enable_pipelined_write && seq_per_batch) { - continue; - } - if (options.enable_pipelined_write && options.two_write_queues) { - continue; - } - if (options.unordered_write && - !options.allow_concurrent_memtable_write) { - continue; - } - if (options.unordered_write && options.enable_pipelined_write) { - continue; - } + for (auto& write_group : write_scenarios) { + Options options; + options.create_if_missing = true; + options.unordered_write = unordered_write_; + options.allow_concurrent_memtable_write = allow_parallel_; + options.enable_pipelined_write = enable_pipelined_write_; + options.two_write_queues = two_queues_; + // Skip unsupported combinations + if (options.enable_pipelined_write && seq_per_batch_) { + continue; + } + if (options.enable_pipelined_write && options.two_write_queues) { + continue; + } + if (options.unordered_write && !options.allow_concurrent_memtable_write) { + continue; + } + if (options.unordered_write && options.enable_pipelined_write) { + continue; + } - ReadOptions read_options; - DB* db; - DBImpl* db_impl; + ReadOptions read_options; + DB* db; + DBImpl* db_impl; - DestroyDB(dbname, options); + DestroyDB(dbname, options); - DBOptions db_options(options); - ColumnFamilyOptions cf_options(options); - std::vector column_families; - column_families.push_back( - ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); - std::vector handles; - auto open_s = - DBImpl::Open(db_options, dbname, column_families, &handles, - &db, seq_per_batch, true /* batch_per_txn */); - ASSERT_OK(open_s); - assert(handles.size() == 1); - delete handles[0]; + DBOptions db_options(options); + ColumnFamilyOptions cf_options(options); + std::vector column_families; + column_families.push_back( + ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); + std::vector handles; + auto open_s = DBImpl::Open(db_options, dbname, column_families, &handles, + &db, seq_per_batch_, true /* batch_per_txn */); + ASSERT_OK(open_s); + assert(handles.size() == 1); + delete handles[0]; - db_impl = dynamic_cast(db); - ASSERT_TRUE(db_impl); + db_impl = dynamic_cast(db); + ASSERT_TRUE(db_impl); - // Writers that have called JoinBatchGroup. - std::atomic threads_joining(0); - // Writers that have linked to the queue - std::atomic threads_linked(0); - // Writers that pass WriteThread::JoinBatchGroup:Wait sync-point. - std::atomic threads_verified(0); + // Writers that have called JoinBatchGroup. + std::atomic threads_joining(0); + // Writers that have linked to the queue + std::atomic threads_linked(0); + // Writers that pass WriteThread::JoinBatchGroup:Wait sync-point. + std::atomic threads_verified(0); - std::atomic seq(db_impl->GetLatestSequenceNumber()); - ASSERT_EQ(db_impl->GetLatestSequenceNumber(), 0); + std::atomic seq(db_impl->GetLatestSequenceNumber()); + ASSERT_EQ(db_impl->GetLatestSequenceNumber(), 0); - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "WriteThread::JoinBatchGroup:Start", [&](void*) { - uint64_t cur_threads_joining = threads_joining.fetch_add(1); - // Wait for the last joined writer to link to the queue. - // In this way the writers link to the queue one by one. - // This allows us to confidently detect the first writer - // who increases threads_linked as the leader. - while (threads_linked.load() < cur_threads_joining) { - } - }); - - // Verification once writers call JoinBatchGroup. - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "WriteThread::JoinBatchGroup:Wait", [&](void* arg) { - uint64_t cur_threads_linked = threads_linked.fetch_add(1); - bool is_leader = false; - bool is_last = false; - - // who am i - is_leader = (cur_threads_linked == 0); - is_last = (cur_threads_linked == write_group.size() - 1); - - // check my state - auto* writer = reinterpret_cast(arg); - - if (is_leader) { - ASSERT_TRUE(writer->state == - WriteThread::State::STATE_GROUP_LEADER); - } else { - ASSERT_TRUE(writer->state == - WriteThread::State::STATE_INIT); - } - - // (meta test) the first WriteOP should indeed be the first - // and the last should be the last (all others can be out of - // order) - if (is_leader) { - ASSERT_TRUE(writer->callback->Callback(nullptr).ok() == - !write_group.front().callback_.should_fail_); - } else if (is_last) { - ASSERT_TRUE(writer->callback->Callback(nullptr).ok() == - !write_group.back().callback_.should_fail_); - } - - threads_verified.fetch_add(1); - // Wait here until all verification in this sync-point - // callback finish for all writers. - while (threads_verified.load() < write_group.size()) { - } - }); - - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "WriteThread::JoinBatchGroup:DoneWaiting", [&](void* arg) { - // check my state - auto* writer = reinterpret_cast(arg); - - if (!allow_batching) { - // no batching so everyone should be a leader - ASSERT_TRUE(writer->state == - WriteThread::State::STATE_GROUP_LEADER); - } else if (!allow_parallel) { - ASSERT_TRUE(writer->state == - WriteThread::State::STATE_COMPLETED || - (enable_pipelined_write && - writer->state == - WriteThread::State:: - STATE_MEMTABLE_WRITER_LEADER)); - } - }); - - std::atomic thread_num(0); - std::atomic dummy_key(0); - - // Each write thread create a random write batch and write to DB - // with a write callback. - std::function write_with_callback_func = [&]() { - uint32_t i = thread_num.fetch_add(1); - Random rnd(i); - - // leaders gotta lead - while (i > 0 && threads_verified.load() < 1) { - } - - // loser has to lose - while (i == write_group.size() - 1 && - threads_verified.load() < write_group.size() - 1) { - } - - auto& write_op = write_group.at(i); - write_op.Clear(); - write_op.callback_.allow_batching_ = allow_batching; - - // insert some keys - for (uint32_t j = 0; j < rnd.Next() % 50; j++) { - // grab unique key - char my_key = dummy_key.fetch_add(1); - - string skey(5, my_key); - string sval(10, my_key); - write_op.Put(skey, sval); - - if (!write_op.callback_.should_fail_ && !seq_per_batch) { - seq.fetch_add(1); - } - } - if (!write_op.callback_.should_fail_ && seq_per_batch) { - seq.fetch_add(1); - } - - WriteOptions woptions; - woptions.disableWAL = !enable_WAL; - woptions.sync = enable_WAL; - Status s; - if (seq_per_batch) { - class PublishSeqCallback : public PreReleaseCallback { - public: - PublishSeqCallback(DBImpl* db_impl_in) - : db_impl_(db_impl_in) {} - Status Callback(SequenceNumber last_seq, bool /*not used*/, - uint64_t, size_t /*index*/, - size_t /*total*/) override { - db_impl_->SetLastPublishedSequence(last_seq); - return Status::OK(); - } - DBImpl* db_impl_; - } publish_seq_callback(db_impl); - // seq_per_batch requires a natural batch separator or Noop - WriteBatchInternal::InsertNoop(&write_op.write_batch_); - const size_t ONE_BATCH = 1; - s = db_impl->WriteImpl( - woptions, &write_op.write_batch_, &write_op.callback_, - nullptr, 0, false, nullptr, ONE_BATCH, - two_queues ? &publish_seq_callback : nullptr); - } else { - s = db_impl->WriteWithCallback( - woptions, &write_op.write_batch_, &write_op.callback_); - } - - if (write_op.callback_.should_fail_) { - ASSERT_TRUE(s.IsBusy()); - } else { - ASSERT_OK(s); - } - }; - - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); - - // do all the writes - std::vector threads; - for (uint32_t i = 0; i < write_group.size(); i++) { - threads.emplace_back(write_with_callback_func); - } - for (auto& t : threads) { - t.join(); - } - - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); - - // check for keys - string value; - for (auto& w : write_group) { - ASSERT_TRUE(w.callback_.was_called_.load()); - for (auto& kvp : w.kvs_) { - if (w.callback_.should_fail_) { - ASSERT_TRUE( - db->Get(read_options, kvp.first, &value).IsNotFound()); - } else { - ASSERT_OK(db->Get(read_options, kvp.first, &value)); - ASSERT_EQ(value, kvp.second); - } - } - } - - ASSERT_EQ(seq.load(), db_impl->TEST_GetLastVisibleSequence()); - - delete db; - DestroyDB(dbname, options); - } + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::JoinBatchGroup:Start", [&](void*) { + uint64_t cur_threads_joining = threads_joining.fetch_add(1); + // Wait for the last joined writer to link to the queue. + // In this way the writers link to the queue one by one. + // This allows us to confidently detect the first writer + // who increases threads_linked as the leader. + while (threads_linked.load() < cur_threads_joining) { } + }); + + // Verification once writers call JoinBatchGroup. + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::JoinBatchGroup:Wait", [&](void* arg) { + uint64_t cur_threads_linked = threads_linked.fetch_add(1); + bool is_leader = false; + bool is_last = false; + + // who am i + is_leader = (cur_threads_linked == 0); + is_last = (cur_threads_linked == write_group.size() - 1); + + // check my state + auto* writer = reinterpret_cast(arg); + + if (is_leader) { + ASSERT_TRUE(writer->state == + WriteThread::State::STATE_GROUP_LEADER); + } else { + ASSERT_TRUE(writer->state == WriteThread::State::STATE_INIT); + } + + // (meta test) the first WriteOP should indeed be the first + // and the last should be the last (all others can be out of + // order) + if (is_leader) { + ASSERT_TRUE(writer->callback->Callback(nullptr).ok() == + !write_group.front().callback_.should_fail_); + } else if (is_last) { + ASSERT_TRUE(writer->callback->Callback(nullptr).ok() == + !write_group.back().callback_.should_fail_); + } + + threads_verified.fetch_add(1); + // Wait here until all verification in this sync-point + // callback finish for all writers. + while (threads_verified.load() < write_group.size()) { + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::JoinBatchGroup:DoneWaiting", [&](void* arg) { + // check my state + auto* writer = reinterpret_cast(arg); + + if (!allow_batching_) { + // no batching so everyone should be a leader + ASSERT_TRUE(writer->state == + WriteThread::State::STATE_GROUP_LEADER); + } else if (!allow_parallel_) { + ASSERT_TRUE(writer->state == WriteThread::State::STATE_COMPLETED || + (enable_pipelined_write_ && + writer->state == + WriteThread::State::STATE_MEMTABLE_WRITER_LEADER)); + } + }); + + std::atomic thread_num(0); + std::atomic dummy_key(0); + + // Each write thread create a random write batch and write to DB + // with a write callback. + std::function write_with_callback_func = [&]() { + uint32_t i = thread_num.fetch_add(1); + Random rnd(i); + + // leaders gotta lead + while (i > 0 && threads_verified.load() < 1) { + } + + // loser has to lose + while (i == write_group.size() - 1 && + threads_verified.load() < write_group.size() - 1) { + } + + auto& write_op = write_group.at(i); + write_op.Clear(); + write_op.callback_.allow_batching_ = allow_batching_; + + // insert some keys + for (uint32_t j = 0; j < rnd.Next() % 50; j++) { + // grab unique key + char my_key = dummy_key.fetch_add(1); + + string skey(5, my_key); + string sval(10, my_key); + write_op.Put(skey, sval); + + if (!write_op.callback_.should_fail_ && !seq_per_batch_) { + seq.fetch_add(1); + } + } + if (!write_op.callback_.should_fail_ && seq_per_batch_) { + seq.fetch_add(1); + } + + WriteOptions woptions; + woptions.disableWAL = !enable_WAL_; + woptions.sync = enable_WAL_; + Status s; + if (seq_per_batch_) { + class PublishSeqCallback : public PreReleaseCallback { + public: + PublishSeqCallback(DBImpl* db_impl_in) : db_impl_(db_impl_in) {} + Status Callback(SequenceNumber last_seq, bool /*not used*/, uint64_t, + size_t /*index*/, size_t /*total*/) override { + db_impl_->SetLastPublishedSequence(last_seq); + return Status::OK(); + } + DBImpl* db_impl_; + } publish_seq_callback(db_impl); + // seq_per_batch_ requires a natural batch separator or Noop + WriteBatchInternal::InsertNoop(&write_op.write_batch_); + const size_t ONE_BATCH = 1; + s = db_impl->WriteImpl(woptions, &write_op.write_batch_, + &write_op.callback_, nullptr, 0, false, nullptr, + ONE_BATCH, + two_queues_ ? &publish_seq_callback : nullptr); + } else { + s = db_impl->WriteWithCallback(woptions, &write_op.write_batch_, + &write_op.callback_); + } + + if (write_op.callback_.should_fail_) { + ASSERT_TRUE(s.IsBusy()); + } else { + ASSERT_OK(s); + } + }; + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // do all the writes + std::vector threads; + for (uint32_t i = 0; i < write_group.size(); i++) { + threads.emplace_back(write_with_callback_func); + } + for (auto& t : threads) { + t.join(); + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + + // check for keys + string value; + for (auto& w : write_group) { + ASSERT_TRUE(w.callback_.was_called_.load()); + for (auto& kvp : w.kvs_) { + if (w.callback_.should_fail_) { + ASSERT_TRUE(db->Get(read_options, kvp.first, &value).IsNotFound()); + } else { + ASSERT_OK(db->Get(read_options, kvp.first, &value)); + ASSERT_EQ(value, kvp.second); } } } - } - } + + ASSERT_EQ(seq.load(), db_impl->TEST_GetLastVisibleSequence()); + + delete db; + DestroyDB(dbname, options); } } +INSTANTIATE_TEST_CASE_P(WriteCallbackPTest, WriteCallbackPTest, + ::testing::Combine(::testing::Bool(), ::testing::Bool(), + ::testing::Bool(), ::testing::Bool(), + ::testing::Bool(), ::testing::Bool(), + ::testing::Bool())); + TEST_F(WriteCallbackTest, WriteCallBackTest) { Options options; WriteOptions write_options;