rocksdb/db/compacted_db_impl.cc
Huisheng Liu 904a60ff63 return timestamp from get (#6409)
Summary:
Added new Get() methods that return timestamp. Dummy implementation is given so that classes derived from DB don't need to be touched to provide their implementation. MultiGet is not included.

ReadRandom perf test (10 minutes) on the same development machine ram drive with the same DB data shows no regression (within marge of error). The test is adapted from https://github.com/facebook/rocksdb/wiki/RocksDB-In-Memory-Workload-Performance-Benchmarks.
    base line (commit 72ee067b9):
        101.712 micros/op 314602 ops/sec;   36.0 MB/s (5658999 of 5658999 found)
    This PR:
        100.288 micros/op 319071 ops/sec;   36.5 MB/s (5674999 of 5674999 found)

./db_bench --db=r:\rocksdb.github --num_levels=6 --key_size=20 --prefix_size=20 --keys_per_prefix=0 --value_size=100 --cache_size=2147483648 --cache_numshardbits=6 --compression_type=none --compression_ratio=1 --min_level_to_compress=-1 --disable_seek_compaction=1 --hard_rate_limit=2 --write_buffer_size=134217728 --max_write_buffer_number=2 --level0_file_num_compaction_trigger=8 --target_file_size_base=134217728 --max_bytes_for_level_base=1073741824 --disable_wal=0 --wal_dir=r:\rocksdb.github\WAL_LOG --sync=0 --verify_checksum=1 --delete_obsolete_files_period_micros=314572800 --max_background_compactions=4 --max_background_flushes=0 --level0_slowdown_writes_trigger=16 --level0_stop_writes_trigger=24 --statistics=0 --stats_per_interval=0 --stats_interval=1048576 --histogram=0 --use_plain_table=1 --open_files=-1 --mmap_read=1 --mmap_write=0 --memtablerep=prefix_hash --bloom_bits=10 --bloom_locality=1 --duration=600 --benchmarks=readrandom --use_existing_db=1 --num=25000000 --threads=32
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6409

Differential Revision: D20200086

Pulled By: riversand963

fbshipit-source-id: 490edd74d924f62bd8ae9c29c2a6bbbb8410ca50
2020-03-02 16:01:00 -08:00

161 lines
5.7 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "db/compacted_db_impl.h"
#include "db/db_impl/db_impl.h"
#include "db/version_set.h"
#include "table/get_context.h"
namespace ROCKSDB_NAMESPACE {
extern void MarkKeyMayExist(void* arg);
extern bool SaveValue(void* arg, const ParsedInternalKey& parsed_key,
const Slice& v, bool hit_and_return);
CompactedDBImpl::CompactedDBImpl(
const DBOptions& options, const std::string& dbname)
: DBImpl(options, dbname), cfd_(nullptr), version_(nullptr),
user_comparator_(nullptr) {
}
CompactedDBImpl::~CompactedDBImpl() {
}
size_t CompactedDBImpl::FindFile(const Slice& key) {
size_t right = files_.num_files - 1;
auto cmp = [&](const FdWithKeyRange& f, const Slice& k) -> bool {
return user_comparator_->Compare(ExtractUserKey(f.largest_key), k) < 0;
};
return static_cast<size_t>(std::lower_bound(files_.files,
files_.files + right, key, cmp) - files_.files);
}
Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
const Slice& key, PinnableSlice* value) {
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
GetContext::kNotFound, key, value, nullptr, nullptr,
nullptr, true, nullptr, nullptr);
LookupKey lkey(key, kMaxSequenceNumber);
files_.files[FindFile(key)].fd.table_reader->Get(options, lkey.internal_key(),
&get_context, nullptr);
if (get_context.State() == GetContext::kFound) {
return Status::OK();
}
return Status::NotFound();
}
std::vector<Status> CompactedDBImpl::MultiGet(const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>&,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
autovector<TableReader*, 16> reader_list;
for (const auto& key : keys) {
const FdWithKeyRange& f = files_.files[FindFile(key)];
if (user_comparator_->Compare(key, ExtractUserKey(f.smallest_key)) < 0) {
reader_list.push_back(nullptr);
} else {
LookupKey lkey(key, kMaxSequenceNumber);
f.fd.table_reader->Prepare(lkey.internal_key());
reader_list.push_back(f.fd.table_reader);
}
}
std::vector<Status> statuses(keys.size(), Status::NotFound());
values->resize(keys.size());
int idx = 0;
for (auto* r : reader_list) {
if (r != nullptr) {
PinnableSlice pinnable_val;
std::string& value = (*values)[idx];
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
GetContext::kNotFound, keys[idx], &pinnable_val,
nullptr, nullptr, nullptr, true, nullptr, nullptr);
LookupKey lkey(keys[idx], kMaxSequenceNumber);
r->Get(options, lkey.internal_key(), &get_context, nullptr);
value.assign(pinnable_val.data(), pinnable_val.size());
if (get_context.State() == GetContext::kFound) {
statuses[idx] = Status::OK();
}
}
++idx;
}
return statuses;
}
Status CompactedDBImpl::Init(const Options& options) {
SuperVersionContext sv_context(/* create_superversion */ true);
mutex_.Lock();
ColumnFamilyDescriptor cf(kDefaultColumnFamilyName,
ColumnFamilyOptions(options));
Status s = Recover({cf}, true /* read only */, false, true);
if (s.ok()) {
cfd_ = reinterpret_cast<ColumnFamilyHandleImpl*>(
DefaultColumnFamily())->cfd();
cfd_->InstallSuperVersion(&sv_context, &mutex_);
}
mutex_.Unlock();
sv_context.Clean();
if (!s.ok()) {
return s;
}
NewThreadStatusCfInfo(cfd_);
version_ = cfd_->GetSuperVersion()->current;
user_comparator_ = cfd_->user_comparator();
auto* vstorage = version_->storage_info();
if (vstorage->num_non_empty_levels() == 0) {
return Status::NotSupported("no file exists");
}
const LevelFilesBrief& l0 = vstorage->LevelFilesBrief(0);
// L0 should not have files
if (l0.num_files > 1) {
return Status::NotSupported("L0 contain more than 1 file");
}
if (l0.num_files == 1) {
if (vstorage->num_non_empty_levels() > 1) {
return Status::NotSupported("Both L0 and other level contain files");
}
files_ = l0;
return Status::OK();
}
for (int i = 1; i < vstorage->num_non_empty_levels() - 1; ++i) {
if (vstorage->LevelFilesBrief(i).num_files > 0) {
return Status::NotSupported("Other levels also contain files");
}
}
int level = vstorage->num_non_empty_levels() - 1;
if (vstorage->LevelFilesBrief(level).num_files > 0) {
files_ = vstorage->LevelFilesBrief(level);
return Status::OK();
}
return Status::NotSupported("no file exists");
}
Status CompactedDBImpl::Open(const Options& options,
const std::string& dbname, DB** dbptr) {
*dbptr = nullptr;
if (options.max_open_files != -1) {
return Status::InvalidArgument("require max_open_files = -1");
}
if (options.merge_operator.get() != nullptr) {
return Status::InvalidArgument("merge operator is not supported");
}
DBOptions db_options(options);
std::unique_ptr<CompactedDBImpl> db(new CompactedDBImpl(db_options, dbname));
Status s = db->Init(options);
if (s.ok()) {
db->StartTimedTasks();
ROCKS_LOG_INFO(db->immutable_db_options_.info_log,
"Opened the db as fully compacted mode");
LogFlush(db->immutable_db_options_.info_log);
*dbptr = db.release();
}
return s;
}
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE