904a60ff63
Summary: Added new Get() methods that return timestamp. Dummy implementation is given so that classes derived from DB don't need to be touched to provide their implementation. MultiGet is not included. ReadRandom perf test (10 minutes) on the same development machine ram drive with the same DB data shows no regression (within marge of error). The test is adapted from https://github.com/facebook/rocksdb/wiki/RocksDB-In-Memory-Workload-Performance-Benchmarks. base line (commit 72ee067b9): 101.712 micros/op 314602 ops/sec; 36.0 MB/s (5658999 of 5658999 found) This PR: 100.288 micros/op 319071 ops/sec; 36.5 MB/s (5674999 of 5674999 found) ./db_bench --db=r:\rocksdb.github --num_levels=6 --key_size=20 --prefix_size=20 --keys_per_prefix=0 --value_size=100 --cache_size=2147483648 --cache_numshardbits=6 --compression_type=none --compression_ratio=1 --min_level_to_compress=-1 --disable_seek_compaction=1 --hard_rate_limit=2 --write_buffer_size=134217728 --max_write_buffer_number=2 --level0_file_num_compaction_trigger=8 --target_file_size_base=134217728 --max_bytes_for_level_base=1073741824 --disable_wal=0 --wal_dir=r:\rocksdb.github\WAL_LOG --sync=0 --verify_checksum=1 --delete_obsolete_files_period_micros=314572800 --max_background_compactions=4 --max_background_flushes=0 --level0_slowdown_writes_trigger=16 --level0_stop_writes_trigger=24 --statistics=0 --stats_per_interval=0 --stats_interval=1048576 --histogram=0 --use_plain_table=1 --open_files=-1 --mmap_read=1 --mmap_write=0 --memtablerep=prefix_hash --bloom_bits=10 --bloom_locality=1 --duration=600 --benchmarks=readrandom --use_existing_db=1 --num=25000000 --threads=32 Pull Request resolved: https://github.com/facebook/rocksdb/pull/6409 Differential Revision: D20200086 Pulled By: riversand963 fbshipit-source-id: 490edd74d924f62bd8ae9c29c2a6bbbb8410ca50
224 lines
7.9 KiB
C++
224 lines
7.9 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include "db/db_impl/db_impl_readonly.h"
|
|
#include "db/arena_wrapped_db_iter.h"
|
|
|
|
#include "db/compacted_db_impl.h"
|
|
#include "db/db_impl/db_impl.h"
|
|
#include "db/db_iter.h"
|
|
#include "db/merge_context.h"
|
|
#include "monitoring/perf_context_imp.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
DBImplReadOnly::DBImplReadOnly(const DBOptions& db_options,
|
|
const std::string& dbname)
|
|
: DBImpl(db_options, dbname) {
|
|
ROCKS_LOG_INFO(immutable_db_options_.info_log,
|
|
"Opening the db in read only mode");
|
|
LogFlush(immutable_db_options_.info_log);
|
|
}
|
|
|
|
DBImplReadOnly::~DBImplReadOnly() {}
|
|
|
|
// Implementations of the DB interface
|
|
Status DBImplReadOnly::Get(const ReadOptions& read_options,
|
|
ColumnFamilyHandle* column_family, const Slice& key,
|
|
PinnableSlice* pinnable_val) {
|
|
assert(pinnable_val != nullptr);
|
|
// TODO: stopwatch DB_GET needed?, perf timer needed?
|
|
PERF_TIMER_GUARD(get_snapshot_time);
|
|
Status s;
|
|
SequenceNumber snapshot = versions_->LastSequence();
|
|
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
|
auto cfd = cfh->cfd();
|
|
if (tracer_) {
|
|
InstrumentedMutexLock lock(&trace_mutex_);
|
|
if (tracer_) {
|
|
tracer_->Get(column_family, key);
|
|
}
|
|
}
|
|
SuperVersion* super_version = cfd->GetSuperVersion();
|
|
MergeContext merge_context;
|
|
SequenceNumber max_covering_tombstone_seq = 0;
|
|
LookupKey lkey(key, snapshot);
|
|
PERF_TIMER_STOP(get_snapshot_time);
|
|
if (super_version->mem->Get(lkey, pinnable_val->GetSelf(),
|
|
/*timestamp=*/nullptr, &s, &merge_context,
|
|
&max_covering_tombstone_seq, read_options)) {
|
|
pinnable_val->PinSelf();
|
|
RecordTick(stats_, MEMTABLE_HIT);
|
|
} else {
|
|
PERF_TIMER_GUARD(get_from_output_files_time);
|
|
super_version->current->Get(read_options, lkey, pinnable_val,
|
|
/*timestamp=*/nullptr, &s, &merge_context,
|
|
&max_covering_tombstone_seq);
|
|
RecordTick(stats_, MEMTABLE_MISS);
|
|
}
|
|
RecordTick(stats_, NUMBER_KEYS_READ);
|
|
size_t size = pinnable_val->size();
|
|
RecordTick(stats_, BYTES_READ, size);
|
|
RecordInHistogram(stats_, BYTES_PER_READ, size);
|
|
PERF_COUNTER_ADD(get_read_bytes, size);
|
|
return s;
|
|
}
|
|
|
|
Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
|
|
ColumnFamilyHandle* column_family) {
|
|
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
|
auto cfd = cfh->cfd();
|
|
SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
|
|
SequenceNumber latest_snapshot = versions_->LastSequence();
|
|
SequenceNumber read_seq =
|
|
read_options.snapshot != nullptr
|
|
? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
|
|
->number_
|
|
: latest_snapshot;
|
|
ReadCallback* read_callback = nullptr; // No read callback provided.
|
|
auto db_iter = NewArenaWrappedDbIterator(
|
|
env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
|
|
read_seq,
|
|
super_version->mutable_cf_options.max_sequential_skip_in_iterations,
|
|
super_version->version_number, read_callback);
|
|
auto internal_iter =
|
|
NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(),
|
|
db_iter->GetRangeDelAggregator(), read_seq);
|
|
db_iter->SetIterUnderDBIter(internal_iter);
|
|
return db_iter;
|
|
}
|
|
|
|
Status DBImplReadOnly::NewIterators(
|
|
const ReadOptions& read_options,
|
|
const std::vector<ColumnFamilyHandle*>& column_families,
|
|
std::vector<Iterator*>* iterators) {
|
|
ReadCallback* read_callback = nullptr; // No read callback provided.
|
|
if (iterators == nullptr) {
|
|
return Status::InvalidArgument("iterators not allowed to be nullptr");
|
|
}
|
|
iterators->clear();
|
|
iterators->reserve(column_families.size());
|
|
SequenceNumber latest_snapshot = versions_->LastSequence();
|
|
SequenceNumber read_seq =
|
|
read_options.snapshot != nullptr
|
|
? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
|
|
->number_
|
|
: latest_snapshot;
|
|
|
|
for (auto cfh : column_families) {
|
|
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
|
|
auto* sv = cfd->GetSuperVersion()->Ref();
|
|
auto* db_iter = NewArenaWrappedDbIterator(
|
|
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, read_seq,
|
|
sv->mutable_cf_options.max_sequential_skip_in_iterations,
|
|
sv->version_number, read_callback);
|
|
auto* internal_iter =
|
|
NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
|
|
db_iter->GetRangeDelAggregator(), read_seq);
|
|
db_iter->SetIterUnderDBIter(internal_iter);
|
|
iterators->push_back(db_iter);
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
|
|
DB** dbptr, bool /*error_if_log_file_exist*/) {
|
|
*dbptr = nullptr;
|
|
|
|
// Try to first open DB as fully compacted DB
|
|
Status s;
|
|
s = CompactedDBImpl::Open(options, dbname, dbptr);
|
|
if (s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
DBOptions db_options(options);
|
|
ColumnFamilyOptions cf_options(options);
|
|
std::vector<ColumnFamilyDescriptor> column_families;
|
|
column_families.push_back(
|
|
ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
|
|
std::vector<ColumnFamilyHandle*> handles;
|
|
|
|
s = DB::OpenForReadOnly(db_options, dbname, column_families, &handles, dbptr);
|
|
if (s.ok()) {
|
|
assert(handles.size() == 1);
|
|
// i can delete the handle since DBImpl is always holding a
|
|
// reference to default column family
|
|
delete handles[0];
|
|
}
|
|
return s;
|
|
}
|
|
|
|
Status DB::OpenForReadOnly(
|
|
const DBOptions& db_options, const std::string& dbname,
|
|
const std::vector<ColumnFamilyDescriptor>& column_families,
|
|
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
|
|
bool error_if_log_file_exist) {
|
|
*dbptr = nullptr;
|
|
handles->clear();
|
|
|
|
SuperVersionContext sv_context(/* create_superversion */ true);
|
|
DBImplReadOnly* impl = new DBImplReadOnly(db_options, dbname);
|
|
impl->mutex_.Lock();
|
|
Status s = impl->Recover(column_families, true /* read only */,
|
|
error_if_log_file_exist);
|
|
if (s.ok()) {
|
|
// set column family handles
|
|
for (auto cf : column_families) {
|
|
auto cfd =
|
|
impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
|
|
if (cfd == nullptr) {
|
|
s = Status::InvalidArgument("Column family not found: ", cf.name);
|
|
break;
|
|
}
|
|
handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
|
|
}
|
|
}
|
|
if (s.ok()) {
|
|
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
|
|
sv_context.NewSuperVersion();
|
|
cfd->InstallSuperVersion(&sv_context, &impl->mutex_);
|
|
}
|
|
}
|
|
impl->mutex_.Unlock();
|
|
sv_context.Clean();
|
|
if (s.ok()) {
|
|
*dbptr = impl;
|
|
for (auto* h : *handles) {
|
|
impl->NewThreadStatusCfInfo(
|
|
reinterpret_cast<ColumnFamilyHandleImpl*>(h)->cfd());
|
|
}
|
|
} else {
|
|
for (auto h : *handles) {
|
|
delete h;
|
|
}
|
|
handles->clear();
|
|
delete impl;
|
|
}
|
|
return s;
|
|
}
|
|
|
|
#else // !ROCKSDB_LITE
|
|
|
|
Status DB::OpenForReadOnly(const Options& /*options*/,
|
|
const std::string& /*dbname*/, DB** /*dbptr*/,
|
|
bool /*error_if_log_file_exist*/) {
|
|
return Status::NotSupported("Not supported in ROCKSDB_LITE.");
|
|
}
|
|
|
|
Status DB::OpenForReadOnly(
|
|
const DBOptions& /*db_options*/, const std::string& /*dbname*/,
|
|
const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
|
|
std::vector<ColumnFamilyHandle*>* /*handles*/, DB** /*dbptr*/,
|
|
bool /*error_if_log_file_exist*/) {
|
|
return Status::NotSupported("Not supported in ROCKSDB_LITE.");
|
|
}
|
|
#endif // !ROCKSDB_LITE
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|