From edd47c5104278e3b9883f79d8ac1c9a7f2ed8d4f Mon Sep 17 00:00:00 2001 From: sdong Date: Wed, 18 Jun 2014 16:36:48 -0700 Subject: [PATCH] PlainTable to encode to avoid to rewrite prefix when it is the same as the previous key Summary: Add a encoding feature of PlainTable to encode PlainTable's keys to save some bytes for the same prefixes. The data format is documented in table/plain_table_factory.h Test Plan: Add unit test coverage in plain_table_db_test Reviewers: yhchiang, igor, dhruba, ljin, haobo Reviewed By: haobo Subscribers: nkg-, leveldb Differential Revision: https://reviews.facebook.net/D18735 --- db/dbformat.h | 33 +++- db/plain_table_db_test.cc | 122 +++++++++--- include/rocksdb/table.h | 40 +++- table/plain_table_builder.cc | 85 ++++----- table/plain_table_builder.h | 25 ++- table/plain_table_factory.cc | 22 ++- table/plain_table_factory.h | 106 ++++++++++- table/plain_table_key_coding.cc | 323 ++++++++++++++++++++++++++++++++ table/plain_table_key_coding.h | 97 ++++++++++ table/plain_table_reader.cc | 212 ++++++++++++--------- table/plain_table_reader.h | 27 ++- tools/sst_dump.cc | 5 +- 12 files changed, 890 insertions(+), 207 deletions(-) create mode 100644 table/plain_table_key_coding.cc create mode 100644 table/plain_table_key_coding.h diff --git a/db/dbformat.h b/db/dbformat.h index 9640372d7..c7b3ced94 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -263,17 +263,38 @@ class IterKey { key_size_ = size; } + void SetInternalKey(const Slice& key_prefix, const Slice& user_key, + SequenceNumber s, + ValueType value_type = kValueTypeForSeek) { + size_t psize = key_prefix.size(); + size_t usize = user_key.size(); + EnlargeBufferIfNeeded(psize + usize + sizeof(uint64_t)); + if (psize > 0) { + memcpy(key_, key_prefix.data(), psize); + } + memcpy(key_ + psize, user_key.data(), usize); + EncodeFixed64(key_ + usize + psize, PackSequenceAndType(s, value_type)); + key_size_ = psize + usize + sizeof(uint64_t); + } + void SetInternalKey(const Slice& user_key, SequenceNumber s, ValueType value_type = kValueTypeForSeek) { - size_t usize = user_key.size(); - EnlargeBufferIfNeeded(usize + sizeof(uint64_t)); - memcpy(key_, user_key.data(), usize); - EncodeFixed64(key_ + usize, PackSequenceAndType(s, value_type)); - key_size_ = usize + sizeof(uint64_t); + SetInternalKey(Slice(), user_key, s, value_type); + } + + void Reserve(size_t size) { + EnlargeBufferIfNeeded(size); + key_size_ = size; } void SetInternalKey(const ParsedInternalKey& parsed_key) { - SetInternalKey(parsed_key.user_key, parsed_key.sequence, parsed_key.type); + SetInternalKey(Slice(), parsed_key); + } + + void SetInternalKey(const Slice& key_prefix, + const ParsedInternalKey& parsed_key_suffix) { + SetInternalKey(key_prefix, parsed_key_suffix.user_key, + parsed_key_suffix.sequence, parsed_key_suffix.type); } private: diff --git a/db/plain_table_db_test.cc b/db/plain_table_db_test.cc index 0df57a41b..b169b1724 100644 --- a/db/plain_table_db_test.cc +++ b/db/plain_table_db_test.cc @@ -61,7 +61,7 @@ class PlainTableDBTest { // Return the current option configuration. Options CurrentOptions() { Options options; - options.table_factory.reset(NewPlainTableFactory(16, 2, 0.8, 3)); + options.table_factory.reset(NewPlainTableFactory(0, 2, 0.8, 3, 0, kPrefix)); options.memtable_factory.reset(NewHashLinkListRepFactory(4, 0, 3, true)); options.prefix_extractor.reset(NewFixedPrefixTransform(8)); options.allow_mmap_reads = true; @@ -179,17 +179,21 @@ class TestPlainTableReader : public PlainTableReader { public: TestPlainTableReader(const EnvOptions& storage_options, const InternalKeyComparator& icomparator, - uint64_t file_size, int bloom_bits_per_key, - double hash_table_ratio, size_t index_sparseness, + EncodingType encoding_type, uint64_t file_size, + int bloom_bits_per_key, double hash_table_ratio, + size_t index_sparseness, const TableProperties* table_properties, unique_ptr&& file, const Options& options, bool* expect_bloom_not_match) : PlainTableReader(options, std::move(file), storage_options, icomparator, - file_size, table_properties), + encoding_type, file_size, table_properties), expect_bloom_not_match_(expect_bloom_not_match) { - Status s = PopulateIndex(const_cast(table_properties), - bloom_bits_per_key, hash_table_ratio, - index_sparseness, 2 * 1024 * 1024); + Status s = MmapDataFile(); + ASSERT_TRUE(s.ok()); + + s = PopulateIndex(const_cast(table_properties), + bloom_bits_per_key, hash_table_ratio, index_sparseness, + 2 * 1024 * 1024); ASSERT_TRUE(s.ok()); } @@ -211,9 +215,10 @@ class TestPlainTableFactory : public PlainTableFactory { uint32_t user_key_len, int bloom_bits_per_key, double hash_table_ratio, size_t index_sparseness, - size_t huge_page_tlb_size) - : PlainTableFactory(user_key_len, user_key_len, hash_table_ratio, - index_sparseness, huge_page_tlb_size), + size_t huge_page_tlb_size, + EncodingType encoding_type) + : PlainTableFactory(user_key_len, bloom_bits_per_key, hash_table_ratio, + index_sparseness, huge_page_tlb_size, encoding_type), bloom_bits_per_key_(bloom_bits_per_key), hash_table_ratio_(hash_table_ratio), index_sparseness_(index_sparseness), @@ -228,10 +233,17 @@ class TestPlainTableFactory : public PlainTableFactory { options.env, options.info_log.get(), &props); ASSERT_TRUE(s.ok()); + auto& user_props = props->user_collected_properties; + auto encoding_type_prop = + user_props.find(PlainTablePropertyNames::kEncodingType); + assert(encoding_type_prop != user_props.end()); + EncodingType encoding_type = static_cast( + DecodeFixed32(encoding_type_prop->second.c_str())); + std::unique_ptr new_reader(new TestPlainTableReader( - soptions, internal_comparator, file_size, bloom_bits_per_key_, - hash_table_ratio_, index_sparseness_, props, std::move(file), options, - expect_bloom_not_match_)); + soptions, internal_comparator, encoding_type, file_size, + bloom_bits_per_key_, hash_table_ratio_, index_sparseness_, props, + std::move(file), options, expect_bloom_not_match_)); *table = std::move(new_reader); return s; @@ -247,18 +259,22 @@ class TestPlainTableFactory : public PlainTableFactory { TEST(PlainTableDBTest, Flush) { for (size_t huge_page_tlb_size = 0; huge_page_tlb_size <= 2 * 1024 * 1024; huge_page_tlb_size += 2 * 1024 * 1024) { + for (EncodingType encoding_type : {kPlain, kPrefix}) { for (int bloom_bits = 0; bloom_bits <= 117; bloom_bits += 117) { for (int total_order = 0; total_order <= 1; total_order++) { + if (encoding_type == kPrefix && total_order == 1) { + continue; + } Options options = CurrentOptions(); options.create_if_missing = true; // Set only one bucket to force bucket conflict. // Test index interval for the same prefix to be 1, 2 and 4 if (total_order) { options.table_factory.reset(NewTotalOrderPlainTableFactory( - 16, bloom_bits, 2, huge_page_tlb_size)); + 0, bloom_bits, 2, huge_page_tlb_size)); } else { options.table_factory.reset(NewPlainTableFactory( - 16, bloom_bits, 0.75, 16, huge_page_tlb_size)); + 0, bloom_bits, 0.75, 16, huge_page_tlb_size, encoding_type)); } DestroyAndReopen(&options); @@ -281,14 +297,19 @@ TEST(PlainTableDBTest, Flush) { ASSERT_EQ("v2", Get("0000000000000bar")); } } + } } } TEST(PlainTableDBTest, Flush2) { for (size_t huge_page_tlb_size = 0; huge_page_tlb_size <= 2 * 1024 * 1024; huge_page_tlb_size += 2 * 1024 * 1024) { + for (EncodingType encoding_type : {kPlain, kPrefix}) { for (int bloom_bits = 0; bloom_bits <= 117; bloom_bits += 117) { for (int total_order = 0; total_order <= 1; total_order++) { + if (encoding_type == kPrefix && total_order == 1) { + continue; + } bool expect_bloom_not_match = false; Options options = CurrentOptions(); options.create_if_missing = true; @@ -296,13 +317,13 @@ TEST(PlainTableDBTest, Flush2) { // Test index interval for the same prefix to be 1, 2 and 4 if (total_order) { options.prefix_extractor = nullptr; - options.table_factory.reset( - new TestPlainTableFactory(&expect_bloom_not_match, 16, bloom_bits, - 0, 2, huge_page_tlb_size)); + options.table_factory.reset(new TestPlainTableFactory( + &expect_bloom_not_match, 0, bloom_bits, 0, 2, huge_page_tlb_size, + encoding_type)); } else { - options.table_factory.reset( - new TestPlainTableFactory(&expect_bloom_not_match, 16, bloom_bits, - 0.75, 16, huge_page_tlb_size)); + options.table_factory.reset(new TestPlainTableFactory( + &expect_bloom_not_match, 0, bloom_bits, 0.75, 16, + huge_page_tlb_size, encoding_type)); } DestroyAndReopen(&options); ASSERT_OK(Put("0000000000000bar", "b")); @@ -341,14 +362,19 @@ TEST(PlainTableDBTest, Flush2) { } } } + } } } TEST(PlainTableDBTest, Iterator) { for (size_t huge_page_tlb_size = 0; huge_page_tlb_size <= 2 * 1024 * 1024; huge_page_tlb_size += 2 * 1024 * 1024) { + for (EncodingType encoding_type : {kPlain, kPrefix}) { for (int bloom_bits = 0; bloom_bits <= 117; bloom_bits += 117) { for (int total_order = 0; total_order <= 1; total_order++) { + if (encoding_type == kPrefix && total_order == 1) { + continue; + } bool expect_bloom_not_match = false; Options options = CurrentOptions(); options.create_if_missing = true; @@ -356,13 +382,13 @@ TEST(PlainTableDBTest, Iterator) { // Test index interval for the same prefix to be 1, 2 and 4 if (total_order) { options.prefix_extractor = nullptr; - options.table_factory.reset( - new TestPlainTableFactory(&expect_bloom_not_match, 16, bloom_bits, - 0, 2, huge_page_tlb_size)); + options.table_factory.reset(new TestPlainTableFactory( + &expect_bloom_not_match, 16, bloom_bits, 0, 2, huge_page_tlb_size, + encoding_type)); } else { - options.table_factory.reset( - new TestPlainTableFactory(&expect_bloom_not_match, 16, bloom_bits, - 0.75, 16, huge_page_tlb_size)); + options.table_factory.reset(new TestPlainTableFactory( + &expect_bloom_not_match, 16, bloom_bits, 0.75, 16, + huge_page_tlb_size, encoding_type)); } DestroyAndReopen(&options); @@ -449,6 +475,7 @@ TEST(PlainTableDBTest, Iterator) { delete iter; } } + } } } @@ -460,7 +487,7 @@ std::string MakeLongKey(size_t length, char c) { TEST(PlainTableDBTest, IteratorLargeKeys) { Options options = CurrentOptions(); - options.table_factory.reset(NewTotalOrderPlainTableFactory(0, 0, 16)); + options.table_factory.reset(NewTotalOrderPlainTableFactory(0, 0, 16, 0)); options.create_if_missing = true; options.prefix_extractor.reset(); DestroyAndReopen(&options); @@ -496,6 +523,45 @@ TEST(PlainTableDBTest, IteratorLargeKeys) { delete iter; } +namespace { +std::string MakeLongKeyWithPrefix(size_t length, char c) { + return "00000000" + std::string(length - 8, c); +} +} // namespace + +TEST(PlainTableDBTest, IteratorLargeKeysWithPrefix) { + Options options = CurrentOptions(); + options.table_factory.reset(NewPlainTableFactory(16, 0, 0.8, 3, 0, kPrefix)); + options.create_if_missing = true; + DestroyAndReopen(&options); + + std::string key_list[] = { + MakeLongKeyWithPrefix(30, '0'), MakeLongKeyWithPrefix(16, '1'), + MakeLongKeyWithPrefix(32, '2'), MakeLongKeyWithPrefix(60, '3'), + MakeLongKeyWithPrefix(90, '4'), MakeLongKeyWithPrefix(50, '5'), + MakeLongKeyWithPrefix(26, '6')}; + + for (size_t i = 0; i < 7; i++) { + ASSERT_OK(Put(key_list[i], std::to_string(i))); + } + + dbfull()->TEST_FlushMemTable(); + + Iterator* iter = dbfull()->NewIterator(ReadOptions()); + iter->Seek(key_list[0]); + + for (size_t i = 0; i < 7; i++) { + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(key_list[i], iter->key().ToString()); + ASSERT_EQ(std::to_string(i), iter->value().ToString()); + iter->Next(); + } + + ASSERT_TRUE(!iter->Valid()); + + delete iter; +} + // A test comparator which compare two strings in this way: // (1) first compare prefix of 8 bytes in alphabet order, // (2) if two strings share the same prefix, sort the other part of the string diff --git a/include/rocksdb/table.h b/include/rocksdb/table.h index 94aa97a96..b4ef36a07 100644 --- a/include/rocksdb/table.h +++ b/include/rocksdb/table.h @@ -97,6 +97,30 @@ extern TableFactory* NewBlockBasedTableFactory( const BlockBasedTableOptions& table_options = BlockBasedTableOptions()); #ifndef ROCKSDB_LITE + +enum EncodingType : char { + // Always write full keys without any special encoding. + kPlain, + // Find opportunity to write the same prefix once for multiple rows. + // In some cases, when a key follows a previous key with the same prefix, + // instead of writing out the full key, it just writes out the size of the + // shared prefix, as well as other bytes, to save some bytes. + // + // When using this option, the user is required to use the same prefix + // extractor to make sure the same prefix will be extracted from the same key. + // The Name() value of the prefix extractor will be stored in the file. When + // reopening the file, the name of the options.prefix_extractor given will be + // bitwise compared to the prefix extractors stored in the file. An error + // will be returned if the two don't match. + kPrefix, +}; + +// Table Properties that are specific to plain table properties. +struct PlainTablePropertyNames { + static const std::string kPrefixExtractorName; + static const std::string kEncodingType; +}; + // -- Plain Table with prefix-only seek // For this factory, you need to set Options.prefix_extrator properly to make it // work. Look-up will starts with prefix hash lookup for key prefix. Inside the @@ -113,11 +137,22 @@ extern TableFactory* NewBlockBasedTableFactory( // in the hash table // @index_sparseness: inside each prefix, need to build one index record for how // many keys for binary search inside each hash bucket. +// For encoding type kPrefix, the value will be used when +// writing to determine an interval to rewrite the full key. +// It will also be used as a suggestion and satisfied when +// possible. // @huge_page_tlb_size: if <=0, allocate hash indexes and blooms from malloc. // Otherwise from huge page TLB. The user needs to reserve // huge pages for it to be allocated, like: // sysctl -w vm.nr_hugepages=20 // See linux doc Documentation/vm/hugetlbpage.txt +// @encoding_type: how to encode the keys. See enum EncodingType above for +// the choices. The value will determine how to encode keys +// when writing to a new SST file. This value will be stored +// inside the SST file which will be used when reading from the +// file, which makes it possible for users to choose different +// encoding type when reopening a DB. Files with different +// encoding types can co-exist in the same DB and can be read. const uint32_t kPlainTableVariableLength = 0; extern TableFactory* NewPlainTableFactory(uint32_t user_key_len = @@ -125,7 +160,8 @@ extern TableFactory* NewPlainTableFactory(uint32_t user_key_len = int bloom_bits_per_prefix = 10, double hash_table_ratio = 0.75, size_t index_sparseness = 16, - size_t huge_page_tlb_size = 0); + size_t huge_page_tlb_size = 0, + EncodingType encoding_type = kPlain); // -- Plain Table // This factory of plain table ignores Options.prefix_extractor and assumes no @@ -147,7 +183,7 @@ extern TableFactory* NewPlainTableFactory(uint32_t user_key_len = extern TableFactory* NewTotalOrderPlainTableFactory( uint32_t user_key_len = kPlainTableVariableLength, int bloom_bits_per_key = 0, size_t index_sparseness = 16, - size_t huge_page_tlb_size = 0); + size_t huge_page_tlb_size = 0, bool full_scan_mode = false); #endif // ROCKSDB_LITE diff --git a/table/plain_table_builder.cc b/table/plain_table_builder.cc index 12037cf6a..e9c3f3624 100644 --- a/table/plain_table_builder.cc +++ b/table/plain_table_builder.cc @@ -1,6 +1,7 @@ -// Copyright (c) 2011 The LevelDB Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. See the AUTHORS file for names of contributors. +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. #ifndef ROCKSDB_LITE #include "table/plain_table_builder.h" @@ -12,6 +13,7 @@ #include "rocksdb/env.h" #include "rocksdb/filter_policy.h" #include "rocksdb/options.h" +#include "rocksdb/table.h" #include "table/plain_table_factory.h" #include "db/dbformat.h" #include "table/block_builder.h" @@ -52,10 +54,14 @@ Status WriteBlock( extern const uint64_t kPlainTableMagicNumber = 0x8242229663bf9564ull; extern const uint64_t kLegacyPlainTableMagicNumber = 0x4f3418eb7a8f13b8ull; -PlainTableBuilder::PlainTableBuilder(const Options& options, - WritableFile* file, - uint32_t user_key_len) : - options_(options), file_(file), user_key_len_(user_key_len) { +PlainTableBuilder::PlainTableBuilder(const Options& options, WritableFile* file, + uint32_t user_key_len, + EncodingType encoding_type, + size_t index_sparseness) + : options_(options), + file_(file), + encoder_(encoding_type, user_key_len, options.prefix_extractor.get(), + index_sparseness) { properties_.fixed_key_len = user_key_len; // for plain table, we put all the data in a big chuck. @@ -64,7 +70,20 @@ PlainTableBuilder::PlainTableBuilder(const Options& options, // filter block. properties_.index_size = 0; properties_.filter_size = 0; - properties_.format_version = 0; + // To support roll-back to previous version, now still use version 0 for + // plain encoding. + properties_.format_version = (encoding_type == kPlain) ? 0 : 1; + + if (options_.prefix_extractor) { + properties_.user_collected_properties + [PlainTablePropertyNames::kPrefixExtractorName] = + options_.prefix_extractor->Name(); + } + + std::string val; + PutFixed32(&val, static_cast(encoder_.GetEncodingType())); + properties_.user_collected_properties + [PlainTablePropertyNames::kEncodingType] = val; for (auto& collector_factories : options.table_properties_collector_factories) { @@ -77,51 +96,25 @@ PlainTableBuilder::~PlainTableBuilder() { } void PlainTableBuilder::Add(const Slice& key, const Slice& value) { - size_t user_key_size = key.size() - 8; - assert(user_key_len_ == 0 || user_key_size == user_key_len_); + // temp buffer for metadata bytes between key and value. + char meta_bytes_buf[6]; + size_t meta_bytes_buf_size = 0; - if (!IsFixedLength()) { - // Write key length - char key_size_buf[5]; // tmp buffer for key size as varint32 - char* ptr = EncodeVarint32(key_size_buf, user_key_size); - assert(ptr <= key_size_buf + sizeof(key_size_buf)); - auto len = ptr - key_size_buf; - file_->Append(Slice(key_size_buf, len)); - offset_ += len; - } - - // Write key - ParsedInternalKey parsed_key; - if (!ParseInternalKey(key, &parsed_key)) { - status_ = Status::Corruption(Slice()); - return; - } - // For value size as varint32 (up to 5 bytes). - // If the row is of value type with seqId 0, flush the special flag together - // in this buffer to safe one file append call, which takes 1 byte. - char value_size_buf[6]; - size_t value_size_buf_size = 0; - if (parsed_key.sequence == 0 && parsed_key.type == kTypeValue) { - file_->Append(Slice(key.data(), user_key_size)); - offset_ += user_key_size; - value_size_buf[0] = PlainTableFactory::kValueTypeSeqId0; - value_size_buf_size = 1; - } else { - file_->Append(key); - offset_ += key.size(); - } + // Write out the key + encoder_.AppendKey(key, file_, &offset_, meta_bytes_buf, + &meta_bytes_buf_size); // Write value length int value_size = value.size(); char* end_ptr = - EncodeVarint32(value_size_buf + value_size_buf_size, value_size); - assert(end_ptr <= value_size_buf + sizeof(value_size_buf)); - value_size_buf_size = end_ptr - value_size_buf; - file_->Append(Slice(value_size_buf, value_size_buf_size)); + EncodeVarint32(meta_bytes_buf + meta_bytes_buf_size, value_size); + assert(end_ptr <= meta_bytes_buf + sizeof(meta_bytes_buf)); + meta_bytes_buf_size = end_ptr - meta_bytes_buf; + file_->Append(Slice(meta_bytes_buf, meta_bytes_buf_size)); // Write value file_->Append(value); - offset_ += value_size + value_size_buf_size; + offset_ += value_size + meta_bytes_buf_size; properties_.num_entries++; properties_.raw_key_size += key.size(); @@ -150,6 +143,8 @@ Status PlainTableBuilder::Finish() { // -- Add basic properties property_block_builder.AddTableProperty(properties_); + property_block_builder.Add(properties_.user_collected_properties); + // -- Add user collected properties NotifyCollectTableCollectorsOnFinish(table_properties_collectors_, options_.info_log.get(), diff --git a/table/plain_table_builder.h b/table/plain_table_builder.h index 9b0f46080..a0bc1513a 100644 --- a/table/plain_table_builder.h +++ b/table/plain_table_builder.h @@ -1,9 +1,7 @@ -// Copyright (c) 2011 The LevelDB Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. See the AUTHORS file for names of contributors. -// -// IndexedTable is a simple table format for UNIT TEST ONLY. It is not built -// as production quality. +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. #pragma once #ifndef ROCKSDB_LITE @@ -12,6 +10,8 @@ #include "rocksdb/options.h" #include "rocksdb/status.h" #include "table/table_builder.h" +#include "table/plain_table_key_coding.h" +#include "rocksdb/table.h" #include "rocksdb/table_properties.h" namespace rocksdb { @@ -22,14 +22,15 @@ class WritableFile; class TableBuilder; class PlainTableBuilder: public TableBuilder { -public: + public: // Create a builder that will store the contents of the table it is // building in *file. Does not close the file. It is up to the // caller to close the file after calling Finish(). The output file // will be part of level specified by 'level'. A value of -1 means // that the caller does not know which level the output file will reside. PlainTableBuilder(const Options& options, WritableFile* file, - uint32_t user_key_size); + uint32_t user_key_size, EncodingType encoding_type, + size_t index_sparseness); // REQUIRES: Either Finish() or Abandon() has been called. ~PlainTableBuilder(); @@ -61,7 +62,7 @@ public: // Finish() call, returns the size of the final generated file. uint64_t FileSize() const override; -private: + private: Options options_; std::vector> table_properties_collectors_; @@ -69,14 +70,10 @@ private: uint64_t offset_ = 0; Status status_; TableProperties properties_; + PlainTableKeyEncoder encoder_; - const size_t user_key_len_; bool closed_ = false; // Either Finish() or Abandon() has been called. - bool IsFixedLength() const { - return user_key_len_ > 0; - } - // No copying allowed PlainTableBuilder(const PlainTableBuilder&) = delete; void operator=(const PlainTableBuilder&) = delete; diff --git a/table/plain_table_factory.cc b/table/plain_table_factory.cc index f9d88e9ef..38cedd70c 100644 --- a/table/plain_table_factory.cc +++ b/table/plain_table_factory.cc @@ -23,32 +23,42 @@ Status PlainTableFactory::NewTableReader(const Options& options, return PlainTableReader::Open(options, soptions, icomp, std::move(file), file_size, table, bloom_bits_per_key_, hash_table_ratio_, index_sparseness_, - huge_page_tlb_size_); + huge_page_tlb_size_, full_scan_mode_); } TableBuilder* PlainTableFactory::NewTableBuilder( const Options& options, const InternalKeyComparator& internal_comparator, WritableFile* file, CompressionType compression_type) const { - return new PlainTableBuilder(options, file, user_key_len_); + return new PlainTableBuilder(options, file, user_key_len_, encoding_type_, + index_sparseness_); } extern TableFactory* NewPlainTableFactory(uint32_t user_key_len, int bloom_bits_per_key, double hash_table_ratio, size_t index_sparseness, - size_t huge_page_tlb_size) { + size_t huge_page_tlb_size, + EncodingType encoding_type) { return new PlainTableFactory(user_key_len, bloom_bits_per_key, hash_table_ratio, index_sparseness, - huge_page_tlb_size); + huge_page_tlb_size, encoding_type); } extern TableFactory* NewTotalOrderPlainTableFactory(uint32_t user_key_len, int bloom_bits_per_key, size_t index_sparseness, - size_t huge_page_tlb_size) { + size_t huge_page_tlb_size, + bool full_scan_mode) { return new PlainTableFactory(user_key_len, bloom_bits_per_key, 0, - index_sparseness, huge_page_tlb_size); + index_sparseness, huge_page_tlb_size, kPlain, + full_scan_mode); } +const std::string PlainTablePropertyNames::kPrefixExtractorName = + "rocksdb.prefix.extractor.name"; + +const std::string PlainTablePropertyNames::kEncodingType = + "rocksdb.plain.table.encoding.type"; + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/table/plain_table_factory.h b/table/plain_table_factory.h index 06ddbf4ea..4b02a8e03 100644 --- a/table/plain_table_factory.h +++ b/table/plain_table_factory.h @@ -6,6 +6,7 @@ #ifndef ROCKSDB_LITE #include +#include #include #include "rocksdb/options.h" @@ -27,20 +28,103 @@ class TableBuilder; // parameter of the factory class. Output file format: // +-------------+-----------------+ // | version | user_key_length | -// +------------++------------------------------+ <= key1 offset -// | [key_size] | key1 | value_size | | +// +------------++------------+-----------------+ <= key1 offset +// | encoded key1 | value_size | | // +------------+-------------+-------------+ | // | value1 | // | | -// +----------------------------------------+---+ <= key2 offset -// | [key_size] | key2 | value_size | | +// +--------------------------+-------------+---+ <= key2 offset +// | encoded key2 | value_size | | // +------------+-------------+-------------+ | // | value2 | // | | // | ...... | // +-----------------+--------------------------+ -// If user_key_length = kPlainTableVariableLength, it means the key is variable -// length, there will be an extra field for key size encoded before every key. +// +// When the key encoding type is kPlain. Key part is encoded as: +// +------------+--------------------+ +// | [key_size] | internal key | +// +------------+--------------------+ +// for the case of user_key_len = kPlainTableVariableLength case, +// and simply: +// +----------------------+ +// | internal key | +// +----------------------+ +// for user_key_len != kPlainTableVariableLength case. +// +// If key encoding type is kPrefix. Keys are encoding in this format. +// There are three ways to encode a key: +// (1) Full Key +// +---------------+---------------+-------------------+ +// | Full Key Flag | Full Key Size | Full Internal Key | +// +---------------+---------------+-------------------+ +// which simply encodes a full key +// +// (2) A key shared the same prefix as the previous key, which is encoded as +// format of (1). +// +-------------+-------------+-------------+-------------+------------+ +// | Prefix Flag | Prefix Size | Suffix Flag | Suffix Size | Key Suffix | +// +-------------+-------------+-------------+-------------+------------+ +// where key is the suffix part of the key, including the internal bytes. +// the actual key will be constructed by concatenating prefix part of the +// previous key, with the suffix part of the key here, with sizes given here. +// +// (3) A key shared the same prefix as the previous key, which is encoded as +// the format of (2). +// +-----------------+-----------------+------------------------+ +// | Key Suffix Flag | Key Suffix Size | Suffix of Internal Key | +// +-----------------+-----------------+------------------------+ +// The key will be constructed by concatenating previous key's prefix (which is +// also a prefix which the last key encoded in the format of (1)) and the +// key given here. +// +// For example, we for following keys (prefix and suffix are separated by +// spaces): +// 0000 0001 +// 0000 00021 +// 0000 0002 +// 00011 00 +// 0002 0001 +// Will be encoded like this: +// FK 8 00000001 +// PF 4 SF 5 00021 +// SF 4 0002 +// FK 7 0001100 +// FK 8 00020001 +// (where FK means full key flag, PF means prefix flag and SF means suffix flag) +// +// All those "key flag + key size" shown above are in this format: +// The 8 bits of the first byte: +// +----+----+----+----+----+----+----+----+ +// | Type | Size | +// +----+----+----+----+----+----+----+----+ +// Type indicates: full key, prefix, or suffix. +// The last 6 bits are for size. If the size bits are not all 1, it means the +// size of the key. Otherwise, varint32 is read after this byte. This varint +// value + 0x3F (the value of all 1) will be the key size. +// +// For example, full key with length 16 will be encoded as (binary): +// 00 010000 +// (00 means full key) +// and a prefix with 100 bytes will be encoded as: +// 01 111111 00100101 +// (63) (37) +// (01 means key suffix) +// +// All the internal keys above (including kPlain and kPrefix) are encoded in +// this format: +// There are two types: +// (1) normal internal key format +// +----------- ...... -------------+----+---+---+---+---+---+---+---+ +// | user key |type| sequence ID | +// +----------- ..... --------------+----+---+---+---+---+---+---+---+ +// (2) Special case for keys whose sequence ID is 0 and is value type +// +----------- ...... -------------+----+ +// | user key |0x80| +// +----------- ..... --------------+----+ +// To save 7 bytes for the special case where sequence ID = 0. +// +// class PlainTableFactory : public TableFactory { public: ~PlainTableFactory() {} @@ -63,12 +147,16 @@ class PlainTableFactory : public TableFactory { int bloom_bits_per_key = 0, double hash_table_ratio = 0.75, size_t index_sparseness = 16, - size_t huge_page_tlb_size = 0) + size_t huge_page_tlb_size = 0, + EncodingType encoding_type = kPlain, + bool full_scan_mode = false) : user_key_len_(user_key_len), bloom_bits_per_key_(bloom_bits_per_key), hash_table_ratio_(hash_table_ratio), index_sparseness_(index_sparseness), - huge_page_tlb_size_(huge_page_tlb_size) {} + huge_page_tlb_size_(huge_page_tlb_size), + encoding_type_(encoding_type), + full_scan_mode_(full_scan_mode) {} const char* Name() const override { return "PlainTable"; } Status NewTableReader(const Options& options, const EnvOptions& soptions, const InternalKeyComparator& internal_comparator, @@ -88,6 +176,8 @@ class PlainTableFactory : public TableFactory { double hash_table_ratio_; size_t index_sparseness_; size_t huge_page_tlb_size_; + EncodingType encoding_type_; + bool full_scan_mode_; }; } // namespace rocksdb diff --git a/table/plain_table_key_coding.cc b/table/plain_table_key_coding.cc new file mode 100644 index 000000000..51849b3e3 --- /dev/null +++ b/table/plain_table_key_coding.cc @@ -0,0 +1,323 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef ROCKSDB_LITE +#include "table/plain_table_key_coding.h" + +#include "table/plain_table_factory.h" +#include "db/dbformat.h" + +namespace rocksdb { + +namespace { + +enum EntryType : unsigned char { + kFullKey = 0, + kPrefixFromPreviousKey = 1, + kKeySuffix = 2, +}; + +// Control byte: +// First two bits indicate type of entry +// Other bytes are inlined sizes. If all bits are 1 (0x03F), overflow bytes +// are used. key_size-0x3F will be encoded as a variint32 after this bytes. + +const unsigned char kSizeInlineLimit = 0x3F; + +// Return 0 for error +size_t EncodeSize(EntryType type, uint32_t key_size, char* out_buffer) { + out_buffer[0] = type << 6; + + if (key_size < 0x3F) { + // size inlined + out_buffer[0] |= static_cast(key_size); + return 1; + } else { + out_buffer[0] |= kSizeInlineLimit; + char* ptr = EncodeVarint32(out_buffer + 1, key_size - kSizeInlineLimit); + return ptr - out_buffer; + } +} + +// Return position after the size byte(s). nullptr means error +const char* DecodeSize(const char* offset, const char* limit, + EntryType* entry_type, size_t* key_size) { + assert(offset < limit); + *entry_type = static_cast( + (static_cast(offset[0]) & ~kSizeInlineLimit) >> 6); + char inline_key_size = offset[0] & kSizeInlineLimit; + if (inline_key_size < kSizeInlineLimit) { + *key_size = inline_key_size; + return offset + 1; + } else { + uint32_t extra_size; + const char* ptr = GetVarint32Ptr(offset + 1, limit, &extra_size); + if (ptr == nullptr) { + return nullptr; + } + *key_size = kSizeInlineLimit + extra_size; + return ptr; + } +} +} // namespace + +Status PlainTableKeyEncoder::AppendKey(const Slice& key, WritableFile* file, + uint64_t* offset, char* meta_bytes_buf, + size_t* meta_bytes_buf_size) { + ParsedInternalKey parsed_key; + if (!ParseInternalKey(key, &parsed_key)) { + return Status::Corruption(Slice()); + } + + Slice key_to_write = key; // Portion of internal key to write out. + + size_t user_key_size = fixed_user_key_len_; + if (encoding_type_ == kPlain) { + if (fixed_user_key_len_ == kPlainTableVariableLength) { + user_key_size = key.size() - 8; + // Write key length + char key_size_buf[5]; // tmp buffer for key size as varint32 + char* ptr = EncodeVarint32(key_size_buf, user_key_size); + assert(ptr <= key_size_buf + sizeof(key_size_buf)); + auto len = ptr - key_size_buf; + Status s = file->Append(Slice(key_size_buf, len)); + if (!s.ok()) { + return s; + } + *offset += len; + } + } else { + assert(encoding_type_ == kPrefix); + char size_bytes[12]; + size_t size_bytes_pos = 0; + + user_key_size = key.size() - 8; + + Slice prefix = + prefix_extractor_->Transform(Slice(key.data(), user_key_size)); + if (key_count_for_prefix == 0 || prefix != pre_prefix_.GetKey() || + key_count_for_prefix % index_sparseness_ == 0) { + key_count_for_prefix = 1; + pre_prefix_.SetKey(prefix); + size_bytes_pos += EncodeSize(kFullKey, user_key_size, size_bytes); + Status s = file->Append(Slice(size_bytes, size_bytes_pos)); + if (!s.ok()) { + return s; + } + *offset += size_bytes_pos; + } else { + key_count_for_prefix++; + if (key_count_for_prefix == 2) { + // For second key within a prefix, need to encode prefix length + size_bytes_pos += + EncodeSize(kPrefixFromPreviousKey, pre_prefix_.GetKey().size(), + size_bytes + size_bytes_pos); + } + size_t prefix_len = pre_prefix_.GetKey().size(); + size_bytes_pos += EncodeSize(kKeySuffix, user_key_size - prefix_len, + size_bytes + size_bytes_pos); + Status s = file->Append(Slice(size_bytes, size_bytes_pos)); + if (!s.ok()) { + return s; + } + *offset += size_bytes_pos; + key_to_write = Slice(key.data() + prefix_len, key.size() - prefix_len); + } + } + + // Encode full key + // For value size as varint32 (up to 5 bytes). + // If the row is of value type with seqId 0, flush the special flag together + // in this buffer to safe one file append call, which takes 1 byte. + if (parsed_key.sequence == 0 && parsed_key.type == kTypeValue) { + Status s = + file->Append(Slice(key_to_write.data(), key_to_write.size() - 8)); + if (!s.ok()) { + return s; + } + *offset += key_to_write.size() - 8; + meta_bytes_buf[*meta_bytes_buf_size] = PlainTableFactory::kValueTypeSeqId0; + *meta_bytes_buf_size += 1; + } else { + file->Append(key_to_write); + *offset += key_to_write.size(); + } + + return Status::OK(); +} + +namespace { +Status ReadInternalKey(const char* key_ptr, const char* limit, + uint32_t user_key_size, ParsedInternalKey* parsed_key, + size_t* bytes_read, bool* internal_key_valid, + Slice* internal_key) { + if (key_ptr + user_key_size + 1 >= limit) { + return Status::Corruption("Unexpected EOF when reading the next key"); + } + if (*(key_ptr + user_key_size) == PlainTableFactory::kValueTypeSeqId0) { + // Special encoding for the row with seqID=0 + parsed_key->user_key = Slice(key_ptr, user_key_size); + parsed_key->sequence = 0; + parsed_key->type = kTypeValue; + *bytes_read += user_key_size + 1; + *internal_key_valid = false; + } else { + if (key_ptr + user_key_size + 8 >= limit) { + return Status::Corruption( + "Unexpected EOF when reading internal bytes of the next key"); + } + *internal_key_valid = true; + *internal_key = Slice(key_ptr, user_key_size + 8); + if (!ParseInternalKey(*internal_key, parsed_key)) { + return Status::Corruption( + Slice("Incorrect value type found when reading the next key")); + } + *bytes_read += user_key_size + 8; + } + return Status::OK(); +} +} // namespace + +Status PlainTableKeyDecoder::NextPlainEncodingKey( + const char* start, const char* limit, ParsedInternalKey* parsed_key, + Slice* internal_key, size_t* bytes_read, bool* seekable) { + const char* key_ptr = start; + size_t user_key_size = 0; + if (fixed_user_key_len_ != kPlainTableVariableLength) { + user_key_size = fixed_user_key_len_; + key_ptr = start; + } else { + uint32_t tmp_size = 0; + key_ptr = GetVarint32Ptr(start, limit, &tmp_size); + if (key_ptr == nullptr) { + return Status::Corruption( + "Unexpected EOF when reading the next key's size"); + } + user_key_size = static_cast(tmp_size); + *bytes_read = key_ptr - start; + } + bool decoded_internal_key_valid; + Slice decoded_internal_key; + Status s = + ReadInternalKey(key_ptr, limit, user_key_size, parsed_key, bytes_read, + &decoded_internal_key_valid, &decoded_internal_key); + if (!s.ok()) { + return s; + } + if (internal_key != nullptr) { + if (decoded_internal_key_valid) { + *internal_key = decoded_internal_key; + } else { + // Need to copy out the internal key + cur_key_.SetInternalKey(*parsed_key); + *internal_key = cur_key_.GetKey(); + } + } + return Status::OK(); +} + +Status PlainTableKeyDecoder::NextPrefixEncodingKey( + const char* start, const char* limit, ParsedInternalKey* parsed_key, + Slice* internal_key, size_t* bytes_read, bool* seekable) { + const char* key_ptr = start; + EntryType entry_type; + + bool expect_suffix = false; + do { + size_t size = 0; + bool decoded_internal_key_valid; + const char* pos = DecodeSize(key_ptr, limit, &entry_type, &size); + if (pos == nullptr) { + return Status::Corruption("Unexpected EOF when reading size of the key"); + } + *bytes_read += pos - key_ptr; + key_ptr = pos; + + switch (entry_type) { + case kFullKey: { + expect_suffix = false; + Slice decoded_internal_key; + Status s = + ReadInternalKey(key_ptr, limit, size, parsed_key, bytes_read, + &decoded_internal_key_valid, &decoded_internal_key); + if (!s.ok()) { + return s; + } + saved_user_key_ = parsed_key->user_key; + if (internal_key != nullptr) { + if (decoded_internal_key_valid) { + *internal_key = decoded_internal_key; + } else { + cur_key_.SetInternalKey(*parsed_key); + *internal_key = cur_key_.GetKey(); + } + } + break; + } + case kPrefixFromPreviousKey: { + if (seekable != nullptr) { + *seekable = false; + } + prefix_len_ = size; + assert(prefix_extractor_ == nullptr || + prefix_extractor_->Transform(saved_user_key_).size() == + prefix_len_); + // Need read another size flag for suffix + expect_suffix = true; + break; + } + case kKeySuffix: { + expect_suffix = false; + if (seekable != nullptr) { + *seekable = false; + } + assert(prefix_len_ >= 0); + cur_key_.Reserve(prefix_len_ + size); + + Slice tmp_slice; + Status s = ReadInternalKey(key_ptr, limit, size, parsed_key, bytes_read, + &decoded_internal_key_valid, &tmp_slice); + if (!s.ok()) { + return s; + } + cur_key_.SetInternalKey(Slice(saved_user_key_.data(), prefix_len_), + *parsed_key); + assert( + prefix_extractor_ == nullptr || + prefix_extractor_->Transform(ExtractUserKey(cur_key_.GetKey())) == + Slice(saved_user_key_.data(), prefix_len_)); + parsed_key->user_key = ExtractUserKey(cur_key_.GetKey()); + if (internal_key != nullptr) { + *internal_key = cur_key_.GetKey(); + } + break; + } + default: + return Status::Corruption("Identified size flag."); + } + } while (expect_suffix); // Another round if suffix is expected. + return Status::OK(); +} + +Status PlainTableKeyDecoder::NextKey(const char* start, const char* limit, + ParsedInternalKey* parsed_key, + Slice* internal_key, size_t* bytes_read, + bool* seekable) { + *bytes_read = 0; + if (seekable != nullptr) { + *seekable = true; + } + if (encoding_type_ == kPlain) { + return NextPlainEncodingKey(start, limit, parsed_key, internal_key, + bytes_read, seekable); + } else { + assert(encoding_type_ == kPrefix); + return NextPrefixEncodingKey(start, limit, parsed_key, internal_key, + bytes_read, seekable); + } +} + +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/table/plain_table_key_coding.h b/table/plain_table_key_coding.h new file mode 100644 index 000000000..393dae71f --- /dev/null +++ b/table/plain_table_key_coding.h @@ -0,0 +1,97 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef ROCKSDB_LITE +#pragma once + +#include "rocksdb/slice.h" +#include "db/dbformat.h" + +namespace rocksdb { + +class WritableFile; +class ParsedInternalKey; + +// Helper class to write out a key to an output file +// Actual data format of the key is documented in plain_table_factory.h +class PlainTableKeyEncoder { + public: + explicit PlainTableKeyEncoder(EncodingType encoding_type, + uint32_t user_key_len, + const SliceTransform* prefix_extractor, + size_t index_sparseness) + : encoding_type_((prefix_extractor != nullptr) ? encoding_type : kPlain), + fixed_user_key_len_(user_key_len), + prefix_extractor_(prefix_extractor), + index_sparseness_((index_sparseness > 1) ? index_sparseness : 1), + key_count_for_prefix(0) {} + // key: the key to write out, in the format of internal key. + // file: the output file to write out + // offset: offset in the file. Needs to be updated after appending bytes + // for the key + // meta_bytes_buf: buffer for extra meta bytes + // meta_bytes_buf_size: offset to append extra meta bytes. Will be updated + // if meta_bytes_buf is updated. + Status AppendKey(const Slice& key, WritableFile* file, uint64_t* offset, + char* meta_bytes_buf, size_t* meta_bytes_buf_size); + + // Return actual encoding type to be picked + EncodingType GetEncodingType() { return encoding_type_; } + + private: + EncodingType encoding_type_; + uint32_t fixed_user_key_len_; + const SliceTransform* prefix_extractor_; + const size_t index_sparseness_; + size_t key_count_for_prefix; + IterKey pre_prefix_; +}; + +// A helper class to decode keys from input buffer +// Actual data format of the key is documented in plain_table_factory.h +class PlainTableKeyDecoder { + public: + explicit PlainTableKeyDecoder(EncodingType encoding_type, + uint32_t user_key_len, + const SliceTransform* prefix_extractor) + : encoding_type_(encoding_type), + prefix_len_(0), + fixed_user_key_len_(user_key_len), + prefix_extractor_(prefix_extractor), + in_prefix_(false) {} + // Find the next key. + // start: char array where the key starts. + // limit: boundary of the char array + // parsed_key: the output of the result key + // internal_key: if not null, fill with the output of the result key in + // un-parsed format + // bytes_read: how many bytes read from start. Output + // seekable: whether key can be read from this place. Used when building + // indexes. Output. + Status NextKey(const char* start, const char* limit, + ParsedInternalKey* parsed_key, Slice* internal_key, + size_t* bytes_read, bool* seekable = nullptr); + EncodingType encoding_type_; + uint32_t prefix_len_; + uint32_t fixed_user_key_len_; + Slice saved_user_key_; + IterKey cur_key_; + const SliceTransform* prefix_extractor_; + bool in_prefix_; + + private: + Status NextPlainEncodingKey(const char* start, const char* limit, + ParsedInternalKey* parsed_key, + Slice* internal_key, size_t* bytes_read, + bool* seekable = nullptr); + Status NextPrefixEncodingKey(const char* start, const char* limit, + ParsedInternalKey* parsed_key, + Slice* internal_key, size_t* bytes_read, + bool* seekable = nullptr); +}; + +} // namespace rocksdb + +#endif // ROCKSDB_LITE diff --git a/table/plain_table_reader.cc b/table/plain_table_reader.cc index e0da7e8c6..b3223805f 100644 --- a/table/plain_table_reader.cc +++ b/table/plain_table_reader.cc @@ -23,6 +23,7 @@ #include "table/meta_blocks.h" #include "table/two_level_iterator.h" #include "table/plain_table_factory.h" +#include "table/plain_table_key_coding.h" #include "util/arena.h" #include "util/coding.h" @@ -43,6 +44,7 @@ inline uint32_t GetSliceHash(const Slice& s) { } inline uint32_t GetBucketIdFromHash(uint32_t hash, uint32_t num_buckets) { + assert(num_buckets >= 0); return hash % num_buckets; } @@ -51,7 +53,6 @@ inline uint32_t GetBucketIdFromHash(uint32_t hash, uint32_t num_buckets) { inline uint32_t GetFixed32Element(const char* base, size_t offset) { return DecodeFixed32(base + offset * sizeof(uint32_t)); } - } // namespace // Iterator to iterate IndexedTable @@ -80,10 +81,11 @@ class PlainTableIterator : public Iterator { private: PlainTableReader* table_; + PlainTableKeyDecoder decoder_; bool use_prefix_seek_; uint32_t offset_; uint32_t next_offset_; - IterKey key_; + Slice key_; Slice value_; Status status_; // No copying allowed @@ -96,9 +98,11 @@ PlainTableReader::PlainTableReader(const Options& options, unique_ptr&& file, const EnvOptions& storage_options, const InternalKeyComparator& icomparator, + EncodingType encoding_type, uint64_t file_size, const TableProperties* table_properties) : internal_comparator_(icomparator), + encoding_type_(encoding_type), data_end_offset_(table_properties->data_size), user_key_len_(table_properties->fixed_key_len), prefix_extractor_(options.prefix_extractor.get()), @@ -120,7 +124,7 @@ Status PlainTableReader::Open(const Options& options, unique_ptr* table_reader, const int bloom_bits_per_key, double hash_table_ratio, size_t index_sparseness, - size_t huge_page_tlb_size) { + size_t huge_page_tlb_size, bool full_scan_mode) { assert(options.allow_mmap_reads); if (file_size > kMaxFileSize) { @@ -135,17 +139,53 @@ Status PlainTableReader::Open(const Options& options, } assert(hash_table_ratio >= 0.0); - std::unique_ptr new_reader( - new PlainTableReader(options, std::move(file), soptions, - internal_comparator, file_size, props)); + auto& user_props = props->user_collected_properties; + auto prefix_extractor_in_file = + user_props.find(PlainTablePropertyNames::kPrefixExtractorName); - // -- Populate Index - s = new_reader->PopulateIndex(props, bloom_bits_per_key, hash_table_ratio, - index_sparseness, huge_page_tlb_size); + if (!full_scan_mode && prefix_extractor_in_file != user_props.end()) { + if (!options.prefix_extractor) { + return Status::InvalidArgument( + "Prefix extractor is missing when opening a PlainTable built " + "using a prefix extractor"); + } else if (prefix_extractor_in_file->second.compare( + options.prefix_extractor->Name()) != 0) { + return Status::InvalidArgument( + "Prefix extractor given doesn't match the one used to build " + "PlainTable"); + } + } + + EncodingType encoding_type = kPlain; + auto encoding_type_prop = + user_props.find(PlainTablePropertyNames::kEncodingType); + if (encoding_type_prop != user_props.end()) { + encoding_type = static_cast( + DecodeFixed32(encoding_type_prop->second.c_str())); + } + + std::unique_ptr new_reader(new PlainTableReader( + options, std::move(file), soptions, internal_comparator, encoding_type, + file_size, props)); + + s = new_reader->MmapDataFile(); if (!s.ok()) { return s; } + // -- Populate Index + if (!full_scan_mode) { + s = new_reader->PopulateIndex(props, bloom_bits_per_key, hash_table_ratio, + index_sparseness, huge_page_tlb_size); + if (!s.ok()) { + return s; + } + } else { + // Flag to indicate it is a full scan mode so that none of the indexes + // can be used. + new_reader->index_size_ = kFullScanModeFlag; + } + *table_reader = std::move(new_reader); return s; } @@ -156,11 +196,10 @@ void PlainTableReader::SetupForCompaction() { Iterator* PlainTableReader::NewIterator(const ReadOptions& options, Arena* arena) { if (arena == nullptr) { - return new PlainTableIterator(this, options_.prefix_extractor != nullptr); + return new PlainTableIterator(this, prefix_extractor_ != nullptr); } else { auto mem = arena->AllocateAligned(sizeof(PlainTableIterator)); - return new (mem) - PlainTableIterator(this, options_.prefix_extractor != nullptr); + return new (mem) PlainTableIterator(this, prefix_extractor_ != nullptr); } } @@ -234,11 +273,15 @@ Status PlainTableReader::PopulateIndexRecordList(IndexRecordList* record_list, // are in order. *num_prefixes = 0; + PlainTableKeyDecoder decoder(encoding_type_, user_key_len_, + options_.prefix_extractor.get()); + bool due_index = false; while (pos < data_end_offset_) { uint32_t key_offset = pos; ParsedInternalKey key; Slice value_slice; - Status s = Next(&pos, &key, &value_slice); + bool seekable = false; + Status s = Next(&decoder, &pos, &key, nullptr, &value_slice, &seekable); if (!s.ok()) { return s; } @@ -256,12 +299,21 @@ Status PlainTableReader::PopulateIndexRecordList(IndexRecordList* record_list, num_keys_per_prefix = 0; prev_key_prefix_slice = key_prefix_slice; prev_key_prefix_hash = GetSliceHash(key_prefix_slice); + due_index = true; } - if (index_sparseness == 0 || - num_keys_per_prefix++ % index_sparseness == 0) { + if (due_index) { + if (!seekable) { + return Status::Corruption("Key for a prefix is not seekable"); + } // Add an index key for every kIndexIntervalForSamePrefixKeys keys record_list->AddRecord(prev_key_prefix_hash, key_offset); + due_index = false; + } + + num_keys_per_prefix++; + if (index_sparseness == 0 || num_keys_per_prefix % index_sparseness == 0) { + due_index = true; } is_first_record = false; } @@ -381,6 +433,11 @@ void PlainTableReader::FillIndexes( index_size_, kSubIndexSize); } +Status PlainTableReader::MmapDataFile() { + // Get mmapped memory to file_data_. + return file_->Read(0, file_size_, &file_data_, nullptr); +} + Status PlainTableReader::PopulateIndex(TableProperties* props, int bloom_bits_per_key, double hash_table_ratio, @@ -395,12 +452,6 @@ Status PlainTableReader::PopulateIndex(TableProperties* props, "PlainTable requires a prefix extractor enable prefix hash mode."); } - // Get mmapped memory to file_data_. - Status s = file_->Read(0, file_size_, &file_data_, nullptr); - if (!s.ok()) { - return s; - } - IndexRecordList record_list(kRecordsPerGroup); // First, read the whole file, for every kIndexIntervalForSamePrefixKeys rows // for a prefix (starting from the first one), generate a record of (hash, @@ -419,8 +470,8 @@ Status PlainTableReader::PopulateIndex(TableProperties* props, } } - s = PopulateIndexRecordList(&record_list, &num_prefixes, bloom_bits_per_key, - index_sparseness); + Status s = PopulateIndexRecordList(&record_list, &num_prefixes, + bloom_bits_per_key, index_sparseness); if (!s.ok()) { return s; } @@ -484,7 +535,11 @@ Status PlainTableReader::GetOffset(const Slice& target, const Slice& prefix, uint32_t mid = (high + low) / 2; uint32_t file_offset = GetFixed32Element(base_ptr, mid); size_t tmp; - Status s = ReadKey(file_data_.data() + file_offset, &mid_key, &tmp); + Status s = PlainTableKeyDecoder(encoding_type_, user_key_len_, + options_.prefix_extractor.get()) + .NextKey(file_data_.data() + file_offset, + file_data_.data() + data_end_offset_, &mid_key, + nullptr, &tmp); if (!s.ok()) { return s; } @@ -509,7 +564,15 @@ Status PlainTableReader::GetOffset(const Slice& target, const Slice& prefix, ParsedInternalKey low_key; size_t tmp; uint32_t low_key_offset = GetFixed32Element(base_ptr, low); - Status s = ReadKey(file_data_.data() + low_key_offset, &low_key, &tmp); + Status s = PlainTableKeyDecoder(encoding_type_, user_key_len_, + options_.prefix_extractor.get()) + .NextKey(file_data_.data() + low_key_offset, + file_data_.data() + data_end_offset_, &low_key, + nullptr, &tmp); + if (!s.ok()) { + return s; + } + if (GetPrefix(low_key) == prefix) { prefix_matched = true; *offset = low_key_offset; @@ -533,52 +596,10 @@ Slice PlainTableReader::GetPrefix(const ParsedInternalKey& target) const { return GetPrefixFromUserKey(target.user_key); } -Status PlainTableReader::ReadKey(const char* start, ParsedInternalKey* key, - size_t* bytes_read) const { - const char* key_ptr = nullptr; - *bytes_read = 0; - size_t user_key_size = 0; - if (IsFixedLength()) { - user_key_size = user_key_len_; - key_ptr = start; - } else { - uint32_t tmp_size = 0; - key_ptr = - GetVarint32Ptr(start, file_data_.data() + data_end_offset_, &tmp_size); - if (key_ptr == nullptr) { - return Status::Corruption( - "Unexpected EOF when reading the next key's size"); - } - user_key_size = (size_t)tmp_size; - *bytes_read = key_ptr - start; - } - if (key_ptr + user_key_size + 1 >= file_data_.data() + data_end_offset_) { - return Status::Corruption("Unexpected EOF when reading the next key"); - } - - if (*(key_ptr + user_key_size) == PlainTableFactory::kValueTypeSeqId0) { - // Special encoding for the row with seqID=0 - key->user_key = Slice(key_ptr, user_key_size); - key->sequence = 0; - key->type = kTypeValue; - *bytes_read += user_key_size + 1; - } else { - if (start + user_key_size + 8 >= file_data_.data() + data_end_offset_) { - return Status::Corruption( - "Unexpected EOF when reading internal bytes of the next key"); - } - if (!ParseInternalKey(Slice(key_ptr, user_key_size + 8), key)) { - return Status::Corruption( - Slice("Incorrect value type found when reading the next key")); - } - *bytes_read += user_key_size + 8; - } - - return Status::OK(); -} - -Status PlainTableReader::Next(uint32_t* offset, ParsedInternalKey* key, - Slice* value) const { +Status PlainTableReader::Next(PlainTableKeyDecoder* decoder, uint32_t* offset, + ParsedInternalKey* parsed_key, + Slice* internal_key, Slice* value, + bool* seekable) const { if (*offset == data_end_offset_) { *offset = data_end_offset_; return Status::OK(); @@ -590,7 +611,9 @@ Status PlainTableReader::Next(uint32_t* offset, ParsedInternalKey* key, const char* start = file_data_.data() + *offset; size_t bytes_for_key; - Status s = ReadKey(start, key, &bytes_for_key); + Status s = + decoder->NextKey(start, file_data_.data() + data_end_offset_, parsed_key, + internal_key, &bytes_for_key, seekable); if (!s.ok()) { return s; } @@ -626,6 +649,11 @@ Status PlainTableReader::Get(const ReadOptions& ro, const Slice& target, Slice prefix_slice; uint32_t prefix_hash; if (IsTotalOrderMode()) { + if (index_size_ == kFullScanModeFlag) { + // Full Scan Mode + status_ = + Status::InvalidArgument("Get() is not allowed in full scan mode."); + } // Match whole user key for bloom filter check. if (!MatchBloom(GetSliceHash(GetUserKey(target)))) { return Status::OK(); @@ -655,8 +683,10 @@ Status PlainTableReader::Get(const ReadOptions& ro, const Slice& target, } Slice found_value; + PlainTableKeyDecoder decoder(encoding_type_, user_key_len_, + options_.prefix_extractor.get()); while (offset < data_end_offset_) { - Status s = Next(&offset, &found_key, &found_value); + Status s = Next(&decoder, &offset, &found_key, nullptr, &found_value); if (!s.ok()) { return s; } @@ -683,7 +713,10 @@ uint64_t PlainTableReader::ApproximateOffsetOf(const Slice& key) { PlainTableIterator::PlainTableIterator(PlainTableReader* table, bool use_prefix_seek) - : table_(table), use_prefix_seek_(use_prefix_seek) { + : table_(table), + decoder_(table_->encoding_type_, table_->user_key_len_, + table_->prefix_extractor_), + use_prefix_seek_(use_prefix_seek) { next_offset_ = offset_ = table_->data_end_offset_; } @@ -712,12 +745,21 @@ void PlainTableIterator::SeekToLast() { void PlainTableIterator::Seek(const Slice& target) { // If the user doesn't set prefix seek option and we are not able to do a // total Seek(). assert failure. - if (!use_prefix_seek_ && table_->index_size_ > 1) { - assert(false); - status_ = Status::NotSupported( - "PlainTable cannot issue non-prefix seek unless in total order mode."); - offset_ = next_offset_ = table_->data_end_offset_; - return; + if (!use_prefix_seek_) { + if (table_->index_size_ == PlainTableReader::kFullScanModeFlag) { + // Full Scan Mode. + status_ = + Status::InvalidArgument("Seek() is not allowed in full scan mode."); + offset_ = next_offset_ = table_->data_end_offset_; + return; + } else if (table_->index_size_ > 1) { + assert(false); + status_ = Status::NotSupported( + "PlainTable cannot issue non-prefix seek unless in total order " + "mode."); + offset_ = next_offset_ = table_->data_end_offset_; + return; + } } Slice prefix_slice = table_->GetPrefix(target); @@ -762,11 +804,9 @@ void PlainTableIterator::Next() { if (offset_ < table_->data_end_offset_) { Slice tmp_slice; ParsedInternalKey parsed_key; - status_ = table_->Next(&next_offset_, &parsed_key, &value_); - if (status_.ok()) { - // Make a copy in this case. TODO optimize. - key_.SetInternalKey(parsed_key); - } else { + status_ = + table_->Next(&decoder_, &next_offset_, &parsed_key, &key_, &value_); + if (!status_.ok()) { offset_ = next_offset_ = table_->data_end_offset_; } } @@ -778,7 +818,7 @@ void PlainTableIterator::Prev() { Slice PlainTableIterator::key() const { assert(Valid()); - return key_.GetKey(); + return key_; } Slice PlainTableIterator::value() const { diff --git a/table/plain_table_reader.h b/table/plain_table_reader.h index 9936847e3..8e5a3ee52 100644 --- a/table/plain_table_reader.h +++ b/table/plain_table_reader.h @@ -33,6 +33,7 @@ struct ReadOptions; class TableCache; class TableReader; class InternalKeyComparator; +class PlainTableKeyDecoder; using std::unique_ptr; using std::unordered_map; @@ -53,7 +54,8 @@ class PlainTableReader: public TableReader { unique_ptr&& file, uint64_t file_size, unique_ptr* table, const int bloom_bits_per_key, double hash_table_ratio, - size_t index_sparseness, size_t huge_page_tlb_size); + size_t index_sparseness, size_t huge_page_tlb_size, + bool full_scan_mode); Iterator* NewIterator(const ReadOptions&, Arena* arena = nullptr) override; @@ -75,7 +77,8 @@ class PlainTableReader: public TableReader { PlainTableReader(const Options& options, unique_ptr&& file, const EnvOptions& storage_options, const InternalKeyComparator& internal_comparator, - uint64_t file_size, const TableProperties* table_properties); + EncodingType encoding_type, uint64_t file_size, + const TableProperties* table_properties); virtual ~PlainTableReader(); protected: @@ -128,6 +131,7 @@ class PlainTableReader: public TableReader { Status PopulateIndex(TableProperties* props, int bloom_bits_per_key, double hash_table_ratio, size_t index_sparseness, size_t huge_page_tlb_size); + Status MmapDataFile(); private: struct IndexRecord; @@ -143,6 +147,7 @@ class PlainTableReader: public TableReader { int index_size_ = 0; char* sub_index_; const InternalKeyComparator internal_comparator_; + EncodingType encoding_type_; // represents plain table's current status. Status status_; Slice file_data_; @@ -159,6 +164,7 @@ class PlainTableReader: public TableReader { static const size_t kOffsetLen = sizeof(uint32_t); static const uint64_t kMaxFileSize = 1u << 31; static const size_t kRecordsPerGroup = 256; + static const int kFullScanModeFlag = -1; // Bloom filter is used to rule out non-existent key bool enable_bloom_; @@ -213,14 +219,17 @@ class PlainTableReader: public TableReader { const std::vector& entries_per_bucket, size_t huge_page_tlb_size); - // Read a plain table key from the position `start`. The read content - // will be written to `key` and the size of read bytes will be populated - // in `bytes_read`. - Status ReadKey(const char* row_ptr, ParsedInternalKey* key, - size_t* bytes_read) const; - // Read the key and value at `offset` to parameters `key` and `value`. + // Read the key and value at `offset` to parameters for keys, the and + // `seekable`. // On success, `offset` will be updated as the offset for the next key. - Status Next(uint32_t* offset, ParsedInternalKey* key, Slice* value) const; + // `parsed_key` will be key in parsed format. + // if `internal_key` is not empty, it will be filled with key with slice + // format. + // if `seekable` is not null, it will return whether we can directly read + // data using this offset. + Status Next(PlainTableKeyDecoder* decoder, uint32_t* offset, + ParsedInternalKey* parsed_key, Slice* internal_key, Slice* value, + bool* seekable = nullptr) const; // Get file offset for key target. // return value prefix_matched is set to true if the offset is confirmed // for a key with the same prefix as target. diff --git a/tools/sst_dump.cc b/tools/sst_dump.cc index 63fb80d9e..ea87a0607 100644 --- a/tools/sst_dump.cc +++ b/tools/sst_dump.cc @@ -157,9 +157,8 @@ Status SstFileReader::SetTableOptionsByMagicNumber( } else if (table_magic_number == kPlainTableMagicNumber || table_magic_number == kLegacyPlainTableMagicNumber) { options_.allow_mmap_reads = true; - options_.table_factory = std::make_shared( - table_properties_->fixed_key_len, 2, 0.8); - options_.prefix_extractor.reset(NewNoopTransform()); + options_.table_factory.reset(NewTotalOrderPlainTableFactory( + kPlainTableVariableLength, 0, 1, 0, true)); fprintf(stdout, "Sst file format: plain table\n"); } else { char error_msg_buffer[80];