rocksdb/db/arena_wrapped_db_iter.cc

107 lines
4.5 KiB
C++
Raw Normal View History

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/arena_wrapped_db_iter.h"
#include "memory/arena.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "table/internal_iterator.h"
#include "table/iterator_wrapper.h"
#include "util/user_comparator_wrapper.h"
namespace ROCKSDB_NAMESPACE {
Status ArenaWrappedDBIter::GetProperty(std::string prop_name,
std::string* prop) {
if (prop_name == "rocksdb.iterator.super-version-number") {
// First try to pass the value returned from inner iterator.
if (!db_iter_->GetProperty(prop_name, prop).ok()) {
*prop = ToString(sv_number_);
}
return Status::OK();
}
return db_iter_->GetProperty(prop_name, prop);
}
void ArenaWrappedDBIter::Init(
Env* env, const ReadOptions& read_options, const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iteration,
uint64_t version_number, ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool expose_blob_index, bool allow_refresh) {
auto mem = arena_.AllocateAligned(sizeof(DBIter));
db_iter_ =
new (mem) DBIter(env, read_options, ioptions, mutable_cf_options,
ioptions.user_comparator, /* iter */ nullptr, version,
sequence, true, max_sequential_skip_in_iteration,
read_callback, db_impl, cfd, expose_blob_index);
sv_number_ = version_number;
read_options_ = read_options;
allow_refresh_ = allow_refresh;
}
Status ArenaWrappedDBIter::Refresh() {
if (cfd_ == nullptr || db_impl_ == nullptr || !allow_refresh_) {
return Status::NotSupported("Creating renew iterator is not allowed.");
}
assert(db_iter_ != nullptr);
// TODO(yiwu): For last_seq_same_as_publish_seq_==false, this is not the
// correct behavior. Will be corrected automatically when we take a snapshot
// here for the case of WritePreparedTxnDB.
uint64_t cur_sv_number = cfd_->GetSuperVersionNumber();
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:1");
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:2");
if (sv_number_ != cur_sv_number) {
Env* env = db_iter_->env();
db_iter_->~DBIter();
arena_.~Arena();
new (&arena_) Arena();
SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_);
SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber();
if (read_callback_) {
read_callback_->Refresh(latest_seq);
}
Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options,
sv->current, latest_seq,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
cur_sv_number, read_callback_, db_impl_, cfd_, expose_blob_index_,
allow_refresh_);
InternalIterator* internal_iter = db_impl_->NewInternalIterator(
read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator(),
Properly report IO errors when IndexType::kBinarySearchWithFirstKey is used (#6621) Summary: Context: Index type `kBinarySearchWithFirstKey` added the ability for sst file iterator to sometimes report a key from index without reading the corresponding data block. This is useful when sst blocks are cut at some meaningful boundaries (e.g. one block per key prefix), and many seeks land between blocks (e.g. for each prefix, the ranges of keys in different sst files are nearly disjoint, so a typical seek needs to read a data block from only one file even if all files have the prefix). But this added a new error condition, which rocksdb code was really not equipped to deal with: `InternalIterator::value()` may fail with an IO error or Status::Incomplete, but it's just a method returning a Slice, with no way to report error instead. Before this PR, this type of error wasn't handled at all (an empty slice was returned), and kBinarySearchWithFirstKey implementation was considered a prototype. Now that we (LogDevice) have experimented with kBinarySearchWithFirstKey for a while and confirmed that it's really useful, this PR is adding the missing error handling. It's a pretty inconvenient situation implementation-wise. The error needs to be reported from InternalIterator when trying to access value. But there are ~700 call sites of `InternalIterator::value()`, most of which either can't hit the error condition (because the iterator is reading from memtable or from index or something) or wouldn't benefit from the deferred loading of the value (e.g. compaction iterator that reads all values anyway). Adding error handling to all these call sites would needlessly bloat the code. So instead I made the deferred value loading optional: only the call sites that may use deferred loading have to call the new method `PrepareValue()` before calling `value()`. The feature is enabled with a new bool argument `allow_unprepared_value` to a bunch of methods that create iterators (it wouldn't make sense to put it in ReadOptions because it's completely internal to iterators, with virtually no user-visible effect). Lmk if you have better ideas. Note that the deferred value loading only happens for *internal* iterators. The user-visible iterator (DBIter) always prepares the value before returning from Seek/Next/etc. We could go further and add an API to defer that value loading too, but that's most likely not useful for LogDevice, so it doesn't seem worth the complexity for now. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6621 Test Plan: make -j5 check . Will also deploy to some logdevice test clusters and look at stats. Reviewed By: siying Differential Revision: D20786930 Pulled By: al13n321 fbshipit-source-id: 6da77d918bad3780522e918f17f4d5513d3e99ee
2020-04-15 17:37:23 -07:00
latest_seq, /* allow_unprepared_value */ true);
SetIterUnderDBIter(internal_iter);
} else {
db_iter_->set_sequence(db_impl_->GetLatestSequenceNumber());
db_iter_->set_valid(false);
}
return Status::OK();
}
ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ReadOptions& read_options, const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
uint64_t version_number, ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool expose_blob_index, bool allow_refresh) {
ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
iter->Init(env, read_options, ioptions, mutable_cf_options, version, sequence,
max_sequential_skip_in_iterations, version_number, read_callback,
db_impl, cfd, expose_blob_index, allow_refresh);
if (db_impl != nullptr && cfd != nullptr && allow_refresh) {
iter->StoreRefreshInfo(db_impl, cfd, read_callback, expose_blob_index);
}
return iter;
}
} // namespace ROCKSDB_NAMESPACE