Merge branch 'master' into columnfamilies

This commit is contained in:
Igor Canadi 2014-03-04 09:39:14 -08:00
commit fa34697237
10 changed files with 323 additions and 17 deletions

View File

@ -10,7 +10,7 @@ INSTALL_PATH ?= $(CURDIR)
ifneq ($(MAKECMDGOALS),dbg)
OPT += -O2 -fno-omit-frame-pointer -momit-leaf-frame-pointer
else
OPT += -fno-omit-frame-pointer -momit-leaf-frame-pointer
# intentionally left blank
endif
ifeq ($(MAKECMDGOALS),shared_lib)
@ -55,6 +55,7 @@ VALGRIND_OPTS = --error-exitcode=$(VALGRIND_ERROR) --leak-check=full
TESTS = \
db_test \
block_hash_index_test \
autovector_test \
column_family_test \
table_properties_collector_test \
@ -228,6 +229,9 @@ $(LIBRARY): $(LIBOBJECTS)
db_bench: db/db_bench.o $(LIBOBJECTS) $(TESTUTIL)
$(CXX) db/db_bench.o $(LIBOBJECTS) $(TESTUTIL) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
block_hash_index_test: table/block_hash_index_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) table/block_hash_index_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
db_stress: tools/db_stress.o $(LIBOBJECTS) $(TESTUTIL)
$(CXX) tools/db_stress.o $(LIBOBJECTS) $(TESTUTIL) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)

View File

@ -26,15 +26,6 @@
#include "util/statistics.h"
#include "util/stop_watch.h"
namespace std {
template <>
struct hash<rocksdb::Slice> {
size_t operator()(const rocksdb::Slice& slice) const {
return MurmurHash(slice.data(), slice.size(), 0);
}
};
}
namespace rocksdb {
MemTable::MemTable(const InternalKeyComparator& cmp,
@ -167,7 +158,8 @@ Iterator* MemTable::NewIterator(const ReadOptions& options) {
}
port::RWMutex* MemTable::GetLock(const Slice& key) {
return &locks_[std::hash<Slice>()(key) % locks_.size()];
static murmur_hash hash;
return &locks_[hash(key) % locks_.size()];
}
void MemTable::Add(SequenceNumber s, ValueType type,

112
table/block_hash_index.cc Normal file
View File

@ -0,0 +1,112 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include <algorithm>
#include "table/block_hash_index.h"
#include "rocksdb/comparator.h"
#include "rocksdb/iterator.h"
#include "rocksdb/slice_transform.h"
namespace rocksdb {
BlockHashIndex* CreateBlockHashIndex(Iterator* index_iter, Iterator* data_iter,
const uint32_t num_restarts,
const Comparator* comparator,
const SliceTransform* hash_key_extractor) {
assert(hash_key_extractor);
auto hash_index = new BlockHashIndex(hash_key_extractor);
uint64_t current_restart_index = 0;
std::string pending_entry_prefix;
// pending_block_num == 0 also implies there is no entry inserted at all.
uint32_t pending_block_num = 0;
uint32_t pending_entry_index = 0;
// scan all the entries and create a hash index based on their prefixes.
data_iter->SeekToFirst();
for (index_iter->SeekToFirst();
index_iter->Valid() && current_restart_index < num_restarts;
index_iter->Next()) {
Slice last_key_in_block = index_iter->key();
assert(data_iter->Valid() && data_iter->status().ok());
// scan through all entries within a data block.
while (data_iter->Valid() &&
comparator->Compare(data_iter->key(), last_key_in_block) <= 0) {
auto key_prefix = hash_key_extractor->Transform(data_iter->key());
bool is_first_entry = pending_block_num == 0;
// Keys may share the prefix
if (is_first_entry || pending_entry_prefix != key_prefix) {
if (!is_first_entry) {
bool succeeded = hash_index->Add(
pending_entry_prefix, pending_entry_index, pending_block_num);
if (!succeeded) {
delete hash_index;
return nullptr;
}
}
// update the status.
// needs a hard copy otherwise the underlying data changes all the time.
pending_entry_prefix = key_prefix.ToString();
pending_block_num = 1;
pending_entry_index = current_restart_index;
} else {
// entry number increments when keys share the prefix reside in
// differnt data blocks.
auto last_restart_index = pending_entry_index + pending_block_num - 1;
assert(last_restart_index <= current_restart_index);
if (last_restart_index != current_restart_index) {
++pending_block_num;
}
}
data_iter->Next();
}
++current_restart_index;
}
// make sure all entries has been scaned.
assert(!index_iter->Valid());
assert(!data_iter->Valid());
if (pending_block_num > 0) {
auto succeeded = hash_index->Add(pending_entry_prefix, pending_entry_index,
pending_block_num);
if (!succeeded) {
delete hash_index;
return nullptr;
}
}
return hash_index;
}
bool BlockHashIndex::Add(const Slice& prefix, uint32_t restart_index,
uint32_t num_blocks) {
auto prefix_ptr = arena_.Allocate(prefix.size());
std::copy(prefix.data() /* begin */, prefix.data() + prefix.size() /* end */,
prefix_ptr /* destination */);
auto result =
restart_indices_.insert({Slice(prefix_ptr, prefix.size()),
RestartIndex(restart_index, num_blocks)});
return result.second;
}
const BlockHashIndex::RestartIndex* BlockHashIndex::GetRestartIndex(
const Slice& key) {
auto key_prefix = hash_key_extractor_->Transform(key);
auto pos = restart_indices_.find(key_prefix);
if (pos == restart_indices_.end()) {
return nullptr;
}
return &pos->second;
}
} // namespace rocksdb

72
table/block_hash_index.h Normal file
View File

@ -0,0 +1,72 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <string>
#include <unordered_map>
#include "util/arena.h"
#include "util/murmurhash.h"
namespace rocksdb {
class Comparator;
class Iterator;
class Slice;
class SliceTransform;
// Build a hash-based index to speed up the lookup for "index block".
// BlockHashIndex accepts a key and, if found, returns its restart index within
// that index block.
class BlockHashIndex {
public:
// Represents a restart index in the index block's restart array.
struct RestartIndex {
explicit RestartIndex(uint32_t first_index, uint32_t num_blocks = 1)
: first_index(first_index), num_blocks(num_blocks) {}
// For a given prefix, what is the restart index for the first data block
// that contains it.
uint32_t first_index = 0;
// How many data blocks contains this prefix?
uint32_t num_blocks = 1;
};
explicit BlockHashIndex(const SliceTransform* hash_key_extractor)
: hash_key_extractor_(hash_key_extractor) {}
// Maps a key to its restart first_index.
// Returns nullptr if the restart first_index is found
const RestartIndex* GetRestartIndex(const Slice& key);
bool Add(const Slice& key_prefix, uint32_t restart_index,
uint32_t num_blocks);
size_t ApproximateMemoryUsage() const {
return arena_.ApproximateMemoryUsage();
}
private:
const SliceTransform* hash_key_extractor_;
std::unordered_map<Slice, RestartIndex, murmur_hash> restart_indices_;
Arena arena_;
};
// Create hash index by scanning the entries in index as well as the whole
// dataset.
// @params index_iter: an iterator with the pointer to the first entry in a
// block.
// @params data_iter: an iterator that can scan all the entries reside in a
// table.
// @params num_restarts: used for correctness verification.
// @params hash_key_extractor: extract the hashable part of a given key.
// On error, nullptr will be returned.
BlockHashIndex* CreateBlockHashIndex(Iterator* index_iter, Iterator* data_iter,
const uint32_t num_restarts,
const Comparator* comparator,
const SliceTransform* hash_key_extractor);
} // namespace rocksdb

View File

@ -0,0 +1,117 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include <map>
#include <memory>
#include <vector>
#include "rocksdb/comparator.h"
#include "rocksdb/iterator.h"
#include "rocksdb/slice_transform.h"
#include "table/block_hash_index.h"
#include "util/testharness.h"
#include "util/testutil.h"
namespace rocksdb {
typedef std::map<std::string, std::string> Data;
class MapIterator : public Iterator {
public:
explicit MapIterator(const Data& data) : data_(data), pos_(data_.end()) {}
virtual bool Valid() const { return pos_ != data_.end(); }
virtual void SeekToFirst() { pos_ = data_.begin(); }
virtual void SeekToLast() {
pos_ = data_.end();
--pos_;
}
virtual void Seek(const Slice& target) {
pos_ = data_.find(target.ToString());
}
virtual void Next() { ++pos_; }
virtual void Prev() { --pos_; }
virtual Slice key() const { return pos_->first; }
virtual Slice value() const { return pos_->second; }
virtual Status status() const { return Status::OK(); }
private:
const Data& data_;
Data::const_iterator pos_;
};
class BlockTest {};
TEST(BlockTest, BasicTest) {
const size_t keys_per_block = 4;
const size_t prefix_size = 2;
std::vector<std::string> keys = {/* block 1 */
"0101", "0102", "0103", "0201",
/* block 2 */
"0202", "0203", "0301", "0401",
/* block 3 */
"0501", "0601", "0701", "0801",
/* block 4 */
"0802", "0803", "0804", "0805",
/* block 5 */
"0806", "0807", "0808", "0809", };
Data data_entries;
for (const auto key : keys) {
data_entries.insert({key, key});
}
Data index_entries;
for (size_t i = 3; i < keys.size(); i += keys_per_block) {
// simply ignore the value part
index_entries.insert({keys[i], ""});
}
MapIterator data_iter(data_entries);
MapIterator index_iter(index_entries);
auto prefix_extractor = NewFixedPrefixTransform(prefix_size);
std::unique_ptr<BlockHashIndex> block_hash_index(
CreateBlockHashIndex(&index_iter, &data_iter, index_entries.size(),
BytewiseComparator(), prefix_extractor));
std::map<std::string, BlockHashIndex::RestartIndex> expected = {
{"01xx", BlockHashIndex::RestartIndex(0, 1)},
{"02yy", BlockHashIndex::RestartIndex(0, 2)},
{"03zz", BlockHashIndex::RestartIndex(1, 1)},
{"04pp", BlockHashIndex::RestartIndex(1, 1)},
{"05ww", BlockHashIndex::RestartIndex(2, 1)},
{"06xx", BlockHashIndex::RestartIndex(2, 1)},
{"07pp", BlockHashIndex::RestartIndex(2, 1)},
{"08xz", BlockHashIndex::RestartIndex(2, 3)}, };
const BlockHashIndex::RestartIndex* index = nullptr;
// search existed prefixes
for (const auto& item : expected) {
index = block_hash_index->GetRestartIndex(item.first);
ASSERT_TRUE(index != nullptr);
ASSERT_EQ(item.second.first_index, index->first_index);
ASSERT_EQ(item.second.num_blocks, index->num_blocks);
}
// search non exist prefixes
ASSERT_TRUE(!block_hash_index->GetRestartIndex("00xx"));
ASSERT_TRUE(!block_hash_index->GetRestartIndex("10yy"));
ASSERT_TRUE(!block_hash_index->GetRestartIndex("20zz"));
delete prefix_extractor;
}
} // namespace rocksdb
int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); }

View File

@ -122,6 +122,7 @@ class MergingIterator : public Iterator {
// one, or null if there is no first child.
current_ = first_child;
}
direction_ = kForward;
}
virtual void Next() {
@ -228,6 +229,8 @@ class MergingIterator : public Iterator {
// If the value is true, both of iterators in the heap and current_
// contain valid rows. If it is false, only current_ can possibly contain
// valid rows.
// This flag is always true for reverse direction, as we always use heap for
// the reverse iterating case.
bool use_heap_;
Env* const env_;
// Which direction is the iterator moving?

View File

@ -961,7 +961,6 @@ class BlockBasedTableTest : public TableTest {};
class PlainTableTest : public TableTest {};
class TablePropertyTest {};
/*
// This test serves as the living tutorial for the prefix scan of user collected
// properties.
TEST(TablePropertyTest, PrefixScanTest) {
@ -1123,7 +1122,6 @@ TEST(BlockBasedTableTest, NumBlockStat) {
ASSERT_EQ(kvmap.size(),
c.table_reader()->GetTableProperties()->num_data_blocks);
}
*/
// A simple tool that takes the snapshot of block cache statistics.
class BlockCachePropertiesSnapshot {

View File

@ -39,12 +39,12 @@ class Arena {
// Returns an estimate of the total memory usage of data allocated
// by the arena (exclude the space allocated but not yet used for future
// allocations).
const size_t ApproximateMemoryUsage() {
size_t ApproximateMemoryUsage() const {
return blocks_memory_ + blocks_.capacity() * sizeof(char*) -
alloc_bytes_remaining_;
}
const size_t MemoryAllocatedBytes() { return blocks_memory_; }
size_t MemoryAllocatedBytes() const { return blocks_memory_; }
private:
// Number of bytes allocated in one block

View File

@ -46,5 +46,4 @@ uint32_t Hash(const char* data, size_t n, uint32_t seed) {
return h;
}
} // namespace rocksdb

View File

@ -11,6 +11,7 @@
*/
#pragma once
#include <stdint.h>
#include "rocksdb/slice.h"
#if defined(__x86_64__)
#define MURMUR_HASH MurmurHash64A
@ -29,5 +30,13 @@ typedef unsigned int murmur_t;
unsigned int MurmurHashNeutral2 ( const void * key, int len, unsigned int seed );
#define MurmurHash MurmurHashNeutral2
typedef unsigned int murmur_t;
#endif
// Allow slice to be hashable by murmur hash.
namespace rocksdb {
struct murmur_hash {
size_t operator()(const Slice& slice) const {
return MurmurHash(slice.data(), slice.size(), 0);
}
};
} // rocksdb