diff --git a/db/db_test.cc b/db/db_test.cc index 27e90f110..10babbac6 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -23,6 +23,7 @@ #include "rocksdb/env.h" #include "rocksdb/table.h" #include "rocksdb/perf_context.h" +#include "rocksdb/plain_table_factory.h" #include "util/hash.h" #include "util/logging.h" #include "util/mutexlock.h" @@ -244,6 +245,8 @@ class DBTest { // Sequence of option configurations to try enum OptionConfig { kDefault, + kPlainTableFirstBytePrefix, + kPlainTableAllBytesPrefix, kVectorRep, kMergePut, kFilter, @@ -275,7 +278,8 @@ class DBTest { kNoSkip = 0, kSkipDeletesFilterFirst = 1, kSkipUniversalCompaction = 2, - kSkipMergePut = 4 + kSkipMergePut = 4, + kSkipPlainTable = 8 }; DBTest() : option_config_(kDefault), @@ -297,20 +301,27 @@ class DBTest { // Switch to a fresh database with the next option configuration to // test. Return false if there are no more configurations to test. bool ChangeOptions(int skip_mask = kNoSkip) { - option_config_++; - // skip some options - if (skip_mask & kSkipDeletesFilterFirst && - option_config_ == kDeletesFilterFirst) { - option_config_++; - } - if (skip_mask & kSkipUniversalCompaction && - option_config_ == kUniversalCompaction) { - option_config_++; - } - if (skip_mask & kSkipMergePut && option_config_ == kMergePut) { - option_config_++; + for(option_config_++; option_config_ < kEnd; option_config_++) { + if ((skip_mask & kSkipDeletesFilterFirst) && + option_config_ == kDeletesFilterFirst) { + continue; + } + if ((skip_mask & kSkipUniversalCompaction) && + option_config_ == kUniversalCompaction) { + continue; + } + if ((skip_mask & kSkipMergePut) && option_config_ == kMergePut) { + continue; + } + if ((skip_mask & kSkipPlainTable) + && (option_config_ == kPlainTableAllBytesPrefix + || option_config_ == kPlainTableFirstBytePrefix)) { + continue; + } + break; } + if (option_config_ >= kEnd) { Destroy(&last_options_); return false; @@ -343,6 +354,18 @@ class DBTest { options.memtable_factory.reset( NewHashSkipListRepFactory(NewFixedPrefixTransform(1))); break; + case kPlainTableFirstBytePrefix: + options.table_factory.reset(new PlainTableFactory()); + options.prefix_extractor = NewFixedPrefixTransform(1); + options.allow_mmap_reads = true; + options.max_sequential_skip_in_iterations = 999999; + break; + case kPlainTableAllBytesPrefix: + options.table_factory.reset(new PlainTableFactory()); + options.prefix_extractor = NewNoopTransform(); + options.allow_mmap_reads = true; + options.max_sequential_skip_in_iterations = 999999; + break; case kMergePut: options.merge_operator = MergeOperators::CreatePutOperator(); break; @@ -1009,7 +1032,10 @@ TEST(DBTest, KeyMayExist) { options.statistics.get()->getTickerCount(BLOCK_CACHE_ADD)); delete options.filter_policy; - } while (ChangeOptions()); + + // KeyMayExist function only checks data in block caches, which is not used + // by plain table format. + } while (ChangeOptions(kSkipPlainTable)); } TEST(DBTest, NonBlockingIteration) { @@ -1073,7 +1099,9 @@ TEST(DBTest, NonBlockingIteration) { options.statistics.get()->getTickerCount(BLOCK_CACHE_ADD)); delete iter; - } while (ChangeOptions()); + // This test verifies block cache behaviors, which is not used by plain + // table format. + } while (ChangeOptions(kSkipPlainTable)); } // A delete is skipped for key if KeyMayExist(key) returns False @@ -2932,7 +2960,8 @@ TEST(DBTest, ApproximateSizes) { ASSERT_EQ(NumTableFilesAtLevel(0), 0); ASSERT_GT(NumTableFilesAtLevel(1), 0); } - } while (ChangeOptions(kSkipUniversalCompaction)); + // ApproximateOffsetOf() is not yet implemented in plain table format. + } while (ChangeOptions(kSkipUniversalCompaction | kSkipPlainTable)); } TEST(DBTest, ApproximateSizes_MixOfSmallAndLarge) { @@ -2970,7 +2999,8 @@ TEST(DBTest, ApproximateSizes_MixOfSmallAndLarge) { dbfull()->TEST_CompactRange(0, nullptr, nullptr); } - } while (ChangeOptions()); + // ApproximateOffsetOf() is not yet implemented in plain table format. + } while (ChangeOptions(kSkipPlainTable)); } TEST(DBTest, IteratorPinsRef) { @@ -3054,7 +3084,9 @@ TEST(DBTest, HiddenValuesAreRemoved) { ASSERT_EQ(AllEntriesFor("foo"), "[ tiny ]"); ASSERT_TRUE(Between(Size("", "pastfoo"), 0, 1000)); - } while (ChangeOptions(kSkipUniversalCompaction)); + // ApproximateOffsetOf() is not yet implemented in plain table format, + // which is used by Size(). + } while (ChangeOptions(kSkipUniversalCompaction | kSkipPlainTable)); } TEST(DBTest, CompactBetweenSnapshots) { @@ -4626,7 +4658,8 @@ TEST(DBTest, Randomized) { // TODO(sanjay): Test Get() works int p = rnd.Uniform(100); int minimum = 0; - if (option_config_ == kHashSkipList) { + if (option_config_ == kHashSkipList || + option_config_ == kPlainTableFirstBytePrefix) { minimum = 1; } if (p < 45) { // Put diff --git a/db/plain_table_db_test.cc b/db/plain_table_db_test.cc index 0baf56ecd..17f871e4c 100644 --- a/db/plain_table_db_test.cc +++ b/db/plain_table_db_test.cc @@ -59,7 +59,8 @@ public: // Return the current option configuration. Options CurrentOptions() { Options options; - options.table_factory.reset(new PlainTableFactory(16, 8, 2, 0.8)); + options.table_factory.reset(new PlainTableFactory(16, 2, 0.8)); + options.prefix_extractor = NewFixedPrefixTransform(8); options.allow_mmap_reads = true; return options; } diff --git a/include/rocksdb/plain_table_factory.h b/include/rocksdb/plain_table_factory.h index 2355e43d4..5cf59d23a 100644 --- a/include/rocksdb/plain_table_factory.h +++ b/include/rocksdb/plain_table_factory.h @@ -23,41 +23,37 @@ class TableBuilder; // IndexedTable requires fixed length key, configured as a constructor // parameter of the factory class. Output file format: -// +-------------+ -// | version | -// +-------------+------------------------------+ <= key1 offset -// | key1 | value_size (4 bytes) | | -// +----------------------------------------+ | +// +-------------+-----------------+ +// | version | user_key_length | +// +------------++------------------------------+ <= key1 offset +// | [key_size] | key1 | value_size | | +// +------------+-------------+-------------+ | // | value1 | // | | // +----------------------------------------+---+ <= key2 offset -// | key2 | value_size (4 bytes) | | -// +----------------------------------------+ | +// | [key_size] | key2 | value_size | | +// +------------+-------------+-------------+ | // | value2 | // | | // | ...... | -// +-----------------+--------------------------+ <= index_block_offset -// | key1 | key1 offset (8 bytes) | // +-----------------+--------------------------+ -// | key2 | key2 offset (8 bytes) | -// +-----------------+--------------------------+ -// | key3 | key3 offset (8 bytes) | -// +-----------------+--------------------------+ -// | ...... | -// +-----------------+------------+-------------+ +// If user_key_length = kVariableLength, it means the key is variable length, +// there will be an extra field for key size encoded before every key. class PlainTableFactory: public TableFactory { public: ~PlainTableFactory() { } - // user_key_size is the length of the user key. key_prefix_len is the - // length of the prefix used for in-memory indexes. bloom_num_bits is + // user_key_size is the length of the user key. If it is set to be + // kVariableLength, then it means variable length. Otherwise, all the + // keys need to have the fix length of this value. bloom_num_bits is // number of bits used for bloom filer per key. hash_table_ratio is - // the desired ultilization of the hash table used for prefix hashing. + // the desired utilization of the hash table used for prefix hashing. // hash_table_ratio = number of prefixes / #buckets in the hash table - PlainTableFactory(int user_key_size, int key_prefix_len, - int bloom_num_bits = 0, double hash_table_ratio = 0.75) : - user_key_size_(user_key_size), key_prefix_len_(key_prefix_len), - bloom_num_bits_(bloom_num_bits), hash_table_ratio_(hash_table_ratio) { + explicit PlainTableFactory(uint32_t user_key_len = kVariableLength, + int bloom_num_bits = 0, + double hash_table_ratio = 0.75) : + user_key_len_(user_key_len), bloom_num_bits_(bloom_num_bits), + hash_table_ratio_(hash_table_ratio) { } const char* Name() const override { return "PlainTable"; @@ -70,9 +66,10 @@ public: TableBuilder* GetTableBuilder(const Options& options, WritableFile* file, CompressionType compression_type) const override; + + static const uint32_t kVariableLength = 0; private: - int user_key_size_; - int key_prefix_len_; + uint32_t user_key_len_; int bloom_num_bits_; double hash_table_ratio_; }; diff --git a/include/rocksdb/table_properties.h b/include/rocksdb/table_properties.h index c2570acf6..b1b52e87a 100644 --- a/include/rocksdb/table_properties.h +++ b/include/rocksdb/table_properties.h @@ -37,6 +37,10 @@ struct TableProperties { uint64_t num_data_blocks = 0; // the number of entries in this table uint64_t num_entries = 0; + // format version, reserved for backward compatibility + uint64_t format_version = 0; + // If 0, key is variable length. Otherwise number of bytes for each key. + uint64_t fixed_key_len = 0; // The name of the filter policy used in this table. // If no filter policy is used, `filter_policy_name` will be an empty string. @@ -61,6 +65,8 @@ struct TablePropertiesNames { static const std::string kRawValueSize; static const std::string kNumDataBlocks; static const std::string kNumEntries; + static const std::string kFormatVersion; + static const std::string kFixedKeyLen; static const std::string kFilterPolicy; }; diff --git a/table/meta_blocks.cc b/table/meta_blocks.cc index 95eb6c4ab..5d2d94175 100644 --- a/table/meta_blocks.cc +++ b/table/meta_blocks.cc @@ -67,6 +67,8 @@ void PropertyBlockBuilder::AddTableProperty(const TableProperties& props) { Add(TablePropertiesNames::kNumEntries, props.num_entries); Add(TablePropertiesNames::kNumDataBlocks, props.num_data_blocks); Add(TablePropertiesNames::kFilterSize, props.filter_size); + Add(TablePropertiesNames::kFormatVersion, props.format_version); + Add(TablePropertiesNames::kFixedKeyLen, props.fixed_key_len); if (!props.filter_policy_name.empty()) { Add(TablePropertiesNames::kFilterPolicy, @@ -175,6 +177,8 @@ Status ReadProperties( { TablePropertiesNames::kNumDataBlocks, &table_properties->num_data_blocks }, { TablePropertiesNames::kNumEntries, &table_properties->num_entries }, + { TablePropertiesNames::kFormatVersion, &table_properties->format_version }, + { TablePropertiesNames::kFixedKeyLen, &table_properties->fixed_key_len }, }; std::string last_key; diff --git a/table/plain_table_builder.cc b/table/plain_table_builder.cc index 970599a9b..5c3252360 100644 --- a/table/plain_table_builder.cc +++ b/table/plain_table_builder.cc @@ -50,12 +50,9 @@ extern const uint64_t kPlainTableMagicNumber = 0x4f3418eb7a8f13b8ull; PlainTableBuilder::PlainTableBuilder(const Options& options, WritableFile* file, - int user_key_size, int key_prefix_len) : - options_(options), file_(file), user_key_size_(user_key_size) { - std::string version; - PutFixed32(&version, 1 | 0x80000000); - file_->Append(Slice(version)); - offset_ = 4; + uint32_t user_key_len) : + options_(options), file_(file), user_key_len_(user_key_len) { + properties_.fixed_key_len = user_key_len; // for plain table, we put all the data in a big chuck. properties_.num_data_blocks = 1; @@ -63,25 +60,37 @@ PlainTableBuilder::PlainTableBuilder(const Options& options, // filter block. properties_.index_size = 0; properties_.filter_size = 0; + properties_.format_version = 0; } PlainTableBuilder::~PlainTableBuilder() { } void PlainTableBuilder::Add(const Slice& key, const Slice& value) { - assert((int) key.size() == GetInternalKeyLength()); + assert(user_key_len_ == 0 || key.size() == user_key_len_ + 8); - // Write key-value pair + if (!IsFixedLength()) { + // Write key length + int key_size = key.size(); + key_size_str_.clear(); + PutVarint32(&key_size_str_, key_size); + file_->Append(key_size_str_); + offset_ += key_size_str_.length(); + } + + // Write key file_->Append(key); - offset_ += GetInternalKeyLength(); + offset_ += key.size(); - std::string size; + // Write value length + value_size_str_.clear(); int value_size = value.size(); - PutVarint32(&size, value_size); - Slice sizeSlice(size); - file_->Append(sizeSlice); + PutVarint32(&value_size_str_, value_size); + file_->Append(value_size_str_); + + // Write value file_->Append(value); - offset_ += value_size + size.length(); + offset_ += value_size + value_size_str_.length(); properties_.num_entries++; properties_.raw_key_size += key.size(); diff --git a/table/plain_table_builder.h b/table/plain_table_builder.h index b8a2bbe3b..f4be46828 100644 --- a/table/plain_table_builder.h +++ b/table/plain_table_builder.h @@ -27,7 +27,7 @@ public: // will be part of level specified by 'level'. A value of -1 means // that the caller does not know which level the output file will reside. PlainTableBuilder(const Options& options, WritableFile* file, - int user_key_size, int key_prefix_len); + uint32_t user_key_size); // REQUIRES: Either Finish() or Abandon() has been called. ~PlainTableBuilder(); @@ -66,11 +66,14 @@ private: Status status_; TableProperties properties_; - const size_t user_key_size_; + const size_t user_key_len_; bool closed_ = false; // Either Finish() or Abandon() has been called. - int GetInternalKeyLength() { - return user_key_size_ + 8; + std::string key_size_str_; + std::string value_size_str_; + + bool IsFixedLength() const { + return user_key_len_ > 0; } // No copying allowed diff --git a/table/plain_table_factory.cc b/table/plain_table_factory.cc index 08e75c4ec..bf941a62d 100644 --- a/table/plain_table_factory.cc +++ b/table/plain_table_factory.cc @@ -19,13 +19,12 @@ Status PlainTableFactory::GetTableReader(const Options& options, unique_ptr* table) const { return PlainTableReader::Open(options, soptions, std::move(file), file_size, - table, user_key_size_, key_prefix_len_, - bloom_num_bits_, hash_table_ratio_); + table, bloom_num_bits_, hash_table_ratio_); } TableBuilder* PlainTableFactory::GetTableBuilder( const Options& options, WritableFile* file, CompressionType compression_type) const { - return new PlainTableBuilder(options, file, user_key_size_, key_prefix_len_); + return new PlainTableBuilder(options, file, user_key_len_); } } // namespace rocksdb diff --git a/table/plain_table_reader.cc b/table/plain_table_reader.cc index 5f68a183a..4c396a359 100644 --- a/table/plain_table_reader.cc +++ b/table/plain_table_reader.cc @@ -15,6 +15,7 @@ #include "rocksdb/filter_policy.h" #include "rocksdb/options.h" #include "rocksdb/statistics.h" +#include "rocksdb/plain_table_factory.h" #include "table/block.h" #include "table/filter_block.h" @@ -23,6 +24,7 @@ #include "table/two_level_iterator.h" #include "util/coding.h" +#include "util/dynamic_bloom.h" #include "util/hash.h" #include "util/histogram.h" #include "util/murmurhash.h" @@ -30,46 +32,36 @@ #include "util/stop_watch.h" -namespace std { -template<> -struct hash { -public: - std::size_t operator()(rocksdb::Slice const& s) const { - return MurmurHash(s.data(), s.size(), 397); - } -}; -} - namespace rocksdb { extern const uint64_t kPlainTableMagicNumber; -static uint32_t getBucketId(Slice const& s, size_t prefix_len, - uint32_t num_buckets) { - return MurmurHash(s.data(), prefix_len, 397) % num_buckets; + +static uint32_t GetSliceHash(Slice const& s) { + return Hash(s.data(), s.size(), 397) ; +} +static uint32_t getBucketIdFromHash(uint32_t hash, uint32_t num_buckets) { + return hash % num_buckets; } PlainTableReader::PlainTableReader(const EnvOptions& storage_options, - uint64_t file_size, int user_key_size, - int key_prefix_len, int bloom_bits_per_key, + uint64_t file_size, int bloom_bits_per_key, double hash_table_ratio, const TableProperties& table_properties) : hash_table_size_(0), soptions_(storage_options), file_size_(file_size), - user_key_size_(user_key_size), key_prefix_len_(key_prefix_len), hash_table_ratio_(hash_table_ratio), - filter_policy_(bloom_bits_per_key > 0 ? - NewBloomFilterPolicy(bloom_bits_per_key) : nullptr), - table_properties_(table_properties), - data_start_offset_(0), - data_end_offset_(table_properties_.data_size) { + bloom_bits_per_key_(bloom_bits_per_key), + table_properties_(table_properties), data_start_offset_(0), + data_end_offset_(table_properties_.data_size), + user_key_len_(table_properties.fixed_key_len) { + hash_table_ = nullptr; + bloom_ = nullptr; + sub_index_ = nullptr; } PlainTableReader::~PlainTableReader() { - if (hash_table_ != nullptr) { - delete[] hash_table_; - } - if (filter_policy_ != nullptr) { - delete filter_policy_; - } + delete[] hash_table_; + delete[] sub_index_; + delete bloom_; } Status PlainTableReader::Open(const Options& options, @@ -77,8 +69,6 @@ Status PlainTableReader::Open(const Options& options, unique_ptr && file, uint64_t file_size, unique_ptr* table_reader, - const int user_key_size, - const int key_prefix_len, const int bloom_num_bits, double hash_table_ratio) { assert(options.allow_mmap_reads); @@ -103,8 +93,6 @@ Status PlainTableReader::Open(const Options& options, std::unique_ptr new_reader(new PlainTableReader( soptions, file_size, - user_key_size, - key_prefix_len, bloom_num_bits, hash_table_ratio, table_properties @@ -133,22 +121,69 @@ Iterator* PlainTableReader::NewIterator(const ReadOptions& options) { return new PlainTableIterator(this); } -Status PlainTableReader::PopulateIndex() { - // Get mmapped memory to file_data_. - Status s = file_->Read(0, file_size_, &file_data_, nullptr); - if (!s.ok()) { - return s; - } - version_ = DecodeFixed32(file_data_.data()); - version_ ^= 0x80000000; - assert(version_ == 1); - data_start_offset_ = 4; +struct PlainTableReader::IndexRecord { + uint32_t hash; // hash of the prefix + uint32_t offset; // offset of a row + IndexRecord* next; +}; +// Helper class to track all the index records +class PlainTableReader::IndexRecordList { +public: + explicit IndexRecordList(size_t num_records_per_group) : + num_records_per_group_(num_records_per_group), + current_group_(nullptr), + num_records_in_current_group_(num_records_per_group) { + } + + ~IndexRecordList() { + for (size_t i = 0; i < groups_.size(); i++) { + delete[] groups_[i]; + } + } + + void AddRecord(murmur_t hash, uint32_t offset) { + if (num_records_in_current_group_ == num_records_per_group_) { + current_group_ = AllocateNewGroup(); + num_records_in_current_group_ = 0; + } + auto& new_record = current_group_[num_records_in_current_group_]; + new_record.hash = hash; + new_record.offset = offset; + new_record.next = nullptr; + num_records_in_current_group_++; + } + + size_t GetNumRecords() { + return (groups_.size() - 1) * num_records_per_group_ + + num_records_in_current_group_; + } + IndexRecord* At(size_t index) { + return &(groups_[index / num_records_per_group_] + [index % num_records_per_group_]); + } + + IndexRecord* AllocateNewGroup() { + IndexRecord* result = new IndexRecord[num_records_per_group_]; + groups_.push_back(result); + return result; + } +private: + const size_t num_records_per_group_; + IndexRecord* current_group_; + // List of arrays allocated + std::vector groups_; + size_t num_records_in_current_group_; +}; + +int PlainTableReader::PopulateIndexRecordList( + IndexRecordList& record_list) { Slice key_slice; Slice key_prefix_slice; Slice key_suffix_slice; Slice value_slice; Slice prev_key_prefix_slice; + uint32_t prev_key_prefix_hash = 0; uint32_t pos = data_start_offset_; int key_index_within_prefix = 0; bool first = true; @@ -156,72 +191,104 @@ Status PlainTableReader::PopulateIndex() { HistogramImpl keys_per_prefix_hist; // Need map to be ordered to make sure sub indexes generated // are in order. - std::vector> prefix_index_pairs; - std::string current_prefix_index; + + int num_prefixes = 0; + while (pos < data_end_offset_) { uint32_t key_offset = pos; status_ = Next(pos, &key_slice, &value_slice, pos); - key_prefix_slice = Slice(key_slice.data(), key_prefix_len_); + key_prefix_slice = GetPrefix(key_slice); if (first || prev_key_prefix_slice != key_prefix_slice) { + num_prefixes++; if (!first) { keys_per_prefix_hist.Add(key_index_within_prefix); - prefix_index_pairs.push_back( - std::make_pair( - std::move(prev_key_prefix_slice), - std::move(current_prefix_index))); - current_prefix_index.clear(); } key_index_within_prefix = 0; prev_key_prefix_slice = key_prefix_slice; + prev_key_prefix_hash = GetSliceHash(key_prefix_slice); } - if (key_index_within_prefix++ % 8 == 0) { - // Add an index key for every 8 keys - PutFixed32(¤t_prefix_index, key_offset); + if (key_index_within_prefix++ % 16 == 0) { + // Add an index key for every 16 keys + record_list.AddRecord(prev_key_prefix_hash, key_offset); } first = false; } - prefix_index_pairs.push_back( - std::make_pair(std::move(prev_key_prefix_slice), - std::move(current_prefix_index))); - keys_per_prefix_hist.Add(key_index_within_prefix); + Log(options_.info_log, "Number of Keys per prefix Histogram: %s", + keys_per_prefix_hist.ToString().c_str()); + + return num_prefixes; +} + +void PlainTableReader::Allocate(int num_prefixes) { if (hash_table_ != nullptr) { delete[] hash_table_; } - std::vector filter_entries(0); // for creating bloom filter; - if (filter_policy_ != nullptr) { - filter_entries.reserve(prefix_index_pairs.size()); + if (bloom_bits_per_key_ > 0) { + bloom_ = new DynamicBloom(num_prefixes * bloom_bits_per_key_); } double hash_table_size_multipier = (hash_table_ratio_ > 1.0) ? 1.0 : 1.0 / hash_table_ratio_; - hash_table_size_ = prefix_index_pairs.size() * hash_table_size_multipier + 1; + hash_table_size_ = num_prefixes * hash_table_size_multipier + 1; hash_table_ = new uint32_t[hash_table_size_]; - std::vector hash2map(hash_table_size_); +} +size_t PlainTableReader::BucketizeIndexesAndFillBloom( + IndexRecordList& record_list, int num_prefixes, + std::vector& hash2offsets, + std::vector& bucket_count) { size_t sub_index_size_needed = 0; - for (auto& p: prefix_index_pairs) { - auto& sub_index = hash2map[getBucketId(p.first, key_prefix_len_, - hash_table_size_)]; - if (sub_index.length() > 0 || p.second.length() > kOffsetLen) { - if (sub_index.length() <= kOffsetLen) { - sub_index_size_needed += sub_index.length() + 4; + bool first = true; + uint32_t prev_hash = 0; + size_t num_records = record_list.GetNumRecords(); + for (size_t i = 0; i < num_records; i++) { + IndexRecord* index_record = record_list.At(i); + uint32_t cur_hash = index_record->hash; + if (first || prev_hash != cur_hash) { + prev_hash = cur_hash; + first = false; + if (bloom_) { + bloom_->AddHash(cur_hash); } - sub_index_size_needed += p.second.length(); } - sub_index.append(p.second); - if (filter_policy_ != nullptr) { - filter_entries.push_back(p.first); + uint32_t bucket = getBucketIdFromHash(cur_hash, hash_table_size_); + IndexRecord* prev_bucket_head = hash2offsets[bucket]; + index_record->next = prev_bucket_head; + hash2offsets[bucket] = index_record; + if (bucket_count[bucket] > 0) { + if (bucket_count[bucket] == 1) { + sub_index_size_needed += kOffsetLen + 1; + } + if (bucket_count[bucket] == 127) { + // Need more than one byte for length + sub_index_size_needed++; + } + sub_index_size_needed += kOffsetLen; } + bucket_count[bucket]++; } + return sub_index_size_needed; +} - sub_index_.clear(); +void PlainTableReader::FillIndexes(size_t sub_index_size_needed, + std::vector& hash2offsets, + std::vector& bucket_count) { Log(options_.info_log, "Reserving %zu bytes for sub index", sub_index_size_needed); - sub_index_.reserve(sub_index_size_needed); + // 4 bytes buffer for variable length size + size_t buffer_size = 64; + size_t buffer_used = 0; + sub_index_size_needed += buffer_size; + sub_index_ = new char[sub_index_size_needed]; + size_t sub_index_offset = 0; + char* prev_ptr; + char* cur_ptr; + uint32_t* sub_index_ptr; + IndexRecord* record; for (int i = 0; i < hash_table_size_; i++) { - uint32_t num_keys_for_bucket = hash2map[i].length() / kOffsetLen; + uint32_t num_keys_for_bucket = bucket_count[i]; switch (num_keys_for_bucket) { case 0: // No key for bucket @@ -229,58 +296,131 @@ Status PlainTableReader::PopulateIndex() { break; case 1: // point directly to the file offset - hash_table_[i] = DecodeFixed32(hash2map[i].data()); + hash_table_[i] = hash2offsets[i]->offset; break; default: - // point to index block - hash_table_[i] = sub_index_.length() | kSubIndexMask; - PutFixed32(&sub_index_, num_keys_for_bucket); - sub_index_.append(hash2map[i]); + // point to second level indexes. + hash_table_[i] = sub_index_offset | kSubIndexMask; + prev_ptr = sub_index_ + sub_index_offset; + cur_ptr = EncodeVarint32(prev_ptr, num_keys_for_bucket); + sub_index_offset += cur_ptr - prev_ptr; + if (cur_ptr - prev_ptr > 2 + || (cur_ptr - prev_ptr == 2 && num_keys_for_bucket <= 127)) { + // Need to resize sub_index. Exponentially grow buffer. + buffer_used += cur_ptr - prev_ptr - 1; + if (buffer_used + 4 > buffer_size) { + Log(options_.info_log, "Recalculate suffix_map length to %zu", + sub_index_size_needed); + + sub_index_size_needed += buffer_size; + buffer_size *= 2; + char* new_sub_index = new char[sub_index_size_needed]; + memcpy(new_sub_index, sub_index_, sub_index_offset); + delete[] sub_index_; + sub_index_ = new_sub_index; + } + } + sub_index_ptr = (uint32_t*) (sub_index_ + sub_index_offset); + record = hash2offsets[i]; + int j; + for (j = num_keys_for_bucket - 1; + j >= 0 && record; j--, record = record->next) { + sub_index_ptr[j] = record->offset; + } + assert(j == -1 && record == nullptr); + sub_index_offset += kOffsetLen * num_keys_for_bucket; + break; } } - if (filter_policy_ != nullptr) { - filter_str_.clear(); - filter_policy_->CreateFilter(&filter_entries[0], filter_entries.size(), - &filter_str_); - filter_slice_ = Slice(filter_str_.data(), filter_str_.size()); - } Log(options_.info_log, "hash table size: %d, suffix_map length %zu", - hash_table_size_, sub_index_.length()); - Log(options_.info_log, "Number of Keys per prefix Histogram: %s", - keys_per_prefix_hist.ToString().c_str()); + hash_table_size_, sub_index_size_needed); +} + +// PopulateIndex() builds index of keys. +// hash_table_ contains buckets size of hash_table_size_, each is a 32-bit +// integer. The lower 31 bits contain an offset value (explained below) and +// the first bit of the integer indicates type of the offset: +// +// 0 indicates that the bucket contains only one prefix (no conflict when +// hashing this prefix), whose first row starts from this offset of the file. +// 1 indicates that the bucket contains more than one prefixes, or there +// are too many rows for one prefix so we need a binary search for it. In +// this case, the offset indicates the offset of sub_index_ holding the +// binary search indexes of keys for those rows. Those binary search indexes +// are organized in this way: +// +// The first 4 bytes, indicates how many indexes (N) are stored after it. After +// it, there are N 32-bit integers, each points of an offset of the file, which +// points to starting of a row. Those offsets need to be guaranteed to be in +// ascending order so the keys they are pointing to are also in ascending order +// to make sure we can use them to do binary searches. +Status PlainTableReader::PopulateIndex() { + // Get mmapped memory to file_data_. + Status s = file_->Read(0, file_size_, &file_data_, nullptr); + if (!s.ok()) { + return s; + } + + IndexRecordList record_list(256); + // First, read the whole file, for every 16 rows for a prefix (starting from + // the first one), generate a record of (hash, offset) and append it to + // IndexRecordList, which is a data structure created to store them. + int num_prefixes = PopulateIndexRecordList(record_list); + // Calculated hash table and bloom filter size and allocate memory for indexes + // and bloom filter based on the number of prefixes. + Allocate(num_prefixes); + + // Bucketize all the index records to a temp data structure, in which for + // each bucket, we generate a linked list of IndexRecord, in reversed order. + std::vector hash2offsets(hash_table_size_, nullptr); + std::vector bucket_count(hash_table_size_, 0); + size_t sub_index_size_needed = BucketizeIndexesAndFillBloom(record_list, + num_prefixes, + hash2offsets, + bucket_count); + // From the temp data structure, populate indexes. + FillIndexes(sub_index_size_needed, hash2offsets, bucket_count); return Status::OK(); } -uint32_t PlainTableReader::GetOffset(const Slice& target, - bool& prefix_matched) { +Status PlainTableReader::GetOffset(const Slice& target, const Slice& prefix, + uint32_t prefix_hash, bool& prefix_matched, + uint32_t& ret_offset) { prefix_matched = false; - int bucket = getBucketId(target, key_prefix_len_, hash_table_size_); + int bucket = getBucketIdFromHash(prefix_hash, hash_table_size_); uint32_t bucket_value = hash_table_[bucket]; if (bucket_value == data_end_offset_) { - return data_end_offset_; + ret_offset = data_end_offset_; + return Status::OK(); } else if ((bucket_value & kSubIndexMask) == 0) { // point directly to the file - return bucket_value; + ret_offset = bucket_value; + return Status::OK(); } - // point to sub-index, need to do a binary search + // point to sub-index, need to do a binary search uint32_t low = 0; uint64_t prefix_index_offset = bucket_value ^ kSubIndexMask; - uint32_t upper_bound = DecodeFixed32(sub_index_.data() + prefix_index_offset); + + const char* index_ptr = sub_index_ + prefix_index_offset; + uint32_t upper_bound; + const uint32_t* base_ptr = (const uint32_t*) GetVarint32Ptr(index_ptr, + index_ptr + 4, + &upper_bound); uint32_t high = upper_bound; - uint64_t base_offset = prefix_index_offset + 4; Slice mid_key; // The key is between [low, high). Do a binary search between it. while (high - low > 1) { uint32_t mid = (high + low) / 2; - const char* index_offset = sub_index_.data() + base_offset - + kOffsetLen * mid; - uint32_t file_offset = DecodeFixed32(index_offset); - mid_key = Slice(file_data_.data() + file_offset, GetInternalKeyLength()); - + uint32_t file_offset = base_ptr[mid]; + size_t tmp; + Status s = ReadKey(file_data_.data() + file_offset, &mid_key, tmp); + if (!s.ok()) { + return s; + } int cmp_result = options_.comparator->Compare(target, mid_key); if (cmp_result > 0) { low = mid; @@ -289,38 +429,61 @@ uint32_t PlainTableReader::GetOffset(const Slice& target, // Happen to have found the exact key or target is smaller than the // first key after base_offset. prefix_matched = true; - return file_offset; + ret_offset = file_offset; + return Status::OK(); } else { high = mid; } } } - - // The key is between low and low+1 (if exists). Both of them can have the - // correct prefix. Need to rule out at least one, to avoid to miss the - // correct one. - uint32_t low_key_offset = DecodeFixed32( - sub_index_.data() + base_offset + kOffsetLen * low); - if (low + 1 < upper_bound) { - if (Slice(file_data_.data() + low_key_offset, key_prefix_len_) - == Slice(target.data(), key_prefix_len_)) { - prefix_matched = true; - } else { - prefix_matched = false; - return DecodeFixed32( - sub_index_.data() + base_offset + kOffsetLen * (low + 1)); - } - } else { + // Both of the key at the position low or low+1 could share the same + // prefix as target. We need to rule out one of them to avoid to go + // to the wrong prefix. + Slice low_key; + size_t tmp; + uint32_t low_key_offset = base_ptr[low]; + Status s = ReadKey(file_data_.data() + low_key_offset, &low_key, tmp); + if (GetPrefix(low_key) == prefix) { + prefix_matched = true; + ret_offset = low_key_offset; + } else if (low + 1 < upper_bound) { + // There is possible a next prefix, return it prefix_matched = false; + ret_offset = base_ptr[low + 1]; + } else { + // target is larger than a key of the last prefix in this bucket + // but with a different prefix. Key does not exist. + ret_offset = data_end_offset_; } - return low_key_offset; + return Status::OK(); } -bool PlainTableReader::MayHavePrefix(const Slice& target_prefix) { - return filter_policy_ == nullptr - || filter_policy_->KeyMayMatch(target_prefix, filter_slice_); +bool PlainTableReader::MayHavePrefix(uint32_t hash) { + return bloom_ == nullptr || bloom_->MayContainHash(hash); } +Status PlainTableReader::ReadKey(const char* row_ptr, Slice* key, + size_t& bytes_read) { + const char* key_ptr; + bytes_read = 0; + size_t internal_key_size; + if (IsFixedLength()) { + internal_key_size = GetFixedInternalKeyLength(); + key_ptr = row_ptr; + } else { + uint32_t key_size; + key_ptr = GetVarint32Ptr(row_ptr, file_data_.data() + data_end_offset_, + &key_size); + internal_key_size = (size_t) key_size; + bytes_read = key_ptr - row_ptr; + } + if (row_ptr + internal_key_size >= file_data_.data() + data_end_offset_) { + return Status::Corruption("Unable to read the next key"); + } + *key = Slice(key_ptr, internal_key_size); + bytes_read += internal_key_size; + return Status::OK(); +} Status PlainTableReader::Next(uint32_t offset, Slice* key, Slice* value, uint32_t& next_offset) { @@ -333,22 +496,17 @@ Status PlainTableReader::Next(uint32_t offset, Slice* key, Slice* value, return Status::Corruption("Offset is out of file size"); } - int internal_key_size = GetInternalKeyLength(); - if (offset + internal_key_size >= data_end_offset_) { - return Status::Corruption("Un able to read the next key"); - } - - const char* key_ptr = file_data_.data() + offset; - *key = Slice(key_ptr, internal_key_size); - + const char* row_ptr = file_data_.data() + offset; + size_t bytes_for_key; + Status s = ReadKey(row_ptr, key, bytes_for_key); uint32_t value_size; - const char* value_ptr = GetVarint32Ptr(key_ptr + internal_key_size, + const char* value_ptr = GetVarint32Ptr(row_ptr + bytes_for_key, file_data_.data() + data_end_offset_, &value_size); if (value_ptr == nullptr) { return Status::Corruption("Error reading value length."); } - next_offset = offset + (value_ptr - key_ptr) + value_size; + next_offset = offset + (value_ptr - row_ptr) + value_size; if (next_offset > data_end_offset_) { return Status::Corruption("Reach end of file when reading value"); } @@ -362,13 +520,17 @@ Status PlainTableReader::Get( bool (*saver)(void*, const Slice&, const Slice&, bool), void (*mark_key_may_exist)(void*)) { // Check bloom filter first. - if (!MayHavePrefix(Slice(target.data(), key_prefix_len_))) { + Slice prefix_slice = GetPrefix(target); + uint32_t prefix_hash = GetSliceHash(prefix_slice); + if (!MayHavePrefix(prefix_hash)) { return Status::OK(); } - uint32_t offset; bool prefix_match; - offset = GetOffset(target, prefix_match); + Status s = GetOffset(target, prefix_slice, prefix_hash, prefix_match, offset); + if (!s.ok()) { + return s; + } Slice found_key; Slice found_value; while (offset < data_end_offset_) { @@ -379,8 +541,8 @@ Status PlainTableReader::Get( if (!prefix_match) { // Need to verify prefix for the first key found if it is not yet // checked. - if (!target.starts_with(Slice(found_key.data(), key_prefix_len_))) { - break; + if (GetPrefix(found_key) != prefix_slice) { + return Status::OK(); } prefix_match = true; } @@ -403,7 +565,7 @@ uint64_t PlainTableReader::ApproximateOffsetOf(const Slice& key) { PlainTableIterator::PlainTableIterator(PlainTableReader* table) : table_(table) { - SeekToFirst(); + next_offset_ = offset_ = table_->data_end_offset_; } PlainTableIterator::~PlainTableIterator() { @@ -416,7 +578,11 @@ bool PlainTableIterator::Valid() const { void PlainTableIterator::SeekToFirst() { next_offset_ = table_->data_start_offset_; - Next(); + if (next_offset_ >= table_->data_end_offset_) { + next_offset_ = offset_ = table_->data_end_offset_; + } else { + Next(); + } } void PlainTableIterator::SeekToLast() { @@ -424,18 +590,25 @@ void PlainTableIterator::SeekToLast() { } void PlainTableIterator::Seek(const Slice& target) { - if (!table_->MayHavePrefix(Slice(target.data(), table_->key_prefix_len_))) { + Slice prefix_slice = table_->GetPrefix(target); + uint32_t prefix_hash = GetSliceHash(prefix_slice); + if (!table_->MayHavePrefix(prefix_hash)) { offset_ = next_offset_ = table_->data_end_offset_; return; } bool prefix_match; - next_offset_ = table_->GetOffset(target, prefix_match); + status_ = table_->GetOffset(target, prefix_slice, prefix_hash, prefix_match, + next_offset_); + if (!status_.ok()) { + offset_ = next_offset_ = table_->data_end_offset_; + return; + } if (next_offset_ < table_-> data_end_offset_) { for (Next(); status_.ok() && Valid(); Next()) { if (!prefix_match) { // Need to verify the first key's prefix - if (!target.starts_with(Slice(key().data(), table_->key_prefix_len_))) { + if (table_->GetPrefix(key()) != prefix_slice) { offset_ = next_offset_ = table_->data_end_offset_; break; } diff --git a/table/plain_table_reader.h b/table/plain_table_reader.h index 26a506d14..6d2efc7da 100644 --- a/table/plain_table_reader.h +++ b/table/plain_table_reader.h @@ -9,6 +9,7 @@ #include "rocksdb/env.h" #include "rocksdb/iterator.h" #include "rocksdb/table.h" +#include "rocksdb/plain_table_factory.h" namespace rocksdb { @@ -20,33 +21,12 @@ class RandomAccessFile; struct ReadOptions; class TableCache; class TableReader; +class DynamicBloom; using std::unique_ptr; using std::unordered_map; -// Based on following output file format: -// +-------------+ -// | version | -// +-------------+------------------------------+ <= key1_data_offset -// | key1 | value_size (4 bytes) | | -// +----------------------------------------+ | -// | value1 | -// | | -// +----------------------------------------+---+ <= key2_data_offset -// | key2 | value_size (4 bytes) | | -// +----------------------------------------+ | -// | value2 | -// | | -// | ...... | -// +-----------------+--------------------------+ <= index_block_offset -// | key1 | key1 offset (8 bytes) | -// +-----------------+--------------------------+ <= key2_index_offset -// | key2 | key2 offset (8 bytes) | -// +-----------------+--------------------------+ <= key3_index_offset -// | key3 | key3 offset (8 bytes) | -// +-----------------+--------------------------+ <= key4_index_offset -// | ...... | -// +-----------------+------------+-------------+ +// Based on following output file format shown in plain_table_factory.h // When opening the output file, IndexedTableReader creates a hash table // from key prefixes to offset of the output file. IndexedTable will decide // whether it points to the data offset of the first key with the key prefix @@ -58,8 +38,7 @@ class PlainTableReader: public TableReader { public: static Status Open(const Options& options, const EnvOptions& soptions, unique_ptr && file, uint64_t file_size, - unique_ptr* table, const int user_key_size, - const int key_prefix_len, const int bloom_num_bits, + unique_ptr* table, const int bloom_num_bits, double hash_table_ratio); bool PrefixMayMatch(const Slice& internal_prefix); @@ -81,20 +60,18 @@ public: return table_properties_; } - PlainTableReader( - const EnvOptions& storage_options, - uint64_t file_size, - int user_key_size, - int key_prefix_len, - int bloom_num_bits, - double hash_table_ratio, - const TableProperties& table_properties); + PlainTableReader(const EnvOptions& storage_options, uint64_t file_size, + int bloom_num_bits, double hash_table_ratio, + const TableProperties& table_properties); ~PlainTableReader(); private: + struct IndexRecord; + class IndexRecordList; + uint32_t* hash_table_ = nullptr; int hash_table_size_; - std::string sub_index_; + char* sub_index_ = nullptr; Options options_; const EnvOptions& soptions_; @@ -104,37 +81,67 @@ private: Slice file_data_; uint32_t version_; uint32_t file_size_; - const size_t user_key_size_; - const size_t key_prefix_len_; + const double hash_table_ratio_; - const FilterPolicy* filter_policy_; - std::string filter_str_; - Slice filter_slice_; + const int bloom_bits_per_key_; + DynamicBloom* bloom_; TableProperties table_properties_; - uint32_t data_start_offset_; - uint32_t data_end_offset_; + const uint32_t data_start_offset_; + const uint32_t data_end_offset_; + const size_t user_key_len_; static const size_t kNumInternalBytes = 8; static const uint32_t kSubIndexMask = 0x80000000; static const size_t kOffsetLen = sizeof(uint32_t); - inline size_t GetInternalKeyLength() { - return user_key_size_ + kNumInternalBytes; + bool IsFixedLength() { + return user_key_len_ != PlainTableFactory::kVariableLength; + } + + size_t GetFixedInternalKeyLength() { + return user_key_len_ + kNumInternalBytes; } friend class TableCache; friend class PlainTableIterator; + // Internal helper function to generate an IndexRecordList object from all + // the rows, which contains index records as a list. + int PopulateIndexRecordList(IndexRecordList& record_list); + + // Internal helper function to allocate memory for indexes and bloom filters + void Allocate(int num_prefixes); + + // Internal helper function to bucket index record list to hash buckets. + // hash2offsets is sized of of hash_table_size_, each contains a linked list + // of offsets for the hash, in reversed order. + // bucket_count is sized of hash_table_size_. The value is how many index + // records are there in hash2offsets for the same bucket. + size_t BucketizeIndexesAndFillBloom( + IndexRecordList& record_list, int num_prefixes, + std::vector& hash2offsets, + std::vector& bucket_count); + + // Internal helper class to fill the indexes and bloom filters to internal + // data structures. hash2offsets and bucket_count are bucketized indexes and + // counts generated by BucketizeIndexesAndFillBloom(). + void FillIndexes(size_t sub_index_size_needed, + std::vector& hash2offsets, + std::vector& bucket_count); + // Populate the internal indexes. It must be called before // any query to the table. // This query will populate the hash table hash_table_, the second // level of indexes sub_index_ and bloom filter filter_slice_ if enabled. Status PopulateIndex(); - // Check bloom filter to see whether it might contain this prefix - bool MayHavePrefix(const Slice& target_prefix); + // Check bloom filter to see whether it might contain this prefix. + // The hash of the prefix is given, since it can be reused for index lookup + // too. + bool MayHavePrefix(uint32_t hash); + Status ReadKey(const char* row_ptr, Slice* key, size_t& bytes_read); // Read the key and value at offset to key and value. // tmp_slice is a tmp slice. // return next_offset as the offset for the next key. @@ -142,7 +149,15 @@ private: // Get file offset for key target. // return value prefix_matched is set to true if the offset is confirmed // for a key with the same prefix as target. - uint32_t GetOffset(const Slice& target, bool& prefix_matched); + Status GetOffset(const Slice& target, const Slice& prefix, + uint32_t prefix_hash, bool& prefix_matched, + uint32_t& ret_offset); + + Slice GetPrefix(const Slice& target) { + assert(target.size() >= 8); // target is internal key + return options_.prefix_extractor->Transform( + Slice(target.data(), target.size() - 8)); + } // No copying allowed explicit PlainTableReader(const TableReader&) = delete; diff --git a/table/table_properties.cc b/table/table_properties.cc index 47e7f8b33..414b15681 100644 --- a/table/table_properties.cc +++ b/table/table_properties.cc @@ -104,6 +104,10 @@ const std::string TablePropertiesNames::kNumEntries = "rocksdb.num.entries"; const std::string TablePropertiesNames::kFilterPolicy = "rocksdb.filter.policy"; +const std::string TablePropertiesNames::kFormatVersion = + "rocksdb.format.version"; +const std::string TablePropertiesNames::kFixedKeyLen = + "rocksdb.fixed.key.length"; extern const std::string kPropertiesBlock = "rocksdb.properties"; diff --git a/table/table_reader_bench.cc b/table/table_reader_bench.cc index 7e7e6b7da..a491d168f 100644 --- a/table/table_reader_bench.cc +++ b/table/table_reader_bench.cc @@ -8,7 +8,6 @@ #include "rocksdb/db.h" #include "rocksdb/slice_transform.h" #include "rocksdb/table.h" -#include "rocksdb/slice_transform.h" #include "db/db_impl.h" #include "db/dbformat.h" #include "port/atomic_pointer.h" @@ -242,9 +241,10 @@ int main(int argc, char** argv) { if (FLAGS_plain_table) { options.allow_mmap_reads = true; env_options.use_mmap_reads = true; - tf = new rocksdb::PlainTableFactory(16, FLAGS_prefix_len, - (FLAGS_prefix_len == 16) ? 0 : 8, + tf = new rocksdb::PlainTableFactory(16, (FLAGS_prefix_len == 16) ? 0 : 8, 0.75); + options.prefix_extractor = rocksdb::NewFixedPrefixTransform( + FLAGS_prefix_len); } else { tf = new rocksdb::BlockBasedTableFactory(); } diff --git a/table/table_test.cc b/table/table_test.cc index 7711ed8ad..bff8ee529 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -22,8 +22,8 @@ #include "rocksdb/env.h" #include "rocksdb/iterator.h" #include "rocksdb/memtablerep.h" - #include "table/meta_blocks.h" +#include "rocksdb/plain_table_factory.h" #include "table/block_based_table_builder.h" #include "table/block_based_table_factory.h" #include "table/block_based_table_reader.h" @@ -124,8 +124,9 @@ class StringSink: public WritableFile { class StringSource: public RandomAccessFile { public: - StringSource(const Slice& contents, uint64_t uniq_id) - : contents_(contents.data(), contents.size()), uniq_id_(uniq_id) { + StringSource(const Slice& contents, uint64_t uniq_id, bool mmap) + : contents_(contents.data(), contents.size()), uniq_id_(uniq_id), + mmap_(mmap) { } virtual ~StringSource() { } @@ -140,8 +141,12 @@ class StringSource: public RandomAccessFile { if (offset + n > contents_.size()) { n = contents_.size() - offset; } - memcpy(scratch, &contents_[offset], n); - *result = Slice(scratch, n); + if (!mmap_) { + memcpy(scratch, &contents_[offset], n); + *result = Slice(scratch, n); + } else { + *result = Slice(&contents_[offset], n); + } return Status::OK(); } @@ -159,6 +164,7 @@ class StringSource: public RandomAccessFile { private: std::string contents_; uint64_t uniq_id_; + bool mmap_; }; typedef std::map KVMap; @@ -245,89 +251,6 @@ class BlockConstructor: public Constructor { BlockConstructor(); }; -class BlockBasedTableConstructor: public Constructor { - public: - explicit BlockBasedTableConstructor( - const Comparator* cmp) - : Constructor(cmp) { - } - ~BlockBasedTableConstructor() { - Reset(); - } - virtual Status FinishImpl(const Options& options, const KVMap& data) { - Reset(); - sink_.reset(new StringSink()); - std::unique_ptr flush_policy_factory( - new FlushBlockBySizePolicyFactory(options.block_size, - options.block_size_deviation)); - - BlockBasedTableBuilder builder( - options, - sink_.get(), - flush_policy_factory.get(), - options.compression); - - for (KVMap::const_iterator it = data.begin(); - it != data.end(); - ++it) { - builder.Add(it->first, it->second); - ASSERT_TRUE(builder.status().ok()); - } - Status s = builder.Finish(); - ASSERT_TRUE(s.ok()) << s.ToString(); - - ASSERT_EQ(sink_->contents().size(), builder.FileSize()); - - // Open the table - uniq_id_ = cur_uniq_id_++; - source_.reset(new StringSource(sink_->contents(), uniq_id_)); - unique_ptr table_factory; - return options.table_factory->GetTableReader(options, soptions, - std::move(source_), - sink_->contents().size(), - &table_reader_); - } - - virtual Iterator* NewIterator() const { - return table_reader_->NewIterator(ReadOptions()); - } - - uint64_t ApproximateOffsetOf(const Slice& key) const { - return table_reader_->ApproximateOffsetOf(key); - } - - virtual Status Reopen(const Options& options) { - source_.reset(new StringSource(sink_->contents(), uniq_id_)); - return options.table_factory->GetTableReader(options, soptions, - std::move(source_), - sink_->contents().size(), - &table_reader_); - } - - virtual TableReader* table_reader() { - return table_reader_.get(); - } - - private: - void Reset() { - uniq_id_ = 0; - table_reader_.reset(); - sink_.reset(); - source_.reset(); - } - - uint64_t uniq_id_; - unique_ptr sink_; - unique_ptr source_; - unique_ptr table_reader_; - - BlockBasedTableConstructor(); - - static uint64_t cur_uniq_id_; - const EnvOptions soptions; -}; -uint64_t BlockBasedTableConstructor::cur_uniq_id_ = 1; - // A helper class that converts internal format keys into user keys class KeyConvertingIterator: public Iterator { public: @@ -369,6 +292,102 @@ class KeyConvertingIterator: public Iterator { void operator=(const KeyConvertingIterator&); }; +class TableConstructor: public Constructor { + public: + explicit TableConstructor( + const Comparator* cmp, bool convert_to_internal_key = false) + : Constructor(cmp), + convert_to_internal_key_(convert_to_internal_key) { + } + ~TableConstructor() { + Reset(); + } + virtual Status FinishImpl(const Options& options, const KVMap& data) { + Reset(); + sink_.reset(new StringSink()); + unique_ptr builder; + builder.reset( + options.table_factory->GetTableBuilder(options, sink_.get(), + options.compression)); + + for (KVMap::const_iterator it = data.begin(); + it != data.end(); + ++it) { + if (convert_to_internal_key_) { + ParsedInternalKey ikey(it->first, kMaxSequenceNumber, kTypeValue); + std::string encoded; + AppendInternalKey(&encoded, ikey); + builder->Add(encoded, it->second); + } else { + builder->Add(it->first, it->second); + } + ASSERT_TRUE(builder->status().ok()); + } + Status s = builder->Finish(); + ASSERT_TRUE(s.ok()) << s.ToString(); + + ASSERT_EQ(sink_->contents().size(), builder->FileSize()); + + // Open the table + uniq_id_ = cur_uniq_id_++; + source_.reset( + new StringSource(sink_->contents(), uniq_id_, + options.allow_mmap_reads)); + unique_ptr table_factory; + return options.table_factory->GetTableReader(options, soptions, + std::move(source_), + sink_->contents().size(), + &table_reader_); + } + + virtual Iterator* NewIterator() const { + Iterator* iter = table_reader_->NewIterator(ReadOptions()); + if (convert_to_internal_key_) { + return new KeyConvertingIterator(iter); + } else { + return iter; + } + } + + uint64_t ApproximateOffsetOf(const Slice& key) const { + return table_reader_->ApproximateOffsetOf(key); + } + + virtual Status Reopen(const Options& options) { + source_.reset( + new StringSource(sink_->contents(), uniq_id_, + options.allow_mmap_reads)); + return options.table_factory->GetTableReader(options, soptions, + std::move(source_), + sink_->contents().size(), + &table_reader_); + } + + virtual TableReader* table_reader() { + return table_reader_.get(); + } + + private: + void Reset() { + uniq_id_ = 0; + table_reader_.reset(); + sink_.reset(); + source_.reset(); + } + bool convert_to_internal_key_; + + uint64_t uniq_id_; + unique_ptr sink_; + unique_ptr source_; + unique_ptr table_reader_; + + TableConstructor(); + + static uint64_t cur_uniq_id_; + const EnvOptions soptions; +}; +uint64_t TableConstructor::cur_uniq_id_ = 1; + class MemTableConstructor: public Constructor { public: explicit MemTableConstructor(const Comparator* cmp) @@ -481,7 +500,9 @@ static bool BZip2CompressionSupported() { #endif enum TestType { - TABLE_TEST, + BLOCK_BASED_TABLE_TEST, + PLAIN_TABLE_SEMI_FIXED_PREFIX, + PLAIN_TABLE_FULL_STR_PREFIX, BLOCK_TEST, MEMTABLE_TEST, DB_TEST @@ -497,8 +518,10 @@ struct TestArgs { static std::vector GenerateArgList() { std::vector ret; - TestType test_type[4] = {TABLE_TEST, BLOCK_TEST, MEMTABLE_TEST, DB_TEST}; - int test_type_len = 4; + TestType test_type[6] = { BLOCK_BASED_TABLE_TEST, + PLAIN_TABLE_SEMI_FIXED_PREFIX, PLAIN_TABLE_FULL_STR_PREFIX, BLOCK_TEST, + MEMTABLE_TEST, DB_TEST }; + int test_type_len = 6; bool reverse_compare[2] = {false, true}; int reverse_compare_len = 2; int restart_interval[3] = {16, 1, 1024}; @@ -523,20 +546,66 @@ static std::vector GenerateArgList() { #endif for(int i =0; i < test_type_len; i++) - for (int j =0; j < reverse_compare_len; j++) - for (int k =0; k < restart_interval_len; k++) - for (unsigned int n =0; n < compression_types.size(); n++) { - TestArgs one_arg; - one_arg.type = test_type[i]; - one_arg.reverse_compare = reverse_compare[j]; - one_arg.restart_interval = restart_interval[k]; - one_arg.compression = compression_types[n]; - ret.push_back(one_arg); - } + for (int j =0; j < reverse_compare_len; j++) { + if (test_type[i] == PLAIN_TABLE_SEMI_FIXED_PREFIX + || test_type[i] == PLAIN_TABLE_FULL_STR_PREFIX) { + // Plain table doesn't use restart index or compression. + TestArgs one_arg; + one_arg.type = test_type[i]; + one_arg.reverse_compare = reverse_compare[0]; + one_arg.restart_interval = restart_interval[0]; + one_arg.compression = compression_types[0]; + ret.push_back(one_arg); + continue; + } + for (int k = 0; k < restart_interval_len; k++) + for (unsigned int n = 0; n < compression_types.size(); n++) { + TestArgs one_arg; + one_arg.type = test_type[i]; + one_arg.reverse_compare = reverse_compare[j]; + one_arg.restart_interval = restart_interval[k]; + one_arg.compression = compression_types[n]; + ret.push_back(one_arg); + } + } return ret; } +// In order to make all tests run for plain table format, including +// those operating on empty keys, create a new prefix transformer which +// return fixed prefix if the slice is not shorter than the prefix length, +// and the full slice if it is shorter. +class FixedOrLessPrefixTransform : public SliceTransform { + private: + const size_t prefix_len_; + + public: + explicit FixedOrLessPrefixTransform(size_t prefix_len) : + prefix_len_(prefix_len) { + } + + virtual const char* Name() const { + return "rocksdb.FixedPrefix"; + } + + virtual Slice Transform(const Slice& src) const { + assert(InDomain(src)); + if (src.size() < prefix_len_) { + return src; + } + return Slice(src.data(), prefix_len_); + } + + virtual bool InDomain(const Slice& src) const { + return true; + } + + virtual bool InRange(const Slice& dst) const { + return (dst.size() <= prefix_len_); + } +}; + class Harness { public: Harness() : constructor_(nullptr) { } @@ -554,9 +623,35 @@ class Harness { if (args.reverse_compare) { options_.comparator = &reverse_key_comparator; } + internal_comparator_.reset(new InternalKeyComparator(options_.comparator)); + support_prev_ = true; + only_support_prefix_seek_ = false; + BlockBasedTableFactory::TableOptions table_options; switch (args.type) { - case TABLE_TEST: - constructor_ = new BlockBasedTableConstructor(options_.comparator); + case BLOCK_BASED_TABLE_TEST: + table_options.flush_block_policy_factory.reset( + new FlushBlockBySizePolicyFactory(options_.block_size, + options_.block_size_deviation)); + options_.table_factory.reset(new BlockBasedTableFactory(table_options)); + constructor_ = new TableConstructor(options_.comparator); + break; + case PLAIN_TABLE_SEMI_FIXED_PREFIX: + support_prev_ = false; + only_support_prefix_seek_ = true; + options_.prefix_extractor = new FixedOrLessPrefixTransform(2); + options_.allow_mmap_reads = true; + options_.table_factory.reset(new PlainTableFactory()); + constructor_ = new TableConstructor(options_.comparator, true); + options_.comparator = internal_comparator_.get(); + break; + case PLAIN_TABLE_FULL_STR_PREFIX: + support_prev_ = false; + only_support_prefix_seek_ = true; + options_.prefix_extractor = NewNoopTransform(); + options_.allow_mmap_reads = true; + options_.table_factory.reset(new PlainTableFactory()); + constructor_ = new TableConstructor(options_.comparator, true); + options_.comparator = internal_comparator_.get(); break; case BLOCK_TEST: constructor_ = new BlockConstructor(options_.comparator); @@ -584,7 +679,9 @@ class Harness { constructor_->Finish(options_, &keys, &data); TestForwardScan(keys, data); - TestBackwardScan(keys, data); + if (support_prev_) { + TestBackwardScan(keys, data); + } TestRandomAccess(rnd, keys, data); } @@ -627,7 +724,7 @@ class Harness { KVMap::const_iterator model_iter = data.begin(); if (kVerbose) fprintf(stderr, "---\n"); for (int i = 0; i < 200; i++) { - const int toss = rnd->Uniform(5); + const int toss = rnd->Uniform(support_prev_ ? 5 : 3); switch (toss) { case 0: { if (iter->Valid()) { @@ -719,17 +816,20 @@ class Harness { } else { const int index = rnd->Uniform(keys.size()); std::string result = keys[index]; - switch (rnd->Uniform(3)) { + switch (rnd->Uniform(support_prev_ ? 3 : 1)) { case 0: // Return an existing key break; case 1: { // Attempt to return something smaller than an existing key - if (result.size() > 0 && result[result.size()-1] > '\0') { - result[result.size()-1]--; + if (result.size() > 0 && result[result.size() - 1] > '\0' + && (!only_support_prefix_seek_ + || options_.prefix_extractor->Transform(result).size() + < result.size())) { + result[result.size() - 1]--; } break; - } + } case 2: { // Return something larger than an existing key Increment(options_.comparator, &result); @@ -746,6 +846,9 @@ class Harness { private: Options options_ = Options(); Constructor* constructor_; + bool support_prev_; + bool only_support_prefix_seek_; + shared_ptr internal_comparator_; }; static bool Between(uint64_t val, uint64_t low, uint64_t high) { @@ -763,8 +866,8 @@ class TableTest { }; // This test include all the basic checks except those for index size and block // size, which will be conducted in separated unit tests. -TEST(TableTest, BasicBlockedBasedTableProperties) { - BlockBasedTableConstructor c(BytewiseComparator()); +TEST(TableTest, BasicTableProperties) { + TableConstructor c(BytewiseComparator()); c.Add("a1", "val1"); c.Add("b2", "val2"); @@ -824,7 +927,7 @@ TEST(TableTest, BasicPlainTableProperties) { } ASSERT_OK(builder->Finish()); - StringSource source(sink.contents(), 72242); + StringSource source(sink.contents(), 72242, true); TableProperties props; auto s = ReadTableProperties( @@ -849,7 +952,7 @@ TEST(TableTest, BasicPlainTableProperties) { } TEST(TableTest, FilterPolicyNameProperties) { - BlockBasedTableConstructor c(BytewiseComparator()); + TableConstructor c(BytewiseComparator()); c.Add("a1", "val1"); std::vector keys; KVMap kvmap; @@ -889,7 +992,7 @@ TEST(TableTest, IndexSizeStat) { // Each time we load one more key to the table. the table index block // size is expected to be larger than last time's. for (size_t i = 1; i < keys.size(); ++i) { - BlockBasedTableConstructor c(BytewiseComparator()); + TableConstructor c(BytewiseComparator()); for (size_t j = 0; j < i; ++j) { c.Add(keys[j], "val"); } @@ -910,7 +1013,7 @@ TEST(TableTest, IndexSizeStat) { TEST(TableTest, NumBlockStat) { Random rnd(test::RandomSeed()); - BlockBasedTableConstructor c(BytewiseComparator()); + TableConstructor c(BytewiseComparator()); Options options; options.compression = kNoCompression; options.block_restart_interval = 1; @@ -986,7 +1089,7 @@ TEST(TableTest, BlockCacheTest) { std::vector keys; KVMap kvmap; - BlockBasedTableConstructor c(BytewiseComparator()); + TableConstructor c(BytewiseComparator()); c.Add("key", "value"); c.Finish(options, &keys, &kvmap); @@ -1107,7 +1210,7 @@ TEST(TableTest, BlockCacheTest) { } TEST(TableTest, ApproximateOffsetOfPlain) { - BlockBasedTableConstructor c(BytewiseComparator()); + TableConstructor c(BytewiseComparator()); c.Add("k01", "hello"); c.Add("k02", "hello2"); c.Add("k03", std::string(10000, 'x')); @@ -1138,7 +1241,7 @@ TEST(TableTest, ApproximateOffsetOfPlain) { static void Do_Compression_Test(CompressionType comp) { Random rnd(301); - BlockBasedTableConstructor c(BytewiseComparator()); + TableConstructor c(BytewiseComparator()); std::string tmp; c.Add("k01", "hello"); c.Add("k02", test::CompressibleString(&rnd, 0.25, 10000, &tmp)); @@ -1156,7 +1259,7 @@ static void Do_Compression_Test(CompressionType comp) { ASSERT_TRUE(Between(c.ApproximateOffsetOf("k02"), 0, 0)); ASSERT_TRUE(Between(c.ApproximateOffsetOf("k03"), 2000, 3000)); ASSERT_TRUE(Between(c.ApproximateOffsetOf("k04"), 2000, 3000)); - ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"), 4000, 6000)); + ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"), 4000, 6100)); } TEST(TableTest, ApproximateOffsetOfCompressed) { @@ -1194,7 +1297,7 @@ TEST(TableTest, BlockCacheLeak) { opt.block_cache = NewLRUCache(16*1024*1024); // big enough so we don't ever // lose cached values. - BlockBasedTableConstructor c(BytewiseComparator()); + TableConstructor c(BytewiseComparator()); c.Add("k01", "hello"); c.Add("k02", "hello2"); c.Add("k03", std::string(10000, 'x')); diff --git a/util/dynamic_bloom.cc b/util/dynamic_bloom.cc index be47ab55a..84f964d9e 100644 --- a/util/dynamic_bloom.cc +++ b/util/dynamic_bloom.cc @@ -39,7 +39,10 @@ DynamicBloom::DynamicBloom(uint32_t total_bits, } void DynamicBloom::Add(const Slice& key) { - uint32_t h = hash_func_(key); + AddHash(hash_func_(key)); +} + +void DynamicBloom::AddHash(uint32_t h) { const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits for (uint32_t i = 0; i < num_probes_; i++) { const uint32_t bitpos = h % total_bits_; @@ -49,7 +52,10 @@ void DynamicBloom::Add(const Slice& key) { } bool DynamicBloom::MayContain(const Slice& key) { - uint32_t h = hash_func_(key); + return (MayContainHash(hash_func_(key))); +} + +bool DynamicBloom::MayContainHash(uint32_t h) { const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits for (uint32_t i = 0; i < num_probes_; i++) { const uint32_t bitpos = h % total_bits_; diff --git a/util/dynamic_bloom.h b/util/dynamic_bloom.h index c496e2ce7..aa29a4ae7 100644 --- a/util/dynamic_bloom.h +++ b/util/dynamic_bloom.h @@ -28,9 +28,14 @@ class DynamicBloom { // Assuming single threaded access to Add void Add(const Slice& key); + // Assuming single threaded access to Add + void AddHash(uint32_t hash); + // Multithreaded access to MayContain is OK bool MayContain(const Slice& key); + // Multithreaded access to MayContain is OK + bool MayContainHash(uint32_t hash); private: uint32_t (*hash_func_)(const Slice& key);