diff --git a/db/db_impl.cc b/db/db_impl.cc index 65e0c4597..3fa583869 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1618,8 +1618,8 @@ if (read_options.tailing) { result = NewDBIterator( env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, cfd->user_comparator(), iter, kMaxSequenceNumber, - sv->mutable_cf_options.max_sequential_skip_in_iterations, - read_callback); + sv->mutable_cf_options.max_sequential_skip_in_iterations, read_callback, + this, cfd); #endif } else { // Note: no need to consider the special case of @@ -1686,9 +1686,8 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options, ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, snapshot, sv->mutable_cf_options.max_sequential_skip_in_iterations, - sv->version_number, read_callback, - ((read_options.snapshot != nullptr) ? nullptr : this), cfd, allow_blob, - allow_refresh); + sv->version_number, read_callback, this, cfd, allow_blob, + ((read_options.snapshot != nullptr) ? false : allow_refresh)); InternalIterator* internal_iter = NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), @@ -1725,7 +1724,7 @@ Status DBImpl::NewIterators( env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, cfd->user_comparator(), iter, kMaxSequenceNumber, sv->mutable_cf_options.max_sequential_skip_in_iterations, - read_callback)); + read_callback, this, cfd)); } #endif } else { @@ -3129,5 +3128,28 @@ Status DBImpl::EndTrace() { return s; } +Status DBImpl::TraceIteratorSeek(const uint32_t& cf_id, const Slice& key) { + Status s; + if (tracer_) { + InstrumentedMutexLock lock(&trace_mutex_); + if (tracer_) { + s = tracer_->IteratorSeek(cf_id, key); + } + } + return s; +} + +Status DBImpl::TraceIteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) { + Status s; + if (tracer_) { + InstrumentedMutexLock lock(&trace_mutex_); + if (tracer_) { + s = tracer_->IteratorSeekForPrev(cf_id, key); + } + } + return s; +} + #endif // ROCKSDB_LITE + } // namespace rocksdb diff --git a/db/db_impl.h b/db/db_impl.h index a5b33f119..36428f33e 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -342,7 +342,8 @@ class DBImpl : public DB { using DB::EndTrace; virtual Status EndTrace() override; - + Status TraceIteratorSeek(const uint32_t& cf_id, const Slice& key); + Status TraceIteratorSeekForPrev(const uint32_t& cf_id, const Slice& key); #endif // ROCKSDB_LITE // Similar to GetSnapshot(), but also lets the db know that this snapshot diff --git a/db/db_iter.cc b/db/db_iter.cc index 2c6c96580..701d0d8fc 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -27,6 +27,7 @@ #include "util/logging.h" #include "util/mutexlock.h" #include "util/string_util.h" +#include "util/trace_replay.h" namespace rocksdb { @@ -114,7 +115,8 @@ class DBIter final: public Iterator { const MutableCFOptions& mutable_cf_options, const Comparator* cmp, InternalIterator* iter, SequenceNumber s, bool arena_mode, uint64_t max_sequential_skip_in_iterations, - ReadCallback* read_callback, bool allow_blob) + ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd, + bool allow_blob) : arena_mode_(arena_mode), env_(_env), logger_(cf_options.info_log), @@ -135,6 +137,8 @@ class DBIter final: public Iterator { range_del_agg_(cf_options.internal_comparator, s, true /* collapse_deletions */), read_callback_(read_callback), + db_impl_(db_impl), + cfd_(cfd), allow_blob_(allow_blob), is_blob_(false), start_seqnum_(read_options.iter_start_seqnum) { @@ -344,6 +348,8 @@ class DBIter final: public Iterator { LocalStatistics local_stats_; PinnedIteratorsManager pinned_iters_mgr_; ReadCallback* read_callback_; + DBImpl* db_impl_; + ColumnFamilyData* cfd_; bool allow_blob_; bool is_blob_; // for diff snapshots we want the lower bound on the seqnum; @@ -1267,6 +1273,12 @@ void DBIter::Seek(const Slice& target) { saved_key_.Clear(); saved_key_.SetInternalKey(target, seq); +#ifndef ROCKSDB_LITE + if (db_impl_ != nullptr && cfd_ != nullptr) { + db_impl_->TraceIteratorSeek(cfd_->GetID(), target); + } +#endif // ROCKSDB_LITE + if (iterate_lower_bound_ != nullptr && user_comparator_->Compare(saved_key_.GetUserKey(), *iterate_lower_bound_) < 0) { @@ -1331,6 +1343,12 @@ void DBIter::SeekForPrev(const Slice& target) { range_del_agg_.InvalidateRangeDelMapPositions(); } +#ifndef ROCKSDB_LITE + if (db_impl_ != nullptr && cfd_ != nullptr) { + db_impl_->TraceIteratorSeekForPrev(cfd_->GetID(), target); + } +#endif // ROCKSDB_LITE + RecordTick(statistics_, NUMBER_DB_SEEK); if (iter_->Valid()) { if (prefix_extractor_ && prefix_same_as_start_) { @@ -1453,11 +1471,12 @@ Iterator* NewDBIterator(Env* env, const ReadOptions& read_options, InternalIterator* internal_iter, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, - ReadCallback* read_callback, bool allow_blob) { - DBIter* db_iter = - new DBIter(env, read_options, cf_options, mutable_cf_options, - user_key_comparator, internal_iter, sequence, false, - max_sequential_skip_in_iterations, read_callback, allow_blob); + ReadCallback* read_callback, DBImpl* db_impl, + ColumnFamilyData* cfd, bool allow_blob) { + DBIter* db_iter = new DBIter( + env, read_options, cf_options, mutable_cf_options, user_key_comparator, + internal_iter, sequence, false, max_sequential_skip_in_iterations, + read_callback, db_impl, cfd, allow_blob); return db_iter; } @@ -1504,13 +1523,14 @@ void ArenaWrappedDBIter::Init(Env* env, const ReadOptions& read_options, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iteration, uint64_t version_number, - ReadCallback* read_callback, bool allow_blob, + ReadCallback* read_callback, DBImpl* db_impl, + ColumnFamilyData* cfd, bool allow_blob, bool allow_refresh) { auto mem = arena_.AllocateAligned(sizeof(DBIter)); - db_iter_ = new (mem) - DBIter(env, read_options, cf_options, mutable_cf_options, - cf_options.user_comparator, nullptr, sequence, true, - max_sequential_skip_in_iteration, read_callback, allow_blob); + db_iter_ = new (mem) DBIter(env, read_options, cf_options, mutable_cf_options, + cf_options.user_comparator, nullptr, sequence, + true, max_sequential_skip_in_iteration, + read_callback, db_impl, cfd, allow_blob); sv_number_ = version_number; allow_refresh_ = allow_refresh; } @@ -1534,7 +1554,8 @@ Status ArenaWrappedDBIter::Refresh() { SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex()); Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options, latest_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations, - cur_sv_number, read_callback_, allow_blob_, allow_refresh_); + cur_sv_number, read_callback_, db_impl_, cfd_, allow_blob_, + allow_refresh_); InternalIterator* internal_iter = db_impl_->NewInternalIterator( read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator()); @@ -1556,7 +1577,7 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator( ArenaWrappedDBIter* iter = new ArenaWrappedDBIter(); iter->Init(env, read_options, cf_options, mutable_cf_options, sequence, max_sequential_skip_in_iterations, version_number, read_callback, - allow_blob, allow_refresh); + db_impl, cfd, allow_blob, allow_refresh); if (db_impl != nullptr && cfd != nullptr && allow_refresh) { iter->StoreRefreshInfo(read_options, db_impl, cfd, read_callback, allow_blob); diff --git a/db/db_iter.h b/db/db_iter.h index 56b6a42db..8e18f03fc 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -27,15 +27,14 @@ class DBIter; // Return a new iterator that converts internal keys (yielded by // "*internal_iter") that were live at the specified "sequence" number // into appropriate user keys. -extern Iterator* NewDBIterator(Env* env, const ReadOptions& read_options, - const ImmutableCFOptions& cf_options, - const MutableCFOptions& mutable_cf_options, - const Comparator* user_key_comparator, - InternalIterator* internal_iter, - const SequenceNumber& sequence, - uint64_t max_sequential_skip_in_iterations, - ReadCallback* read_callback, - bool allow_blob = false); +extern Iterator* NewDBIterator( + Env* env, const ReadOptions& read_options, + const ImmutableCFOptions& cf_options, + const MutableCFOptions& mutable_cf_options, + const Comparator* user_key_comparator, InternalIterator* internal_iter, + const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, + ReadCallback* read_callback, DBImpl* db_impl = nullptr, + ColumnFamilyData* cfd = nullptr, bool allow_blob = false); // A wrapper iterator which wraps DB Iterator and the arena, with which the DB // iterator is supposed be allocated. This class is used as an entry point of @@ -74,7 +73,8 @@ class ArenaWrappedDBIter : public Iterator { const MutableCFOptions& mutable_cf_options, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, uint64_t version_number, - ReadCallback* read_callback, bool allow_blob, bool allow_refresh); + ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd, + bool allow_blob, bool allow_refresh); void StoreRefreshInfo(const ReadOptions& read_options, DBImpl* db_impl, ColumnFamilyData* cfd, ReadCallback* read_callback, diff --git a/db/db_iterator_test.cc b/db/db_iterator_test.cc index 9df7bfbbc..eb7b52c0c 100644 --- a/db/db_iterator_test.cc +++ b/db/db_iterator_test.cc @@ -2093,6 +2093,34 @@ TEST_P(DBIteratorTest, Refresh) { iter.reset(); } +TEST_P(DBIteratorTest, RefreshWithSnapshot) { + ASSERT_OK(Put("x", "y")); + const Snapshot* snapshot = db_->GetSnapshot(); + ReadOptions options; + options.snapshot = snapshot; + Iterator* iter = NewIterator(options); + + iter->Seek(Slice("a")); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(Slice("x")), 0); + iter->Next(); + ASSERT_FALSE(iter->Valid()); + + ASSERT_OK(Put("c", "d")); + + iter->Seek(Slice("a")); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(Slice("x")), 0); + iter->Next(); + ASSERT_FALSE(iter->Valid()); + + Status s; + s = iter->Refresh(); + ASSERT_TRUE(s.IsNotSupported()); + db_->ReleaseSnapshot(snapshot); + delete iter; +} + TEST_P(DBIteratorTest, CreationFailure) { SyncPoint::GetInstance()->SetCallBack( "DBImpl::NewInternalIterator:StatusCallback", [](void* arg) { diff --git a/db/db_test2.cc b/db/db_test2.cc index 784386ac8..519bf3371 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -2509,6 +2509,7 @@ TEST_F(DBTest2, TraceAndReplay) { EnvOptions env_opts; CreateAndReopenWithCF({"pikachu"}, options); Random rnd(301); + Iterator* single_iter = nullptr; std::string trace_filename = dbname_ + "/rocksdb.trace"; std::unique_ptr trace_writer; @@ -2529,6 +2530,11 @@ TEST_F(DBTest2, TraceAndReplay) { ASSERT_OK(batch.DeleteRange("j", "k")); ASSERT_OK(db_->Write(wo, &batch)); + single_iter = db_->NewIterator(ro); + single_iter->Seek("f"); + single_iter->SeekForPrev("g"); + delete single_iter; + ASSERT_EQ("1", Get(0, "a")); ASSERT_EQ("12", Get(0, "g")); diff --git a/util/trace_replay.cc b/util/trace_replay.cc index ca02dccbc..cd2e3ee95 100644 --- a/util/trace_replay.cc +++ b/util/trace_replay.cc @@ -52,6 +52,22 @@ Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) { return WriteTrace(trace); } +Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key) { + Trace trace; + trace.ts = env_->NowMicros(); + trace.type = kTraceIteratorSeek; + EncodeCFAndKey(&trace.payload, cf_id, key); + return WriteTrace(trace); +} + +Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) { + Trace trace; + trace.ts = env_->NowMicros(); + trace.type = kTraceIteratorSeekForPrev; + EncodeCFAndKey(&trace.payload, cf_id, key); + return WriteTrace(trace); +} + Status Tracer::WriteHeader() { std::ostringstream s; s << kTraceMagic << "\t" @@ -112,6 +128,7 @@ Status Replayer::Replay() { ReadOptions roptions; Trace trace; uint64_t ops = 0; + Iterator* single_iter = nullptr; while (s.ok()) { trace.reset(); s = ReadTrace(&trace); @@ -140,6 +157,39 @@ Status Replayer::Replay() { db_->Get(roptions, cf_map_[cf_id], key, &value); } ops++; + } else if (trace.type == kTraceIteratorSeek) { + uint32_t cf_id = 0; + Slice key; + DecodeCFAndKey(trace.payload, &cf_id, &key); + if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) { + return Status::Corruption("Invalid Column Family ID."); + } + + if (cf_id == 0) { + single_iter = db_->NewIterator(roptions); + } else { + single_iter = db_->NewIterator(roptions, cf_map_[cf_id]); + } + single_iter->Seek(key); + ops++; + delete single_iter; + } else if (trace.type == kTraceIteratorSeekForPrev) { + // Currently, only support to call the Seek() + uint32_t cf_id = 0; + Slice key; + DecodeCFAndKey(trace.payload, &cf_id, &key); + if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) { + return Status::Corruption("Invalid Column Family ID."); + } + + if (cf_id == 0) { + single_iter = db_->NewIterator(roptions); + } else { + single_iter = db_->NewIterator(roptions, cf_map_[cf_id]); + } + single_iter->SeekForPrev(key); + ops++; + delete single_iter; } else if (trace.type == kTraceEnd) { // Do nothing for now. // TODO: Add some validations later. diff --git a/util/trace_replay.h b/util/trace_replay.h index 84a164014..b324696f0 100644 --- a/util/trace_replay.h +++ b/util/trace_replay.h @@ -15,6 +15,7 @@ namespace rocksdb { class ColumnFamilyHandle; +class ColumnFamilyData; class DB; class DBImpl; class Slice; @@ -32,6 +33,8 @@ enum TraceType : char { kTraceEnd = 2, kTraceWrite = 3, kTraceGet = 4, + kTraceIteratorSeek = 5, + kTraceIteratorSeekForPrev = 6, kTraceMax, }; @@ -57,6 +60,8 @@ class Tracer { Status Write(WriteBatch* write_batch); Status Get(ColumnFamilyHandle* cfname, const Slice& key); + Status IteratorSeek(const uint32_t& cf_id, const Slice& key); + Status IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key); Status Close();