Compare commits
20 Commits
main
...
2.8.fb.tru
Author | SHA1 | Date | |
---|---|---|---|
|
fcfb666d17 | ||
|
808928fc99 | ||
|
eb96dc003a | ||
|
8db376d494 | ||
|
73895c9478 | ||
|
7b37f0d5af | ||
|
8cd08fdca4 | ||
|
ce068c09dd | ||
|
4b7b1949d4 | ||
|
9e04ce7645 | ||
|
13dc9c7f56 | ||
|
034b494774 | ||
|
d705755e52 | ||
|
26f36347d0 | ||
|
0f7daf5fb4 | ||
|
22f396798e | ||
|
258eac1772 | ||
|
c80c3f3b05 | ||
|
14534f0d7b | ||
|
b405cb886b |
@ -2540,9 +2540,9 @@ Status DBImpl::ProcessKeyValueCompaction(
|
||||
Status status;
|
||||
std::string compaction_filter_value;
|
||||
ParsedInternalKey ikey;
|
||||
std::string current_user_key;
|
||||
IterKey current_user_key;
|
||||
bool has_current_user_key = false;
|
||||
std::vector<char> delete_key; // for compaction filter
|
||||
IterKey delete_key;
|
||||
SequenceNumber last_sequence_for_key __attribute__((unused)) =
|
||||
kMaxSequenceNumber;
|
||||
SequenceNumber visible_in_snapshot = kMaxSequenceNumber;
|
||||
@ -2612,16 +2612,16 @@ Status DBImpl::ProcessKeyValueCompaction(
|
||||
// Do not hide error keys
|
||||
// TODO: error key stays in db forever? Figure out the intention/rationale
|
||||
// v10 error v8 : we cannot hide v8 even though it's pretty obvious.
|
||||
current_user_key.clear();
|
||||
current_user_key.Clear();
|
||||
has_current_user_key = false;
|
||||
last_sequence_for_key = kMaxSequenceNumber;
|
||||
visible_in_snapshot = kMaxSequenceNumber;
|
||||
} else {
|
||||
if (!has_current_user_key ||
|
||||
user_comparator()->Compare(ikey.user_key,
|
||||
Slice(current_user_key)) != 0) {
|
||||
current_user_key.GetKey()) != 0) {
|
||||
// First occurrence of this user key
|
||||
current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
|
||||
current_user_key.SetUserKey(ikey.user_key);
|
||||
has_current_user_key = true;
|
||||
last_sequence_for_key = kMaxSequenceNumber;
|
||||
visible_in_snapshot = kMaxSequenceNumber;
|
||||
@ -2642,13 +2642,11 @@ Status DBImpl::ProcessKeyValueCompaction(
|
||||
&compaction_filter_value,
|
||||
&value_changed);
|
||||
if (to_delete) {
|
||||
// make a copy of the original key
|
||||
delete_key.assign(key.data(), key.data() + key.size());
|
||||
// convert it to a delete
|
||||
UpdateInternalKey(&delete_key[0], delete_key.size(),
|
||||
ikey.sequence, kTypeDeletion);
|
||||
// make a copy of the original key and convert it to a delete
|
||||
delete_key.SetInternalKey(ExtractUserKey(key), ikey.sequence,
|
||||
kTypeDeletion);
|
||||
// anchor the key again
|
||||
key = Slice(&delete_key[0], delete_key.size());
|
||||
key = delete_key.GetKey();
|
||||
// needed because ikey is backed by key
|
||||
ParseInternalKey(key, &ikey);
|
||||
// no value associated with delete
|
||||
@ -3455,7 +3453,7 @@ Status DBImpl::GetImpl(const ReadOptions& options,
|
||||
StartPerfTimer(&from_files_timer);
|
||||
|
||||
sv->current->Get(options, lkey, value, &s, &merge_context, &stats,
|
||||
options_, value_found);
|
||||
value_found);
|
||||
have_stat_update = true;
|
||||
BumpPerfTime(&perf_context.get_from_output_files_time, &from_files_timer);
|
||||
RecordTick(options_.statistics.get(), MEMTABLE_MISS);
|
||||
@ -3559,7 +3557,7 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
|
||||
// Done
|
||||
} else {
|
||||
get_version->current->Get(options, lkey, value, &s, &merge_context,
|
||||
&stats, options_);
|
||||
&stats);
|
||||
have_stat_update = true;
|
||||
}
|
||||
|
||||
|
@ -64,7 +64,7 @@ Status DBImplReadOnly::Get(const ReadOptions& options,
|
||||
} else {
|
||||
Version::GetStats stats;
|
||||
super_version->current->Get(options, lkey, value, &s, &merge_context,
|
||||
&stats, options_);
|
||||
&stats);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
@ -39,71 +39,6 @@ static void DumpInternalIter(Iterator* iter) {
|
||||
|
||||
namespace {
|
||||
|
||||
class IterLookupKey {
|
||||
public:
|
||||
IterLookupKey() : key_(space_), buf_size_(sizeof(space_)), key_size_(0) {}
|
||||
|
||||
~IterLookupKey() { Clear(); }
|
||||
|
||||
Slice GetKey() const {
|
||||
if (key_ != nullptr) {
|
||||
return Slice(key_, key_size_);
|
||||
} else {
|
||||
return Slice();
|
||||
}
|
||||
}
|
||||
|
||||
bool Valid() const { return key_ != nullptr; }
|
||||
|
||||
void Clear() {
|
||||
if (key_ != nullptr && key_ != space_) {
|
||||
delete[] key_;
|
||||
}
|
||||
key_ = space_;
|
||||
buf_size_ = sizeof(buf_size_);
|
||||
}
|
||||
|
||||
// Enlarge the buffer size if needed based on key_size.
|
||||
// By default, static allocated buffer is used. Once there is a key
|
||||
// larger than the static allocated buffer, another buffer is dynamically
|
||||
// allocated, until a larger key buffer is requested. In that case, we
|
||||
// reallocate buffer and delete the old one.
|
||||
void EnlargeBufferIfNeeded(size_t key_size) {
|
||||
// If size is smaller than buffer size, continue using current buffer,
|
||||
// or the static allocated one, as default
|
||||
if (key_size > buf_size_) {
|
||||
// Need to enlarge the buffer.
|
||||
Clear();
|
||||
key_ = new char[key_size];
|
||||
buf_size_ = key_size;
|
||||
}
|
||||
key_size_ = key_size;
|
||||
}
|
||||
|
||||
void SetUserKey(const Slice& user_key) {
|
||||
size_t size = user_key.size();
|
||||
EnlargeBufferIfNeeded(size);
|
||||
memcpy(key_, user_key.data(), size);
|
||||
}
|
||||
|
||||
void SetInternalKey(const Slice& user_key, SequenceNumber s) {
|
||||
size_t usize = user_key.size();
|
||||
EnlargeBufferIfNeeded(usize + sizeof(uint64_t));
|
||||
memcpy(key_, user_key.data(), usize);
|
||||
EncodeFixed64(key_ + usize, PackSequenceAndType(s, kValueTypeForSeek));
|
||||
}
|
||||
|
||||
private:
|
||||
char* key_;
|
||||
size_t buf_size_;
|
||||
size_t key_size_;
|
||||
char space_[32]; // Avoid allocation for short keys
|
||||
|
||||
// No copying allowed
|
||||
IterLookupKey(const IterLookupKey&) = delete;
|
||||
void operator=(const LookupKey&) = delete;
|
||||
};
|
||||
|
||||
// Memtables and sstables that make the DB representation contain
|
||||
// (userkey,seq,type) => uservalue entries. DBIter
|
||||
// combines multiple entries for the same userkey found in the DB
|
||||
@ -191,7 +126,7 @@ class DBIter: public Iterator {
|
||||
SequenceNumber const sequence_;
|
||||
|
||||
Status status_;
|
||||
IterLookupKey saved_key_; // == current key when direction_==kReverse
|
||||
IterKey saved_key_; // == current key when direction_==kReverse
|
||||
std::string saved_value_; // == current raw value when direction_==kReverse
|
||||
std::string skip_key_;
|
||||
Direction direction_;
|
||||
|
@ -266,6 +266,9 @@ class DBTest {
|
||||
// Sequence of option configurations to try
|
||||
enum OptionConfig {
|
||||
kDefault,
|
||||
kBlockBasedTableWithPrefixHashIndex,
|
||||
// TODO(kailiu) figure this out
|
||||
// kBlockBasedTableWithWholeKeyHashIndex,
|
||||
kPlainTableFirstBytePrefix,
|
||||
kPlainTableAllBytesPrefix,
|
||||
kVectorRep,
|
||||
@ -302,7 +305,8 @@ class DBTest {
|
||||
kSkipDeletesFilterFirst = 1,
|
||||
kSkipUniversalCompaction = 2,
|
||||
kSkipMergePut = 4,
|
||||
kSkipPlainTable = 8
|
||||
kSkipPlainTable = 8,
|
||||
kSkipHashIndex = 16
|
||||
};
|
||||
|
||||
DBTest() : option_config_(kDefault),
|
||||
@ -343,6 +347,11 @@ class DBTest {
|
||||
|| option_config_ == kPlainTableFirstBytePrefix)) {
|
||||
continue;
|
||||
}
|
||||
if ((skip_mask & kSkipPlainTable) &&
|
||||
option_config_ == kBlockBasedTableWithPrefixHashIndex) {
|
||||
continue;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
@ -429,7 +438,7 @@ class DBTest {
|
||||
break;
|
||||
case kHashLinkList:
|
||||
options.prefix_extractor.reset(NewFixedPrefixTransform(1));
|
||||
options.memtable_factory.reset(NewHashLinkListRepFactory(4));
|
||||
options.memtable_factory.reset(NewHashLinkListRepFactory(4, 0));
|
||||
break;
|
||||
case kUniversalCompaction:
|
||||
options.compaction_style = kCompactionStyleUniversal;
|
||||
@ -441,6 +450,21 @@ class DBTest {
|
||||
case kInfiniteMaxOpenFiles:
|
||||
options.max_open_files = -1;
|
||||
break;
|
||||
case kBlockBasedTableWithPrefixHashIndex: {
|
||||
BlockBasedTableOptions table_options;
|
||||
table_options.index_type = BlockBasedTableOptions::kHashSearch;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
options.prefix_extractor.reset(NewFixedPrefixTransform(1));
|
||||
break;
|
||||
}
|
||||
// TODO(kailiu) figure out why it's failing and fix
|
||||
// case kBlockBasedTableWithWholeKeyHashIndex: {
|
||||
// BlockBasedTableOptions table_options;
|
||||
// table_options.index_type = BlockBasedTableOptions::kHashSearch;
|
||||
// options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
// options.prefix_extractor.reset(NewNoopTransform());
|
||||
// break;
|
||||
// }
|
||||
default:
|
||||
break;
|
||||
}
|
||||
@ -885,13 +909,29 @@ TEST(DBTest, Empty) {
|
||||
options.write_buffer_size = 100000; // Small write buffer
|
||||
Reopen(&options);
|
||||
|
||||
std::string num;
|
||||
ASSERT_TRUE(dbfull()->GetProperty(
|
||||
"rocksdb.num-entries-active-mem-table", &num));
|
||||
ASSERT_EQ("0", num);
|
||||
|
||||
ASSERT_OK(Put("foo", "v1"));
|
||||
ASSERT_EQ("v1", Get("foo"));
|
||||
ASSERT_TRUE(dbfull()->GetProperty(
|
||||
"rocksdb.num-entries-active-mem-table", &num));
|
||||
ASSERT_EQ("1", num);
|
||||
|
||||
env_->delay_sstable_sync_.Release_Store(env_); // Block sync calls
|
||||
Put("k1", std::string(100000, 'x')); // Fill memtable
|
||||
ASSERT_TRUE(dbfull()->GetProperty(
|
||||
"rocksdb.num-entries-active-mem-table", &num));
|
||||
ASSERT_EQ("2", num);
|
||||
|
||||
Put("k2", std::string(100000, 'y')); // Trigger compaction
|
||||
ASSERT_EQ("v1", Get("foo"));
|
||||
ASSERT_TRUE(dbfull()->GetProperty(
|
||||
"rocksdb.num-entries-active-mem-table", &num));
|
||||
ASSERT_EQ("1", num);
|
||||
|
||||
env_->delay_sstable_sync_.Release_Store(nullptr); // Release sync calls
|
||||
} while (ChangeOptions());
|
||||
}
|
||||
@ -1251,7 +1291,7 @@ TEST(DBTest, KeyMayExist) {
|
||||
|
||||
// KeyMayExist function only checks data in block caches, which is not used
|
||||
// by plain table format.
|
||||
} while (ChangeOptions(kSkipPlainTable));
|
||||
} while (ChangeOptions(kSkipPlainTable | kSkipHashIndex));
|
||||
}
|
||||
|
||||
TEST(DBTest, NonBlockingIteration) {
|
||||
@ -2064,6 +2104,9 @@ TEST(DBTest, NumImmutableMemTable) {
|
||||
ASSERT_OK(dbfull()->Put(writeOpt, "k1", big_value));
|
||||
ASSERT_TRUE(dbfull()->GetProperty("rocksdb.num-immutable-mem-table", &num));
|
||||
ASSERT_EQ(num, "0");
|
||||
ASSERT_TRUE(dbfull()->GetProperty(
|
||||
"rocksdb.num-entries-active-mem-table", &num));
|
||||
ASSERT_EQ(num, "1");
|
||||
perf_context.Reset();
|
||||
Get("k1");
|
||||
ASSERT_EQ(1, (int) perf_context.get_from_memtable_count);
|
||||
@ -2071,6 +2114,13 @@ TEST(DBTest, NumImmutableMemTable) {
|
||||
ASSERT_OK(dbfull()->Put(writeOpt, "k2", big_value));
|
||||
ASSERT_TRUE(dbfull()->GetProperty("rocksdb.num-immutable-mem-table", &num));
|
||||
ASSERT_EQ(num, "1");
|
||||
ASSERT_TRUE(dbfull()->GetProperty(
|
||||
"rocksdb.num-entries-active-mem-table", &num));
|
||||
ASSERT_EQ(num, "1");
|
||||
ASSERT_TRUE(dbfull()->GetProperty(
|
||||
"rocksdb.num-entries-imm-mem-tables", &num));
|
||||
ASSERT_EQ(num, "1");
|
||||
|
||||
perf_context.Reset();
|
||||
Get("k1");
|
||||
ASSERT_EQ(2, (int) perf_context.get_from_memtable_count);
|
||||
@ -2083,6 +2133,12 @@ TEST(DBTest, NumImmutableMemTable) {
|
||||
&num));
|
||||
ASSERT_TRUE(dbfull()->GetProperty("rocksdb.num-immutable-mem-table", &num));
|
||||
ASSERT_EQ(num, "2");
|
||||
ASSERT_TRUE(dbfull()->GetProperty(
|
||||
"rocksdb.num-entries-active-mem-table", &num));
|
||||
ASSERT_EQ(num, "1");
|
||||
ASSERT_TRUE(dbfull()->GetProperty(
|
||||
"rocksdb.num-entries-imm-mem-tables", &num));
|
||||
ASSERT_EQ(num, "2");
|
||||
perf_context.Reset();
|
||||
Get("k2");
|
||||
ASSERT_EQ(2, (int) perf_context.get_from_memtable_count);
|
||||
@ -5882,7 +5938,8 @@ TEST(DBTest, Randomized) {
|
||||
int minimum = 0;
|
||||
if (option_config_ == kHashSkipList ||
|
||||
option_config_ == kHashLinkList ||
|
||||
option_config_ == kPlainTableFirstBytePrefix) {
|
||||
option_config_ == kPlainTableFirstBytePrefix ||
|
||||
option_config_ == kBlockBasedTableWithPrefixHashIndex) {
|
||||
minimum = 1;
|
||||
}
|
||||
if (p < 45) { // Put
|
||||
@ -5924,6 +5981,7 @@ TEST(DBTest, Randomized) {
|
||||
if ((step % 100) == 0) {
|
||||
ASSERT_TRUE(CompareIterators(step, &model, db_, nullptr, nullptr));
|
||||
ASSERT_TRUE(CompareIterators(step, &model, db_, model_snap, db_snap));
|
||||
|
||||
// Save a snapshot from each DB this time that we'll use next
|
||||
// time we compare things, to make sure the current state is
|
||||
// preserved with the snapshot
|
||||
|
@ -13,6 +13,7 @@
|
||||
#include "rocksdb/db.h"
|
||||
#include "rocksdb/filter_policy.h"
|
||||
#include "rocksdb/slice.h"
|
||||
#include "rocksdb/slice_transform.h"
|
||||
#include "rocksdb/table.h"
|
||||
#include "rocksdb/types.h"
|
||||
#include "util/coding.h"
|
||||
@ -235,4 +236,100 @@ inline LookupKey::~LookupKey() {
|
||||
if (start_ != space_) delete[] start_;
|
||||
}
|
||||
|
||||
class IterKey {
|
||||
public:
|
||||
IterKey() : key_(space_), buf_size_(sizeof(space_)), key_size_(0) {}
|
||||
|
||||
~IterKey() { ResetBuffer(); }
|
||||
|
||||
Slice GetKey() const { return Slice(key_, key_size_); }
|
||||
|
||||
void Clear() { key_size_ = 0; }
|
||||
|
||||
void SetUserKey(const Slice& user_key) {
|
||||
size_t size = user_key.size();
|
||||
EnlargeBufferIfNeeded(size);
|
||||
memcpy(key_, user_key.data(), size);
|
||||
key_size_ = size;
|
||||
}
|
||||
|
||||
void SetInternalKey(const Slice& user_key, SequenceNumber s,
|
||||
ValueType value_type = kValueTypeForSeek) {
|
||||
size_t usize = user_key.size();
|
||||
EnlargeBufferIfNeeded(usize + sizeof(uint64_t));
|
||||
memcpy(key_, user_key.data(), usize);
|
||||
EncodeFixed64(key_ + usize, PackSequenceAndType(s, value_type));
|
||||
key_size_ = usize + sizeof(uint64_t);
|
||||
}
|
||||
|
||||
void SetInternalKey(const ParsedInternalKey& parsed_key) {
|
||||
SetInternalKey(parsed_key.user_key, parsed_key.sequence, parsed_key.type);
|
||||
}
|
||||
|
||||
private:
|
||||
char* key_;
|
||||
size_t buf_size_;
|
||||
size_t key_size_;
|
||||
char space_[32]; // Avoid allocation for short keys
|
||||
|
||||
void ResetBuffer() {
|
||||
if (key_ != nullptr && key_ != space_) {
|
||||
delete[] key_;
|
||||
}
|
||||
key_ = space_;
|
||||
buf_size_ = sizeof(space_);
|
||||
key_size_ = 0;
|
||||
}
|
||||
|
||||
// Enlarge the buffer size if needed based on key_size.
|
||||
// By default, static allocated buffer is used. Once there is a key
|
||||
// larger than the static allocated buffer, another buffer is dynamically
|
||||
// allocated, until a larger key buffer is requested. In that case, we
|
||||
// reallocate buffer and delete the old one.
|
||||
void EnlargeBufferIfNeeded(size_t key_size) {
|
||||
// If size is smaller than buffer size, continue using current buffer,
|
||||
// or the static allocated one, as default
|
||||
if (key_size > buf_size_) {
|
||||
// Need to enlarge the buffer.
|
||||
ResetBuffer();
|
||||
key_ = new char[key_size];
|
||||
buf_size_ = key_size;
|
||||
}
|
||||
}
|
||||
|
||||
// No copying allowed
|
||||
IterKey(const IterKey&) = delete;
|
||||
void operator=(const IterKey&) = delete;
|
||||
};
|
||||
|
||||
class InternalKeySliceTransform : public SliceTransform {
|
||||
public:
|
||||
explicit InternalKeySliceTransform(const SliceTransform* transform)
|
||||
: transform_(transform) {}
|
||||
|
||||
virtual const char* Name() const { return transform_->Name(); }
|
||||
|
||||
virtual Slice Transform(const Slice& src) const {
|
||||
auto user_key = ExtractUserKey(src);
|
||||
return transform_->Transform(user_key);
|
||||
}
|
||||
|
||||
virtual bool InDomain(const Slice& src) const {
|
||||
auto user_key = ExtractUserKey(src);
|
||||
return transform_->InDomain(user_key);
|
||||
}
|
||||
|
||||
virtual bool InRange(const Slice& dst) const {
|
||||
auto user_key = ExtractUserKey(dst);
|
||||
return transform_->InRange(user_key);
|
||||
}
|
||||
|
||||
const SliceTransform* user_prefix_extractor() const { return transform_; }
|
||||
|
||||
private:
|
||||
// Like comparator, InternalKeySliceTransform will not take care of the
|
||||
// deletion of transform_
|
||||
const SliceTransform* const transform_;
|
||||
};
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -38,6 +38,10 @@ DBPropertyType GetPropertyType(const Slice& property) {
|
||||
return kBackgroundErrors;
|
||||
} else if (in == "cur-size-active-mem-table") {
|
||||
return kCurSizeActiveMemTable;
|
||||
} else if (in == "num-entries-active-mem-table") {
|
||||
return kNumEntriesInMutableMemtable;
|
||||
} else if (in == "num-entries-imm-mem-tables") {
|
||||
return kNumEntriesInImmutableMemtable;
|
||||
}
|
||||
return kUnknown;
|
||||
}
|
||||
@ -47,7 +51,7 @@ bool InternalStats::GetProperty(DBPropertyType property_type,
|
||||
DBImpl* db) {
|
||||
VersionSet* version_set = db->versions_.get();
|
||||
Version* current = version_set->current();
|
||||
const MemTableList& imm = db->imm_;
|
||||
MemTableList& imm = db->imm_;
|
||||
Slice in = property;
|
||||
|
||||
switch (property_type) {
|
||||
@ -353,6 +357,14 @@ bool InternalStats::GetProperty(DBPropertyType property_type,
|
||||
// Current size of the active memtable
|
||||
*value = std::to_string(db->mem_->ApproximateMemoryUsage());
|
||||
return true;
|
||||
case kNumEntriesInMutableMemtable:
|
||||
// Current size of the active memtable
|
||||
*value = std::to_string(db->mem_->GetNumEntries());
|
||||
return true;
|
||||
case kNumEntriesInImmutableMemtable:
|
||||
// Current size of the active memtable
|
||||
*value = std::to_string(imm.current()->GetTotalNumEntries());
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
|
@ -28,11 +28,14 @@ enum DBPropertyType {
|
||||
kSsTables, // Return a human readable string of current SST files
|
||||
kNumImmutableMemTable, // Return number of immutable mem tables
|
||||
kMemtableFlushPending, // Return 1 if mem table flushing is pending,
|
||||
// otherwise
|
||||
// 0.
|
||||
// otherwise 0.
|
||||
kCompactionPending, // Return 1 if a compaction is pending. Otherwise 0.
|
||||
kBackgroundErrors, // Return accumulated background errors encountered.
|
||||
kCurSizeActiveMemTable, // Return current size of the active memtable
|
||||
kNumEntriesInMutableMemtable, // Return number of entries in the mutable
|
||||
// memtable.
|
||||
kNumEntriesInImmutableMemtable, // Return sum of number of entries in all
|
||||
// the immutable mem tables.
|
||||
kUnknown,
|
||||
};
|
||||
|
||||
|
@ -37,6 +37,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options)
|
||||
arena_(options.arena_block_size),
|
||||
table_(options.memtable_factory->CreateMemTableRep(
|
||||
comparator_, &arena_, options.prefix_extractor.get())),
|
||||
num_entries_(0),
|
||||
flush_in_progress_(false),
|
||||
flush_completed_(false),
|
||||
file_number_(0),
|
||||
@ -51,9 +52,10 @@ MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options)
|
||||
// gone wrong already.
|
||||
assert(!should_flush_);
|
||||
if (prefix_extractor_ && options.memtable_prefix_bloom_bits > 0) {
|
||||
prefix_bloom_.reset(new DynamicBloom(options.memtable_prefix_bloom_bits,
|
||||
options.bloom_locality,
|
||||
options.memtable_prefix_bloom_probes));
|
||||
prefix_bloom_.reset(new DynamicBloom(
|
||||
options.memtable_prefix_bloom_bits, options.bloom_locality,
|
||||
options.memtable_prefix_bloom_probes, nullptr,
|
||||
options.memtable_prefix_bloom_huge_page_tlb_size));
|
||||
}
|
||||
}
|
||||
|
||||
@ -260,6 +262,7 @@ void MemTable::Add(SequenceNumber s, ValueType type,
|
||||
memcpy(p, value.data(), val_size);
|
||||
assert((unsigned)(p + val_size - buf) == (unsigned)encoded_len);
|
||||
table_->Insert(handle);
|
||||
num_entries_++;
|
||||
|
||||
if (prefix_bloom_) {
|
||||
assert(prefix_extractor_);
|
||||
|
@ -132,6 +132,9 @@ class MemTable {
|
||||
// key in the memtable.
|
||||
size_t CountSuccessiveMergeEntries(const LookupKey& key);
|
||||
|
||||
// Get total number of entries in the mem table.
|
||||
uint64_t GetNumEntries() const { return num_entries_; }
|
||||
|
||||
// Returns the edits area that is needed for flushing the memtable
|
||||
VersionEdit* GetEdits() { return &edit_; }
|
||||
|
||||
@ -182,6 +185,8 @@ class MemTable {
|
||||
Arena arena_;
|
||||
unique_ptr<MemTableRep> table_;
|
||||
|
||||
uint64_t num_entries_;
|
||||
|
||||
// These are used to manage memtable flushes to storage
|
||||
bool flush_in_progress_; // started the flush
|
||||
bool flush_completed_; // finished the flush
|
||||
|
@ -77,6 +77,14 @@ void MemTableListVersion::AddIterators(const ReadOptions& options,
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t MemTableListVersion::GetTotalNumEntries() const {
|
||||
uint64_t total_num = 0;
|
||||
for (auto& m : memlist_) {
|
||||
total_num += m->GetNumEntries();
|
||||
}
|
||||
return total_num;
|
||||
}
|
||||
|
||||
// caller is responsible for referencing m
|
||||
void MemTableListVersion::Add(MemTable* m) {
|
||||
assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable
|
||||
|
@ -43,6 +43,8 @@ class MemTableListVersion {
|
||||
void AddIterators(const ReadOptions& options,
|
||||
std::vector<Iterator*>* iterator_list);
|
||||
|
||||
uint64_t GetTotalNumEntries() const;
|
||||
|
||||
private:
|
||||
// REQUIRE: m is mutable memtable
|
||||
void Add(MemTable* m);
|
||||
|
@ -188,9 +188,9 @@ class TestPlainTableReader : public PlainTableReader {
|
||||
const Options& options, bool* expect_bloom_not_match)
|
||||
: PlainTableReader(options, std::move(file), storage_options, icomparator,
|
||||
file_size, bloom_bits_per_key, hash_table_ratio,
|
||||
index_sparseness, table_properties),
|
||||
index_sparseness, table_properties, 2 * 1024 * 1024),
|
||||
expect_bloom_not_match_(expect_bloom_not_match) {
|
||||
Status s = PopulateIndex();
|
||||
Status s = PopulateIndex(const_cast<TableProperties*>(table_properties));
|
||||
ASSERT_TRUE(s.ok());
|
||||
}
|
||||
|
||||
@ -209,13 +209,12 @@ extern const uint64_t kPlainTableMagicNumber;
|
||||
class TestPlainTableFactory : public PlainTableFactory {
|
||||
public:
|
||||
explicit TestPlainTableFactory(bool* expect_bloom_not_match,
|
||||
uint32_t user_key_len =
|
||||
kPlainTableVariableLength,
|
||||
int bloom_bits_per_key = 0,
|
||||
double hash_table_ratio = 0.75,
|
||||
size_t index_sparseness = 16)
|
||||
uint32_t user_key_len, int bloom_bits_per_key,
|
||||
double hash_table_ratio,
|
||||
size_t index_sparseness,
|
||||
size_t huge_page_tlb_size)
|
||||
: PlainTableFactory(user_key_len, user_key_len, hash_table_ratio,
|
||||
hash_table_ratio),
|
||||
index_sparseness, huge_page_tlb_size),
|
||||
bloom_bits_per_key_(bloom_bits_per_key),
|
||||
hash_table_ratio_(hash_table_ratio),
|
||||
index_sparseness_(index_sparseness),
|
||||
@ -247,6 +246,8 @@ class TestPlainTableFactory : public PlainTableFactory {
|
||||
};
|
||||
|
||||
TEST(PlainTableDBTest, Flush) {
|
||||
for (size_t huge_page_tlb_size = 0; huge_page_tlb_size <= 2 * 1024 * 1024;
|
||||
huge_page_tlb_size += 2 * 1024 * 1024) {
|
||||
for (int bloom_bits = 0; bloom_bits <= 117; bloom_bits += 117) {
|
||||
for (int total_order = 0; total_order <= 1; total_order++) {
|
||||
Options options = CurrentOptions();
|
||||
@ -254,10 +255,11 @@ TEST(PlainTableDBTest, Flush) {
|
||||
// Set only one bucket to force bucket conflict.
|
||||
// Test index interval for the same prefix to be 1, 2 and 4
|
||||
if (total_order) {
|
||||
options.table_factory.reset(
|
||||
NewTotalOrderPlainTableFactory(16, bloom_bits, 2));
|
||||
options.table_factory.reset(NewTotalOrderPlainTableFactory(
|
||||
16, bloom_bits, 2, huge_page_tlb_size));
|
||||
} else {
|
||||
options.table_factory.reset(NewPlainTableFactory(16, bloom_bits));
|
||||
options.table_factory.reset(NewPlainTableFactory(
|
||||
16, bloom_bits, 0.75, 16, huge_page_tlb_size));
|
||||
}
|
||||
DestroyAndReopen(&options);
|
||||
|
||||
@ -265,13 +267,27 @@ TEST(PlainTableDBTest, Flush) {
|
||||
ASSERT_OK(Put("0000000000000bar", "v2"));
|
||||
ASSERT_OK(Put("1000000000000foo", "v3"));
|
||||
dbfull()->TEST_FlushMemTable();
|
||||
|
||||
TablePropertiesCollection ptc;
|
||||
reinterpret_cast<DB*>(dbfull())->GetPropertiesOfAllTables(&ptc);
|
||||
ASSERT_EQ(1U, ptc.size());
|
||||
auto row = ptc.begin();
|
||||
auto tp = row->second;
|
||||
ASSERT_EQ(total_order ? "4" : "12", (tp->user_collected_properties).at(
|
||||
"plain_table_hash_table_size"));
|
||||
ASSERT_EQ(total_order ? "9" : "0", (tp->user_collected_properties).at(
|
||||
"plain_table_sub_index_size"));
|
||||
|
||||
ASSERT_EQ("v3", Get("1000000000000foo"));
|
||||
ASSERT_EQ("v2", Get("0000000000000bar"));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST(PlainTableDBTest, Flush2) {
|
||||
for (size_t huge_page_tlb_size = 0; huge_page_tlb_size <= 2 * 1024 * 1024;
|
||||
huge_page_tlb_size += 2 * 1024 * 1024) {
|
||||
for (int bloom_bits = 0; bloom_bits <= 117; bloom_bits += 117) {
|
||||
for (int total_order = 0; total_order <= 1; total_order++) {
|
||||
bool expect_bloom_not_match = false;
|
||||
@ -281,11 +297,13 @@ TEST(PlainTableDBTest, Flush2) {
|
||||
// Test index interval for the same prefix to be 1, 2 and 4
|
||||
if (total_order) {
|
||||
options.prefix_extractor = nullptr;
|
||||
options.table_factory.reset(new TestPlainTableFactory(
|
||||
&expect_bloom_not_match, 16, bloom_bits, 0, 2));
|
||||
options.table_factory.reset(
|
||||
new TestPlainTableFactory(&expect_bloom_not_match, 16, bloom_bits,
|
||||
0, 2, huge_page_tlb_size));
|
||||
} else {
|
||||
options.table_factory.reset(
|
||||
new TestPlainTableFactory(&expect_bloom_not_match, 16, bloom_bits));
|
||||
new TestPlainTableFactory(&expect_bloom_not_match, 16, bloom_bits,
|
||||
0.75, 16, huge_page_tlb_size));
|
||||
}
|
||||
DestroyAndReopen(&options);
|
||||
ASSERT_OK(Put("0000000000000bar", "b"));
|
||||
@ -325,8 +343,11 @@ TEST(PlainTableDBTest, Flush2) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST(PlainTableDBTest, Iterator) {
|
||||
for (size_t huge_page_tlb_size = 0; huge_page_tlb_size <= 2 * 1024 * 1024;
|
||||
huge_page_tlb_size += 2 * 1024 * 1024) {
|
||||
for (int bloom_bits = 0; bloom_bits <= 117; bloom_bits += 117) {
|
||||
for (int total_order = 0; total_order <= 1; total_order++) {
|
||||
bool expect_bloom_not_match = false;
|
||||
@ -337,10 +358,11 @@ TEST(PlainTableDBTest, Iterator) {
|
||||
if (total_order) {
|
||||
options.prefix_extractor = nullptr;
|
||||
options.table_factory.reset(new TestPlainTableFactory(
|
||||
&expect_bloom_not_match, 16, bloom_bits, 0, 2));
|
||||
&expect_bloom_not_match, 16, bloom_bits, 0, 2, huge_page_tlb_size));
|
||||
} else {
|
||||
options.table_factory.reset(
|
||||
new TestPlainTableFactory(&expect_bloom_not_match, 16, bloom_bits));
|
||||
new TestPlainTableFactory(&expect_bloom_not_match, 16, bloom_bits,
|
||||
0.75, 16, huge_page_tlb_size));
|
||||
}
|
||||
DestroyAndReopen(&options);
|
||||
|
||||
@ -428,6 +450,49 @@ TEST(PlainTableDBTest, Iterator) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::string MakeLongKey(size_t length, char c) {
|
||||
return std::string(length, c);
|
||||
}
|
||||
|
||||
TEST(PlainTableDBTest, IteratorLargeKeys) {
|
||||
Options options = CurrentOptions();
|
||||
options.table_factory.reset(NewTotalOrderPlainTableFactory(0, 0, 16));
|
||||
options.create_if_missing = true;
|
||||
options.prefix_extractor.reset();
|
||||
DestroyAndReopen(&options);
|
||||
|
||||
std::string key_list[] = {
|
||||
MakeLongKey(30, '0'),
|
||||
MakeLongKey(16, '1'),
|
||||
MakeLongKey(32, '2'),
|
||||
MakeLongKey(60, '3'),
|
||||
MakeLongKey(90, '4'),
|
||||
MakeLongKey(50, '5'),
|
||||
MakeLongKey(26, '6')
|
||||
};
|
||||
|
||||
for (size_t i = 0; i < 7; i++) {
|
||||
ASSERT_OK(Put(key_list[i], std::to_string(i)));
|
||||
}
|
||||
|
||||
dbfull()->TEST_FlushMemTable();
|
||||
|
||||
Iterator* iter = dbfull()->NewIterator(ro_);
|
||||
iter->Seek(key_list[0]);
|
||||
|
||||
for (size_t i = 0; i < 7; i++) {
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_EQ(key_list[i], iter->key().ToString());
|
||||
ASSERT_EQ(std::to_string(i), iter->value().ToString());
|
||||
iter->Next();
|
||||
}
|
||||
|
||||
ASSERT_TRUE(!iter->Valid());
|
||||
|
||||
delete iter;
|
||||
}
|
||||
|
||||
// A test comparator which compare two strings in this way:
|
||||
// (1) first compare prefix of 8 bytes in alphabet order,
|
||||
@ -527,12 +592,15 @@ TEST(PlainTableDBTest, IteratorReverseSuffixComparator) {
|
||||
}
|
||||
|
||||
TEST(PlainTableDBTest, HashBucketConflict) {
|
||||
for (size_t huge_page_tlb_size = 0; huge_page_tlb_size <= 2 * 1024 * 1024;
|
||||
huge_page_tlb_size += 2 * 1024 * 1024) {
|
||||
for (unsigned char i = 1; i <= 3; i++) {
|
||||
Options options = CurrentOptions();
|
||||
options.create_if_missing = true;
|
||||
// Set only one bucket to force bucket conflict.
|
||||
// Test index interval for the same prefix to be 1, 2 and 4
|
||||
options.table_factory.reset(NewTotalOrderPlainTableFactory(16, 0, 2 ^ i));
|
||||
options.table_factory.reset(
|
||||
NewTotalOrderPlainTableFactory(16, 0, 2 ^ i, huge_page_tlb_size));
|
||||
DestroyAndReopen(&options);
|
||||
ASSERT_OK(Put("5000000000000fo0", "v1"));
|
||||
ASSERT_OK(Put("5000000000000fo1", "v2"));
|
||||
@ -606,8 +674,11 @@ TEST(PlainTableDBTest, HashBucketConflict) {
|
||||
delete iter;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST(PlainTableDBTest, HashBucketConflictReverseSuffixComparator) {
|
||||
for (size_t huge_page_tlb_size = 0; huge_page_tlb_size <= 2 * 1024 * 1024;
|
||||
huge_page_tlb_size += 2 * 1024 * 1024) {
|
||||
for (unsigned char i = 1; i <= 3; i++) {
|
||||
Options options = CurrentOptions();
|
||||
options.create_if_missing = true;
|
||||
@ -615,7 +686,8 @@ TEST(PlainTableDBTest, HashBucketConflictReverseSuffixComparator) {
|
||||
options.comparator = ∁
|
||||
// Set only one bucket to force bucket conflict.
|
||||
// Test index interval for the same prefix to be 1, 2 and 4
|
||||
options.table_factory.reset(NewTotalOrderPlainTableFactory(16, 0, 2 ^ i));
|
||||
options.table_factory.reset(
|
||||
NewTotalOrderPlainTableFactory(16, 0, 2 ^ i, huge_page_tlb_size));
|
||||
DestroyAndReopen(&options);
|
||||
ASSERT_OK(Put("5000000000000fo0", "v1"));
|
||||
ASSERT_OK(Put("5000000000000fo1", "v2"));
|
||||
@ -688,6 +760,7 @@ TEST(PlainTableDBTest, HashBucketConflictReverseSuffixComparator) {
|
||||
delete iter;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST(PlainTableDBTest, NonExistingKeyToNonEmptyBucket) {
|
||||
Options options = CurrentOptions();
|
||||
|
@ -31,6 +31,7 @@ DEFINE_int64(min_write_buffer_number_to_merge, 1, "");
|
||||
DEFINE_int32(skiplist_height, 4, "");
|
||||
DEFINE_int32(memtable_prefix_bloom_bits, 10000000, "");
|
||||
DEFINE_int32(memtable_prefix_bloom_probes, 10, "");
|
||||
DEFINE_int32(memtable_prefix_bloom_huge_page_tlb_size, 2 * 1024 * 1024, "");
|
||||
DEFINE_int32(value_size, 40, "");
|
||||
|
||||
// Path to the database on file system
|
||||
@ -147,6 +148,8 @@ class PrefixTest {
|
||||
|
||||
options.memtable_prefix_bloom_bits = FLAGS_memtable_prefix_bloom_bits;
|
||||
options.memtable_prefix_bloom_probes = FLAGS_memtable_prefix_bloom_probes;
|
||||
options.memtable_prefix_bloom_huge_page_tlb_size =
|
||||
FLAGS_memtable_prefix_bloom_huge_page_tlb_size;
|
||||
|
||||
Status s = DB::Open(options, kDbName, &db);
|
||||
ASSERT_OK(s);
|
||||
@ -171,6 +174,10 @@ class PrefixTest {
|
||||
options.memtable_factory.reset(
|
||||
NewHashLinkListRepFactory(bucket_count));
|
||||
return true;
|
||||
case kHashLinkListHugePageTlb:
|
||||
options.memtable_factory.reset(
|
||||
NewHashLinkListRepFactory(bucket_count, 2 * 1024 * 1024));
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
@ -189,6 +196,7 @@ class PrefixTest {
|
||||
kBegin,
|
||||
kHashSkipList,
|
||||
kHashLinkList,
|
||||
kHashLinkListHugePageTlb,
|
||||
kEnd
|
||||
};
|
||||
int option_config_;
|
||||
|
@ -111,19 +111,20 @@ Iterator* TableCache::NewIterator(const ReadOptions& options,
|
||||
if (table_reader_ptr != nullptr) {
|
||||
*table_reader_ptr = nullptr;
|
||||
}
|
||||
Cache::Handle* handle = file_meta.table_reader_handle;
|
||||
TableReader* table_reader = file_meta.table_reader;
|
||||
Cache::Handle* handle = nullptr;
|
||||
Status s;
|
||||
if (!handle) {
|
||||
if (table_reader == nullptr) {
|
||||
s = FindTable(toptions, icomparator, file_meta.number, file_meta.file_size,
|
||||
&handle, nullptr, options.read_tier == kBlockCacheTier);
|
||||
}
|
||||
if (!s.ok()) {
|
||||
return NewErrorIterator(s);
|
||||
}
|
||||
table_reader = GetTableReaderFromHandle(handle);
|
||||
}
|
||||
|
||||
TableReader* table_reader = GetTableReaderFromHandle(handle);
|
||||
Iterator* result = table_reader->NewIterator(options);
|
||||
if (!file_meta.table_reader_handle) {
|
||||
if (handle != nullptr) {
|
||||
result->RegisterCleanup(&UnrefEntry, cache_.get(), handle);
|
||||
}
|
||||
if (table_reader_ptr != nullptr) {
|
||||
@ -143,17 +144,20 @@ Status TableCache::Get(const ReadOptions& options,
|
||||
bool (*saver)(void*, const ParsedInternalKey&,
|
||||
const Slice&, bool),
|
||||
bool* table_io, void (*mark_key_may_exist)(void*)) {
|
||||
Cache::Handle* handle = file_meta.table_reader_handle;
|
||||
TableReader* t = file_meta.table_reader;
|
||||
Status s;
|
||||
if (!handle) {
|
||||
Cache::Handle* handle = nullptr;
|
||||
if (!t) {
|
||||
s = FindTable(storage_options_, internal_comparator, file_meta.number,
|
||||
file_meta.file_size, &handle, table_io,
|
||||
options.read_tier == kBlockCacheTier);
|
||||
if (s.ok()) {
|
||||
t = GetTableReaderFromHandle(handle);
|
||||
}
|
||||
}
|
||||
if (s.ok()) {
|
||||
TableReader* t = GetTableReaderFromHandle(handle);
|
||||
s = t->Get(options, k, arg, saver, mark_key_may_exist);
|
||||
if (!file_meta.table_reader_handle) {
|
||||
if (handle != nullptr) {
|
||||
ReleaseHandle(handle);
|
||||
}
|
||||
} else if (options.read_tier && s.IsIncomplete()) {
|
||||
@ -169,15 +173,16 @@ Status TableCache::GetTableProperties(
|
||||
const FileMetaData& file_meta,
|
||||
std::shared_ptr<const TableProperties>* properties, bool no_io) {
|
||||
Status s;
|
||||
auto table_handle = file_meta.table_reader_handle;
|
||||
auto table_reader = file_meta.table_reader;
|
||||
// table already been pre-loaded?
|
||||
if (table_handle) {
|
||||
auto table = GetTableReaderFromHandle(table_handle);
|
||||
*properties = table->GetTableProperties();
|
||||
if (table_reader) {
|
||||
*properties = table_reader->GetTableProperties();
|
||||
|
||||
return s;
|
||||
}
|
||||
|
||||
bool table_io;
|
||||
Cache::Handle* table_handle = nullptr;
|
||||
s = FindTable(toptions, internal_comparator, file_meta.number,
|
||||
file_meta.file_size, &table_handle, &table_io, no_io);
|
||||
if (!s.ok()) {
|
||||
@ -195,20 +200,21 @@ bool TableCache::PrefixMayMatch(const ReadOptions& options,
|
||||
const FileMetaData& file_meta,
|
||||
const Slice& internal_prefix, bool* table_io) {
|
||||
bool may_match = true;
|
||||
auto table_handle = file_meta.table_reader_handle;
|
||||
if (table_handle == nullptr) {
|
||||
auto table_reader = file_meta.table_reader;
|
||||
Cache::Handle* table_handle = nullptr;
|
||||
if (table_reader == nullptr) {
|
||||
// Need to get table handle from file number
|
||||
Status s = FindTable(storage_options_, icomparator, file_meta.number,
|
||||
file_meta.file_size, &table_handle, table_io);
|
||||
if (!s.ok()) {
|
||||
return may_match;
|
||||
}
|
||||
table_reader = GetTableReaderFromHandle(table_handle);
|
||||
}
|
||||
|
||||
auto table = GetTableReaderFromHandle(table_handle);
|
||||
may_match = table->PrefixMayMatch(internal_prefix);
|
||||
may_match = table_reader->PrefixMayMatch(internal_prefix);
|
||||
|
||||
if (file_meta.table_reader_handle == nullptr) {
|
||||
if (table_handle != nullptr) {
|
||||
// Need to release handle if it is generated from here.
|
||||
ReleaseHandle(table_handle);
|
||||
}
|
||||
|
@ -31,10 +31,13 @@ struct FileMetaData {
|
||||
|
||||
// Needs to be disposed when refs becomes 0.
|
||||
Cache::Handle* table_reader_handle;
|
||||
// Table reader in table_reader_handle
|
||||
TableReader* table_reader;
|
||||
|
||||
FileMetaData(uint64_t number, uint64_t file_size) :
|
||||
refs(0), allowed_seeks(1 << 30), number(number), file_size(file_size),
|
||||
being_compacted(false), table_reader_handle(nullptr) {
|
||||
being_compacted(false), table_reader_handle(nullptr),
|
||||
table_reader(nullptr) {
|
||||
}
|
||||
FileMetaData() : FileMetaData(0, 0) { }
|
||||
};
|
||||
|
@ -148,7 +148,7 @@ namespace {
|
||||
struct EncodedFileMetaData {
|
||||
uint64_t number; // file number
|
||||
uint64_t file_size; // file size
|
||||
Cache::Handle* table_reader_handle; // cached table reader's handler
|
||||
TableReader* table_reader; // cached table reader
|
||||
};
|
||||
} // namespace
|
||||
|
||||
@ -196,7 +196,7 @@ class Version::LevelFileNumIterator : public Iterator {
|
||||
auto* file_meta = (*flist_)[index_];
|
||||
current_value_.number = file_meta->number;
|
||||
current_value_.file_size = file_meta->file_size;
|
||||
current_value_.table_reader_handle = file_meta->table_reader_handle;
|
||||
current_value_.table_reader = file_meta->table_reader;
|
||||
return Slice(reinterpret_cast<const char*>(¤t_value_),
|
||||
sizeof(EncodedFileMetaData));
|
||||
}
|
||||
@ -228,7 +228,7 @@ static Iterator* GetFileIterator(void* arg, const ReadOptions& options,
|
||||
const EncodedFileMetaData* encoded_meta =
|
||||
reinterpret_cast<const EncodedFileMetaData*>(file_value.data());
|
||||
FileMetaData meta(encoded_meta->number, encoded_meta->file_size);
|
||||
meta.table_reader_handle = encoded_meta->table_reader_handle;
|
||||
meta.table_reader = encoded_meta->table_reader;
|
||||
return cache->NewIterator(
|
||||
options.prefix ? options_copy : options, soptions, icomparator, meta,
|
||||
nullptr /* don't need reference to table*/, for_compaction);
|
||||
@ -254,7 +254,7 @@ bool Version::PrefixMayMatch(const ReadOptions& options,
|
||||
reinterpret_cast<const EncodedFileMetaData*>(
|
||||
level_iter->value().data());
|
||||
FileMetaData meta(encoded_meta->number, encoded_meta->file_size);
|
||||
meta.table_reader_handle = encoded_meta->table_reader_handle;
|
||||
meta.table_reader = encoded_meta->table_reader;
|
||||
may_match = vset_->table_cache_->PrefixMayMatch(options, vset_->icmp_, meta,
|
||||
internal_prefix, nullptr);
|
||||
}
|
||||
@ -478,6 +478,12 @@ bool BySmallestKey(FileMetaData* a, FileMetaData* b,
|
||||
|
||||
Version::Version(VersionSet* vset, uint64_t version_number)
|
||||
: vset_(vset),
|
||||
internal_comparator_(&(vset->icmp_)),
|
||||
user_comparator_(internal_comparator_->user_comparator()),
|
||||
table_cache_(vset->table_cache_),
|
||||
merge_operator_(vset->options_->merge_operator.get()),
|
||||
info_log_(vset->options_->info_log.get()),
|
||||
db_statistics_(vset->options_->statistics.get()),
|
||||
next_(this),
|
||||
prev_(this),
|
||||
refs_(0),
|
||||
@ -497,27 +503,22 @@ void Version::Get(const ReadOptions& options,
|
||||
Status* status,
|
||||
MergeContext* merge_context,
|
||||
GetStats* stats,
|
||||
const Options& db_options,
|
||||
bool* value_found) {
|
||||
Slice ikey = k.internal_key();
|
||||
Slice user_key = k.user_key();
|
||||
const Comparator* ucmp = vset_->icmp_.user_comparator();
|
||||
|
||||
auto merge_operator = db_options.merge_operator.get();
|
||||
auto logger = db_options.info_log.get();
|
||||
|
||||
assert(status->ok() || status->IsMergeInProgress());
|
||||
Saver saver;
|
||||
saver.state = status->ok()? kNotFound : kMerge;
|
||||
saver.ucmp = ucmp;
|
||||
saver.ucmp = user_comparator_;
|
||||
saver.user_key = user_key;
|
||||
saver.value_found = value_found;
|
||||
saver.value = value;
|
||||
saver.merge_operator = merge_operator;
|
||||
saver.merge_operator = merge_operator_;
|
||||
saver.merge_context = merge_context;
|
||||
saver.logger = logger;
|
||||
saver.logger = info_log_;
|
||||
saver.didIO = false;
|
||||
saver.statistics = db_options.statistics.get();
|
||||
saver.statistics = db_statistics_;
|
||||
|
||||
stats->seek_file = nullptr;
|
||||
stats->seek_file_level = -1;
|
||||
@ -548,7 +549,7 @@ void Version::Get(const ReadOptions& options,
|
||||
// On Level-n (n>=1), files are sorted.
|
||||
// Binary search to find earliest index whose largest key >= ikey.
|
||||
// We will also stop when the file no longer overlaps ikey
|
||||
start_index = FindFile(vset_->icmp_, files_[level], ikey);
|
||||
start_index = FindFile(*internal_comparator_, files_[level], ikey);
|
||||
}
|
||||
|
||||
// Traverse each relevant file to find the desired key
|
||||
@ -557,8 +558,10 @@ void Version::Get(const ReadOptions& options,
|
||||
#endif
|
||||
for (uint32_t i = start_index; i < num_files; ++i) {
|
||||
FileMetaData* f = files[i];
|
||||
if (ucmp->Compare(user_key, f->smallest.user_key()) < 0 ||
|
||||
ucmp->Compare(user_key, f->largest.user_key()) > 0) {
|
||||
// Skip key range filtering for levle 0 if there are few level 0 files.
|
||||
if ((level > 0 || num_files > 2) &&
|
||||
(user_comparator_->Compare(user_key, f->smallest.user_key()) < 0 ||
|
||||
user_comparator_->Compare(user_key, f->largest.user_key()) > 0)) {
|
||||
// Only process overlapping files.
|
||||
if (level > 0) {
|
||||
// If on Level-n (n>=1) then the files are sorted.
|
||||
@ -574,7 +577,8 @@ void Version::Get(const ReadOptions& options,
|
||||
// Sanity check to make sure that the files are correctly sorted
|
||||
if (prev_file) {
|
||||
if (level != 0) {
|
||||
int comp_sign = vset_->icmp_.Compare(prev_file->largest, f->smallest);
|
||||
int comp_sign =
|
||||
internal_comparator_->Compare(prev_file->largest, f->smallest);
|
||||
assert(comp_sign < 0);
|
||||
} else {
|
||||
// level == 0, the current file cannot be newer than the previous one.
|
||||
@ -588,9 +592,8 @@ void Version::Get(const ReadOptions& options,
|
||||
prev_file = f;
|
||||
#endif
|
||||
bool tableIO = false;
|
||||
*status =
|
||||
vset_->table_cache_->Get(options, vset_->icmp_, *f, ikey, &saver,
|
||||
SaveValue, &tableIO, MarkKeyMayExist);
|
||||
*status = table_cache_->Get(options, *internal_comparator_, *f, ikey,
|
||||
&saver, SaveValue, &tableIO, MarkKeyMayExist);
|
||||
// TODO: examine the behavior for corrupted key
|
||||
if (!status->ok()) {
|
||||
return;
|
||||
@ -635,12 +638,12 @@ void Version::Get(const ReadOptions& options,
|
||||
if (kMerge == saver.state) {
|
||||
// merge_operands are in saver and we hit the beginning of the key history
|
||||
// do a final merge of nullptr and operands;
|
||||
if (merge_operator->FullMerge(user_key, nullptr,
|
||||
saver.merge_context->GetOperands(),
|
||||
value, logger)) {
|
||||
if (merge_operator_->FullMerge(user_key, nullptr,
|
||||
saver.merge_context->GetOperands(), value,
|
||||
info_log_)) {
|
||||
*status = Status::OK();
|
||||
} else {
|
||||
RecordTick(db_options.statistics.get(), NUMBER_MERGE_FAILURES);
|
||||
RecordTick(db_statistics_, NUMBER_MERGE_FAILURES);
|
||||
*status = Status::Corruption("could not perform end-of-key merge for ",
|
||||
user_key);
|
||||
}
|
||||
@ -1447,6 +1450,12 @@ class VersionSet::Builder {
|
||||
file_meta->number, file_meta->file_size,
|
||||
&file_meta->table_reader_handle,
|
||||
&table_io, false);
|
||||
if (file_meta->table_reader_handle != nullptr) {
|
||||
// Load table_reader
|
||||
file_meta->table_reader =
|
||||
vset_->table_cache_->GetTableReaderFromHandle(
|
||||
file_meta->table_reader_handle);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -83,8 +83,7 @@ class Version {
|
||||
int seek_file_level;
|
||||
};
|
||||
void Get(const ReadOptions&, const LookupKey& key, std::string* val,
|
||||
Status* status, MergeContext* merge_context,
|
||||
GetStats* stats, const Options& db_option,
|
||||
Status* status, MergeContext* merge_context, GetStats* stats,
|
||||
bool* value_found = nullptr);
|
||||
|
||||
// Adds "stats" into the current state. Returns true if a new
|
||||
@ -224,6 +223,12 @@ class Version {
|
||||
void UpdateFilesBySize();
|
||||
|
||||
VersionSet* vset_; // VersionSet to which this Version belongs
|
||||
const InternalKeyComparator* internal_comparator_;
|
||||
const Comparator* user_comparator_;
|
||||
TableCache* table_cache_;
|
||||
const MergeOperator* merge_operator_;
|
||||
Logger* info_log_;
|
||||
Statistics* db_statistics_;
|
||||
Version* next_; // Next version in linked list
|
||||
Version* prev_; // Previous version in linked list
|
||||
int refs_; // Number of live refs to this version
|
||||
|
@ -223,8 +223,13 @@ extern MemTableRepFactory* NewHashSkipListRepFactory(
|
||||
// The factory is to create memtables with a hashed linked list:
|
||||
// it contains a fixed array of buckets, each pointing to a sorted single
|
||||
// linked list (null if the bucket is empty).
|
||||
// bucket_count: number of fixed array buckets
|
||||
// @bucket_count: number of fixed array buckets
|
||||
// @huge_page_tlb_size: if <=0, allocate the hash table bytes from malloc.
|
||||
// Otherwise from huge page TLB. The user needs to reserve
|
||||
// huge pages for it to be allocated, like:
|
||||
// sysctl -w vm.nr_hugepages=20
|
||||
// See linux doc Documentation/vm/hugetlbpage.txt
|
||||
extern MemTableRepFactory* NewHashLinkListRepFactory(
|
||||
size_t bucket_count = 50000);
|
||||
size_t bucket_count = 50000, size_t huge_page_tlb_size = 0);
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -719,6 +719,14 @@ struct Options {
|
||||
// number of hash probes per key
|
||||
uint32_t memtable_prefix_bloom_probes;
|
||||
|
||||
// Page size for huge page TLB for bloom in memtable. If <=0, not allocate
|
||||
// from huge page TLB but from malloc.
|
||||
// Need to reserve huge pages for it to be allocated. For example:
|
||||
// sysctl -w vm.nr_hugepages=20
|
||||
// See linux doc Documentation/vm/hugetlbpage.txt
|
||||
|
||||
size_t memtable_prefix_bloom_huge_page_tlb_size;
|
||||
|
||||
// Control locality of bloom filter probes to improve cache miss rate.
|
||||
// This option only applies to memtable prefix bloom and plaintable
|
||||
// prefix bloom. It essentially limits the max number of cache lines each
|
||||
|
@ -60,6 +60,12 @@ struct BlockBasedTableOptions {
|
||||
// A space efficient index block that is optimized for
|
||||
// binary-search-based index.
|
||||
kBinarySearch,
|
||||
|
||||
// The hash index, if enabled, will do the hash lookup when
|
||||
// `ReadOption.prefix_seek == true`. User should also specify
|
||||
// `Options.prefix_extractor` to allow the index block to correctly
|
||||
// extract the prefix of the given key and perform hash table lookup.
|
||||
kHashSearch,
|
||||
};
|
||||
|
||||
IndexType index_type = kBinarySearch;
|
||||
@ -91,12 +97,19 @@ extern TableFactory* NewBlockBasedTableFactory(
|
||||
// in the hash table
|
||||
// @index_sparseness: inside each prefix, need to build one index record for how
|
||||
// many keys for binary search inside each hash bucket.
|
||||
// @huge_page_tlb_size: if <=0, allocate hash indexes and blooms from malloc.
|
||||
// Otherwise from huge page TLB. The user needs to reserve
|
||||
// huge pages for it to be allocated, like:
|
||||
// sysctl -w vm.nr_hugepages=20
|
||||
// See linux doc Documentation/vm/hugetlbpage.txt
|
||||
|
||||
const uint32_t kPlainTableVariableLength = 0;
|
||||
extern TableFactory* NewPlainTableFactory(uint32_t user_key_len =
|
||||
kPlainTableVariableLength,
|
||||
int bloom_bits_per_prefix = 10,
|
||||
double hash_table_ratio = 0.75,
|
||||
size_t index_sparseness = 16);
|
||||
size_t index_sparseness = 16,
|
||||
size_t huge_page_tlb_size = 0);
|
||||
|
||||
// -- Plain Table
|
||||
// This factory of plain table ignores Options.prefix_extractor and assumes no
|
||||
@ -110,9 +123,15 @@ extern TableFactory* NewPlainTableFactory(uint32_t user_key_len =
|
||||
// disable it by passing a zero.
|
||||
// @index_sparseness: need to build one index record for how many keys for
|
||||
// binary search.
|
||||
// @huge_page_tlb_size: if <=0, allocate hash indexes and blooms from malloc.
|
||||
// Otherwise from huge page TLB. The user needs to reserve
|
||||
// huge pages for it to be allocated, like:
|
||||
// sysctl -w vm.nr_hugepages=20
|
||||
// See linux doc Documentation/vm/hugetlbpage.txt
|
||||
extern TableFactory* NewTotalOrderPlainTableFactory(
|
||||
uint32_t user_key_len = kPlainTableVariableLength,
|
||||
int bloom_bits_per_key = 0, size_t index_sparseness = 16);
|
||||
int bloom_bits_per_key = 0, size_t index_sparseness = 16,
|
||||
size_t huge_page_tlb_size = 0);
|
||||
|
||||
// A base class for table factories.
|
||||
class TableFactory {
|
||||
|
@ -23,7 +23,7 @@ namespace rocksdb {
|
||||
// ++pos) {
|
||||
// ...
|
||||
// }
|
||||
typedef std::map<std::string, std::string> UserCollectedProperties;
|
||||
typedef std::map<const std::string, std::string> UserCollectedProperties;
|
||||
|
||||
// TableProperties contains a bunch of read-only properties of its associated
|
||||
// table.
|
||||
|
119
table/block.cc
119
table/block.cc
@ -11,16 +11,20 @@
|
||||
|
||||
#include "table/block.h"
|
||||
|
||||
#include <vector>
|
||||
#include <algorithm>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#include "rocksdb/comparator.h"
|
||||
#include "table/block_hash_index.h"
|
||||
#include "table/format.h"
|
||||
#include "util/coding.h"
|
||||
#include "util/logging.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
inline uint32_t Block::NumRestarts() const {
|
||||
uint32_t Block::NumRestarts() const {
|
||||
assert(size_ >= 2*sizeof(uint32_t));
|
||||
return DecodeFixed32(data_ + size_ - sizeof(uint32_t));
|
||||
}
|
||||
@ -92,6 +96,7 @@ class Block::Iter : public Iterator {
|
||||
std::string key_;
|
||||
Slice value_;
|
||||
Status status_;
|
||||
BlockHashIndex* hash_index_;
|
||||
|
||||
inline int Compare(const Slice& a, const Slice& b) const {
|
||||
return comparator_->Compare(a, b);
|
||||
@ -118,16 +123,15 @@ class Block::Iter : public Iterator {
|
||||
}
|
||||
|
||||
public:
|
||||
Iter(const Comparator* comparator,
|
||||
const char* data,
|
||||
uint32_t restarts,
|
||||
uint32_t num_restarts)
|
||||
Iter(const Comparator* comparator, const char* data, uint32_t restarts,
|
||||
uint32_t num_restarts, BlockHashIndex* hash_index)
|
||||
: comparator_(comparator),
|
||||
data_(data),
|
||||
restarts_(restarts),
|
||||
num_restarts_(num_restarts),
|
||||
current_(restarts_),
|
||||
restart_index_(num_restarts_) {
|
||||
restart_index_(num_restarts_),
|
||||
hash_index_(hash_index) {
|
||||
assert(num_restarts_ > 0);
|
||||
}
|
||||
|
||||
@ -169,45 +173,22 @@ class Block::Iter : public Iterator {
|
||||
}
|
||||
|
||||
virtual void Seek(const Slice& target) {
|
||||
// Binary search in restart array to find the first restart point
|
||||
// with a key >= target
|
||||
uint32_t left = 0;
|
||||
uint32_t right = num_restarts_ - 1;
|
||||
while (left < right) {
|
||||
uint32_t mid = (left + right + 1) / 2;
|
||||
uint32_t region_offset = GetRestartPoint(mid);
|
||||
uint32_t shared, non_shared, value_length;
|
||||
const char* key_ptr = DecodeEntry(data_ + region_offset,
|
||||
data_ + restarts_,
|
||||
&shared, &non_shared, &value_length);
|
||||
if (key_ptr == nullptr || (shared != 0)) {
|
||||
CorruptionError();
|
||||
uint32_t index = 0;
|
||||
bool ok = hash_index_ ? HashSeek(target, &index)
|
||||
: BinarySeek(target, 0, num_restarts_ - 1, &index);
|
||||
|
||||
if (!ok) {
|
||||
return;
|
||||
}
|
||||
Slice mid_key(key_ptr, non_shared);
|
||||
if (Compare(mid_key, target) < 0) {
|
||||
// Key at "mid" is smaller than "target". Therefore all
|
||||
// blocks before "mid" are uninteresting.
|
||||
left = mid;
|
||||
} else {
|
||||
// Key at "mid" is >= "target". Therefore all blocks at or
|
||||
// after "mid" are uninteresting.
|
||||
right = mid - 1;
|
||||
}
|
||||
}
|
||||
|
||||
SeekToRestartPoint(index);
|
||||
// Linear search (within restart block) for first key >= target
|
||||
SeekToRestartPoint(left);
|
||||
while (true) {
|
||||
if (!ParseNextKey()) {
|
||||
return;
|
||||
}
|
||||
if (Compare(key_, target) >= 0) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
while (true) {
|
||||
if (!ParseNextKey() || Compare(key_, target) >= 0) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
virtual void SeekToFirst() {
|
||||
SeekToRestartPoint(0);
|
||||
ParseNextKey();
|
||||
@ -257,6 +238,53 @@ class Block::Iter : public Iterator {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
// Binary search in restart array to find the first restart point
|
||||
// with a key >= target
|
||||
bool BinarySeek(const Slice& target, uint32_t left, uint32_t right,
|
||||
uint32_t* index) {
|
||||
assert(left <= right);
|
||||
|
||||
while (left < right) {
|
||||
uint32_t mid = (left + right + 1) / 2;
|
||||
uint32_t region_offset = GetRestartPoint(mid);
|
||||
uint32_t shared, non_shared, value_length;
|
||||
const char* key_ptr =
|
||||
DecodeEntry(data_ + region_offset, data_ + restarts_, &shared,
|
||||
&non_shared, &value_length);
|
||||
if (key_ptr == nullptr || (shared != 0)) {
|
||||
CorruptionError();
|
||||
return false;
|
||||
}
|
||||
Slice mid_key(key_ptr, non_shared);
|
||||
if (Compare(mid_key, target) < 0) {
|
||||
// Key at "mid" is smaller than "target". Therefore all
|
||||
// blocks before "mid" are uninteresting.
|
||||
left = mid;
|
||||
} else {
|
||||
// Key at "mid" is >= "target". Therefore all blocks at or
|
||||
// after "mid" are uninteresting.
|
||||
right = mid - 1;
|
||||
}
|
||||
}
|
||||
|
||||
*index = left;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool HashSeek(const Slice& target, uint32_t* index) {
|
||||
assert(hash_index_);
|
||||
auto restart_index = hash_index_->GetRestartIndex(target);
|
||||
if (restart_index == nullptr) {
|
||||
current_ = restarts_;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// the elements in restart_array[index : index + num_blocks]
|
||||
// are all with same prefix. We'll do binary search in that small range.
|
||||
auto left = restart_index->first_index;
|
||||
auto right = restart_index->first_index + restart_index->num_blocks - 1;
|
||||
return BinarySeek(target, left, right, index);
|
||||
}
|
||||
};
|
||||
|
||||
Iterator* Block::NewIterator(const Comparator* cmp) {
|
||||
@ -267,8 +295,13 @@ Iterator* Block::NewIterator(const Comparator* cmp) {
|
||||
if (num_restarts == 0) {
|
||||
return NewEmptyIterator();
|
||||
} else {
|
||||
return new Iter(cmp, data_, restart_offset_, num_restarts);
|
||||
return new Iter(cmp, data_, restart_offset_, num_restarts,
|
||||
hash_index_.get());
|
||||
}
|
||||
}
|
||||
|
||||
void Block::SetBlockHashIndex(BlockHashIndex* hash_index) {
|
||||
hash_index_.reset(hash_index);
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -10,6 +10,7 @@
|
||||
#pragma once
|
||||
#include <stddef.h>
|
||||
#include <stdint.h>
|
||||
|
||||
#include "rocksdb/iterator.h"
|
||||
#include "rocksdb/options.h"
|
||||
|
||||
@ -17,6 +18,7 @@ namespace rocksdb {
|
||||
|
||||
struct BlockContents;
|
||||
class Comparator;
|
||||
class BlockHashIndex;
|
||||
|
||||
class Block {
|
||||
public:
|
||||
@ -26,20 +28,28 @@ class Block {
|
||||
~Block();
|
||||
|
||||
size_t size() const { return size_; }
|
||||
const char* data() const { return data_; }
|
||||
bool cachable() const { return cachable_; }
|
||||
uint32_t NumRestarts() const;
|
||||
CompressionType compression_type() const { return compression_type_; }
|
||||
|
||||
// If hash index lookup is enabled and `use_hash_index` is true. This block
|
||||
// will do hash lookup for the key prefix.
|
||||
//
|
||||
// NOTE: for the hash based lookup, if a key prefix doesn't match any key,
|
||||
// the iterator will simply be set as "invalid", rather than returning
|
||||
// the key that is just pass the target key.
|
||||
Iterator* NewIterator(const Comparator* comparator);
|
||||
const char* data() { return data_; }
|
||||
void SetBlockHashIndex(BlockHashIndex* hash_index);
|
||||
|
||||
private:
|
||||
uint32_t NumRestarts() const;
|
||||
|
||||
const char* data_;
|
||||
size_t size_;
|
||||
uint32_t restart_offset_; // Offset in data_ of restart array
|
||||
bool owned_; // Block owns data_[]
|
||||
bool cachable_;
|
||||
CompressionType compression_type_;
|
||||
std::unique_ptr<BlockHashIndex> hash_index_;
|
||||
|
||||
// No copying allowed
|
||||
Block(const Block&);
|
||||
|
@ -88,8 +88,7 @@ class IndexBuilder {
|
||||
const Comparator* comparator_;
|
||||
};
|
||||
|
||||
// This index builder builds space-efficient index block for binary-search-based
|
||||
// index.
|
||||
// This index builder builds space-efficient index block.
|
||||
//
|
||||
// Optimizations:
|
||||
// 1. Made block's `block_restart_interval` to be 1, which will avoid linear
|
||||
@ -97,9 +96,9 @@ class IndexBuilder {
|
||||
// 2. Shorten the key length for index block. Other than honestly using the
|
||||
// last key in the data block as the index key, we instead find a shortest
|
||||
// substitute key that serves the same function.
|
||||
class BinarySearchIndexBuilder : public IndexBuilder {
|
||||
class ShortenedIndexBuilder : public IndexBuilder {
|
||||
public:
|
||||
explicit BinarySearchIndexBuilder(const Comparator* comparator)
|
||||
explicit ShortenedIndexBuilder(const Comparator* comparator)
|
||||
: IndexBuilder(comparator),
|
||||
index_block_builder_(1 /* block_restart_interval == 1 */, comparator) {}
|
||||
|
||||
@ -128,11 +127,37 @@ class BinarySearchIndexBuilder : public IndexBuilder {
|
||||
BlockBuilder index_block_builder_;
|
||||
};
|
||||
|
||||
// FullKeyIndexBuilder is also based on BlockBuilder. It works pretty much like
|
||||
// ShortenedIndexBuilder, but preserves the full key instead the substitude key.
|
||||
class FullKeyIndexBuilder : public IndexBuilder {
|
||||
public:
|
||||
explicit FullKeyIndexBuilder(const Comparator* comparator)
|
||||
: IndexBuilder(comparator),
|
||||
index_block_builder_(1 /* block_restart_interval == 1 */, comparator) {}
|
||||
|
||||
virtual void AddEntry(std::string* last_key_in_current_block,
|
||||
const Slice* first_key_in_next_block,
|
||||
const BlockHandle& block_handle) override {
|
||||
std::string handle_encoding;
|
||||
block_handle.EncodeTo(&handle_encoding);
|
||||
index_block_builder_.Add(*last_key_in_current_block, handle_encoding);
|
||||
}
|
||||
|
||||
virtual Slice Finish() override { return index_block_builder_.Finish(); }
|
||||
|
||||
virtual size_t EstimatedSize() const {
|
||||
return index_block_builder_.CurrentSizeEstimate();
|
||||
}
|
||||
|
||||
private:
|
||||
BlockBuilder index_block_builder_;
|
||||
};
|
||||
|
||||
// Create a index builder based on its type.
|
||||
IndexBuilder* CreateIndexBuilder(IndexType type, const Comparator* comparator) {
|
||||
switch (type) {
|
||||
case BlockBasedTableOptions::kBinarySearch: {
|
||||
return new BinarySearchIndexBuilder(comparator);
|
||||
return new ShortenedIndexBuilder(comparator);
|
||||
}
|
||||
default: {
|
||||
assert(!"Do not recognize the index type ");
|
||||
@ -296,13 +321,16 @@ struct BlockBasedTableBuilder::Rep {
|
||||
}
|
||||
};
|
||||
|
||||
// TODO(sdong): Currently only write out binary search index. In
|
||||
// BlockBasedTableReader, Hash index will be built using binary search index.
|
||||
BlockBasedTableBuilder::BlockBasedTableBuilder(
|
||||
const Options& options, const BlockBasedTableOptions& table_options,
|
||||
const InternalKeyComparator& internal_comparator, WritableFile* file,
|
||||
CompressionType compression_type)
|
||||
: rep_(new Rep(options, internal_comparator, file,
|
||||
table_options.flush_block_policy_factory.get(),
|
||||
compression_type, table_options.index_type)) {
|
||||
compression_type,
|
||||
BlockBasedTableOptions::IndexType::kBinarySearch)) {
|
||||
if (rep_->filter_block != nullptr) {
|
||||
rep_->filter_block->StartBlock(0);
|
||||
}
|
||||
|
@ -25,6 +25,7 @@
|
||||
|
||||
#include "table/block.h"
|
||||
#include "table/filter_block.h"
|
||||
#include "table/block_hash_index.h"
|
||||
#include "table/format.h"
|
||||
#include "table/meta_blocks.h"
|
||||
#include "table/two_level_iterator.h"
|
||||
@ -180,19 +181,51 @@ class BinarySearchIndexReader : public IndexReader {
|
||||
std::unique_ptr<Block> index_block_;
|
||||
};
|
||||
|
||||
// TODO(kailiu) This class is only a stub for now. And the comment below is also
|
||||
// not completed.
|
||||
// Index that leverages an internal hash table to quicken the lookup for a given
|
||||
// key.
|
||||
// @param data_iter_gen, equavalent to BlockBasedTable::NewIterator(). But that
|
||||
// functions requires index to be initalized. To avoid this problem external
|
||||
// caller will pass a function that can create the iterator over the entries
|
||||
// without the table to be fully initialized.
|
||||
class HashIndexReader : public IndexReader {
|
||||
public:
|
||||
static Status Create(RandomAccessFile* file, const BlockHandle& index_handle,
|
||||
Env* env, const Comparator* comparator,
|
||||
BlockBasedTable* table,
|
||||
std::function<Iterator*(Iterator*)> data_iter_gen,
|
||||
const SliceTransform* prefix_extractor,
|
||||
IndexReader** index_reader) {
|
||||
return Status::NotSupported("not implemented yet!");
|
||||
assert(prefix_extractor);
|
||||
Block* index_block = nullptr;
|
||||
auto s =
|
||||
ReadBlockFromFile(file, ReadOptions(), index_handle, &index_block, env);
|
||||
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
||||
*index_reader = new HashIndexReader(comparator, index_block);
|
||||
std::unique_ptr<Iterator> index_iter(index_block->NewIterator(nullptr));
|
||||
std::unique_ptr<Iterator> data_iter(
|
||||
data_iter_gen(index_block->NewIterator(nullptr)));
|
||||
auto hash_index = CreateBlockHashIndex(index_iter.get(), data_iter.get(),
|
||||
index_block->NumRestarts(),
|
||||
comparator, prefix_extractor);
|
||||
index_block->SetBlockHashIndex(hash_index);
|
||||
return s;
|
||||
}
|
||||
|
||||
virtual Iterator* NewIterator() override {
|
||||
return index_block_->NewIterator(comparator_);
|
||||
}
|
||||
|
||||
virtual size_t size() const override { return index_block_->size(); }
|
||||
|
||||
private:
|
||||
HashIndexReader(const Comparator* comparator, Block* index_block)
|
||||
: IndexReader(comparator), index_block_(index_block) {
|
||||
assert(index_block_ != nullptr);
|
||||
}
|
||||
std::unique_ptr<Block> index_block_;
|
||||
};
|
||||
|
||||
|
||||
@ -223,6 +256,11 @@ struct BlockBasedTable::Rep {
|
||||
|
||||
std::shared_ptr<const TableProperties> table_properties;
|
||||
BlockBasedTableOptions::IndexType index_type;
|
||||
// TODO(kailiu) It is very ugly to use internal key in table, since table
|
||||
// module should not be relying on db module. However to make things easier
|
||||
// and compatible with existing code, we introduce a wrapper that allows
|
||||
// block to extract prefix without knowing if a key is internal or not.
|
||||
unique_ptr<SliceTransform> internal_prefix_transform;
|
||||
};
|
||||
|
||||
BlockBasedTable::~BlockBasedTable() {
|
||||
@ -327,8 +365,20 @@ Status BlockBasedTable::Open(const Options& options, const EnvOptions& soptions,
|
||||
s = ReadMetaBlock(rep, &meta, &meta_iter);
|
||||
|
||||
// Read the properties
|
||||
bool found_properties_block = true;
|
||||
meta_iter->Seek(kPropertiesBlock);
|
||||
if (meta_iter->Valid() && meta_iter->key() == kPropertiesBlock) {
|
||||
if (meta_iter->status().ok() &&
|
||||
(!meta_iter->Valid() || meta_iter->key() != kPropertiesBlock)) {
|
||||
meta_iter->Seek(kPropertiesBlockOldName);
|
||||
if (meta_iter->status().ok() &&
|
||||
(!meta_iter->Valid() || meta_iter->key() != kPropertiesBlockOldName)) {
|
||||
found_properties_block = false;
|
||||
Log(WARN, rep->options.info_log,
|
||||
"Cannot find Properties block from file.");
|
||||
}
|
||||
}
|
||||
|
||||
if (found_properties_block) {
|
||||
s = meta_iter->status();
|
||||
TableProperties* table_properties = nullptr;
|
||||
if (s.ok()) {
|
||||
@ -747,8 +797,7 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
|
||||
return { filter, cache_handle };
|
||||
}
|
||||
|
||||
Iterator* BlockBasedTable::NewIndexIterator(const ReadOptions& read_options)
|
||||
const {
|
||||
Iterator* BlockBasedTable::NewIndexIterator(const ReadOptions& read_options) {
|
||||
// index reader has already been pre-populated.
|
||||
if (rep_->index_reader) {
|
||||
return rep_->index_reader->NewIterator();
|
||||
@ -954,11 +1003,14 @@ Status BlockBasedTable::Get(
|
||||
return s;
|
||||
}
|
||||
|
||||
namespace {
|
||||
bool SaveDidIO(void* arg, const ParsedInternalKey& key, const Slice& value,
|
||||
bool didIO) {
|
||||
*reinterpret_cast<bool*>(arg) = didIO;
|
||||
return false;
|
||||
}
|
||||
} // namespace
|
||||
|
||||
bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options,
|
||||
const Slice& key) {
|
||||
// We use Get() as it has logic that checks whether we read the
|
||||
@ -975,22 +1027,51 @@ bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options,
|
||||
// 3. options
|
||||
// 4. internal_comparator
|
||||
// 5. index_type
|
||||
Status BlockBasedTable::CreateIndexReader(IndexReader** index_reader) const {
|
||||
Status BlockBasedTable::CreateIndexReader(IndexReader** index_reader) {
|
||||
// Some old version of block-based tables don't have index type present in
|
||||
// table properties. If that's the case we can safely use the kBinarySearch.
|
||||
auto index_type = BlockBasedTableOptions::kBinarySearch;
|
||||
|
||||
auto index_type_on_file = BlockBasedTableOptions::kBinarySearch;
|
||||
if (rep_->table_properties) {
|
||||
auto& props = rep_->table_properties->user_collected_properties;
|
||||
auto pos = props.find(BlockBasedTablePropertyNames::kIndexType);
|
||||
if (pos != props.end()) {
|
||||
index_type = static_cast<BlockBasedTableOptions::IndexType>(
|
||||
index_type_on_file = static_cast<BlockBasedTableOptions::IndexType>(
|
||||
DecodeFixed32(pos->second.c_str()));
|
||||
}
|
||||
}
|
||||
|
||||
switch (index_type) {
|
||||
// TODO(sdong): Currently binary index is the only index type we support in
|
||||
// files. Hash index is built on top of binary index too.
|
||||
if (index_type_on_file != BlockBasedTableOptions::kBinarySearch) {
|
||||
return Status::NotSupported("File Contains not supported index type: ",
|
||||
std::to_string(index_type_on_file));
|
||||
}
|
||||
|
||||
auto file = rep_->file.get();
|
||||
const auto& index_handle = rep_->index_handle;
|
||||
auto env = rep_->options.env;
|
||||
auto comparator = &rep_->internal_comparator;
|
||||
|
||||
switch (rep_->index_type) {
|
||||
case BlockBasedTableOptions::kBinarySearch: {
|
||||
return BinarySearchIndexReader::Create(
|
||||
rep_->file.get(), rep_->index_handle, rep_->options.env,
|
||||
&rep_->internal_comparator, index_reader);
|
||||
return BinarySearchIndexReader::Create(file, index_handle, env,
|
||||
comparator, index_reader);
|
||||
}
|
||||
case BlockBasedTableOptions::kHashSearch: {
|
||||
// We need to wrap data with internal_prefix_transform to make sure it can
|
||||
// handle prefix correctly.
|
||||
rep_->internal_prefix_transform.reset(
|
||||
new InternalKeySliceTransform(rep_->options.prefix_extractor.get()));
|
||||
return HashIndexReader::Create(
|
||||
file, index_handle, env, comparator,
|
||||
[&](Iterator* index_iter) {
|
||||
return NewTwoLevelIterator(
|
||||
index_iter, &BlockBasedTable::DataBlockReader,
|
||||
const_cast<BlockBasedTable*>(this), ReadOptions(),
|
||||
rep_->soptions, rep_->internal_comparator);
|
||||
},
|
||||
rep_->internal_prefix_transform.get(), index_reader);
|
||||
}
|
||||
default: {
|
||||
std::string error_message =
|
||||
@ -1023,7 +1104,10 @@ uint64_t BlockBasedTable::ApproximateOffsetOf(const Slice& key) {
|
||||
// key is past the last key in the file. If table_properties is not
|
||||
// available, approximate the offset by returning the offset of the
|
||||
// metaindex block (which is right near the end of the file).
|
||||
result = 0;
|
||||
if (rep_->table_properties) {
|
||||
result = rep_->table_properties->data_size;
|
||||
}
|
||||
// table_properties is not present in the table.
|
||||
if (result == 0) {
|
||||
result = rep_->metaindex_handle.offset();
|
||||
|
@ -12,6 +12,7 @@
|
||||
#include <stdint.h>
|
||||
#include <memory>
|
||||
#include <utility>
|
||||
#include <string>
|
||||
|
||||
#include "rocksdb/statistics.h"
|
||||
#include "rocksdb/status.h"
|
||||
@ -131,7 +132,7 @@ class BlockBasedTable : public TableReader {
|
||||
// 2. index is not present in block cache.
|
||||
// 3. We disallowed any io to be performed, that is, read_options ==
|
||||
// kBlockCacheTier
|
||||
Iterator* NewIndexIterator(const ReadOptions& read_options) const;
|
||||
Iterator* NewIndexIterator(const ReadOptions& read_options);
|
||||
|
||||
// Read block cache from block caches (if set): block_cache and
|
||||
// block_cache_compressed.
|
||||
@ -164,7 +165,7 @@ class BlockBasedTable : public TableReader {
|
||||
|
||||
void ReadMeta(const Footer& footer);
|
||||
void ReadFilter(const Slice& filter_handle_value);
|
||||
Status CreateIndexReader(IndexReader** index_reader) const;
|
||||
Status CreateIndexReader(IndexReader** index_reader);
|
||||
|
||||
// Read the meta block from sst.
|
||||
static Status ReadMetaBlock(
|
||||
@ -198,4 +199,8 @@ class BlockBasedTable : public TableReader {
|
||||
void operator=(const TableReader&) = delete;
|
||||
};
|
||||
|
||||
// Backward compatible properties block name. Limited in block based
|
||||
// table.
|
||||
extern const std::string kPropertiesBlockOldName;
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -3,7 +3,10 @@
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
//
|
||||
#include <stdio.h>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "db/dbformat.h"
|
||||
#include "db/memtable.h"
|
||||
#include "db/write_batch_internal.h"
|
||||
@ -11,9 +14,11 @@
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/iterator.h"
|
||||
#include "rocksdb/table.h"
|
||||
#include "rocksdb/slice_transform.h"
|
||||
#include "table/block.h"
|
||||
#include "table/block_builder.h"
|
||||
#include "table/format.h"
|
||||
#include "table/block_hash_index.h"
|
||||
#include "util/random.h"
|
||||
#include "util/testharness.h"
|
||||
#include "util/testutil.h"
|
||||
@ -25,6 +30,40 @@ static std::string RandomString(Random* rnd, int len) {
|
||||
test::RandomString(rnd, len, &r);
|
||||
return r;
|
||||
}
|
||||
std::string GenerateKey(int primary_key, int secondary_key, int padding_size,
|
||||
Random *rnd) {
|
||||
char buf[50];
|
||||
char *p = &buf[0];
|
||||
snprintf(buf, sizeof(buf), "%6d%4d", primary_key, secondary_key);
|
||||
std::string k(p);
|
||||
if (padding_size) {
|
||||
k += RandomString(rnd, padding_size);
|
||||
}
|
||||
|
||||
return k;
|
||||
}
|
||||
|
||||
// Generate random key value pairs.
|
||||
// The generated key will be sorted. You can tune the parameters to generated
|
||||
// different kinds of test key/value pairs for different scenario.
|
||||
void GenerateRandomKVs(std::vector<std::string> *keys,
|
||||
std::vector<std::string> *values, const int from,
|
||||
const int len, const int step = 1,
|
||||
const int padding_size = 0,
|
||||
const int keys_share_prefix = 1) {
|
||||
Random rnd(302);
|
||||
|
||||
// generate different prefix
|
||||
for (int i = from; i < from + len; i += step) {
|
||||
// generating keys that shares the prefix
|
||||
for (int j = 0; j < keys_share_prefix; ++j) {
|
||||
keys->emplace_back(GenerateKey(i, j, padding_size, &rnd));
|
||||
|
||||
// 100 bytes values
|
||||
values->emplace_back(RandomString(&rnd, 100));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class BlockTest {};
|
||||
|
||||
@ -39,24 +78,11 @@ TEST(BlockTest, SimpleTest) {
|
||||
std::vector<std::string> values;
|
||||
BlockBuilder builder(options, ic.get());
|
||||
int num_records = 100000;
|
||||
char buf[10];
|
||||
char* p = &buf[0];
|
||||
|
||||
GenerateRandomKVs(&keys, &values, 0, num_records);
|
||||
// add a bunch of records to a block
|
||||
for (int i = 0; i < num_records; i++) {
|
||||
// generate random kvs
|
||||
sprintf(p, "%6d", i);
|
||||
std::string k(p);
|
||||
std::string v = RandomString(&rnd, 100); // 100 byte values
|
||||
|
||||
// write kvs to the block
|
||||
Slice key(k);
|
||||
Slice value(v);
|
||||
builder.Add(key, value);
|
||||
|
||||
// remember kvs in a lookaside array
|
||||
keys.push_back(k);
|
||||
values.push_back(v);
|
||||
builder.Add(keys[i], values[i]);
|
||||
}
|
||||
|
||||
// read serialized contents of the block
|
||||
@ -101,6 +127,114 @@ TEST(BlockTest, SimpleTest) {
|
||||
delete iter;
|
||||
}
|
||||
|
||||
// return the block contents
|
||||
BlockContents GetBlockContents(std::unique_ptr<BlockBuilder> *builder,
|
||||
const std::vector<std::string> &keys,
|
||||
const std::vector<std::string> &values,
|
||||
const int prefix_group_size = 1) {
|
||||
builder->reset(
|
||||
new BlockBuilder(1 /* restart interval */, BytewiseComparator()));
|
||||
|
||||
// Add only half of the keys
|
||||
for (size_t i = 0; i < keys.size(); ++i) {
|
||||
(*builder)->Add(keys[i], values[i]);
|
||||
}
|
||||
Slice rawblock = (*builder)->Finish();
|
||||
|
||||
BlockContents contents;
|
||||
contents.data = rawblock;
|
||||
contents.cachable = false;
|
||||
contents.heap_allocated = false;
|
||||
|
||||
return contents;
|
||||
}
|
||||
|
||||
void CheckBlockContents(BlockContents contents, const int max_key,
|
||||
const std::vector<std::string> &keys,
|
||||
const std::vector<std::string> &values) {
|
||||
const size_t prefix_size = 6;
|
||||
// create block reader
|
||||
Block reader1(contents);
|
||||
Block reader2(contents);
|
||||
|
||||
std::unique_ptr<const SliceTransform> prefix_extractor(
|
||||
NewFixedPrefixTransform(prefix_size));
|
||||
|
||||
{
|
||||
auto iter1 = reader1.NewIterator(nullptr);
|
||||
auto iter2 = reader1.NewIterator(nullptr);
|
||||
reader1.SetBlockHashIndex(CreateBlockHashIndex(iter1, iter2, keys.size(),
|
||||
BytewiseComparator(),
|
||||
prefix_extractor.get()));
|
||||
|
||||
delete iter1;
|
||||
delete iter2;
|
||||
}
|
||||
|
||||
std::unique_ptr<Iterator> hash_iter(
|
||||
reader1.NewIterator(BytewiseComparator()));
|
||||
|
||||
std::unique_ptr<Iterator> regular_iter(
|
||||
reader2.NewIterator(BytewiseComparator()));
|
||||
|
||||
// Seek existent keys
|
||||
for (size_t i = 0; i < keys.size(); i++) {
|
||||
hash_iter->Seek(keys[i]);
|
||||
ASSERT_OK(hash_iter->status());
|
||||
ASSERT_TRUE(hash_iter->Valid());
|
||||
|
||||
Slice v = hash_iter->value();
|
||||
ASSERT_EQ(v.ToString().compare(values[i]), 0);
|
||||
}
|
||||
|
||||
// Seek non-existent keys.
|
||||
// For hash index, if no key with a given prefix is not found, iterator will
|
||||
// simply be set as invalid; whereas the binary search based iterator will
|
||||
// return the one that is closest.
|
||||
for (int i = 1; i < max_key - 1; i += 2) {
|
||||
auto key = GenerateKey(i, 0, 0, nullptr);
|
||||
hash_iter->Seek(key);
|
||||
ASSERT_TRUE(!hash_iter->Valid());
|
||||
|
||||
regular_iter->Seek(key);
|
||||
ASSERT_TRUE(regular_iter->Valid());
|
||||
}
|
||||
}
|
||||
|
||||
// In this test case, no two key share same prefix.
|
||||
TEST(BlockTest, SimpleIndexHash) {
|
||||
const int kMaxKey = 100000;
|
||||
std::vector<std::string> keys;
|
||||
std::vector<std::string> values;
|
||||
GenerateRandomKVs(&keys, &values, 0 /* first key id */,
|
||||
kMaxKey /* last key id */, 2 /* step */,
|
||||
8 /* padding size (8 bytes randomly generated suffix) */);
|
||||
|
||||
std::unique_ptr<BlockBuilder> builder;
|
||||
auto contents = GetBlockContents(&builder, keys, values);
|
||||
|
||||
CheckBlockContents(contents, kMaxKey, keys, values);
|
||||
}
|
||||
|
||||
TEST(BlockTest, IndexHashWithSharedPrefix) {
|
||||
const int kMaxKey = 100000;
|
||||
// for each prefix, there will be 5 keys starts with it.
|
||||
const int kPrefixGroup = 5;
|
||||
std::vector<std::string> keys;
|
||||
std::vector<std::string> values;
|
||||
// Generate keys with same prefix.
|
||||
GenerateRandomKVs(&keys, &values, 0, // first key id
|
||||
kMaxKey, // last key id
|
||||
2, // step
|
||||
10, // padding size,
|
||||
kPrefixGroup);
|
||||
|
||||
std::unique_ptr<BlockBuilder> builder;
|
||||
auto contents = GetBlockContents(&builder, keys, values, kPrefixGroup);
|
||||
|
||||
CheckBlockContents(contents, kMaxKey, keys, values);
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -244,6 +244,8 @@ Status ReadTableProperties(RandomAccessFile* file, uint64_t file_size,
|
||||
metaindex_block.NewIterator(BytewiseComparator()));
|
||||
|
||||
// -- Read property block
|
||||
// This function is not used by BlockBasedTable, so we don't have to
|
||||
// worry about old properties block name.
|
||||
meta_iter->Seek(kPropertiesBlock);
|
||||
TableProperties table_properties;
|
||||
if (meta_iter->Valid() &&
|
||||
|
@ -74,10 +74,12 @@ void PlainTableBuilder::Add(const Slice& key, const Slice& value) {
|
||||
|
||||
if (!IsFixedLength()) {
|
||||
// Write key length
|
||||
key_size_str_.clear();
|
||||
PutVarint32(&key_size_str_, user_key_size);
|
||||
file_->Append(key_size_str_);
|
||||
offset_ += key_size_str_.length();
|
||||
char key_size_buf[5]; // tmp buffer for key size as varint32
|
||||
char* ptr = EncodeVarint32(key_size_buf, user_key_size);
|
||||
assert(ptr <= key_size_buf + sizeof(key_size_buf));
|
||||
auto len = ptr - key_size_buf;
|
||||
file_->Append(Slice(key_size_buf, len));
|
||||
offset_ += len;
|
||||
}
|
||||
|
||||
// Write key
|
||||
@ -86,25 +88,32 @@ void PlainTableBuilder::Add(const Slice& key, const Slice& value) {
|
||||
status_ = Status::Corruption(Slice());
|
||||
return;
|
||||
}
|
||||
// For value size as varint32 (up to 5 bytes).
|
||||
// If the row is of value type with seqId 0, flush the special flag together
|
||||
// in this buffer to safe one file append call, which takes 1 byte.
|
||||
char value_size_buf[6];
|
||||
size_t value_size_buf_size = 0;
|
||||
if (parsed_key.sequence == 0 && parsed_key.type == kTypeValue) {
|
||||
file_->Append(Slice(key.data(), user_key_size));
|
||||
char tmp_char = PlainTableFactory::kValueTypeSeqId0;
|
||||
file_->Append(Slice(&tmp_char, 1));
|
||||
offset_ += key.size() - 7;
|
||||
offset_ += user_key_size;
|
||||
value_size_buf[0] = PlainTableFactory::kValueTypeSeqId0;
|
||||
value_size_buf_size = 1;
|
||||
} else {
|
||||
file_->Append(key);
|
||||
offset_ += key.size();
|
||||
}
|
||||
|
||||
// Write value length
|
||||
value_size_str_.clear();
|
||||
int value_size = value.size();
|
||||
PutVarint32(&value_size_str_, value_size);
|
||||
file_->Append(value_size_str_);
|
||||
char* end_ptr =
|
||||
EncodeVarint32(value_size_buf + value_size_buf_size, value_size);
|
||||
assert(end_ptr <= value_size_buf + sizeof(value_size_buf));
|
||||
value_size_buf_size = end_ptr - value_size_buf;
|
||||
file_->Append(Slice(value_size_buf, value_size_buf_size));
|
||||
|
||||
// Write value
|
||||
file_->Append(value);
|
||||
offset_ += value_size + value_size_str_.length();
|
||||
offset_ += value_size + value_size_buf_size;
|
||||
|
||||
properties_.num_entries++;
|
||||
properties_.raw_key_size += key.size();
|
||||
|
@ -69,9 +69,6 @@ private:
|
||||
const size_t user_key_len_;
|
||||
bool closed_ = false; // Either Finish() or Abandon() has been called.
|
||||
|
||||
std::string key_size_str_;
|
||||
std::string value_size_str_;
|
||||
|
||||
bool IsFixedLength() const {
|
||||
return user_key_len_ > 0;
|
||||
}
|
||||
|
@ -21,7 +21,8 @@ Status PlainTableFactory::NewTableReader(const Options& options,
|
||||
unique_ptr<TableReader>* table) const {
|
||||
return PlainTableReader::Open(options, soptions, icomp, std::move(file),
|
||||
file_size, table, bloom_bits_per_key_,
|
||||
hash_table_ratio_, index_sparseness_);
|
||||
hash_table_ratio_, index_sparseness_,
|
||||
huge_page_tlb_size_);
|
||||
}
|
||||
|
||||
TableBuilder* PlainTableFactory::NewTableBuilder(
|
||||
@ -33,16 +34,19 @@ TableBuilder* PlainTableFactory::NewTableBuilder(
|
||||
extern TableFactory* NewPlainTableFactory(uint32_t user_key_len,
|
||||
int bloom_bits_per_key,
|
||||
double hash_table_ratio,
|
||||
size_t index_sparseness) {
|
||||
size_t index_sparseness,
|
||||
size_t huge_page_tlb_size) {
|
||||
return new PlainTableFactory(user_key_len, bloom_bits_per_key,
|
||||
hash_table_ratio, index_sparseness);
|
||||
hash_table_ratio, index_sparseness,
|
||||
huge_page_tlb_size);
|
||||
}
|
||||
|
||||
extern TableFactory* NewTotalOrderPlainTableFactory(uint32_t user_key_len,
|
||||
int bloom_bits_per_key,
|
||||
size_t index_sparseness) {
|
||||
size_t index_sparseness,
|
||||
size_t huge_page_tlb_size) {
|
||||
return new PlainTableFactory(user_key_len, bloom_bits_per_key, 0,
|
||||
index_sparseness);
|
||||
index_sparseness, huge_page_tlb_size);
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -54,14 +54,19 @@ class PlainTableFactory : public TableFactory {
|
||||
// inside the same prefix. It will be the maximum number of linear search
|
||||
// required after hash and binary search.
|
||||
// index_sparseness = 0 means index for every key.
|
||||
// huge_page_tlb_size determines whether to allocate hash indexes from huge
|
||||
// page TLB and the page size if allocating from there. See comments of
|
||||
// Arena::AllocateAligned() for details.
|
||||
explicit PlainTableFactory(uint32_t user_key_len = kPlainTableVariableLength,
|
||||
int bloom_bits_per_key = 0,
|
||||
double hash_table_ratio = 0.75,
|
||||
size_t index_sparseness = 16)
|
||||
size_t index_sparseness = 16,
|
||||
size_t huge_page_tlb_size = 2 * 1024 * 1024)
|
||||
: user_key_len_(user_key_len),
|
||||
bloom_bits_per_key_(bloom_bits_per_key),
|
||||
hash_table_ratio_(hash_table_ratio),
|
||||
index_sparseness_(index_sparseness) {}
|
||||
index_sparseness_(index_sparseness),
|
||||
huge_page_tlb_size_(huge_page_tlb_size) {}
|
||||
const char* Name() const override { return "PlainTable"; }
|
||||
Status NewTableReader(const Options& options, const EnvOptions& soptions,
|
||||
const InternalKeyComparator& internal_comparator,
|
||||
@ -80,6 +85,7 @@ class PlainTableFactory : public TableFactory {
|
||||
int bloom_bits_per_key_;
|
||||
double hash_table_ratio_;
|
||||
size_t index_sparseness_;
|
||||
size_t huge_page_tlb_size_;
|
||||
};
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -23,6 +23,7 @@
|
||||
#include "table/two_level_iterator.h"
|
||||
#include "table/plain_table_factory.h"
|
||||
|
||||
#include "util/arena.h"
|
||||
#include "util/coding.h"
|
||||
#include "util/dynamic_bloom.h"
|
||||
#include "util/hash.h"
|
||||
@ -81,10 +82,9 @@ class PlainTableIterator : public Iterator {
|
||||
bool use_prefix_seek_;
|
||||
uint32_t offset_;
|
||||
uint32_t next_offset_;
|
||||
Slice key_;
|
||||
IterKey key_;
|
||||
Slice value_;
|
||||
Status status_;
|
||||
std::string tmp_str_;
|
||||
// No copying allowed
|
||||
PlainTableIterator(const PlainTableIterator&) = delete;
|
||||
void operator=(const Iterator&) = delete;
|
||||
@ -95,7 +95,8 @@ PlainTableReader::PlainTableReader(
|
||||
const Options& options, unique_ptr<RandomAccessFile>&& file,
|
||||
const EnvOptions& storage_options, const InternalKeyComparator& icomparator,
|
||||
uint64_t file_size, int bloom_bits_per_key, double hash_table_ratio,
|
||||
size_t index_sparseness, const TableProperties* table_properties)
|
||||
size_t index_sparseness, const TableProperties* table_properties,
|
||||
size_t huge_page_tlb_size)
|
||||
: options_(options),
|
||||
soptions_(storage_options),
|
||||
file_(std::move(file)),
|
||||
@ -104,21 +105,25 @@ PlainTableReader::PlainTableReader(
|
||||
kHashTableRatio(hash_table_ratio),
|
||||
kBloomBitsPerKey(bloom_bits_per_key),
|
||||
kIndexIntervalForSamePrefixKeys(index_sparseness),
|
||||
table_properties_(table_properties),
|
||||
data_end_offset_(table_properties_->data_size),
|
||||
user_key_len_(table_properties->fixed_key_len) {
|
||||
table_properties_(nullptr),
|
||||
data_end_offset_(table_properties->data_size),
|
||||
user_key_len_(table_properties->fixed_key_len),
|
||||
huge_page_tlb_size_(huge_page_tlb_size) {
|
||||
assert(kHashTableRatio >= 0.0);
|
||||
}
|
||||
|
||||
PlainTableReader::~PlainTableReader() {
|
||||
}
|
||||
|
||||
Status PlainTableReader::Open(
|
||||
const Options& options, const EnvOptions& soptions,
|
||||
Status PlainTableReader::Open(const Options& options,
|
||||
const EnvOptions& soptions,
|
||||
const InternalKeyComparator& internal_comparator,
|
||||
unique_ptr<RandomAccessFile>&& file, uint64_t file_size,
|
||||
unique_ptr<TableReader>* table_reader, const int bloom_bits_per_key,
|
||||
double hash_table_ratio, size_t index_sparseness) {
|
||||
unique_ptr<RandomAccessFile>&& file,
|
||||
uint64_t file_size,
|
||||
unique_ptr<TableReader>* table_reader,
|
||||
const int bloom_bits_per_key,
|
||||
double hash_table_ratio, size_t index_sparseness,
|
||||
size_t huge_page_tlb_size) {
|
||||
assert(options.allow_mmap_reads);
|
||||
|
||||
if (file_size > kMaxFileSize) {
|
||||
@ -134,10 +139,11 @@ Status PlainTableReader::Open(
|
||||
|
||||
std::unique_ptr<PlainTableReader> new_reader(new PlainTableReader(
|
||||
options, std::move(file), soptions, internal_comparator, file_size,
|
||||
bloom_bits_per_key, hash_table_ratio, index_sparseness, props));
|
||||
bloom_bits_per_key, hash_table_ratio, index_sparseness, props,
|
||||
huge_page_tlb_size));
|
||||
|
||||
// -- Populate Index
|
||||
s = new_reader->PopulateIndex();
|
||||
s = new_reader->PopulateIndex(props);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
@ -265,12 +271,11 @@ Status PlainTableReader::PopulateIndexRecordList(IndexRecordList* record_list,
|
||||
}
|
||||
|
||||
void PlainTableReader::AllocateIndexAndBloom(int num_prefixes) {
|
||||
index_.reset();
|
||||
|
||||
if (options_.prefix_extractor.get() != nullptr) {
|
||||
uint32_t bloom_total_bits = num_prefixes * kBloomBitsPerKey;
|
||||
if (bloom_total_bits > 0) {
|
||||
bloom_.reset(new DynamicBloom(bloom_total_bits, options_.bloom_locality));
|
||||
bloom_.reset(new DynamicBloom(bloom_total_bits, options_.bloom_locality,
|
||||
6, nullptr, huge_page_tlb_size_));
|
||||
}
|
||||
}
|
||||
|
||||
@ -282,7 +287,6 @@ void PlainTableReader::AllocateIndexAndBloom(int num_prefixes) {
|
||||
double hash_table_size_multipier = 1.0 / kHashTableRatio;
|
||||
index_size_ = num_prefixes * hash_table_size_multipier + 1;
|
||||
}
|
||||
index_.reset(new uint32_t[index_size_]);
|
||||
}
|
||||
|
||||
size_t PlainTableReader::BucketizeIndexesAndFillBloom(
|
||||
@ -326,7 +330,12 @@ void PlainTableReader::FillIndexes(
|
||||
const std::vector<uint32_t>& entries_per_bucket) {
|
||||
Log(options_.info_log, "Reserving %zu bytes for plain table's sub_index",
|
||||
kSubIndexSize);
|
||||
sub_index_.reset(new char[kSubIndexSize]);
|
||||
auto total_allocate_size = sizeof(uint32_t) * index_size_ + kSubIndexSize;
|
||||
char* allocated =
|
||||
arena_.AllocateAligned(total_allocate_size, huge_page_tlb_size_);
|
||||
index_ = reinterpret_cast<uint32_t*>(allocated);
|
||||
sub_index_ = allocated + sizeof(uint32_t) * index_size_;
|
||||
|
||||
size_t sub_index_offset = 0;
|
||||
for (int i = 0; i < index_size_; i++) {
|
||||
uint32_t num_keys_for_bucket = entries_per_bucket[i];
|
||||
@ -364,7 +373,10 @@ void PlainTableReader::FillIndexes(
|
||||
index_size_, kSubIndexSize);
|
||||
}
|
||||
|
||||
Status PlainTableReader::PopulateIndex() {
|
||||
Status PlainTableReader::PopulateIndex(TableProperties* props) {
|
||||
assert(props != nullptr);
|
||||
table_properties_.reset(props);
|
||||
|
||||
// options.prefix_extractor is requried for a hash-based look-up.
|
||||
if (options_.prefix_extractor.get() == nullptr && kHashTableRatio != 0) {
|
||||
return Status::NotSupported(
|
||||
@ -388,7 +400,8 @@ Status PlainTableReader::PopulateIndex() {
|
||||
if (IsTotalOrderMode()) {
|
||||
uint32_t num_bloom_bits = table_properties_->num_entries * kBloomBitsPerKey;
|
||||
if (num_bloom_bits > 0) {
|
||||
bloom_.reset(new DynamicBloom(num_bloom_bits, options_.bloom_locality));
|
||||
bloom_.reset(new DynamicBloom(num_bloom_bits, options_.bloom_locality, 6,
|
||||
nullptr, huge_page_tlb_size_));
|
||||
}
|
||||
}
|
||||
|
||||
@ -409,6 +422,14 @@ Status PlainTableReader::PopulateIndex() {
|
||||
// From the temp data structure, populate indexes.
|
||||
FillIndexes(sub_index_size_needed, hash_to_offsets, entries_per_bucket);
|
||||
|
||||
// Fill two table properties.
|
||||
// TODO(sdong): after we have the feature of storing index in file, this
|
||||
// properties need to be populated to index_size instead.
|
||||
props->user_collected_properties["plain_table_hash_table_size"] =
|
||||
std::to_string(index_size_ * 4U);
|
||||
props->user_collected_properties["plain_table_sub_index_size"] =
|
||||
std::to_string(sub_index_size_needed);
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -720,9 +741,7 @@ void PlainTableIterator::Next() {
|
||||
status_ = table_->Next(&next_offset_, &parsed_key, &value_);
|
||||
if (status_.ok()) {
|
||||
// Make a copy in this case. TODO optimize.
|
||||
tmp_str_.clear();
|
||||
AppendInternalKey(&tmp_str_, parsed_key);
|
||||
key_ = Slice(tmp_str_);
|
||||
key_.SetInternalKey(parsed_key);
|
||||
} else {
|
||||
offset_ = next_offset_ = table_->data_end_offset_;
|
||||
}
|
||||
@ -735,7 +754,7 @@ void PlainTableIterator::Prev() {
|
||||
|
||||
Slice PlainTableIterator::key() const {
|
||||
assert(Valid());
|
||||
return key_;
|
||||
return key_.GetKey();
|
||||
}
|
||||
|
||||
Slice PlainTableIterator::value() const {
|
||||
|
@ -17,6 +17,7 @@
|
||||
#include "rocksdb/table_properties.h"
|
||||
#include "table/table_reader.h"
|
||||
#include "table/plain_table_factory.h"
|
||||
#include "util/arena.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
@ -50,7 +51,7 @@ class PlainTableReader: public TableReader {
|
||||
unique_ptr<RandomAccessFile>&& file, uint64_t file_size,
|
||||
unique_ptr<TableReader>* table,
|
||||
const int bloom_bits_per_key, double hash_table_ratio,
|
||||
size_t index_sparseness);
|
||||
size_t index_sparseness, size_t huge_page_tlb_size);
|
||||
|
||||
bool PrefixMayMatch(const Slice& internal_prefix);
|
||||
|
||||
@ -74,7 +75,8 @@ class PlainTableReader: public TableReader {
|
||||
const InternalKeyComparator& internal_comparator,
|
||||
uint64_t file_size, int bloom_num_bits,
|
||||
double hash_table_ratio, size_t index_sparseness,
|
||||
const TableProperties* table_properties);
|
||||
const TableProperties* table_properties,
|
||||
size_t huge_page_tlb_size);
|
||||
virtual ~PlainTableReader();
|
||||
|
||||
protected:
|
||||
@ -86,6 +88,9 @@ class PlainTableReader: public TableReader {
|
||||
// PopulateIndex() builds index of keys. It must be called before any query
|
||||
// to the table.
|
||||
//
|
||||
// props: the table properties object that need to be stored. Ownership of
|
||||
// the object will be passed.
|
||||
//
|
||||
// index_ contains buckets size of index_size_, each is a
|
||||
// 32-bit integer. The lower 31 bits contain an offset value (explained below)
|
||||
// and the first bit of the integer indicates type of the offset.
|
||||
@ -121,7 +126,7 @@ class PlainTableReader: public TableReader {
|
||||
// ....
|
||||
// record N file offset: fixedint32
|
||||
// <end>
|
||||
Status PopulateIndex();
|
||||
Status PopulateIndex(TableProperties* props);
|
||||
|
||||
private:
|
||||
struct IndexRecord;
|
||||
@ -133,9 +138,9 @@ class PlainTableReader: public TableReader {
|
||||
// For more details about the in-memory index, please refer to:
|
||||
// https://github.com/facebook/rocksdb/wiki/PlainTable-Format
|
||||
// #wiki-in-memory-index-format
|
||||
std::unique_ptr<uint32_t[]> index_;
|
||||
uint32_t* index_;
|
||||
int index_size_ = 0;
|
||||
std::unique_ptr<char[]> sub_index_;
|
||||
char* sub_index_;
|
||||
|
||||
Options options_;
|
||||
const EnvOptions& soptions_;
|
||||
@ -156,6 +161,7 @@ class PlainTableReader: public TableReader {
|
||||
const size_t kIndexIntervalForSamePrefixKeys = 16;
|
||||
// Bloom filter is used to rule out non-existent key
|
||||
unique_ptr<DynamicBloom> bloom_;
|
||||
Arena arena_;
|
||||
|
||||
std::shared_ptr<const TableProperties> table_properties_;
|
||||
// data_start_offset_ and data_end_offset_ defines the range of the
|
||||
@ -163,6 +169,7 @@ class PlainTableReader: public TableReader {
|
||||
const uint32_t data_start_offset_ = 0;
|
||||
const uint32_t data_end_offset_;
|
||||
const size_t user_key_len_;
|
||||
const size_t huge_page_tlb_size_;
|
||||
|
||||
static const size_t kNumInternalBytes = 8;
|
||||
static const uint32_t kSubIndexMask = 0x80000000;
|
||||
|
@ -91,5 +91,7 @@ const std::string TablePropertiesNames::kFixedKeyLen =
|
||||
"rocksdb.fixed.key.length";
|
||||
|
||||
extern const std::string kPropertiesBlock = "rocksdb.properties";
|
||||
// Old property block name for backward compatibility
|
||||
extern const std::string kPropertiesBlockOldName = "rocksdb.stats";
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -1055,6 +1055,116 @@ static std::string RandomString(Random* rnd, int len) {
|
||||
return r;
|
||||
}
|
||||
|
||||
void AddInternalKey(TableConstructor* c, const std::string prefix,
|
||||
int suffix_len = 800) {
|
||||
static Random rnd(1023);
|
||||
InternalKey k(prefix + RandomString(&rnd, 800), 0, kTypeValue);
|
||||
c->Add(k.Encode().ToString(), "v");
|
||||
}
|
||||
|
||||
TEST(TableTest, HashIndexTest) {
|
||||
TableConstructor c(BytewiseComparator());
|
||||
|
||||
// keys with prefix length 3, make sure the key/value is big enough to fill
|
||||
// one block
|
||||
AddInternalKey(&c, "0015");
|
||||
AddInternalKey(&c, "0035");
|
||||
|
||||
AddInternalKey(&c, "0054");
|
||||
AddInternalKey(&c, "0055");
|
||||
|
||||
AddInternalKey(&c, "0056");
|
||||
AddInternalKey(&c, "0057");
|
||||
|
||||
AddInternalKey(&c, "0058");
|
||||
AddInternalKey(&c, "0075");
|
||||
|
||||
AddInternalKey(&c, "0076");
|
||||
AddInternalKey(&c, "0095");
|
||||
|
||||
std::vector<std::string> keys;
|
||||
KVMap kvmap;
|
||||
Options options;
|
||||
BlockBasedTableOptions table_options;
|
||||
table_options.index_type = BlockBasedTableOptions::kHashSearch;
|
||||
options.table_factory.reset(new BlockBasedTableFactory(table_options));
|
||||
|
||||
options.prefix_extractor.reset(NewFixedPrefixTransform(3));
|
||||
options.block_cache = NewLRUCache(1024);
|
||||
options.block_size = 1700;
|
||||
|
||||
std::unique_ptr<InternalKeyComparator> comparator(
|
||||
new InternalKeyComparator(BytewiseComparator()));
|
||||
c.Finish(options, *comparator, &keys, &kvmap);
|
||||
auto reader = c.table_reader();
|
||||
|
||||
auto props = c.table_reader()->GetTableProperties();
|
||||
ASSERT_EQ(5u, props->num_data_blocks);
|
||||
|
||||
std::unique_ptr<Iterator> hash_iter(reader->NewIterator(ReadOptions()));
|
||||
|
||||
// -- Find keys do not exist, but have common prefix.
|
||||
std::vector<std::string> prefixes = {"001", "003", "005", "007", "009"};
|
||||
std::vector<std::string> lower_bound = {keys[0], keys[1], keys[2],
|
||||
keys[7], keys[9], };
|
||||
|
||||
// find the lower bound of the prefix
|
||||
for (size_t i = 0; i < prefixes.size(); ++i) {
|
||||
hash_iter->Seek(InternalKey(prefixes[i], 0, kTypeValue).Encode());
|
||||
ASSERT_OK(hash_iter->status());
|
||||
ASSERT_TRUE(hash_iter->Valid());
|
||||
|
||||
// seek the first element in the block
|
||||
ASSERT_EQ(lower_bound[i], hash_iter->key().ToString());
|
||||
ASSERT_EQ("v", hash_iter->value().ToString());
|
||||
}
|
||||
|
||||
// find the upper bound of prefixes
|
||||
std::vector<std::string> upper_bound = {keys[1], keys[2], keys[7], keys[9], };
|
||||
|
||||
// find existing keys
|
||||
for (const auto& item : kvmap) {
|
||||
auto ukey = ExtractUserKey(item.first).ToString();
|
||||
hash_iter->Seek(ukey);
|
||||
|
||||
// ASSERT_OK(regular_iter->status());
|
||||
ASSERT_OK(hash_iter->status());
|
||||
|
||||
// ASSERT_TRUE(regular_iter->Valid());
|
||||
ASSERT_TRUE(hash_iter->Valid());
|
||||
|
||||
ASSERT_EQ(item.first, hash_iter->key().ToString());
|
||||
ASSERT_EQ(item.second, hash_iter->value().ToString());
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < prefixes.size(); ++i) {
|
||||
// the key is greater than any existing keys.
|
||||
auto key = prefixes[i] + "9";
|
||||
hash_iter->Seek(InternalKey(key, 0, kTypeValue).Encode());
|
||||
|
||||
ASSERT_OK(hash_iter->status());
|
||||
if (i == prefixes.size() - 1) {
|
||||
// last key
|
||||
ASSERT_TRUE(!hash_iter->Valid());
|
||||
} else {
|
||||
ASSERT_TRUE(hash_iter->Valid());
|
||||
// seek the first element in the block
|
||||
ASSERT_EQ(upper_bound[i], hash_iter->key().ToString());
|
||||
ASSERT_EQ("v", hash_iter->value().ToString());
|
||||
}
|
||||
}
|
||||
|
||||
// find keys with prefix that don't match any of the existing prefixes.
|
||||
std::vector<std::string> non_exist_prefixes = {"002", "004", "006", "008"};
|
||||
for (const auto& prefix : non_exist_prefixes) {
|
||||
hash_iter->Seek(InternalKey(prefix, 0, kTypeValue).Encode());
|
||||
// regular_iter->Seek(prefix);
|
||||
|
||||
ASSERT_OK(hash_iter->status());
|
||||
ASSERT_TRUE(!hash_iter->Valid());
|
||||
}
|
||||
}
|
||||
|
||||
// It's very hard to figure out the index block size of a block accurately.
|
||||
// To make sure we get the index size, we just make sure as key number
|
||||
// grows, the filter block size also grows.
|
||||
|
@ -8,6 +8,7 @@
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#include "util/arena.h"
|
||||
#include <sys/mman.h>
|
||||
#include <algorithm>
|
||||
|
||||
namespace rocksdb {
|
||||
@ -38,6 +39,13 @@ Arena::~Arena() {
|
||||
for (const auto& block : blocks_) {
|
||||
delete[] block;
|
||||
}
|
||||
for (const auto& mmap_info : huge_blocks_) {
|
||||
auto ret = munmap(mmap_info.addr_, mmap_info.length_);
|
||||
if (ret != 0) {
|
||||
// TODO(sdong): Better handling
|
||||
perror("munmap");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
char* Arena::AllocateFallback(size_t bytes, bool aligned) {
|
||||
@ -63,9 +71,29 @@ char* Arena::AllocateFallback(size_t bytes, bool aligned) {
|
||||
}
|
||||
}
|
||||
|
||||
char* Arena::AllocateAligned(size_t bytes) {
|
||||
char* Arena::AllocateAligned(size_t bytes, size_t huge_page_tlb_size) {
|
||||
assert((kAlignUnit & (kAlignUnit - 1)) ==
|
||||
0); // Pointer size should be a power of 2
|
||||
|
||||
#ifdef OS_LINUX
|
||||
if (huge_page_tlb_size > 0 && bytes > 0) {
|
||||
// Allocate from a huge page TBL table.
|
||||
size_t reserved_size =
|
||||
((bytes - 1U) / huge_page_tlb_size + 1U) * huge_page_tlb_size;
|
||||
assert(reserved_size >= bytes);
|
||||
void* addr = mmap(nullptr, reserved_size, (PROT_READ | PROT_WRITE),
|
||||
(MAP_PRIVATE | MAP_ANONYMOUS | MAP_HUGETLB), 0, 0);
|
||||
if (addr == MAP_FAILED) {
|
||||
perror("mmap");
|
||||
// fail back to malloc
|
||||
} else {
|
||||
blocks_memory_ += reserved_size;
|
||||
huge_blocks_.push_back(MmapInfo(addr, reserved_size));
|
||||
return reinterpret_cast<char*>(addr);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
size_t current_mod =
|
||||
reinterpret_cast<uintptr_t>(aligned_alloc_ptr_) & (kAlignUnit - 1);
|
||||
size_t slop = (current_mod == 0 ? 0 : kAlignUnit - current_mod);
|
||||
|
17
util/arena.h
17
util/arena.h
@ -34,7 +34,14 @@ class Arena {
|
||||
|
||||
char* Allocate(size_t bytes);
|
||||
|
||||
char* AllocateAligned(size_t bytes);
|
||||
// huge_page_tlb_size: if >0, allocate bytes from huge page TLB and the size
|
||||
// of the huge page TLB. Bytes will be rounded up to multiple and 2MB and
|
||||
// allocate huge pages through mmap anonymous option with huge page on.
|
||||
// The extra space allocated will be wasted. To enable it, need to reserve
|
||||
// huge pages for it to be allocated, like:
|
||||
// sysctl -w vm.nr_hugepages=20
|
||||
// See linux doc Documentation/vm/hugetlbpage.txt for details.
|
||||
char* AllocateAligned(size_t bytes, size_t huge_page_tlb_size = 0);
|
||||
|
||||
// Returns an estimate of the total memory usage of data allocated
|
||||
// by the arena (exclude the space allocated but not yet used for future
|
||||
@ -60,6 +67,14 @@ class Arena {
|
||||
// Array of new[] allocated memory blocks
|
||||
typedef std::vector<char*> Blocks;
|
||||
Blocks blocks_;
|
||||
|
||||
struct MmapInfo {
|
||||
void* addr_;
|
||||
size_t length_;
|
||||
|
||||
MmapInfo(void* addr, size_t length) : addr_(addr), length_(length) {}
|
||||
};
|
||||
std::vector<MmapInfo> huge_blocks_;
|
||||
size_t irregular_block_num = 0;
|
||||
|
||||
// Stats for current active block.
|
||||
|
@ -19,15 +19,16 @@ static uint32_t BloomHash(const Slice& key) {
|
||||
}
|
||||
}
|
||||
|
||||
DynamicBloom::DynamicBloom(uint32_t total_bits,
|
||||
uint32_t cl_per_block,
|
||||
DynamicBloom::DynamicBloom(uint32_t total_bits, uint32_t cl_per_block,
|
||||
uint32_t num_probes,
|
||||
uint32_t (*hash_func)(const Slice& key))
|
||||
uint32_t (*hash_func)(const Slice& key),
|
||||
size_t huge_page_tlb_size)
|
||||
: kBlocked(cl_per_block > 0),
|
||||
kBitsPerBlock(std::min(cl_per_block, num_probes) * CACHE_LINE_SIZE * 8),
|
||||
kTotalBits((kBlocked ? (total_bits + kBitsPerBlock - 1) / kBitsPerBlock
|
||||
* kBitsPerBlock :
|
||||
total_bits + 7) / 8 * 8),
|
||||
kTotalBits((kBlocked ? (total_bits + kBitsPerBlock - 1) / kBitsPerBlock *
|
||||
kBitsPerBlock
|
||||
: total_bits + 7) /
|
||||
8 * 8),
|
||||
kNumBlocks(kBlocked ? kTotalBits / kBitsPerBlock : 1),
|
||||
kNumProbes(num_probes),
|
||||
hash_func_(hash_func == nullptr ? &BloomHash : hash_func) {
|
||||
@ -38,7 +39,9 @@ DynamicBloom::DynamicBloom(uint32_t total_bits,
|
||||
if (kBlocked) {
|
||||
sz += CACHE_LINE_SIZE - 1;
|
||||
}
|
||||
raw_ = new unsigned char[sz]();
|
||||
raw_ = reinterpret_cast<unsigned char*>(
|
||||
arena_.AllocateAligned(sz, huge_page_tlb_size));
|
||||
memset(raw_, 0, sz);
|
||||
if (kBlocked && (reinterpret_cast<uint64_t>(raw_) % CACHE_LINE_SIZE)) {
|
||||
data_ = raw_ + CACHE_LINE_SIZE -
|
||||
reinterpret_cast<uint64_t>(raw_) % CACHE_LINE_SIZE;
|
||||
|
@ -8,6 +8,8 @@
|
||||
#include <atomic>
|
||||
#include <memory>
|
||||
|
||||
#include <util/arena.h>
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
class Slice;
|
||||
@ -19,13 +21,17 @@ class DynamicBloom {
|
||||
// cl_per_block: block size in cache lines. When this is non-zero, a
|
||||
// query/set is done within a block to improve cache locality.
|
||||
// hash_func: customized hash function
|
||||
// huge_page_tlb_size: if >0, try to allocate bloom bytes from huge page TLB
|
||||
// withi this page size. Need to reserve huge pages for
|
||||
// it to be allocated, like:
|
||||
// sysctl -w vm.nr_hugepages=20
|
||||
// See linux doc Documentation/vm/hugetlbpage.txt
|
||||
explicit DynamicBloom(uint32_t total_bits, uint32_t cl_per_block = 0,
|
||||
uint32_t num_probes = 6,
|
||||
uint32_t (*hash_func)(const Slice& key) = nullptr);
|
||||
uint32_t (*hash_func)(const Slice& key) = nullptr,
|
||||
size_t huge_page_tlb_size = 0);
|
||||
|
||||
~DynamicBloom() {
|
||||
delete[] raw_;
|
||||
}
|
||||
~DynamicBloom() {}
|
||||
|
||||
// Assuming single threaded access to this function.
|
||||
void Add(const Slice& key);
|
||||
@ -49,6 +55,8 @@ class DynamicBloom {
|
||||
uint32_t (*hash_func_)(const Slice& key);
|
||||
unsigned char* data_;
|
||||
unsigned char* raw_;
|
||||
|
||||
Arena arena_;
|
||||
};
|
||||
|
||||
inline void DynamicBloom::Add(const Slice& key) { AddHash(hash_func_(key)); }
|
||||
|
@ -761,6 +761,10 @@ class PosixWritableFile : public WritableFile {
|
||||
}
|
||||
|
||||
virtual Status Sync() {
|
||||
Status s = Flush();
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
TEST_KILL_RANDOM(rocksdb_kill_odds);
|
||||
if (pending_sync_ && fdatasync(fd_) < 0) {
|
||||
return IOError(filename_, errno);
|
||||
@ -771,6 +775,10 @@ class PosixWritableFile : public WritableFile {
|
||||
}
|
||||
|
||||
virtual Status Fsync() {
|
||||
Status s = Flush();
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
TEST_KILL_RANDOM(rocksdb_kill_odds);
|
||||
if (pending_fsync_ && fsync(fd_) < 0) {
|
||||
return IOError(filename_, errno);
|
||||
|
@ -290,7 +290,6 @@ TEST(EnvPosixTest, AllocateTest) {
|
||||
// allocate 100 MB
|
||||
size_t kPreallocateSize = 100 * 1024 * 1024;
|
||||
size_t kBlockSize = 512;
|
||||
size_t kPageSize = 4096;
|
||||
std::string data = "test";
|
||||
wfile->SetPreallocationBlockSize(kPreallocateSize);
|
||||
ASSERT_OK(wfile->Append(Slice(data)));
|
||||
@ -299,8 +298,9 @@ TEST(EnvPosixTest, AllocateTest) {
|
||||
struct stat f_stat;
|
||||
stat(fname.c_str(), &f_stat);
|
||||
ASSERT_EQ((unsigned int)data.size(), f_stat.st_size);
|
||||
auto st_blocks = f_stat.st_blocks;
|
||||
// verify that blocks are preallocated
|
||||
ASSERT_EQ((unsigned int)(kPreallocateSize / kBlockSize), f_stat.st_blocks);
|
||||
ASSERT_LE((unsigned int)(kPreallocateSize / kBlockSize), st_blocks);
|
||||
|
||||
// close the file, should deallocate the blocks
|
||||
wfile.reset();
|
||||
@ -308,8 +308,7 @@ TEST(EnvPosixTest, AllocateTest) {
|
||||
stat(fname.c_str(), &f_stat);
|
||||
ASSERT_EQ((unsigned int)data.size(), f_stat.st_size);
|
||||
// verify that preallocated blocks were deallocated on file close
|
||||
size_t data_blocks_pages = ((data.size() + kPageSize - 1) / kPageSize);
|
||||
ASSERT_EQ((unsigned int)(data_blocks_pages * kPageSize / kBlockSize), f_stat.st_blocks);
|
||||
ASSERT_LT(f_stat.st_blocks, st_blocks);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
@ -52,7 +52,8 @@ struct Node {
|
||||
class HashLinkListRep : public MemTableRep {
|
||||
public:
|
||||
HashLinkListRep(const MemTableRep::KeyComparator& compare, Arena* arena,
|
||||
const SliceTransform* transform, size_t bucket_size);
|
||||
const SliceTransform* transform, size_t bucket_size,
|
||||
size_t huge_page_tlb_size);
|
||||
|
||||
virtual KeyHandle Allocate(const size_t len, char** buf) override;
|
||||
|
||||
@ -308,13 +309,13 @@ class HashLinkListRep : public MemTableRep {
|
||||
|
||||
HashLinkListRep::HashLinkListRep(const MemTableRep::KeyComparator& compare,
|
||||
Arena* arena, const SliceTransform* transform,
|
||||
size_t bucket_size)
|
||||
size_t bucket_size, size_t huge_page_tlb_size)
|
||||
: MemTableRep(arena),
|
||||
bucket_size_(bucket_size),
|
||||
transform_(transform),
|
||||
compare_(compare) {
|
||||
char* mem = arena_->AllocateAligned(
|
||||
sizeof(port::AtomicPointer) * bucket_size);
|
||||
char* mem = arena_->AllocateAligned(sizeof(port::AtomicPointer) * bucket_size,
|
||||
huge_page_tlb_size);
|
||||
|
||||
buckets_ = new (mem) port::AtomicPointer[bucket_size];
|
||||
|
||||
@ -476,11 +477,13 @@ Node* HashLinkListRep::FindGreaterOrEqualInBucket(Node* head,
|
||||
MemTableRep* HashLinkListRepFactory::CreateMemTableRep(
|
||||
const MemTableRep::KeyComparator& compare, Arena* arena,
|
||||
const SliceTransform* transform) {
|
||||
return new HashLinkListRep(compare, arena, transform, bucket_count_);
|
||||
return new HashLinkListRep(compare, arena, transform, bucket_count_,
|
||||
huge_page_tlb_size_);
|
||||
}
|
||||
|
||||
MemTableRepFactory* NewHashLinkListRepFactory(size_t bucket_count) {
|
||||
return new HashLinkListRepFactory(bucket_count);
|
||||
MemTableRepFactory* NewHashLinkListRepFactory(size_t bucket_count,
|
||||
size_t huge_page_tlb_size) {
|
||||
return new HashLinkListRepFactory(bucket_count, huge_page_tlb_size);
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -14,8 +14,9 @@ namespace rocksdb {
|
||||
|
||||
class HashLinkListRepFactory : public MemTableRepFactory {
|
||||
public:
|
||||
explicit HashLinkListRepFactory(size_t bucket_count)
|
||||
: bucket_count_(bucket_count) { }
|
||||
explicit HashLinkListRepFactory(size_t bucket_count,
|
||||
size_t huge_page_tlb_size)
|
||||
: bucket_count_(bucket_count), huge_page_tlb_size_(huge_page_tlb_size) {}
|
||||
|
||||
virtual ~HashLinkListRepFactory() {}
|
||||
|
||||
@ -29,6 +30,7 @@ class HashLinkListRepFactory : public MemTableRepFactory {
|
||||
|
||||
private:
|
||||
const size_t bucket_count_;
|
||||
const size_t huge_page_tlb_size_;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -317,6 +317,10 @@ Options::Dump(Logger* log) const
|
||||
memtable_prefix_bloom_bits);
|
||||
Log(log, " Options.memtable_prefix_bloom_probes: %d",
|
||||
memtable_prefix_bloom_probes);
|
||||
Log(log, " Options.memtable_prefix_bloom_huge_page_tlb_size: %zu",
|
||||
memtable_prefix_bloom_huge_page_tlb_size);
|
||||
Log(log, " Options.bloom_locality: %d",
|
||||
bloom_locality);
|
||||
Log(log, " Options.max_successive_merges: %zd",
|
||||
max_successive_merges);
|
||||
} // Options::Dump
|
||||
|
Loading…
Reference in New Issue
Block a user