From 6d75319d95d10a6f27dbef85d515ca04b349bde2 Mon Sep 17 00:00:00 2001 From: Zhichao Cao Date: Fri, 10 Aug 2018 17:56:11 -0700 Subject: [PATCH] Add tracing function of Seek() and SeekForPrev() to trace_replay (#4228) Summary: In the current trace_and replay, Get an WriteBatch are traced. This pull request track down the Seek() and SeekForPrev() to the trace file. are write to the file. Replay of Iterator is not supported in the current implementation. Tested with trace_analyzer. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4228 Differential Revision: D9201381 Pulled By: zhichao-cao fbshipit-source-id: 6f9cc9cb3c20260af741bee065ec35c5c96354ab --- db/db_impl.cc | 34 +++++++++++++++++++++++----- db/db_impl.h | 3 ++- db/db_iter.cc | 47 ++++++++++++++++++++++++++++----------- db/db_iter.h | 20 ++++++++--------- db/db_iterator_test.cc | 28 +++++++++++++++++++++++ db/db_test2.cc | 6 +++++ util/trace_replay.cc | 50 ++++++++++++++++++++++++++++++++++++++++++ util/trace_replay.h | 5 +++++ 8 files changed, 163 insertions(+), 30 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 65e0c4597..3fa583869 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1618,8 +1618,8 @@ if (read_options.tailing) { result = NewDBIterator( env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, cfd->user_comparator(), iter, kMaxSequenceNumber, - sv->mutable_cf_options.max_sequential_skip_in_iterations, - read_callback); + sv->mutable_cf_options.max_sequential_skip_in_iterations, read_callback, + this, cfd); #endif } else { // Note: no need to consider the special case of @@ -1686,9 +1686,8 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options, ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, snapshot, sv->mutable_cf_options.max_sequential_skip_in_iterations, - sv->version_number, read_callback, - ((read_options.snapshot != nullptr) ? nullptr : this), cfd, allow_blob, - allow_refresh); + sv->version_number, read_callback, this, cfd, allow_blob, + ((read_options.snapshot != nullptr) ? false : allow_refresh)); InternalIterator* internal_iter = NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), @@ -1725,7 +1724,7 @@ Status DBImpl::NewIterators( env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, cfd->user_comparator(), iter, kMaxSequenceNumber, sv->mutable_cf_options.max_sequential_skip_in_iterations, - read_callback)); + read_callback, this, cfd)); } #endif } else { @@ -3129,5 +3128,28 @@ Status DBImpl::EndTrace() { return s; } +Status DBImpl::TraceIteratorSeek(const uint32_t& cf_id, const Slice& key) { + Status s; + if (tracer_) { + InstrumentedMutexLock lock(&trace_mutex_); + if (tracer_) { + s = tracer_->IteratorSeek(cf_id, key); + } + } + return s; +} + +Status DBImpl::TraceIteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) { + Status s; + if (tracer_) { + InstrumentedMutexLock lock(&trace_mutex_); + if (tracer_) { + s = tracer_->IteratorSeekForPrev(cf_id, key); + } + } + return s; +} + #endif // ROCKSDB_LITE + } // namespace rocksdb diff --git a/db/db_impl.h b/db/db_impl.h index a5b33f119..36428f33e 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -342,7 +342,8 @@ class DBImpl : public DB { using DB::EndTrace; virtual Status EndTrace() override; - + Status TraceIteratorSeek(const uint32_t& cf_id, const Slice& key); + Status TraceIteratorSeekForPrev(const uint32_t& cf_id, const Slice& key); #endif // ROCKSDB_LITE // Similar to GetSnapshot(), but also lets the db know that this snapshot diff --git a/db/db_iter.cc b/db/db_iter.cc index 2c6c96580..701d0d8fc 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -27,6 +27,7 @@ #include "util/logging.h" #include "util/mutexlock.h" #include "util/string_util.h" +#include "util/trace_replay.h" namespace rocksdb { @@ -114,7 +115,8 @@ class DBIter final: public Iterator { const MutableCFOptions& mutable_cf_options, const Comparator* cmp, InternalIterator* iter, SequenceNumber s, bool arena_mode, uint64_t max_sequential_skip_in_iterations, - ReadCallback* read_callback, bool allow_blob) + ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd, + bool allow_blob) : arena_mode_(arena_mode), env_(_env), logger_(cf_options.info_log), @@ -135,6 +137,8 @@ class DBIter final: public Iterator { range_del_agg_(cf_options.internal_comparator, s, true /* collapse_deletions */), read_callback_(read_callback), + db_impl_(db_impl), + cfd_(cfd), allow_blob_(allow_blob), is_blob_(false), start_seqnum_(read_options.iter_start_seqnum) { @@ -344,6 +348,8 @@ class DBIter final: public Iterator { LocalStatistics local_stats_; PinnedIteratorsManager pinned_iters_mgr_; ReadCallback* read_callback_; + DBImpl* db_impl_; + ColumnFamilyData* cfd_; bool allow_blob_; bool is_blob_; // for diff snapshots we want the lower bound on the seqnum; @@ -1267,6 +1273,12 @@ void DBIter::Seek(const Slice& target) { saved_key_.Clear(); saved_key_.SetInternalKey(target, seq); +#ifndef ROCKSDB_LITE + if (db_impl_ != nullptr && cfd_ != nullptr) { + db_impl_->TraceIteratorSeek(cfd_->GetID(), target); + } +#endif // ROCKSDB_LITE + if (iterate_lower_bound_ != nullptr && user_comparator_->Compare(saved_key_.GetUserKey(), *iterate_lower_bound_) < 0) { @@ -1331,6 +1343,12 @@ void DBIter::SeekForPrev(const Slice& target) { range_del_agg_.InvalidateRangeDelMapPositions(); } +#ifndef ROCKSDB_LITE + if (db_impl_ != nullptr && cfd_ != nullptr) { + db_impl_->TraceIteratorSeekForPrev(cfd_->GetID(), target); + } +#endif // ROCKSDB_LITE + RecordTick(statistics_, NUMBER_DB_SEEK); if (iter_->Valid()) { if (prefix_extractor_ && prefix_same_as_start_) { @@ -1453,11 +1471,12 @@ Iterator* NewDBIterator(Env* env, const ReadOptions& read_options, InternalIterator* internal_iter, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, - ReadCallback* read_callback, bool allow_blob) { - DBIter* db_iter = - new DBIter(env, read_options, cf_options, mutable_cf_options, - user_key_comparator, internal_iter, sequence, false, - max_sequential_skip_in_iterations, read_callback, allow_blob); + ReadCallback* read_callback, DBImpl* db_impl, + ColumnFamilyData* cfd, bool allow_blob) { + DBIter* db_iter = new DBIter( + env, read_options, cf_options, mutable_cf_options, user_key_comparator, + internal_iter, sequence, false, max_sequential_skip_in_iterations, + read_callback, db_impl, cfd, allow_blob); return db_iter; } @@ -1504,13 +1523,14 @@ void ArenaWrappedDBIter::Init(Env* env, const ReadOptions& read_options, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iteration, uint64_t version_number, - ReadCallback* read_callback, bool allow_blob, + ReadCallback* read_callback, DBImpl* db_impl, + ColumnFamilyData* cfd, bool allow_blob, bool allow_refresh) { auto mem = arena_.AllocateAligned(sizeof(DBIter)); - db_iter_ = new (mem) - DBIter(env, read_options, cf_options, mutable_cf_options, - cf_options.user_comparator, nullptr, sequence, true, - max_sequential_skip_in_iteration, read_callback, allow_blob); + db_iter_ = new (mem) DBIter(env, read_options, cf_options, mutable_cf_options, + cf_options.user_comparator, nullptr, sequence, + true, max_sequential_skip_in_iteration, + read_callback, db_impl, cfd, allow_blob); sv_number_ = version_number; allow_refresh_ = allow_refresh; } @@ -1534,7 +1554,8 @@ Status ArenaWrappedDBIter::Refresh() { SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex()); Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options, latest_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations, - cur_sv_number, read_callback_, allow_blob_, allow_refresh_); + cur_sv_number, read_callback_, db_impl_, cfd_, allow_blob_, + allow_refresh_); InternalIterator* internal_iter = db_impl_->NewInternalIterator( read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator()); @@ -1556,7 +1577,7 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator( ArenaWrappedDBIter* iter = new ArenaWrappedDBIter(); iter->Init(env, read_options, cf_options, mutable_cf_options, sequence, max_sequential_skip_in_iterations, version_number, read_callback, - allow_blob, allow_refresh); + db_impl, cfd, allow_blob, allow_refresh); if (db_impl != nullptr && cfd != nullptr && allow_refresh) { iter->StoreRefreshInfo(read_options, db_impl, cfd, read_callback, allow_blob); diff --git a/db/db_iter.h b/db/db_iter.h index 56b6a42db..8e18f03fc 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -27,15 +27,14 @@ class DBIter; // Return a new iterator that converts internal keys (yielded by // "*internal_iter") that were live at the specified "sequence" number // into appropriate user keys. -extern Iterator* NewDBIterator(Env* env, const ReadOptions& read_options, - const ImmutableCFOptions& cf_options, - const MutableCFOptions& mutable_cf_options, - const Comparator* user_key_comparator, - InternalIterator* internal_iter, - const SequenceNumber& sequence, - uint64_t max_sequential_skip_in_iterations, - ReadCallback* read_callback, - bool allow_blob = false); +extern Iterator* NewDBIterator( + Env* env, const ReadOptions& read_options, + const ImmutableCFOptions& cf_options, + const MutableCFOptions& mutable_cf_options, + const Comparator* user_key_comparator, InternalIterator* internal_iter, + const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, + ReadCallback* read_callback, DBImpl* db_impl = nullptr, + ColumnFamilyData* cfd = nullptr, bool allow_blob = false); // A wrapper iterator which wraps DB Iterator and the arena, with which the DB // iterator is supposed be allocated. This class is used as an entry point of @@ -74,7 +73,8 @@ class ArenaWrappedDBIter : public Iterator { const MutableCFOptions& mutable_cf_options, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, uint64_t version_number, - ReadCallback* read_callback, bool allow_blob, bool allow_refresh); + ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd, + bool allow_blob, bool allow_refresh); void StoreRefreshInfo(const ReadOptions& read_options, DBImpl* db_impl, ColumnFamilyData* cfd, ReadCallback* read_callback, diff --git a/db/db_iterator_test.cc b/db/db_iterator_test.cc index 9df7bfbbc..eb7b52c0c 100644 --- a/db/db_iterator_test.cc +++ b/db/db_iterator_test.cc @@ -2093,6 +2093,34 @@ TEST_P(DBIteratorTest, Refresh) { iter.reset(); } +TEST_P(DBIteratorTest, RefreshWithSnapshot) { + ASSERT_OK(Put("x", "y")); + const Snapshot* snapshot = db_->GetSnapshot(); + ReadOptions options; + options.snapshot = snapshot; + Iterator* iter = NewIterator(options); + + iter->Seek(Slice("a")); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(Slice("x")), 0); + iter->Next(); + ASSERT_FALSE(iter->Valid()); + + ASSERT_OK(Put("c", "d")); + + iter->Seek(Slice("a")); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(Slice("x")), 0); + iter->Next(); + ASSERT_FALSE(iter->Valid()); + + Status s; + s = iter->Refresh(); + ASSERT_TRUE(s.IsNotSupported()); + db_->ReleaseSnapshot(snapshot); + delete iter; +} + TEST_P(DBIteratorTest, CreationFailure) { SyncPoint::GetInstance()->SetCallBack( "DBImpl::NewInternalIterator:StatusCallback", [](void* arg) { diff --git a/db/db_test2.cc b/db/db_test2.cc index 784386ac8..519bf3371 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -2509,6 +2509,7 @@ TEST_F(DBTest2, TraceAndReplay) { EnvOptions env_opts; CreateAndReopenWithCF({"pikachu"}, options); Random rnd(301); + Iterator* single_iter = nullptr; std::string trace_filename = dbname_ + "/rocksdb.trace"; std::unique_ptr trace_writer; @@ -2529,6 +2530,11 @@ TEST_F(DBTest2, TraceAndReplay) { ASSERT_OK(batch.DeleteRange("j", "k")); ASSERT_OK(db_->Write(wo, &batch)); + single_iter = db_->NewIterator(ro); + single_iter->Seek("f"); + single_iter->SeekForPrev("g"); + delete single_iter; + ASSERT_EQ("1", Get(0, "a")); ASSERT_EQ("12", Get(0, "g")); diff --git a/util/trace_replay.cc b/util/trace_replay.cc index ca02dccbc..cd2e3ee95 100644 --- a/util/trace_replay.cc +++ b/util/trace_replay.cc @@ -52,6 +52,22 @@ Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) { return WriteTrace(trace); } +Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key) { + Trace trace; + trace.ts = env_->NowMicros(); + trace.type = kTraceIteratorSeek; + EncodeCFAndKey(&trace.payload, cf_id, key); + return WriteTrace(trace); +} + +Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) { + Trace trace; + trace.ts = env_->NowMicros(); + trace.type = kTraceIteratorSeekForPrev; + EncodeCFAndKey(&trace.payload, cf_id, key); + return WriteTrace(trace); +} + Status Tracer::WriteHeader() { std::ostringstream s; s << kTraceMagic << "\t" @@ -112,6 +128,7 @@ Status Replayer::Replay() { ReadOptions roptions; Trace trace; uint64_t ops = 0; + Iterator* single_iter = nullptr; while (s.ok()) { trace.reset(); s = ReadTrace(&trace); @@ -140,6 +157,39 @@ Status Replayer::Replay() { db_->Get(roptions, cf_map_[cf_id], key, &value); } ops++; + } else if (trace.type == kTraceIteratorSeek) { + uint32_t cf_id = 0; + Slice key; + DecodeCFAndKey(trace.payload, &cf_id, &key); + if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) { + return Status::Corruption("Invalid Column Family ID."); + } + + if (cf_id == 0) { + single_iter = db_->NewIterator(roptions); + } else { + single_iter = db_->NewIterator(roptions, cf_map_[cf_id]); + } + single_iter->Seek(key); + ops++; + delete single_iter; + } else if (trace.type == kTraceIteratorSeekForPrev) { + // Currently, only support to call the Seek() + uint32_t cf_id = 0; + Slice key; + DecodeCFAndKey(trace.payload, &cf_id, &key); + if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) { + return Status::Corruption("Invalid Column Family ID."); + } + + if (cf_id == 0) { + single_iter = db_->NewIterator(roptions); + } else { + single_iter = db_->NewIterator(roptions, cf_map_[cf_id]); + } + single_iter->SeekForPrev(key); + ops++; + delete single_iter; } else if (trace.type == kTraceEnd) { // Do nothing for now. // TODO: Add some validations later. diff --git a/util/trace_replay.h b/util/trace_replay.h index 84a164014..b324696f0 100644 --- a/util/trace_replay.h +++ b/util/trace_replay.h @@ -15,6 +15,7 @@ namespace rocksdb { class ColumnFamilyHandle; +class ColumnFamilyData; class DB; class DBImpl; class Slice; @@ -32,6 +33,8 @@ enum TraceType : char { kTraceEnd = 2, kTraceWrite = 3, kTraceGet = 4, + kTraceIteratorSeek = 5, + kTraceIteratorSeekForPrev = 6, kTraceMax, }; @@ -57,6 +60,8 @@ class Tracer { Status Write(WriteBatch* write_batch); Status Get(ColumnFamilyHandle* cfname, const Slice& key); + Status IteratorSeek(const uint32_t& cf_id, const Slice& key); + Status IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key); Status Close();