rebase master

This commit is contained in:
Chris Riccomini 2014-09-30 14:40:04 -07:00
commit 3a0d498a36
37 changed files with 621 additions and 1181 deletions

2
.gitignore vendored
View File

@ -28,9 +28,11 @@ util/build_version.cc
build_tools/VALGRIND_LOGS/
coverage/COVERAGE_REPORT
.gdbhistory
package/
.phutil_module_cache
tags
java/*.log
java/include/org_rocksdb_*.h
unity.cc
java/crossbuild/.vagrant
.vagrant/

11
AUTHORS Normal file
View File

@ -0,0 +1,11 @@
Facebook Inc.
Facebook Engineering Team
Google Inc.
# Initial version authors:
Jeffrey Dean <jeff@google.com>
Sanjay Ghemawat <sanjay@google.com>
# Partial list of contributors:
Kevin Regan <kevin.d.regan@gmail.com>
Johan Bilien <jobi@litl.com>

View File

@ -15,6 +15,10 @@ There are few options when compiling RocksDB:
* `make all` will compile our static library, and all our tools and unit tests. Our tools
depend on gflags. You will need to have gflags installed to run `make all`.
* if Intel SSE instruction set is supported, set USE_SSE=" -msse -msse4.2 " to make sure
SSE4.2 is used to speed up CRC32 when calculating data checksum.
## Dependencies
* You can link RocksDB with following compression libraries:

View File

@ -122,7 +122,6 @@ TESTS = \
reduce_levels_test \
plain_table_db_test \
prefix_test \
simple_table_db_test \
skiplist_test \
stringappend_test \
ttl_test \
@ -165,6 +164,9 @@ endif
LIBRARY = ${LIBNAME}.a
MEMENVLIBRARY = libmemenv.a
ROCKSDB_MAJOR = 3
ROCKSDB_MINOR = 4
default: all
#-----------------------------------------------
@ -179,8 +181,8 @@ SHARED3 = $(SHARED1)
SHARED = $(SHARED1)
else
# Update db.h if you change these.
SHARED_MAJOR = 3
SHARED_MINOR = 4
SHARED_MAJOR = $(ROCKSDB_MAJOR)
SHARED_MINOR = $(ROCKSDB_MINOR)
SHARED1 = ${LIBNAME}.$(PLATFORM_SHARED_EXT)
SHARED2 = $(SHARED1).$(SHARED_MAJOR)
SHARED3 = $(SHARED1).$(SHARED_MAJOR).$(SHARED_MINOR)
@ -196,7 +198,7 @@ $(SHARED3):
endif # PLATFORM_SHARED_EXT
.PHONY: blackbox_crash_test check clean coverage crash_test ldb_tests \
.PHONY: blackbox_crash_test check clean coverage crash_test ldb_tests package \
release tags valgrind_check whitebox_crash_test format static_lib shared_lib all \
dbg rocksdbjavastatic rocksdbjava install uninstall
@ -278,6 +280,9 @@ tags:
format:
build_tools/format-diff.sh
package:
bash build_tools/make_package.sh $(SHARED_MAJOR).$(SHARED_MINOR)
# ---------------------------------------------------------------------------
# Unit tests and tools
# ---------------------------------------------------------------------------
@ -372,9 +377,6 @@ log_write_bench: util/log_write_bench.o $(LIBOBJECTS) $(TESTHARNESS)
plain_table_db_test: db/plain_table_db_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/plain_table_db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
simple_table_db_test: db/simple_table_db_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/simple_table_db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
table_reader_bench: table/table_reader_bench.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) table/table_reader_bench.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) -pg
@ -639,8 +641,10 @@ ifneq ($(MAKECMDGOALS),clean)
ifneq ($(MAKECMDGOALS),format)
ifneq ($(MAKECMDGOALS),jclean)
ifneq ($(MAKECMDGOALS),jtest)
ifneq ($(MAKECMDGOALS),package)
-include $(DEPFILES)
endif
endif
endif
endif
endif

16
Vagrantfile vendored Normal file
View File

@ -0,0 +1,16 @@
Vagrant.configure("2") do |config|
config.vm.provider "virtualbox" do |v|
v.memory = 4096
v.cpus = 2
end
config.vm.define "ubuntu14" do |box|
box.vm.box = "ubuntu/trusty64"
end
config.vm.define "centos65" do |box|
box.vm.box = "chef/centos-6.5"
end
end

116
build_tools/make_package.sh Executable file
View File

@ -0,0 +1,116 @@
#/usr/bin/env bash
set -e
function log() {
echo "[+] $1"
}
function fatal() {
echo "[!] $1"
exit 1
}
function platform() {
local __resultvar=$1
if [[ -f "/etc/yum.conf" ]]; then
eval $__resultvar="centos"
elif [[ -f "/etc/dpkg/dpkg.cfg" ]]; then
eval $__resultvar="ubuntu"
else
fatal "Unknwon operating system"
fi
}
platform OS
function package() {
if [[ $OS = "ubuntu" ]]; then
if dpkg --get-selections | grep --quiet $1; then
log "$1 is already installed. skipping."
else
apt-get install $@ -y
fi
elif [[ $OS = "centos" ]]; then
if rpm -qa | grep --quiet $1; then
log "$1 is already installed. skipping."
else
yum install $@ -y
fi
fi
}
function detect_fpm_output() {
if [[ $OS = "ubuntu" ]]; then
export FPM_OUTPUT=deb
elif [[ $OS = "centos" ]]; then
export FPM_OUTPUT=rpm
fi
}
detect_fpm_output
function gem_install() {
if gem list | grep --quiet $1; then
log "$1 is already installed. skipping."
else
gem install $@
fi
}
function main() {
if [[ $# -ne 1 ]]; then
fatal "Usage: $0 <rocksdb_version>"
else
log "using rocksdb version: $1"
fi
if [[ -d /vagrant ]]; then
if [[ $OS = "ubuntu" ]]; then
package g++-4.7
export CXX=g++-4.7
# the deb would depend on libgflags2, but the static lib is the only thing
# installed by make install
package libgflags-dev
package ruby-all-dev
elif [[ $OS = "centos" ]]; then
pushd /etc/yum.repos.d
if [[ ! -f /etc/yum.repos.d/devtools-1.1.repo ]]; then
wget http://people.centos.org/tru/devtools-1.1/devtools-1.1.repo
fi
package devtoolset-1.1-gcc --enablerepo=testing-1.1-devtools-6
package devtoolset-1.1-gcc-c++ --enablerepo=testing-1.1-devtools-6
export CC=/opt/centos/devtoolset-1.1/root/usr/bin/gcc
export CPP=/opt/centos/devtoolset-1.1/root/usr/bin/cpp
export CXX=/opt/centos/devtoolset-1.1/root/usr/bin/c++
export PATH=$PATH:/opt/centos/devtoolset-1.1/root/usr/bin
popd
if ! rpm -qa | grep --quiet gflags; then
rpm -i https://github.com/schuhschuh/gflags/releases/download/v2.1.0/gflags-devel-2.1.0-1.amd64.rpm
fi
package ruby
package ruby-devel
package rubygems
package rpm-build
fi
fi
gem_install fpm
make static_lib
make install INSTALL_PATH=package
fpm \
-s dir \
-t $FPM_OUTPUT \
-n rocksdb \
-v $1 \
--prefix /usr \
--url http://rocksdb.org/ \
-m rocksdb@fb.com \
--license BSD \
--vendor Facebook \
--description "RocksDB is an embeddable persistent key-value store for fast storage." \
package
}
main $@

View File

@ -746,15 +746,15 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
// default kCompactionStopStyleTotalSize; with
// kCompactionStopStyleSimilarSize, it's simply the size of the last
// picked file.
uint64_t sz = (candidate_size * (100L + ratio)) /100;
if (sz < f->fd.GetFileSize()) {
double sz = candidate_size * (100.0 + ratio) / 100.0;
if (sz < static_cast<double>(f->fd.GetFileSize())) {
break;
}
if (options_->compaction_options_universal.stop_style == kCompactionStopStyleSimilarSize) {
// Similar-size stopping rule: also check the last picked file isn't
// far larger than the next candidate file.
sz = (f->fd.GetFileSize() * (100L + ratio)) / 100;
if (sz < candidate_size) {
sz = (f->fd.GetFileSize() * (100.0 + ratio)) / 100.0;
if (sz < static_cast<double>(candidate_size)) {
// If the small file we've encountered begins a run of similar-size
// files, we'll pick them up on a future iteration of the outer
// loop. If it's some lonely straggler, it'll eventually get picked

View File

@ -636,6 +636,14 @@ static void AppendWithSpace(std::string* str, Slice msg) {
str->append(msg.data(), msg.size());
}
struct DBWithColumnFamilies {
std::vector<ColumnFamilyHandle*> cfh;
DB* db;
DBWithColumnFamilies() : db(nullptr) {
cfh.clear();
}
};
class Stats {
private:
int id_;
@ -699,7 +707,7 @@ class Stats {
void SetId(int id) { id_ = id; }
void SetExcludeFromMerge() { exclude_from_merge_ = true; }
void FinishedOps(DB* db, int64_t num_ops) {
void FinishedOps(DBWithColumnFamilies* db_with_cfh, DB* db, int64_t num_ops) {
if (FLAGS_histogram) {
double now = FLAGS_env->NowMicros();
double micros = now - last_op_finish_;
@ -739,8 +747,17 @@ class Stats {
if (FLAGS_stats_per_interval) {
std::string stats;
if (db && db->GetProperty("rocksdb.stats", &stats))
if (db_with_cfh && db_with_cfh->cfh.size()) {
for (size_t i = 0; i < db_with_cfh->cfh.size(); ++i) {
if (db->GetProperty(db_with_cfh->cfh[i], "rocksdb.cfstats",
&stats))
fprintf(stderr, "%s\n", stats.c_str());
}
} else if (db && db->GetProperty("rocksdb.stats", &stats)) {
fprintf(stderr, "%s\n", stats.c_str());
}
}
fflush(stderr);
@ -859,13 +876,6 @@ class Benchmark {
std::shared_ptr<Cache> compressed_cache_;
std::shared_ptr<const FilterPolicy> filter_policy_;
const SliceTransform* prefix_extractor_;
struct DBWithColumnFamilies {
std::vector<ColumnFamilyHandle*> cfh;
DB* db;
DBWithColumnFamilies() : db(nullptr) {
cfh.clear();
}
};
DBWithColumnFamilies db_;
std::vector<DBWithColumnFamilies> multi_dbs_;
int64_t num_;
@ -1268,7 +1278,8 @@ class Benchmark {
method = &Benchmark::ReadRandomFast;
} else if (name == Slice("multireadrandom")) {
entries_per_batch_ = FLAGS_batch_size;
fprintf(stderr, "entries_per_batch_ = %ld\n", entries_per_batch_);
fprintf(stderr, "entries_per_batch = %" PRIi64 "\n",
entries_per_batch_);
method = &Benchmark::MultiReadRandom;
} else if (name == Slice("readmissing")) {
++key_size_;
@ -1480,7 +1491,7 @@ class Benchmark {
uint32_t crc = 0;
while (bytes < 500 * 1048576) {
crc = crc32c::Value(data.data(), size);
thread->stats.FinishedOps(nullptr, 1);
thread->stats.FinishedOps(nullptr, nullptr, 1);
bytes += size;
}
// Print so result is not dead
@ -1499,7 +1510,7 @@ class Benchmark {
unsigned int xxh32 = 0;
while (bytes < 500 * 1048576) {
xxh32 = XXH32(data.data(), size, 0);
thread->stats.FinishedOps(nullptr, 1);
thread->stats.FinishedOps(nullptr, nullptr, 1);
bytes += size;
}
// Print so result is not dead
@ -1520,7 +1531,7 @@ class Benchmark {
ptr = ap.Acquire_Load();
}
count++;
thread->stats.FinishedOps(nullptr, 1);
thread->stats.FinishedOps(nullptr, nullptr, 1);
}
if (ptr == nullptr) exit(1); // Disable unused variable warning.
}
@ -1561,7 +1572,7 @@ class Benchmark {
}
produced += compressed.size();
bytes += input.size();
thread->stats.FinishedOps(nullptr, 1);
thread->stats.FinishedOps(nullptr, nullptr, 1);
}
if (!ok) {
@ -1642,7 +1653,7 @@ class Benchmark {
}
delete[] uncompressed;
bytes += input.size();
thread->stats.FinishedOps(nullptr, 1);
thread->stats.FinishedOps(nullptr, nullptr, 1);
}
if (!ok) {
@ -2022,7 +2033,8 @@ class Benchmark {
bytes += value_size_ + key_size_;
}
s = db_with_cfh->db->Write(write_options_, &batch);
thread->stats.FinishedOps(db_with_cfh->db, entries_per_batch_);
thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db,
entries_per_batch_);
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
@ -2047,7 +2059,7 @@ class Benchmark {
int64_t bytes = 0;
for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
bytes += iter->key().size() + iter->value().size();
thread->stats.FinishedOps(db, 1);
thread->stats.FinishedOps(nullptr, db, 1);
++i;
}
delete iter;
@ -2070,7 +2082,7 @@ class Benchmark {
int64_t bytes = 0;
for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
bytes += iter->key().size() + iter->value().size();
thread->stats.FinishedOps(db, 1);
thread->stats.FinishedOps(nullptr, db, 1);
++i;
}
delete iter;
@ -2105,7 +2117,7 @@ class Benchmark {
++nonexist;
}
}
thread->stats.FinishedOps(db, 100);
thread->stats.FinishedOps(nullptr, db, 100);
} while (!duration.Done(100));
char msg[100];
@ -2147,7 +2159,7 @@ class Benchmark {
if (s.ok()) {
found++;
}
thread->stats.FinishedOps(db_with_cfh->db, 1);
thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1);
}
char msg[100];
@ -2189,7 +2201,7 @@ class Benchmark {
++found;
}
}
thread->stats.FinishedOps(db, entries_per_batch_);
thread->stats.FinishedOps(nullptr, db, entries_per_batch_);
}
for (auto& k : keys) {
delete k.data();
@ -2208,7 +2220,7 @@ class Benchmark {
DB* db = SelectDB(thread);
Iterator* iter = db->NewIterator(options);
delete iter;
thread->stats.FinishedOps(db, 1);
thread->stats.FinishedOps(nullptr, db, 1);
}
}
@ -2272,7 +2284,7 @@ class Benchmark {
if (iter_to_use->Valid() && iter_to_use->key().compare(key) == 0) {
found++;
}
thread->stats.FinishedOps(db_.db, 1);
thread->stats.FinishedOps(&db_, db_.db, 1);
}
delete single_iter;
for (auto iter : multi_iters) {
@ -2312,7 +2324,7 @@ class Benchmark {
batch.Delete(key);
}
auto s = db->Write(write_options_, &batch);
thread->stats.FinishedOps(db, entries_per_batch_);
thread->stats.FinishedOps(nullptr, db, entries_per_batch_);
if (!s.ok()) {
fprintf(stderr, "del error: %s\n", s.ToString().c_str());
exit(1);
@ -2372,7 +2384,7 @@ class Benchmark {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
}
thread->stats.FinishedOps(db_.db, 1);
thread->stats.FinishedOps(&db_, db_.db, 1);
++num_writes;
if (writes_per_second_by_10 && num_writes >= writes_per_second_by_10) {
@ -2532,7 +2544,7 @@ class Benchmark {
deletes_done++;
}
thread->stats.FinishedOps(db_.db, 1);
thread->stats.FinishedOps(&db_, db_.db, 1);
}
char msg[100];
snprintf(msg, sizeof(msg),
@ -2590,7 +2602,7 @@ class Benchmark {
put_weight--;
writes_done++;
}
thread->stats.FinishedOps(db, 1);
thread->stats.FinishedOps(nullptr, db, 1);
}
char msg[100];
snprintf(msg, sizeof(msg), "( reads:%" PRIu64 " writes:%" PRIu64 \
@ -2624,7 +2636,7 @@ class Benchmark {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
}
thread->stats.FinishedOps(db, 1);
thread->stats.FinishedOps(nullptr, db, 1);
}
char msg[100];
snprintf(msg, sizeof(msg),
@ -2671,7 +2683,7 @@ class Benchmark {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
}
thread->stats.FinishedOps(db, 1);
thread->stats.FinishedOps(nullptr, db, 1);
}
char msg[100];
@ -2707,7 +2719,7 @@ class Benchmark {
fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
exit(1);
}
thread->stats.FinishedOps(db, 1);
thread->stats.FinishedOps(nullptr, db, 1);
}
// Print some statistics
@ -2768,7 +2780,7 @@ class Benchmark {
}
thread->stats.FinishedOps(db, 1);
thread->stats.FinishedOps(nullptr, db, 1);
}
char msg[100];

View File

@ -3340,12 +3340,14 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
Version::LevelSummaryStorage tmp;
LogToBuffer(
log_buffer,
"[%s] compacted to: %s, %.1f MB/sec, level %d, files in(%d, %d) out(%d) "
"[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
"files in(%d, %d) out(%d) "
"MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
"write-amplify(%.1f) %s\n",
cfd->GetName().c_str(), cfd->current()->LevelSummary(&tmp),
(stats.bytes_readn + stats.bytes_readnp1 + stats.bytes_written) /
(double)stats.micros,
(stats.bytes_readn + stats.bytes_readnp1) /
static_cast<double>(stats.micros),
stats.bytes_written / static_cast<double>(stats.micros),
compact->compaction->output_level(), stats.files_in_leveln,
stats.files_in_levelnp1, stats.files_out_levelnp1,
stats.bytes_readn / 1048576.0, stats.bytes_readnp1 / 1048576.0,

View File

@ -14,7 +14,8 @@ namespace rocksdb {
DBImplReadOnly::DBImplReadOnly(const DBOptions& db_options,
const std::string& dbname)
: DBImpl(db_options, dbname) {
Log(db_options_.info_log, "Opening the db in read only mode");
Log(INFO_LEVEL, db_options_.info_log, "Opening the db in read only mode");
LogFlush(db_options_.info_log);
}
DBImplReadOnly::~DBImplReadOnly() {

View File

@ -1349,8 +1349,8 @@ TEST(DBTest, CompactedDB) {
std::vector<Slice>({Slice("aaa"), Slice("ccc"), Slice("eee"),
Slice("ggg"), Slice("iii"), Slice("kkk")}),
&values);
ASSERT_EQ(status_list.size(), 6);
ASSERT_EQ(values.size(), 6);
ASSERT_EQ(status_list.size(), static_cast<uint64_t>(6));
ASSERT_EQ(values.size(), static_cast<uint64_t>(6));
ASSERT_OK(status_list[0]);
ASSERT_EQ(DummyString(kFileSize / 2, 'a'), values[0]);
ASSERT_TRUE(status_list[1].IsNotFound());

View File

@ -1,815 +0,0 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <algorithm>
#include <set>
#include "rocksdb/db.h"
#include "rocksdb/filter_policy.h"
#include "db/db_impl.h"
#include "db/filename.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include "rocksdb/statistics.h"
#include "rocksdb/cache.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/env.h"
#include "rocksdb/table.h"
#include "rocksdb/table_properties.h"
#include "table/table_builder.h"
#include "util/hash.h"
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/testharness.h"
#include "util/testutil.h"
#include "utilities/merge_operators.h"
using std::unique_ptr;
// IS THIS FILE STILL NEEDED?
namespace rocksdb {
// SimpleTable is a simple table format for UNIT TEST ONLY. It is not built
// as production quality.
// SimpleTable requires the input key size to be fixed 16 bytes, value cannot
// be longer than 150000 bytes and stored data on disk in this format:
// +--------------------------------------------+ <= key1 offset
// | key1 | value_size (4 bytes) | |
// +----------------------------------------+ |
// | value1 |
// | |
// +----------------------------------------+---+ <= key2 offset
// | key2 | value_size (4 bytes) | |
// +----------------------------------------+ |
// | value2 |
// | |
// | ...... |
// +-----------------+--------------------------+ <= index_block_offset
// | key1 | key1 offset (8 bytes) |
// +-----------------+--------------------------+
// | key2 | key2 offset (8 bytes) |
// +-----------------+--------------------------+
// | key3 | key3 offset (8 bytes) |
// +-----------------+--------------------------+
// | ...... |
// +-----------------+------------+-------------+
// | index_block_offset (8 bytes) |
// +------------------------------+
// SimpleTable is a simple table format for UNIT TEST ONLY. It is not built
// as production quality.
class SimpleTableReader: public TableReader {
public:
// Attempt to open the table that is stored in bytes [0..file_size)
// of "file", and read the metadata entries necessary to allow
// retrieving data from the table.
//
// If successful, returns ok and sets "*table" to the newly opened
// table. The client should delete "*table" when no longer needed.
// If there was an error while initializing the table, sets "*table"
// to nullptr and returns a non-ok status. Does not take ownership of
// "*source", but the client must ensure that "source" remains live
// for the duration of the returned table's lifetime.
//
// *file must remain live while this Table is in use.
static Status Open(const ImmutableCFOptions& options,
const EnvOptions& env_options,
unique_ptr<RandomAccessFile> && file, uint64_t file_size,
unique_ptr<TableReader>* table_reader);
Iterator* NewIterator(const ReadOptions&, Arena* arena) override;
Status Get(const ReadOptions&, const Slice& key, void* arg,
bool (*handle_result)(void* arg, const ParsedInternalKey& k,
const Slice& v),
void (*mark_key_may_exist)(void*) = nullptr) override;
uint64_t ApproximateOffsetOf(const Slice& key) override;
virtual size_t ApproximateMemoryUsage() const override { return 0; }
void SetupForCompaction() override;
std::shared_ptr<const TableProperties> GetTableProperties() const override;
~SimpleTableReader();
private:
struct Rep;
Rep* rep_;
explicit SimpleTableReader(Rep* rep) {
rep_ = rep;
}
friend class TableCache;
friend class SimpleTableIterator;
Status GetOffset(const Slice& target, uint64_t* offset);
// No copying allowed
explicit SimpleTableReader(const TableReader&) = delete;
void operator=(const TableReader&) = delete;
};
// Iterator to iterate SimpleTable
class SimpleTableIterator: public Iterator {
public:
explicit SimpleTableIterator(SimpleTableReader* table);
~SimpleTableIterator();
bool Valid() const;
void SeekToFirst();
void SeekToLast();
void Seek(const Slice& target);
void Next();
void Prev();
Slice key() const;
Slice value() const;
Status status() const;
private:
SimpleTableReader* table_;
uint64_t offset_;
uint64_t next_offset_;
Slice key_;
Slice value_;
char tmp_str_[4];
char* key_str_;
char* value_str_;
int value_str_len_;
Status status_;
// No copying allowed
SimpleTableIterator(const SimpleTableIterator&) = delete;
void operator=(const Iterator&) = delete;
};
struct SimpleTableReader::Rep {
~Rep() {
}
Rep(const ImmutableCFOptions& ioptions, const EnvOptions& env_options,
uint64_t index_start_offset, int num_entries) :
ioptions(ioptions), env_options(env_options),
index_start_offset(index_start_offset), num_entries(num_entries) {
}
const ImmutableCFOptions& ioptions;
const EnvOptions& env_options;
Status status;
unique_ptr<RandomAccessFile> file;
uint64_t index_start_offset;
int num_entries;
std::shared_ptr<TableProperties> table_properties;
const static int user_key_size = 16;
const static int offset_length = 8;
const static int key_footer_len = 8;
static int GetInternalKeyLength() {
return user_key_size + key_footer_len;
}
};
SimpleTableReader::~SimpleTableReader() {
delete rep_;
}
Status SimpleTableReader::Open(const ImmutableCFOptions& ioptions,
const EnvOptions& env_options,
unique_ptr<RandomAccessFile> && file,
uint64_t size,
unique_ptr<TableReader>* table_reader) {
char footer_space[Rep::offset_length];
Slice footer_input;
Status s = file->Read(size - Rep::offset_length, Rep::offset_length,
&footer_input, footer_space);
if (s.ok()) {
uint64_t index_start_offset = DecodeFixed64(footer_space);
int num_entries = (size - Rep::offset_length - index_start_offset)
/ (Rep::GetInternalKeyLength() + Rep::offset_length);
SimpleTableReader::Rep* rep = new SimpleTableReader::Rep(
ioptions, env_options, index_start_offset, num_entries);
rep->file = std::move(file);
table_reader->reset(new SimpleTableReader(rep));
}
return s;
}
void SimpleTableReader::SetupForCompaction() {
}
std::shared_ptr<const TableProperties> SimpleTableReader::GetTableProperties()
const {
return rep_->table_properties;
}
Iterator* SimpleTableReader::NewIterator(const ReadOptions& options,
Arena* arena) {
if (arena == nullptr) {
return new SimpleTableIterator(this);
} else {
auto mem = arena->AllocateAligned(sizeof(SimpleTableIterator));
return new (mem) SimpleTableIterator(this);
}
}
Status SimpleTableReader::GetOffset(const Slice& target, uint64_t* offset) {
uint32_t left = 0;
uint32_t right = rep_->num_entries - 1;
char key_chars[Rep::GetInternalKeyLength()];
Slice tmp_slice;
uint32_t target_offset = 0;
while (left <= right) {
uint32_t mid = (left + right + 1) / 2;
uint64_t offset_to_read = rep_->index_start_offset
+ (Rep::GetInternalKeyLength() + Rep::offset_length) * mid;
Status s = rep_->file->Read(offset_to_read, Rep::GetInternalKeyLength(),
&tmp_slice, key_chars);
if (!s.ok()) {
return s;
}
InternalKeyComparator ikc(rep_->ioptions.comparator);
int compare_result = ikc.Compare(tmp_slice, target);
if (compare_result < 0) {
if (left == right) {
target_offset = right + 1;
break;
}
left = mid;
} else {
if (left == right) {
target_offset = left;
break;
}
right = mid - 1;
}
}
if (target_offset >= (uint32_t) rep_->num_entries) {
*offset = rep_->index_start_offset;
return Status::OK();
}
char value_offset_chars[Rep::offset_length];
int64_t offset_for_value_offset = rep_->index_start_offset
+ (Rep::GetInternalKeyLength() + Rep::offset_length) * target_offset
+ Rep::GetInternalKeyLength();
Status s = rep_->file->Read(offset_for_value_offset, Rep::offset_length,
&tmp_slice, value_offset_chars);
if (s.ok()) {
*offset = DecodeFixed64(value_offset_chars);
}
return s;
}
Status SimpleTableReader::Get(const ReadOptions& options, const Slice& k,
void* arg,
bool (*saver)(void*, const ParsedInternalKey&,
const Slice&),
void (*mark_key_may_exist)(void*)) {
Status s;
SimpleTableIterator* iter = new SimpleTableIterator(this);
for (iter->Seek(k); iter->Valid(); iter->Next()) {
ParsedInternalKey parsed_key;
if (!ParseInternalKey(iter->key(), &parsed_key)) {
return Status::Corruption(Slice());
}
if (!(*saver)(arg, parsed_key, iter->value())) {
break;
}
}
s = iter->status();
delete iter;
return s;
}
uint64_t SimpleTableReader::ApproximateOffsetOf(const Slice& key) {
return 0;
}
SimpleTableIterator::SimpleTableIterator(SimpleTableReader* table) :
table_(table) {
key_str_ = new char[SimpleTableReader::Rep::GetInternalKeyLength()];
value_str_len_ = -1;
SeekToFirst();
}
SimpleTableIterator::~SimpleTableIterator() {
delete[] key_str_;
if (value_str_len_ >= 0) {
delete[] value_str_;
}
}
bool SimpleTableIterator::Valid() const {
return offset_ < table_->rep_->index_start_offset;
}
void SimpleTableIterator::SeekToFirst() {
next_offset_ = 0;
Next();
}
void SimpleTableIterator::SeekToLast() {
assert(false);
}
void SimpleTableIterator::Seek(const Slice& target) {
Status s = table_->GetOffset(target, &next_offset_);
if (!s.ok()) {
status_ = s;
}
Next();
}
void SimpleTableIterator::Next() {
offset_ = next_offset_;
if (offset_ >= table_->rep_->index_start_offset) {
return;
}
Slice result;
int internal_key_size = SimpleTableReader::Rep::GetInternalKeyLength();
Status s = table_->rep_->file->Read(next_offset_, internal_key_size, &result,
key_str_);
next_offset_ += internal_key_size;
key_ = result;
Slice value_size_slice;
s = table_->rep_->file->Read(next_offset_, 4, &value_size_slice, tmp_str_);
next_offset_ += 4;
uint32_t value_size = DecodeFixed32(tmp_str_);
Slice value_slice;
if ((int) value_size > value_str_len_) {
if (value_str_len_ >= 0) {
delete[] value_str_;
}
value_str_ = new char[value_size];
value_str_len_ = value_size;
}
s = table_->rep_->file->Read(next_offset_, value_size, &value_slice,
value_str_);
next_offset_ += value_size;
value_ = value_slice;
}
void SimpleTableIterator::Prev() {
assert(false);
}
Slice SimpleTableIterator::key() const {
Log(table_->rep_->ioptions.info_log, "key!!!!");
return key_;
}
Slice SimpleTableIterator::value() const {
return value_;
}
Status SimpleTableIterator::status() const {
return status_;
}
class SimpleTableBuilder: public TableBuilder {
public:
// Create a builder that will store the contents of the table it is
// building in *file. Does not close the file. It is up to the
// caller to close the file after calling Finish(). The output file
// will be part of level specified by 'level'. A value of -1 means
// that the caller does not know which level the output file will reside.
SimpleTableBuilder(const ImmutableCFOptions& ioptions, WritableFile* file,
CompressionType compression_type);
// REQUIRES: Either Finish() or Abandon() has been called.
~SimpleTableBuilder();
// Add key,value to the table being constructed.
// REQUIRES: key is after any previously added key according to comparator.
// REQUIRES: Finish(), Abandon() have not been called
void Add(const Slice& key, const Slice& value) override;
// Return non-ok iff some error has been detected.
Status status() const override;
// Finish building the table. Stops using the file passed to the
// constructor after this function returns.
// REQUIRES: Finish(), Abandon() have not been called
Status Finish() override;
// Indicate that the contents of this builder should be abandoned. Stops
// using the file passed to the constructor after this function returns.
// If the caller is not going to call Finish(), it must call Abandon()
// before destroying this builder.
// REQUIRES: Finish(), Abandon() have not been called
void Abandon() override;
// Number of calls to Add() so far.
uint64_t NumEntries() const override;
// Size of the file generated so far. If invoked after a successful
// Finish() call, returns the size of the final generated file.
uint64_t FileSize() const override;
private:
struct Rep;
Rep* rep_;
// No copying allowed
SimpleTableBuilder(const SimpleTableBuilder&) = delete;
void operator=(const SimpleTableBuilder&) = delete;
};
struct SimpleTableBuilder::Rep {
const ImmutableCFOptions& ioptions;
WritableFile* file;
uint64_t offset = 0;
Status status;
uint64_t num_entries = 0;
bool closed = false; // Either Finish() or Abandon() has been called.
const static int user_key_size = 16;
const static int offset_length = 8;
const static int key_footer_len = 8;
static int GetInternalKeyLength() {
return user_key_size + key_footer_len;
}
std::string index;
Rep(const ImmutableCFOptions& iopt, WritableFile* f) :
ioptions(iopt), file(f) {
}
~Rep() {
}
};
SimpleTableBuilder::SimpleTableBuilder(const ImmutableCFOptions& ioptions,
WritableFile* file,
CompressionType compression_type) :
rep_(new SimpleTableBuilder::Rep(ioptions, file)) {
}
SimpleTableBuilder::~SimpleTableBuilder() {
delete (rep_);
}
void SimpleTableBuilder::Add(const Slice& key, const Slice& value) {
assert((int ) key.size() == Rep::GetInternalKeyLength());
// Update index
rep_->index.append(key.data(), key.size());
PutFixed64(&(rep_->index), rep_->offset);
// Write key-value pair
rep_->file->Append(key);
rep_->offset += Rep::GetInternalKeyLength();
std::string size;
int value_size = value.size();
PutFixed32(&size, value_size);
Slice sizeSlice(size);
rep_->file->Append(sizeSlice);
rep_->file->Append(value);
rep_->offset += value_size + 4;
rep_->num_entries++;
}
Status SimpleTableBuilder::status() const {
return Status::OK();
}
Status SimpleTableBuilder::Finish() {
Rep* r = rep_;
assert(!r->closed);
r->closed = true;
uint64_t index_offset = rep_->offset;
Slice index_slice(rep_->index);
rep_->file->Append(index_slice);
rep_->offset += index_slice.size();
std::string index_offset_str;
PutFixed64(&index_offset_str, index_offset);
Slice foot_slice(index_offset_str);
rep_->file->Append(foot_slice);
rep_->offset += foot_slice.size();
return Status::OK();
}
void SimpleTableBuilder::Abandon() {
rep_->closed = true;
}
uint64_t SimpleTableBuilder::NumEntries() const {
return rep_->num_entries;
}
uint64_t SimpleTableBuilder::FileSize() const {
return rep_->offset;
}
class SimpleTableFactory: public TableFactory {
public:
~SimpleTableFactory() {
}
SimpleTableFactory() {
}
const char* Name() const override {
return "SimpleTable";
}
Status NewTableReader(const ImmutableCFOptions& ioptions,
const EnvOptions& env_options,
const InternalKeyComparator& internal_key,
unique_ptr<RandomAccessFile>&& file, uint64_t file_size,
unique_ptr<TableReader>* table_reader) const;
TableBuilder* NewTableBuilder(
const ImmutableCFOptions& ioptions,
const InternalKeyComparator& internal_key,
WritableFile* file,
const CompressionType compression_type,
const CompressionOptions& compression_opts) const;
virtual Status SanitizeDBOptions(const DBOptions* db_opts) const override {
return Status::OK();
}
virtual std::string GetPrintableTableOptions() const override {
return std::string();
}
};
Status SimpleTableFactory::NewTableReader(
const ImmutableCFOptions& ioptions,
const EnvOptions& env_options,
const InternalKeyComparator& internal_key,
unique_ptr<RandomAccessFile>&& file, uint64_t file_size,
unique_ptr<TableReader>* table_reader) const {
return SimpleTableReader::Open(ioptions, env_options, std::move(file),
file_size, table_reader);
}
TableBuilder* SimpleTableFactory::NewTableBuilder(
const ImmutableCFOptions& ioptions,
const InternalKeyComparator& internal_key,
WritableFile* file, const CompressionType compression_type,
const CompressionOptions& compression_opts) const {
return new SimpleTableBuilder(ioptions, file, compression_type);
}
class SimpleTableDBTest {
protected:
public:
std::string dbname_;
Env* env_;
DB* db_;
Options last_options_;
SimpleTableDBTest() :
env_(Env::Default()) {
dbname_ = test::TmpDir() + "/simple_table_db_test";
ASSERT_OK(DestroyDB(dbname_, Options()));
db_ = nullptr;
Reopen();
}
~SimpleTableDBTest() {
delete db_;
ASSERT_OK(DestroyDB(dbname_, Options()));
}
// Return the current option configuration.
Options CurrentOptions() {
Options options;
options.table_factory.reset(new SimpleTableFactory());
return options;
}
DBImpl* dbfull() {
return reinterpret_cast<DBImpl*>(db_);
}
void Reopen(Options* options = nullptr) {
ASSERT_OK(TryReopen(options));
}
void Close() {
delete db_;
db_ = nullptr;
}
void DestroyAndReopen(Options* options = nullptr) {
//Destroy using last options
Destroy(&last_options_);
ASSERT_OK(TryReopen(options));
}
void Destroy(Options* options) {
delete db_;
db_ = nullptr;
ASSERT_OK(DestroyDB(dbname_, *options));
}
Status PureReopen(Options* options, DB** db) {
return DB::Open(*options, dbname_, db);
}
Status TryReopen(Options* options = nullptr) {
delete db_;
db_ = nullptr;
Options opts;
if (options != nullptr) {
opts = *options;
} else {
opts = CurrentOptions();
opts.create_if_missing = true;
}
last_options_ = opts;
return DB::Open(opts, dbname_, &db_);
}
Status Put(const Slice& k, const Slice& v) {
return db_->Put(WriteOptions(), k, v);
}
Status Delete(const std::string& k) {
return db_->Delete(WriteOptions(), k);
}
std::string Get(const std::string& k, const Snapshot* snapshot = nullptr) {
ReadOptions options;
options.snapshot = snapshot;
std::string result;
Status s = db_->Get(options, k, &result);
if (s.IsNotFound()) {
result = "NOT_FOUND";
} else if (!s.ok()) {
result = s.ToString();
}
return result;
}
int NumTableFilesAtLevel(int level) {
std::string property;
ASSERT_TRUE(
db_->GetProperty("rocksdb.num-files-at-level" + NumberToString(level),
&property));
return atoi(property.c_str());
}
// Return spread of files per level
std::string FilesPerLevel() {
std::string result;
int last_non_zero_offset = 0;
for (int level = 0; level < db_->NumberLevels(); level++) {
int f = NumTableFilesAtLevel(level);
char buf[100];
snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f);
result += buf;
if (f > 0) {
last_non_zero_offset = result.size();
}
}
result.resize(last_non_zero_offset);
return result;
}
std::string IterStatus(Iterator* iter) {
std::string result;
if (iter->Valid()) {
result = iter->key().ToString() + "->" + iter->value().ToString();
} else {
result = "(invalid)";
}
return result;
}
};
TEST(SimpleTableDBTest, Empty) {
ASSERT_TRUE(db_ != nullptr);
ASSERT_EQ("NOT_FOUND", Get("0000000000000foo"));
}
TEST(SimpleTableDBTest, ReadWrite) {
ASSERT_OK(Put("0000000000000foo", "v1"));
ASSERT_EQ("v1", Get("0000000000000foo"));
ASSERT_OK(Put("0000000000000bar", "v2"));
ASSERT_OK(Put("0000000000000foo", "v3"));
ASSERT_EQ("v3", Get("0000000000000foo"));
ASSERT_EQ("v2", Get("0000000000000bar"));
}
TEST(SimpleTableDBTest, Flush) {
ASSERT_OK(Put("0000000000000foo", "v1"));
ASSERT_OK(Put("0000000000000bar", "v2"));
ASSERT_OK(Put("0000000000000foo", "v3"));
dbfull()->TEST_FlushMemTable();
ASSERT_EQ("v3", Get("0000000000000foo"));
ASSERT_EQ("v2", Get("0000000000000bar"));
}
TEST(SimpleTableDBTest, Flush2) {
ASSERT_OK(Put("0000000000000bar", "b"));
ASSERT_OK(Put("0000000000000foo", "v1"));
dbfull()->TEST_FlushMemTable();
ASSERT_OK(Put("0000000000000foo", "v2"));
dbfull()->TEST_FlushMemTable();
ASSERT_EQ("v2", Get("0000000000000foo"));
ASSERT_OK(Put("0000000000000eee", "v3"));
dbfull()->TEST_FlushMemTable();
ASSERT_EQ("v3", Get("0000000000000eee"));
ASSERT_OK(Delete("0000000000000bar"));
dbfull()->TEST_FlushMemTable();
ASSERT_EQ("NOT_FOUND", Get("0000000000000bar"));
ASSERT_OK(Put("0000000000000eee", "v5"));
dbfull()->TEST_FlushMemTable();
ASSERT_EQ("v5", Get("0000000000000eee"));
}
static std::string Key(int i) {
char buf[100];
snprintf(buf, sizeof(buf), "key_______%06d", i);
return std::string(buf);
}
static std::string RandomString(Random* rnd, int len) {
std::string r;
test::RandomString(rnd, len, &r);
return r;
}
TEST(SimpleTableDBTest, CompactionTrigger) {
Options options = CurrentOptions();
options.write_buffer_size = 100 << 10; //100KB
options.num_levels = 3;
options.max_mem_compaction_level = 0;
options.level0_file_num_compaction_trigger = 3;
Reopen(&options);
Random rnd(301);
for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
num++) {
std::vector<std::string> values;
// Write 120KB (12 values, each 10K)
for (int i = 0; i < 12; i++) {
values.push_back(RandomString(&rnd, 10000));
ASSERT_OK(Put(Key(i), values[i]));
}
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ(NumTableFilesAtLevel(0), num + 1);
}
//generate one more file in level-0, and should trigger level-0 compaction
std::vector<std::string> values;
for (int i = 0; i < 12; i++) {
values.push_back(RandomString(&rnd, 10000));
ASSERT_OK(Put(Key(i), values[i]));
}
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_EQ(NumTableFilesAtLevel(1), 1);
}
} // namespace rocksdb
int main(int argc, char** argv) {
return rocksdb::test::RunAllTests();
}

View File

@ -15,6 +15,7 @@
#include "rocksdb/statistics.h"
#include "table/iterator_wrapper.h"
#include "table/table_reader.h"
#include "table/get_context.h"
#include "util/coding.h"
#include "util/stop_watch.h"
@ -132,10 +133,8 @@ Iterator* TableCache::NewIterator(const ReadOptions& options,
Status TableCache::Get(const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& fd, const Slice& k, void* arg,
bool (*saver)(void*, const ParsedInternalKey&,
const Slice&),
void (*mark_key_may_exist)(void*)) {
const FileDescriptor& fd, const Slice& k,
GetContext* get_context) {
TableReader* t = fd.table_reader;
Status s;
Cache::Handle* handle = nullptr;
@ -147,13 +146,13 @@ Status TableCache::Get(const ReadOptions& options,
}
}
if (s.ok()) {
s = t->Get(options, k, arg, saver, mark_key_may_exist);
s = t->Get(options, k, get_context);
if (handle != nullptr) {
ReleaseHandle(handle);
}
} else if (options.read_tier && s.IsIncomplete()) {
// Couldnt find Table in cache but treat as kFound if no_io set
(*mark_key_may_exist)(arg);
get_context->MarkKeyMayExist();
return Status::OK();
}
return s;

View File

@ -27,6 +27,7 @@ namespace rocksdb {
class Env;
class Arena;
struct FileDescriptor;
class GetContext;
class TableCache {
public:
@ -52,10 +53,8 @@ class TableCache {
// it returns false.
Status Get(const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& file_fd, const Slice& k, void* arg,
bool (*handle_result)(void*, const ParsedInternalKey&,
const Slice&),
void (*mark_key_may_exist)(void*) = nullptr);
const FileDescriptor& file_fd, const Slice& k,
GetContext* get_context);
// Evict any entry for the specified file number
static void Evict(Cache* cache, uint64_t file_number);

View File

@ -37,6 +37,7 @@
#include "table/format.h"
#include "table/plain_table_factory.h"
#include "table/meta_blocks.h"
#include "table/get_context.h"
#include "util/coding.h"
#include "util/logging.h"
#include "util/stop_watch.h"
@ -627,81 +628,6 @@ void Version::AddIterators(const ReadOptions& read_options,
}
// Called from TableCache::Get and Table::Get when file/block in which
// key may exist are not there in TableCache/BlockCache respectively. In this
// case we can't guarantee that key does not exist and are not permitted to do
// IO to be certain.Set the status=kFound and value_found=false to let the
// caller know that key may exist but is not there in memory
void MarkKeyMayExist(void* arg) {
Version::Saver* s = reinterpret_cast<Version::Saver*>(arg);
s->state = Version::kFound;
if (s->value_found != nullptr) {
*(s->value_found) = false;
}
}
bool SaveValue(void* arg, const ParsedInternalKey& parsed_key,
const Slice& v) {
Version::Saver* s = reinterpret_cast<Version::Saver*>(arg);
MergeContext* merge_contex = s->merge_context;
std::string merge_result; // temporary area for merge results later
assert(s != nullptr && merge_contex != nullptr);
// TODO: Merge?
if (s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) {
// Key matches. Process it
switch (parsed_key.type) {
case kTypeValue:
if (Version::kNotFound == s->state) {
s->state = Version::kFound;
s->value->assign(v.data(), v.size());
} else if (Version::kMerge == s->state) {
assert(s->merge_operator != nullptr);
s->state = Version::kFound;
if (!s->merge_operator->FullMerge(s->user_key, &v,
merge_contex->GetOperands(),
s->value, s->logger)) {
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
s->state = Version::kCorrupt;
}
} else {
assert(false);
}
return false;
case kTypeDeletion:
if (Version::kNotFound == s->state) {
s->state = Version::kDeleted;
} else if (Version::kMerge == s->state) {
s->state = Version::kFound;
if (!s->merge_operator->FullMerge(s->user_key, nullptr,
merge_contex->GetOperands(),
s->value, s->logger)) {
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
s->state = Version::kCorrupt;
}
} else {
assert(false);
}
return false;
case kTypeMerge:
assert(s->state == Version::kNotFound || s->state == Version::kMerge);
s->state = Version::kMerge;
merge_contex->PushOperand(v);
return true;
default:
assert(false);
break;
}
}
// s->state could be Corrupt, merge or notfound
return false;
}
Version::Version(ColumnFamilyData* cfd, VersionSet* vset,
uint64_t version_number)
@ -756,46 +682,42 @@ void Version::Get(const ReadOptions& options,
Slice user_key = k.user_key();
assert(status->ok() || status->IsMergeInProgress());
Saver saver;
saver.state = status->ok()? kNotFound : kMerge;
saver.ucmp = user_comparator_;
saver.user_key = user_key;
saver.value_found = value_found;
saver.value = value;
saver.merge_operator = merge_operator_;
saver.merge_context = merge_context;
saver.logger = info_log_;
saver.statistics = db_statistics_;
GetContext get_context(user_comparator_, merge_operator_, info_log_,
db_statistics_, status->ok() ? GetContext::kNotFound : GetContext::kMerge,
user_key, value, value_found, merge_context);
FilePicker fp(files_, user_key, ikey, &file_levels_, num_non_empty_levels_,
&file_indexer_, user_comparator_, internal_comparator_);
FdWithKeyRange* f = fp.GetNextFile();
while (f != nullptr) {
*status = table_cache_->Get(options, *internal_comparator_, f->fd, ikey,
&saver, SaveValue, MarkKeyMayExist);
&get_context);
// TODO: examine the behavior for corrupted key
if (!status->ok()) {
return;
}
switch (saver.state) {
case kNotFound:
break; // Keep searching in other files
case kFound:
switch (get_context.State()) {
case GetContext::kNotFound:
// Keep searching in other files
break;
case GetContext::kFound:
return;
case kDeleted:
*status = Status::NotFound(); // Use empty error message for speed
case GetContext::kDeleted:
// Use empty error message for speed
*status = Status::NotFound();
return;
case kCorrupt:
case GetContext::kCorrupt:
*status = Status::Corruption("corrupted key for ", user_key);
return;
case kMerge:
case GetContext::kMerge:
break;
}
f = fp.GetNextFile();
}
if (kMerge == saver.state) {
if (GetContext::kMerge == get_context.State()) {
if (!merge_operator_) {
*status = Status::InvalidArgument(
"merge_operator is not properly initialized.");
@ -804,7 +726,7 @@ void Version::Get(const ReadOptions& options,
// merge_operands are in saver and we hit the beginning of the key history
// do a final merge of nullptr and operands;
if (merge_operator_->FullMerge(user_key, nullptr,
saver.merge_context->GetOperands(), value,
merge_context->GetOperands(), value,
info_log_)) {
*status = Status::OK();
} else {

View File

@ -241,28 +241,6 @@ class Version {
FileMetaData* file;
};
enum SaverState {
kNotFound,
kFound,
kDeleted,
kCorrupt,
kMerge // saver contains the current merge result (the operands)
};
// Callback from TableCache::Get()
struct Saver {
SaverState state;
const Comparator* ucmp;
Slice user_key;
bool* value_found; // Is value set correctly? Used by KeyMayExist
std::string* value;
const MergeOperator* merge_operator;
// the merge operations encountered;
MergeContext* merge_context;
Logger* logger;
Statistics* statistics;
};
private:
friend class Compaction;
friend class VersionSet;

View File

@ -62,6 +62,10 @@ class Comparator {
// must not be deleted.
extern const Comparator* BytewiseComparator();
// Return a builtin comparator that uses reverse lexicographic byte-wise
// ordering.
extern const Comparator* ReverseBytewiseComparator();
} // namespace rocksdb
#endif // STORAGE_ROCKSDB_INCLUDE_COMPARATOR_H_

View File

@ -18,6 +18,14 @@ public class Options extends RocksObject {
}
static final long DEFAULT_CACHE_SIZE = 8 << 20;
static final int DEFAULT_NUM_SHARD_BITS = -1;
/**
* Builtin RocksDB comparators
*/
public enum BuiltinComparator {
BYTEWISE_COMPARATOR, REVERSE_BYTEWISE_COMPARATOR;
}
/**
* Construct options for opening a RocksDB.
*
@ -78,6 +86,21 @@ public class Options extends RocksObject {
return createIfMissing(nativeHandle_);
}
/**
* Set BuiltinComparator to be used with RocksDB.
*
* Note: Comparator can be set once upon database creation.
*
* Default: BytewiseComparator.
* @param builtinComparator a BuiltinComparator type.
*/
public void setBuiltinComparator(BuiltinComparator builtinComparator) {
assert(isInitialized());
setBuiltinComparator(nativeHandle_, builtinComparator.ordinal());
}
private native void setBuiltinComparator(long handle, int builtinComparator);
/**
* Amount of data to build up in memory (backed by an unsorted log
* on disk) before converting to a sorted on-disk file.

View File

@ -22,6 +22,7 @@
#include "rocksdb/table.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/rate_limiter.h"
#include "rocksdb/comparator.h"
/*
* Class: org_rocksdb_Options
@ -63,6 +64,23 @@ jboolean Java_org_rocksdb_Options_createIfMissing(
return reinterpret_cast<rocksdb::Options*>(jhandle)->create_if_missing;
}
/*
* Class: org_rocksdb_Options
* Method: useReverseBytewiseComparator
* Signature: (JI)V
*/
void Java_org_rocksdb_Options_setBuiltinComparator(
JNIEnv* env, jobject jobj, jlong jhandle, jint builtinComparator) {
switch (builtinComparator){
case 1:
reinterpret_cast<rocksdb::Options*>(jhandle)->comparator = rocksdb::ReverseBytewiseComparator();
break;
default:
reinterpret_cast<rocksdb::Options*>(jhandle)->comparator = rocksdb::BytewiseComparator();
break;
}
}
/*
* Class: org_rocksdb_Options
* Method: setWriteBufferSize

View File

@ -36,16 +36,9 @@ class FacebookFbcodeLintEngine extends ArcanistLintEngine {
));
$linters[] = $java_text_linter;
$pep8_options = $this->getPEP8WithTextOptions().',E302';
$python_linter = new ArcanistPEP8Linter();
$python_linter->setConfig(array('options' => $pep8_options));
$linters[] = $python_linter;
$python_2space_linter = new ArcanistPEP8Linter();
$python_2space_linter->setConfig(array('options' => $pep8_options.',E111'));
$linters[] = $python_2space_linter;
// Currently we can't run cpplint in commit hook mode, because it
// depends on having access to the working directory.
if (!$this->getCommitHookMode()) {
@ -119,11 +112,7 @@ class FacebookFbcodeLintEngine extends ArcanistLintEngine {
$dir = dirname($dir);
} while ($dir != '/' && $dir != '.');
if ($space_count == 4) {
$cur_path_linter = $python_linter;
} else {
$cur_path_linter = $python_2space_linter;
}
$cur_path_linter = $python_linter;
$cur_path_linter->addPath($path);
$cur_path_linter->addData($path, $this->loadData($path));

View File

@ -33,6 +33,7 @@
#include "table/format.h"
#include "table/meta_blocks.h"
#include "table/two_level_iterator.h"
#include "table/get_context.h"
#include "util/coding.h"
#include "util/perf_context_imp.h"
@ -1100,10 +1101,8 @@ Iterator* BlockBasedTable::NewIterator(const ReadOptions& read_options,
}
Status BlockBasedTable::Get(
const ReadOptions& read_options, const Slice& key, void* handle_context,
bool (*result_handler)(void* handle_context, const ParsedInternalKey& k,
const Slice& v),
void (*mark_key_may_exist_handler)(void* handle_context)) {
const ReadOptions& read_options, const Slice& key,
GetContext* get_context) {
Status s;
auto filter_entry = GetFilter(read_options.read_tier == kBlockCacheTier);
FilterBlockReader* filter = filter_entry.value;
@ -1141,7 +1140,7 @@ Status BlockBasedTable::Get(
// couldn't get block from block_cache
// Update Saver.state to Found because we are only looking for whether
// we can guarantee the key is not there when "no_io" is set
(*mark_key_may_exist_handler)(handle_context);
get_context->MarkKeyMayExist();
break;
}
if (!biter.status().ok()) {
@ -1156,8 +1155,7 @@ Status BlockBasedTable::Get(
s = Status::Corruption(Slice());
}
if (!(*result_handler)(handle_context, parsed_key,
biter.value())) {
if (!get_context->SaveValue(parsed_key, biter.value())) {
done = true;
break;
}

View File

@ -40,6 +40,7 @@ class WritableFile;
struct BlockBasedTableOptions;
struct EnvOptions;
struct ReadOptions;
class GetContext;
using std::unique_ptr;
@ -76,11 +77,7 @@ class BlockBasedTable : public TableReader {
Iterator* NewIterator(const ReadOptions&, Arena* arena = nullptr) override;
Status Get(const ReadOptions& readOptions, const Slice& key,
void* handle_context,
bool (*result_handler)(void* handle_context,
const ParsedInternalKey& k, const Slice& v),
void (*mark_key_may_exist_handler)(void* handle_context) =
nullptr) override;
GetContext* get_context) override;
// Given a key, return an approximate byte offset in the file where
// the data for that key begins (or would begin if the key were

View File

@ -60,9 +60,11 @@ CuckooTableBuilder::CuckooTableBuilder(
hash_table_size_(use_module_hash ? 0 : 2),
is_last_level_file_(false),
has_seen_first_key_(false),
has_seen_first_value_(false),
key_size_(0),
value_size_(0),
num_entries_(0),
num_values_(0),
ucomp_(user_comparator),
use_module_hash_(use_module_hash),
identity_as_first_hash_(identity_as_first_hash),
@ -84,6 +86,12 @@ void CuckooTableBuilder::Add(const Slice& key, const Slice& value) {
status_ = Status::Corruption("Unable to parse key into inernal key.");
return;
}
if (ikey.type != kTypeDeletion && ikey.type != kTypeValue) {
status_ = Status::NotSupported("Unsupported key type " +
std::to_string(ikey.type));
return;
}
// Determine if we can ignore the sequence number and value type from
// internal keys by looking at sequence number from first key. We assume
// that if first key has a zero sequence number, then all the remaining
@ -94,16 +102,38 @@ void CuckooTableBuilder::Add(const Slice& key, const Slice& value) {
smallest_user_key_.assign(ikey.user_key.data(), ikey.user_key.size());
largest_user_key_.assign(ikey.user_key.data(), ikey.user_key.size());
key_size_ = is_last_level_file_ ? ikey.user_key.size() : key.size();
value_size_ = value.size();
}
if (key_size_ != (is_last_level_file_ ? ikey.user_key.size() : key.size())) {
status_ = Status::NotSupported("all keys have to be the same size");
return;
}
// Even if one sequence number is non-zero, then it is not last level.
assert(!is_last_level_file_ || ikey.sequence == 0);
if (is_last_level_file_) {
kvs_.append(ikey.user_key.data(), ikey.user_key.size());
if (ikey.type == kTypeValue) {
if (!has_seen_first_value_) {
has_seen_first_value_ = true;
value_size_ = value.size();
}
if (value_size_ != value.size()) {
status_ = Status::NotSupported("all values have to be the same size");
return;
}
if (is_last_level_file_) {
kvs_.append(ikey.user_key.data(), ikey.user_key.size());
} else {
kvs_.append(key.data(), key.size());
}
kvs_.append(value.data(), value.size());
++num_values_;
} else {
kvs_.append(key.data(), key.size());
if (is_last_level_file_) {
deleted_keys_.append(ikey.user_key.data(), ikey.user_key.size());
} else {
deleted_keys_.append(key.data(), key.size());
}
}
kvs_.append(value.data(), value.size());
++num_entries_;
// In order to fill the empty buckets in the hash table, we identify a
@ -123,15 +153,30 @@ void CuckooTableBuilder::Add(const Slice& key, const Slice& value) {
}
}
bool CuckooTableBuilder::IsDeletedKey(uint64_t idx) const {
assert(closed_);
return idx >= num_values_;
}
Slice CuckooTableBuilder::GetKey(uint64_t idx) const {
assert(closed_);
if (IsDeletedKey(idx)) {
return Slice(&deleted_keys_[(idx - num_values_) * key_size_], key_size_);
}
return Slice(&kvs_[idx * (key_size_ + value_size_)], key_size_);
}
Slice CuckooTableBuilder::GetUserKey(uint64_t idx) const {
assert(closed_);
return is_last_level_file_ ? GetKey(idx) : ExtractUserKey(GetKey(idx));
}
Slice CuckooTableBuilder::GetValue(uint64_t idx) const {
assert(closed_);
if (IsDeletedKey(idx)) {
static std::string empty_value(value_size_, 'a');
return Slice(empty_value);
}
return Slice(&kvs_[idx * (key_size_ + value_size_) + key_size_], value_size_);
}
@ -256,7 +301,9 @@ Status CuckooTableBuilder::Finish() {
++num_added;
s = file_->Append(GetKey(bucket.vector_idx));
if (s.ok()) {
s = file_->Append(GetValue(bucket.vector_idx));
if (value_size_ > 0) {
s = file_->Append(GetValue(bucket.vector_idx));
}
}
}
if (!s.ok()) {

View File

@ -75,6 +75,7 @@ class CuckooTableBuilder: public TableBuilder {
uint64_t* bucket_id);
Status MakeHashTable(std::vector<CuckooBucket>* buckets);
inline bool IsDeletedKey(uint64_t idx) const;
inline Slice GetKey(uint64_t idx) const;
inline Slice GetUserKey(uint64_t idx) const;
inline Slice GetValue(uint64_t idx) const;
@ -88,14 +89,18 @@ class CuckooTableBuilder: public TableBuilder {
uint64_t hash_table_size_;
bool is_last_level_file_;
bool has_seen_first_key_;
bool has_seen_first_value_;
uint64_t key_size_;
uint64_t value_size_;
// A list of fixed-size key-value pairs concatenating into a string.
// Use GetKey(), GetUserKey(), and GetValue() to retrieve a specific
// key / value given an index
std::string kvs_;
// Number of key-value pairs stored in kvs_
std::string deleted_keys_;
// Number of key-value pairs stored in kvs_ + number of deleted keys
uint64_t num_entries_;
// Number of keys that contain value (non-deletion op)
uint64_t num_values_;
Status status_;
TableProperties properties_;
const Comparator* ucomp_;

View File

@ -19,6 +19,7 @@
#include "rocksdb/table.h"
#include "table/meta_blocks.h"
#include "table/cuckoo_table_factory.h"
#include "table/get_context.h"
#include "util/arena.h"
#include "util/coding.h"
@ -126,11 +127,8 @@ CuckooTableReader::CuckooTableReader(
status_ = file_->Read(0, file_size, &file_data_, nullptr);
}
Status CuckooTableReader::Get(
const ReadOptions& readOptions, const Slice& key, void* handle_context,
bool (*result_handler)(void* arg, const ParsedInternalKey& k,
const Slice& v),
void (*mark_key_may_exist_handler)(void* handle_context)) {
Status CuckooTableReader::Get(const ReadOptions& readOptions, const Slice& key,
GetContext* get_context) {
assert(key.size() == key_length_ + (is_last_level_ ? 8 : 0));
Slice user_key = ExtractUserKey(key);
for (uint32_t hash_cnt = 0; hash_cnt < num_hash_func_; ++hash_cnt) {
@ -149,14 +147,12 @@ Status CuckooTableReader::Get(
if (ucomp_->Compare(user_key, Slice(bucket, user_key.size())) == 0) {
Slice value(bucket + key_length_, value_length_);
if (is_last_level_) {
ParsedInternalKey found_ikey(
Slice(bucket, key_length_), 0, kTypeValue);
result_handler(handle_context, found_ikey, value);
get_context->SaveValue(value);
} else {
Slice full_key(bucket, key_length_);
ParsedInternalKey found_ikey;
ParseInternalKey(full_key, &found_ikey);
result_handler(handle_context, found_ikey, value);
get_context->SaveValue(found_ikey, value);
}
// We don't support merge operations. So, we return here.
return Status::OK();

View File

@ -40,12 +40,8 @@ class CuckooTableReader: public TableReader {
Status status() const { return status_; }
Status Get(
const ReadOptions& read_options, const Slice& key, void* handle_context,
bool (*result_handler)(void* arg, const ParsedInternalKey& k,
const Slice& v),
void (*mark_key_may_exist_handler)(void* handle_context) = nullptr)
override;
Status Get(const ReadOptions& read_options, const Slice& key,
GetContext* get_context) override;
Iterator* NewIterator(const ReadOptions&, Arena* arena = nullptr) override;
void Prepare(const Slice& target) override;

View File

@ -25,6 +25,7 @@ int main() {
#include "table/cuckoo_table_builder.h"
#include "table/cuckoo_table_reader.h"
#include "table/cuckoo_table_factory.h"
#include "table/get_context.h"
#include "util/arena.h"
#include "util/random.h"
#include "util/testharness.h"
@ -61,25 +62,6 @@ uint64_t GetSliceHash(const Slice& s, uint32_t index,
return hash_map[s.ToString()][index];
}
// Methods, variables for checking key and values read.
struct ValuesToAssert {
ValuesToAssert(const std::string& key, const Slice& value)
: expected_user_key(key),
expected_value(value),
call_count(0) {}
std::string expected_user_key;
Slice expected_value;
int call_count;
};
bool AssertValues(void* assert_obj,
const ParsedInternalKey& k, const Slice& v) {
ValuesToAssert *ptr = reinterpret_cast<ValuesToAssert*>(assert_obj);
ASSERT_EQ(ptr->expected_value.ToString(), v.ToString());
ASSERT_EQ(ptr->expected_user_key, k.user_key.ToString());
++ptr->call_count;
return false;
}
} // namespace
class CuckooReaderTest {
@ -134,11 +116,14 @@ class CuckooReaderTest {
ucomp,
GetSliceHash);
ASSERT_OK(reader.status());
// Assume no merge/deletion
for (uint32_t i = 0; i < num_items; ++i) {
ValuesToAssert v(user_keys[i], values[i]);
ASSERT_OK(reader.Get(
ReadOptions(), Slice(keys[i]), &v, AssertValues, nullptr));
ASSERT_EQ(1, v.call_count);
std::string value;
GetContext get_context(ucomp, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(user_keys[i]), &value,
nullptr, nullptr);
ASSERT_OK(reader.Get(ReadOptions(), Slice(keys[i]), &get_context));
ASSERT_EQ(values[i], value);
}
}
void UpdateKeys(bool with_zero_seqno) {
@ -329,6 +314,7 @@ TEST(CuckooReaderTest, WhenKeyNotFound) {
// Make all hash values collide.
AddHashLookups(user_keys[i], 0, kNumHashFunc);
}
auto* ucmp = BytewiseComparator();
CreateCuckooFileAndCheckReader();
std::unique_ptr<RandomAccessFile> read_file;
ASSERT_OK(env->NewRandomAccessFile(fname, &read_file, env_options));
@ -337,7 +323,7 @@ TEST(CuckooReaderTest, WhenKeyNotFound) {
ioptions,
std::move(read_file),
file_size,
BytewiseComparator(),
ucmp,
GetSliceHash);
ASSERT_OK(reader.status());
// Search for a key with colliding hash values.
@ -346,10 +332,11 @@ TEST(CuckooReaderTest, WhenKeyNotFound) {
AddHashLookups(not_found_user_key, 0, kNumHashFunc);
ParsedInternalKey ikey(not_found_user_key, 1000, kTypeValue);
AppendInternalKey(&not_found_key, ikey);
ValuesToAssert v("", "");
ASSERT_OK(reader.Get(
ReadOptions(), Slice(not_found_key), &v, AssertValues, nullptr));
ASSERT_EQ(0, v.call_count);
std::string value;
GetContext get_context(ucmp, nullptr, nullptr, nullptr, GetContext::kNotFound,
Slice(not_found_key), &value, nullptr, nullptr);
ASSERT_OK(reader.Get(ReadOptions(), Slice(not_found_key), &get_context));
ASSERT_TRUE(value.empty());
ASSERT_OK(reader.status());
// Search for a key with an independent hash value.
std::string not_found_user_key2 = "key" + NumToStr(num_items + 1);
@ -357,9 +344,11 @@ TEST(CuckooReaderTest, WhenKeyNotFound) {
ParsedInternalKey ikey2(not_found_user_key2, 1000, kTypeValue);
std::string not_found_key2;
AppendInternalKey(&not_found_key2, ikey2);
ASSERT_OK(reader.Get(
ReadOptions(), Slice(not_found_key2), &v, AssertValues, nullptr));
ASSERT_EQ(0, v.call_count);
GetContext get_context2(ucmp, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(not_found_key2), &value,
nullptr, nullptr);
ASSERT_OK(reader.Get(ReadOptions(), Slice(not_found_key2), &get_context2));
ASSERT_TRUE(value.empty());
ASSERT_OK(reader.status());
// Test read when key is unused key.
@ -369,34 +358,16 @@ TEST(CuckooReaderTest, WhenKeyNotFound) {
// Add hash values that map to empty buckets.
AddHashLookups(ExtractUserKey(unused_key).ToString(),
kNumHashFunc, kNumHashFunc);
ASSERT_OK(reader.Get(
ReadOptions(), Slice(unused_key), &v, AssertValues, nullptr));
ASSERT_EQ(0, v.call_count);
GetContext get_context3(ucmp, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(unused_key), &value,
nullptr, nullptr);
ASSERT_OK(reader.Get(ReadOptions(), Slice(unused_key), &get_context3));
ASSERT_TRUE(value.empty());
ASSERT_OK(reader.status());
}
// Performance tests
namespace {
int64_t found_count = 0;
std::string value;
bool DoNothing(void* arg, const ParsedInternalKey& k, const Slice& v) {
// Deliberately empty.
if (*reinterpret_cast<const int32_t*>(k.user_key.data()) ==
*reinterpret_cast<const int32_t*>(v.data())) {
++found_count;
value.assign(v.data(), v.size());
}
return false;
}
bool CheckValue(void* cnt_ptr, const ParsedInternalKey& k, const Slice& v) {
++*reinterpret_cast<int*>(cnt_ptr);
std::string expected_value;
AppendInternalKey(&expected_value, k);
ASSERT_EQ(0, v.compare(Slice(&expected_value[0], v.size())));
return false;
}
void GetKeys(uint64_t num, std::vector<std::string>* keys) {
keys->clear();
IterKey k;
@ -457,13 +428,15 @@ void WriteFile(const std::vector<std::string>& keys,
test::Uint64Comparator(), nullptr);
ASSERT_OK(reader.status());
ReadOptions r_options;
std::string value;
// Assume only the fast path is triggered
GetContext get_context(nullptr, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(), &value,
nullptr, nullptr);
for (uint64_t i = 0; i < num; ++i) {
int cnt = 0;
ASSERT_OK(reader.Get(r_options, Slice(keys[i]), &cnt, CheckValue, nullptr));
if (cnt != 1) {
fprintf(stderr, "%" PRIu64 " not found.\n", i);
ASSERT_EQ(1, cnt);
}
value.clear();
ASSERT_OK(reader.Get(r_options, Slice(keys[i]), &get_context));
ASSERT_TRUE(Slice(keys[i]) == Slice(&keys[i][0], 4));
}
}
@ -501,7 +474,11 @@ void ReadKeys(uint64_t num, uint32_t batch_size) {
}
std::random_shuffle(keys.begin(), keys.end());
found_count = 0;
std::string value;
// Assume only the fast path is triggered
GetContext get_context(nullptr, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(), &value,
nullptr, nullptr);
uint64_t start_time = env->NowMicros();
if (batch_size > 0) {
for (uint64_t i = 0; i < num; i += batch_size) {
@ -510,20 +487,19 @@ void ReadKeys(uint64_t num, uint32_t batch_size) {
}
for (uint64_t j = i; j < i+batch_size && j < num; ++j) {
reader.Get(r_options, Slice(reinterpret_cast<char*>(&keys[j]), 16),
nullptr, DoNothing, nullptr);
&get_context);
}
}
} else {
for (uint64_t i = 0; i < num; i++) {
reader.Get(r_options, Slice(reinterpret_cast<char*>(&keys[i]), 16),
nullptr, DoNothing, nullptr);
&get_context);
}
}
float time_per_op = (env->NowMicros() - start_time) * 1.0 / num;
fprintf(stderr,
"Time taken per op is %.3fus (%.1f Mqps) with batch size of %u, "
"# of found keys %" PRId64 "\n",
time_per_op, 1.0 / time_per_op, batch_size, found_count);
"Time taken per op is %.3fus (%.1f Mqps) with batch size of %u\n",
time_per_op, 1.0 / time_per_op, batch_size);
}
} // namespace.

101
table/get_context.cc Normal file
View File

@ -0,0 +1,101 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "table/get_context.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/statistics.h"
#include "util/statistics.h"
namespace rocksdb {
GetContext::GetContext(const Comparator* ucmp,
const MergeOperator* merge_operator,
Logger* logger, Statistics* statistics,
GetState init_state, const Slice& user_key, std::string* ret_value,
bool* value_found, MergeContext* merge_context)
: ucmp_(ucmp),
merge_operator_(merge_operator),
logger_(logger),
statistics_(statistics),
state_(init_state),
user_key_(user_key),
value_(ret_value),
value_found_(value_found),
merge_context_(merge_context) {
}
// Called from TableCache::Get and Table::Get when file/block in which
// key may exist are not there in TableCache/BlockCache respectively. In this
// case we can't guarantee that key does not exist and are not permitted to do
// IO to be certain.Set the status=kFound and value_found=false to let the
// caller know that key may exist but is not there in memory
void GetContext::MarkKeyMayExist() {
state_ = kFound;
if (value_found_ != nullptr) {
*value_found_ = false;
}
}
void GetContext::SaveValue(const Slice& value) {
state_ = kFound;
value_->assign(value.data(), value.size());
}
bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
const Slice& value) {
assert((state_ != kMerge && parsed_key.type != kTypeMerge) ||
merge_context_ != nullptr);
if (ucmp_->Compare(parsed_key.user_key, user_key_) == 0) {
// Key matches. Process it
switch (parsed_key.type) {
case kTypeValue:
assert(state_ == kNotFound || state_ == kMerge);
if (kNotFound == state_) {
state_ = kFound;
value_->assign(value.data(), value.size());
} else if (kMerge == state_) {
assert(merge_operator_ != nullptr);
state_ = kFound;
if (!merge_operator_->FullMerge(user_key_, &value,
merge_context_->GetOperands(),
value_, logger_)) {
RecordTick(statistics_, NUMBER_MERGE_FAILURES);
state_ = kCorrupt;
}
}
return false;
case kTypeDeletion:
assert(state_ == kNotFound || state_ == kMerge);
if (kNotFound == state_) {
state_ = kDeleted;
} else if (kMerge == state_) {
state_ = kFound;
if (!merge_operator_->FullMerge(user_key_, nullptr,
merge_context_->GetOperands(),
value_, logger_)) {
RecordTick(statistics_, NUMBER_MERGE_FAILURES);
state_ = kCorrupt;
}
}
return false;
case kTypeMerge:
assert(state_ == kNotFound || state_ == kMerge);
state_ = kMerge;
merge_context_->PushOperand(value);
return true;
default:
assert(false);
break;
}
}
// state_ could be Corrupt, merge or notfound
return false;
}
} // namespace rocksdb

47
table/get_context.h Normal file
View File

@ -0,0 +1,47 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <string>
#include "db/merge_context.h"
namespace rocksdb {
class MergeContext;
class GetContext {
public:
enum GetState {
kNotFound,
kFound,
kDeleted,
kCorrupt,
kMerge // saver contains the current merge result (the operands)
};
GetContext(const Comparator* ucmp, const MergeOperator* merge_operator,
Logger* logger, Statistics* statistics,
GetState init_state, const Slice& user_key, std::string* ret_value,
bool* value_found, MergeContext* merge_context);
void MarkKeyMayExist();
void SaveValue(const Slice& value);
bool SaveValue(const ParsedInternalKey& parsed_key, const Slice& value);
GetState State() const { return state_; }
private:
const Comparator* ucmp_;
const MergeOperator* merge_operator_;
// the merge operations encountered;
Logger* logger_;
Statistics* statistics_;
GetState state_;
Slice user_key_;
std::string* value_;
bool* value_found_; // Is value set correctly? Used by KeyMayExist
MergeContext* merge_context_;
};
} // namespace rocksdb

View File

@ -26,6 +26,7 @@
#include "table/two_level_iterator.h"
#include "table/plain_table_factory.h"
#include "table/plain_table_key_coding.h"
#include "table/get_context.h"
#include "util/arena.h"
#include "util/coding.h"
@ -525,10 +526,7 @@ void PlainTableReader::Prepare(const Slice& target) {
}
Status PlainTableReader::Get(const ReadOptions& ro, const Slice& target,
void* arg,
bool (*saver)(void*, const ParsedInternalKey&,
const Slice&),
void (*mark_key_may_exist)(void*)) {
GetContext* get_context) {
// Check bloom filter first.
Slice prefix_slice;
uint32_t prefix_hash;
@ -580,8 +578,10 @@ Status PlainTableReader::Get(const ReadOptions& ro, const Slice& target,
}
prefix_match = true;
}
// TODO(ljin): since we know the key comparison result here,
// can we enable the fast path?
if (internal_comparator_.Compare(found_key, parsed_target) >= 0) {
if (!(*saver)(arg, found_key, found_value)) {
if (!get_context->SaveValue(found_key, found_value)) {
break;
}
}

View File

@ -36,6 +36,7 @@ class TableCache;
class TableReader;
class InternalKeyComparator;
class PlainTableKeyDecoder;
class GetContext;
using std::unique_ptr;
using std::unordered_map;
@ -65,10 +66,8 @@ class PlainTableReader: public TableReader {
void Prepare(const Slice& target);
Status Get(const ReadOptions&, const Slice& key, void* arg,
bool (*result_handler)(void* arg, const ParsedInternalKey& k,
const Slice& v),
void (*mark_key_may_exist)(void*) = nullptr);
Status Get(const ReadOptions&, const Slice& key,
GetContext* get_context) override;
uint64_t ApproximateOffsetOf(const Slice& key);

View File

@ -18,6 +18,7 @@ class Slice;
class Arena;
struct ReadOptions;
struct TableProperties;
class GetContext;
// A Table is a sorted map from strings to strings. Tables are
// immutable and persistent. A Table may be safely accessed from
@ -55,23 +56,17 @@ class TableReader {
// Report an approximation of how much memory has been used.
virtual size_t ApproximateMemoryUsage() const = 0;
// Calls (*result_handler)(handle_context, ...) repeatedly, starting with
// the entry found after a call to Seek(key), until result_handler returns
// false, where k is the actual internal key for a row found and v as the
// value of the key. May not make such a call if filter policy says that key
// is not present.
// Calls get_context->SaveValue() repeatedly, starting with
// the entry found after a call to Seek(key), until it returns false.
// May not make such a call if filter policy says that key is not present.
//
// mark_key_may_exist_handler needs to be called when it is configured to be
// memory only and the key is not found in the block cache, with
// the parameter to be handle_context.
// get_context->MarkKeyMayExist needs to be called when it is configured to be
// memory only and the key is not found in the block cache.
//
// readOptions is the options for the read
// key is the key to search for
virtual Status Get(
const ReadOptions& readOptions, const Slice& key, void* handle_context,
bool (*result_handler)(void* arg, const ParsedInternalKey& k,
const Slice& v),
void (*mark_key_may_exist_handler)(void* handle_context) = nullptr) = 0;
virtual Status Get(const ReadOptions& readOptions, const Slice& key,
GetContext* get_context) = 0;
};
} // namespace rocksdb

View File

@ -22,6 +22,7 @@ int main() {
#include "table/block_based_table_factory.h"
#include "table/plain_table_factory.h"
#include "table/table_builder.h"
#include "table/get_context.h"
#include "util/histogram.h"
#include "util/testharness.h"
#include "util/testutil.h"
@ -48,11 +49,6 @@ static std::string MakeKey(int i, int j, bool through_db) {
return key.Encode().ToString();
}
static bool DummySaveValue(void* arg, const ParsedInternalKey& ikey,
const Slice& v) {
return false;
}
uint64_t Now(Env* env, bool measured_by_nanosecond) {
return measured_by_nanosecond ? env->NowNanos() : env->NowMicros();
}
@ -131,7 +127,6 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options,
std::string result;
HistogramImpl hist;
void* arg = nullptr;
for (int it = 0; it < num_iter; it++) {
for (int i = 0; i < num_keys1; i++) {
for (int j = 0; j < num_keys2; j++) {
@ -147,8 +142,13 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options,
std::string key = MakeKey(r1, r2, through_db);
uint64_t start_time = Now(env, measured_by_nanosecond);
if (!through_db) {
s = table_reader->Get(read_options, key, arg, DummySaveValue,
nullptr);
std::string value;
MergeContext merge_context;
GetContext get_context(ioptions.comparator, ioptions.merge_operator,
ioptions.info_log, ioptions.statistics,
GetContext::kNotFound, Slice(key), &value,
nullptr, &merge_context);
s = table_reader->Get(read_options, key, &get_context);
} else {
s = db->Get(read_options, key, &result);
}

View File

@ -37,6 +37,7 @@
#include "table/format.h"
#include "table/meta_blocks.h"
#include "table/plain_table_factory.h"
#include "table/get_context.h"
#include "util/random.h"
#include "util/statistics.h"
@ -1485,8 +1486,11 @@ TEST(BlockBasedTableTest, BlockCacheDisabledTest) {
}
{
GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(), nullptr,
nullptr, nullptr);
// a hack that just to trigger BlockBasedTable::GetFilter.
reader->Get(ReadOptions(), "non-exist-key", nullptr, nullptr, nullptr);
reader->Get(ReadOptions(), "non-exist-key", &get_context);
BlockCachePropertiesSnapshot props(options.statistics.get());
props.AssertIndexBlockStat(0, 0);
props.AssertFilterBlockStat(0, 0);

View File

@ -69,13 +69,29 @@ class BytewiseComparatorImpl : public Comparator {
// *key is a run of 0xffs. Leave it alone.
}
};
} // namespace
class ReverseBytewiseComparatorImpl : public BytewiseComparatorImpl {
public:
ReverseBytewiseComparatorImpl() { }
virtual const char* Name() const {
return "rocksdb.ReverseBytewiseComparator";
}
virtual int Compare(const Slice& a, const Slice& b) const {
return -a.compare(b);
}
};
}// namespace
static port::OnceType once = LEVELDB_ONCE_INIT;
static const Comparator* bytewise;
static const Comparator* rbytewise;
static void InitModule() {
bytewise = new BytewiseComparatorImpl;
rbytewise= new ReverseBytewiseComparatorImpl;
}
const Comparator* BytewiseComparator() {
@ -83,4 +99,9 @@ const Comparator* BytewiseComparator() {
return bytewise;
}
const Comparator* ReverseBytewiseComparator() {
port::InitOnce(&once, InitModule);
return rbytewise;
}
} // namespace rocksdb

View File

@ -7,13 +7,13 @@
#include "utilities/compacted_db/compacted_db_impl.h"
#include "db/db_impl.h"
#include "db/version_set.h"
#include "db/merge_context.h"
#include "table/get_context.h"
namespace rocksdb {
extern void MarkKeyMayExist(void* arg);
extern bool SaveValue(void* arg, const ParsedInternalKey& parsed_key,
const Slice& v);
const Slice& v, bool hit_and_return);
CompactedDBImpl::CompactedDBImpl(
const DBOptions& options, const std::string& dbname)
@ -44,25 +44,12 @@ size_t CompactedDBImpl::FindFile(const Slice& key) {
Status CompactedDBImpl::Get(const ReadOptions& options,
ColumnFamilyHandle*, const Slice& key, std::string* value) {
const FdWithKeyRange& f = files_.files[FindFile(key)];
bool value_found;
MergeContext merge_context;
Version::Saver saver;
saver.state = Version::kNotFound;
saver.ucmp = user_comparator_;
saver.user_key = key;
saver.value_found = &value_found;
saver.value = value;
saver.merge_operator = nullptr;
saver.merge_context = &merge_context;
saver.logger = info_log_;
saver.statistics = statistics_;
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
GetContext::kNotFound, key, value, nullptr, nullptr);
LookupKey lkey(key, kMaxSequenceNumber);
f.fd.table_reader->Get(options, lkey.internal_key(),
reinterpret_cast<void*>(&saver), SaveValue,
MarkKeyMayExist);
if (saver.state == Version::kFound) {
files_.files[FindFile(key)].fd.table_reader->Get(
options, lkey.internal_key(), &get_context);
if (get_context.State() == GetContext::kFound) {
return Status::OK();
}
return Status::NotFound();
@ -84,26 +71,15 @@ std::vector<Status> CompactedDBImpl::MultiGet(const ReadOptions& options,
}
std::vector<Status> statuses(keys.size(), Status::NotFound());
values->resize(keys.size());
bool value_found;
MergeContext merge_context;
Version::Saver saver;
saver.ucmp = user_comparator_;
saver.value_found = &value_found;
saver.merge_operator = nullptr;
saver.merge_context = &merge_context;
saver.logger = info_log_;
saver.statistics = statistics_;
int idx = 0;
for (auto* r : reader_list) {
if (r != nullptr) {
saver.state = Version::kNotFound;
saver.user_key = keys[idx];
saver.value = &(*values)[idx];
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
GetContext::kNotFound, keys[idx], &(*values)[idx],
nullptr, nullptr);
LookupKey lkey(keys[idx], kMaxSequenceNumber);
r->Get(options, lkey.internal_key(),
reinterpret_cast<void*>(&saver), SaveValue,
MarkKeyMayExist);
if (saver.state == Version::kFound) {
r->Get(options, lkey.internal_key(), &get_context);
if (get_context.State() == GetContext::kFound) {
statuses[idx] = Status::OK();
}
}
@ -128,8 +104,6 @@ Status CompactedDBImpl::Init(const Options& options) {
}
version_ = cfd_->GetSuperVersion()->current;
user_comparator_ = cfd_->user_comparator();
statistics_ = cfd_->ioptions()->statistics;
info_log_ = cfd_->ioptions()->info_log;
// L0 should not have files
if (version_->file_levels_[0].num_files > 1) {
return Status::NotSupported("L0 contain more than 1 file");
@ -170,8 +144,10 @@ Status CompactedDBImpl::Open(const Options& options,
std::unique_ptr<CompactedDBImpl> db(new CompactedDBImpl(db_options, dbname));
Status s = db->Init(options);
if (s.ok()) {
Log(INFO_LEVEL, db->db_options_.info_log,
"Opened the db as fully compacted mode");
LogFlush(db->db_options_.info_log);
*dbptr = db.release();
Log(options.info_log, "Opened the db as fully compacted mode");
}
return s;
}

View File

@ -88,9 +88,6 @@ class CompactedDBImpl : public DBImpl {
const Comparator* user_comparator_;
FileLevel files_;
Statistics* statistics_;
Logger* info_log_;
// No copying allowed
CompactedDBImpl(const CompactedDBImpl&);
void operator=(const CompactedDBImpl&);