PlainTable to encode to avoid to rewrite prefix when it is the same as the previous key

Summary:
Add a encoding feature of PlainTable to encode PlainTable's keys to save some bytes for the same prefixes.
The data format is documented in table/plain_table_factory.h

Test Plan: Add unit test coverage in plain_table_db_test

Reviewers: yhchiang, igor, dhruba, ljin, haobo

Reviewed By: haobo

Subscribers: nkg-, leveldb

Differential Revision: https://reviews.facebook.net/D18735
This commit is contained in:
sdong 2014-06-18 16:36:48 -07:00
parent 0f0076ed5a
commit edd47c5104
12 changed files with 890 additions and 207 deletions

View File

@ -263,17 +263,38 @@ class IterKey {
key_size_ = size;
}
void SetInternalKey(const Slice& key_prefix, const Slice& user_key,
SequenceNumber s,
ValueType value_type = kValueTypeForSeek) {
size_t psize = key_prefix.size();
size_t usize = user_key.size();
EnlargeBufferIfNeeded(psize + usize + sizeof(uint64_t));
if (psize > 0) {
memcpy(key_, key_prefix.data(), psize);
}
memcpy(key_ + psize, user_key.data(), usize);
EncodeFixed64(key_ + usize + psize, PackSequenceAndType(s, value_type));
key_size_ = psize + usize + sizeof(uint64_t);
}
void SetInternalKey(const Slice& user_key, SequenceNumber s,
ValueType value_type = kValueTypeForSeek) {
size_t usize = user_key.size();
EnlargeBufferIfNeeded(usize + sizeof(uint64_t));
memcpy(key_, user_key.data(), usize);
EncodeFixed64(key_ + usize, PackSequenceAndType(s, value_type));
key_size_ = usize + sizeof(uint64_t);
SetInternalKey(Slice(), user_key, s, value_type);
}
void Reserve(size_t size) {
EnlargeBufferIfNeeded(size);
key_size_ = size;
}
void SetInternalKey(const ParsedInternalKey& parsed_key) {
SetInternalKey(parsed_key.user_key, parsed_key.sequence, parsed_key.type);
SetInternalKey(Slice(), parsed_key);
}
void SetInternalKey(const Slice& key_prefix,
const ParsedInternalKey& parsed_key_suffix) {
SetInternalKey(key_prefix, parsed_key_suffix.user_key,
parsed_key_suffix.sequence, parsed_key_suffix.type);
}
private:

View File

@ -61,7 +61,7 @@ class PlainTableDBTest {
// Return the current option configuration.
Options CurrentOptions() {
Options options;
options.table_factory.reset(NewPlainTableFactory(16, 2, 0.8, 3));
options.table_factory.reset(NewPlainTableFactory(0, 2, 0.8, 3, 0, kPrefix));
options.memtable_factory.reset(NewHashLinkListRepFactory(4, 0, 3, true));
options.prefix_extractor.reset(NewFixedPrefixTransform(8));
options.allow_mmap_reads = true;
@ -179,17 +179,21 @@ class TestPlainTableReader : public PlainTableReader {
public:
TestPlainTableReader(const EnvOptions& storage_options,
const InternalKeyComparator& icomparator,
uint64_t file_size, int bloom_bits_per_key,
double hash_table_ratio, size_t index_sparseness,
EncodingType encoding_type, uint64_t file_size,
int bloom_bits_per_key, double hash_table_ratio,
size_t index_sparseness,
const TableProperties* table_properties,
unique_ptr<RandomAccessFile>&& file,
const Options& options, bool* expect_bloom_not_match)
: PlainTableReader(options, std::move(file), storage_options, icomparator,
file_size, table_properties),
encoding_type, file_size, table_properties),
expect_bloom_not_match_(expect_bloom_not_match) {
Status s = PopulateIndex(const_cast<TableProperties*>(table_properties),
bloom_bits_per_key, hash_table_ratio,
index_sparseness, 2 * 1024 * 1024);
Status s = MmapDataFile();
ASSERT_TRUE(s.ok());
s = PopulateIndex(const_cast<TableProperties*>(table_properties),
bloom_bits_per_key, hash_table_ratio, index_sparseness,
2 * 1024 * 1024);
ASSERT_TRUE(s.ok());
}
@ -211,9 +215,10 @@ class TestPlainTableFactory : public PlainTableFactory {
uint32_t user_key_len, int bloom_bits_per_key,
double hash_table_ratio,
size_t index_sparseness,
size_t huge_page_tlb_size)
: PlainTableFactory(user_key_len, user_key_len, hash_table_ratio,
index_sparseness, huge_page_tlb_size),
size_t huge_page_tlb_size,
EncodingType encoding_type)
: PlainTableFactory(user_key_len, bloom_bits_per_key, hash_table_ratio,
index_sparseness, huge_page_tlb_size, encoding_type),
bloom_bits_per_key_(bloom_bits_per_key),
hash_table_ratio_(hash_table_ratio),
index_sparseness_(index_sparseness),
@ -228,10 +233,17 @@ class TestPlainTableFactory : public PlainTableFactory {
options.env, options.info_log.get(), &props);
ASSERT_TRUE(s.ok());
auto& user_props = props->user_collected_properties;
auto encoding_type_prop =
user_props.find(PlainTablePropertyNames::kEncodingType);
assert(encoding_type_prop != user_props.end());
EncodingType encoding_type = static_cast<EncodingType>(
DecodeFixed32(encoding_type_prop->second.c_str()));
std::unique_ptr<PlainTableReader> new_reader(new TestPlainTableReader(
soptions, internal_comparator, file_size, bloom_bits_per_key_,
hash_table_ratio_, index_sparseness_, props, std::move(file), options,
expect_bloom_not_match_));
soptions, internal_comparator, encoding_type, file_size,
bloom_bits_per_key_, hash_table_ratio_, index_sparseness_, props,
std::move(file), options, expect_bloom_not_match_));
*table = std::move(new_reader);
return s;
@ -247,18 +259,22 @@ class TestPlainTableFactory : public PlainTableFactory {
TEST(PlainTableDBTest, Flush) {
for (size_t huge_page_tlb_size = 0; huge_page_tlb_size <= 2 * 1024 * 1024;
huge_page_tlb_size += 2 * 1024 * 1024) {
for (EncodingType encoding_type : {kPlain, kPrefix}) {
for (int bloom_bits = 0; bloom_bits <= 117; bloom_bits += 117) {
for (int total_order = 0; total_order <= 1; total_order++) {
if (encoding_type == kPrefix && total_order == 1) {
continue;
}
Options options = CurrentOptions();
options.create_if_missing = true;
// Set only one bucket to force bucket conflict.
// Test index interval for the same prefix to be 1, 2 and 4
if (total_order) {
options.table_factory.reset(NewTotalOrderPlainTableFactory(
16, bloom_bits, 2, huge_page_tlb_size));
0, bloom_bits, 2, huge_page_tlb_size));
} else {
options.table_factory.reset(NewPlainTableFactory(
16, bloom_bits, 0.75, 16, huge_page_tlb_size));
0, bloom_bits, 0.75, 16, huge_page_tlb_size, encoding_type));
}
DestroyAndReopen(&options);
@ -281,14 +297,19 @@ TEST(PlainTableDBTest, Flush) {
ASSERT_EQ("v2", Get("0000000000000bar"));
}
}
}
}
}
TEST(PlainTableDBTest, Flush2) {
for (size_t huge_page_tlb_size = 0; huge_page_tlb_size <= 2 * 1024 * 1024;
huge_page_tlb_size += 2 * 1024 * 1024) {
for (EncodingType encoding_type : {kPlain, kPrefix}) {
for (int bloom_bits = 0; bloom_bits <= 117; bloom_bits += 117) {
for (int total_order = 0; total_order <= 1; total_order++) {
if (encoding_type == kPrefix && total_order == 1) {
continue;
}
bool expect_bloom_not_match = false;
Options options = CurrentOptions();
options.create_if_missing = true;
@ -296,13 +317,13 @@ TEST(PlainTableDBTest, Flush2) {
// Test index interval for the same prefix to be 1, 2 and 4
if (total_order) {
options.prefix_extractor = nullptr;
options.table_factory.reset(
new TestPlainTableFactory(&expect_bloom_not_match, 16, bloom_bits,
0, 2, huge_page_tlb_size));
options.table_factory.reset(new TestPlainTableFactory(
&expect_bloom_not_match, 0, bloom_bits, 0, 2, huge_page_tlb_size,
encoding_type));
} else {
options.table_factory.reset(
new TestPlainTableFactory(&expect_bloom_not_match, 16, bloom_bits,
0.75, 16, huge_page_tlb_size));
options.table_factory.reset(new TestPlainTableFactory(
&expect_bloom_not_match, 0, bloom_bits, 0.75, 16,
huge_page_tlb_size, encoding_type));
}
DestroyAndReopen(&options);
ASSERT_OK(Put("0000000000000bar", "b"));
@ -341,14 +362,19 @@ TEST(PlainTableDBTest, Flush2) {
}
}
}
}
}
}
TEST(PlainTableDBTest, Iterator) {
for (size_t huge_page_tlb_size = 0; huge_page_tlb_size <= 2 * 1024 * 1024;
huge_page_tlb_size += 2 * 1024 * 1024) {
for (EncodingType encoding_type : {kPlain, kPrefix}) {
for (int bloom_bits = 0; bloom_bits <= 117; bloom_bits += 117) {
for (int total_order = 0; total_order <= 1; total_order++) {
if (encoding_type == kPrefix && total_order == 1) {
continue;
}
bool expect_bloom_not_match = false;
Options options = CurrentOptions();
options.create_if_missing = true;
@ -356,13 +382,13 @@ TEST(PlainTableDBTest, Iterator) {
// Test index interval for the same prefix to be 1, 2 and 4
if (total_order) {
options.prefix_extractor = nullptr;
options.table_factory.reset(
new TestPlainTableFactory(&expect_bloom_not_match, 16, bloom_bits,
0, 2, huge_page_tlb_size));
options.table_factory.reset(new TestPlainTableFactory(
&expect_bloom_not_match, 16, bloom_bits, 0, 2, huge_page_tlb_size,
encoding_type));
} else {
options.table_factory.reset(
new TestPlainTableFactory(&expect_bloom_not_match, 16, bloom_bits,
0.75, 16, huge_page_tlb_size));
options.table_factory.reset(new TestPlainTableFactory(
&expect_bloom_not_match, 16, bloom_bits, 0.75, 16,
huge_page_tlb_size, encoding_type));
}
DestroyAndReopen(&options);
@ -449,6 +475,7 @@ TEST(PlainTableDBTest, Iterator) {
delete iter;
}
}
}
}
}
@ -460,7 +487,7 @@ std::string MakeLongKey(size_t length, char c) {
TEST(PlainTableDBTest, IteratorLargeKeys) {
Options options = CurrentOptions();
options.table_factory.reset(NewTotalOrderPlainTableFactory(0, 0, 16));
options.table_factory.reset(NewTotalOrderPlainTableFactory(0, 0, 16, 0));
options.create_if_missing = true;
options.prefix_extractor.reset();
DestroyAndReopen(&options);
@ -496,6 +523,45 @@ TEST(PlainTableDBTest, IteratorLargeKeys) {
delete iter;
}
namespace {
std::string MakeLongKeyWithPrefix(size_t length, char c) {
return "00000000" + std::string(length - 8, c);
}
} // namespace
TEST(PlainTableDBTest, IteratorLargeKeysWithPrefix) {
Options options = CurrentOptions();
options.table_factory.reset(NewPlainTableFactory(16, 0, 0.8, 3, 0, kPrefix));
options.create_if_missing = true;
DestroyAndReopen(&options);
std::string key_list[] = {
MakeLongKeyWithPrefix(30, '0'), MakeLongKeyWithPrefix(16, '1'),
MakeLongKeyWithPrefix(32, '2'), MakeLongKeyWithPrefix(60, '3'),
MakeLongKeyWithPrefix(90, '4'), MakeLongKeyWithPrefix(50, '5'),
MakeLongKeyWithPrefix(26, '6')};
for (size_t i = 0; i < 7; i++) {
ASSERT_OK(Put(key_list[i], std::to_string(i)));
}
dbfull()->TEST_FlushMemTable();
Iterator* iter = dbfull()->NewIterator(ReadOptions());
iter->Seek(key_list[0]);
for (size_t i = 0; i < 7; i++) {
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(key_list[i], iter->key().ToString());
ASSERT_EQ(std::to_string(i), iter->value().ToString());
iter->Next();
}
ASSERT_TRUE(!iter->Valid());
delete iter;
}
// A test comparator which compare two strings in this way:
// (1) first compare prefix of 8 bytes in alphabet order,
// (2) if two strings share the same prefix, sort the other part of the string

View File

@ -97,6 +97,30 @@ extern TableFactory* NewBlockBasedTableFactory(
const BlockBasedTableOptions& table_options = BlockBasedTableOptions());
#ifndef ROCKSDB_LITE
enum EncodingType : char {
// Always write full keys without any special encoding.
kPlain,
// Find opportunity to write the same prefix once for multiple rows.
// In some cases, when a key follows a previous key with the same prefix,
// instead of writing out the full key, it just writes out the size of the
// shared prefix, as well as other bytes, to save some bytes.
//
// When using this option, the user is required to use the same prefix
// extractor to make sure the same prefix will be extracted from the same key.
// The Name() value of the prefix extractor will be stored in the file. When
// reopening the file, the name of the options.prefix_extractor given will be
// bitwise compared to the prefix extractors stored in the file. An error
// will be returned if the two don't match.
kPrefix,
};
// Table Properties that are specific to plain table properties.
struct PlainTablePropertyNames {
static const std::string kPrefixExtractorName;
static const std::string kEncodingType;
};
// -- Plain Table with prefix-only seek
// For this factory, you need to set Options.prefix_extrator properly to make it
// work. Look-up will starts with prefix hash lookup for key prefix. Inside the
@ -113,11 +137,22 @@ extern TableFactory* NewBlockBasedTableFactory(
// in the hash table
// @index_sparseness: inside each prefix, need to build one index record for how
// many keys for binary search inside each hash bucket.
// For encoding type kPrefix, the value will be used when
// writing to determine an interval to rewrite the full key.
// It will also be used as a suggestion and satisfied when
// possible.
// @huge_page_tlb_size: if <=0, allocate hash indexes and blooms from malloc.
// Otherwise from huge page TLB. The user needs to reserve
// huge pages for it to be allocated, like:
// sysctl -w vm.nr_hugepages=20
// See linux doc Documentation/vm/hugetlbpage.txt
// @encoding_type: how to encode the keys. See enum EncodingType above for
// the choices. The value will determine how to encode keys
// when writing to a new SST file. This value will be stored
// inside the SST file which will be used when reading from the
// file, which makes it possible for users to choose different
// encoding type when reopening a DB. Files with different
// encoding types can co-exist in the same DB and can be read.
const uint32_t kPlainTableVariableLength = 0;
extern TableFactory* NewPlainTableFactory(uint32_t user_key_len =
@ -125,7 +160,8 @@ extern TableFactory* NewPlainTableFactory(uint32_t user_key_len =
int bloom_bits_per_prefix = 10,
double hash_table_ratio = 0.75,
size_t index_sparseness = 16,
size_t huge_page_tlb_size = 0);
size_t huge_page_tlb_size = 0,
EncodingType encoding_type = kPlain);
// -- Plain Table
// This factory of plain table ignores Options.prefix_extractor and assumes no
@ -147,7 +183,7 @@ extern TableFactory* NewPlainTableFactory(uint32_t user_key_len =
extern TableFactory* NewTotalOrderPlainTableFactory(
uint32_t user_key_len = kPlainTableVariableLength,
int bloom_bits_per_key = 0, size_t index_sparseness = 16,
size_t huge_page_tlb_size = 0);
size_t huge_page_tlb_size = 0, bool full_scan_mode = false);
#endif // ROCKSDB_LITE

View File

@ -1,6 +1,7 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef ROCKSDB_LITE
#include "table/plain_table_builder.h"
@ -12,6 +13,7 @@
#include "rocksdb/env.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/options.h"
#include "rocksdb/table.h"
#include "table/plain_table_factory.h"
#include "db/dbformat.h"
#include "table/block_builder.h"
@ -52,10 +54,14 @@ Status WriteBlock(
extern const uint64_t kPlainTableMagicNumber = 0x8242229663bf9564ull;
extern const uint64_t kLegacyPlainTableMagicNumber = 0x4f3418eb7a8f13b8ull;
PlainTableBuilder::PlainTableBuilder(const Options& options,
WritableFile* file,
uint32_t user_key_len) :
options_(options), file_(file), user_key_len_(user_key_len) {
PlainTableBuilder::PlainTableBuilder(const Options& options, WritableFile* file,
uint32_t user_key_len,
EncodingType encoding_type,
size_t index_sparseness)
: options_(options),
file_(file),
encoder_(encoding_type, user_key_len, options.prefix_extractor.get(),
index_sparseness) {
properties_.fixed_key_len = user_key_len;
// for plain table, we put all the data in a big chuck.
@ -64,7 +70,20 @@ PlainTableBuilder::PlainTableBuilder(const Options& options,
// filter block.
properties_.index_size = 0;
properties_.filter_size = 0;
properties_.format_version = 0;
// To support roll-back to previous version, now still use version 0 for
// plain encoding.
properties_.format_version = (encoding_type == kPlain) ? 0 : 1;
if (options_.prefix_extractor) {
properties_.user_collected_properties
[PlainTablePropertyNames::kPrefixExtractorName] =
options_.prefix_extractor->Name();
}
std::string val;
PutFixed32(&val, static_cast<uint32_t>(encoder_.GetEncodingType()));
properties_.user_collected_properties
[PlainTablePropertyNames::kEncodingType] = val;
for (auto& collector_factories :
options.table_properties_collector_factories) {
@ -77,51 +96,25 @@ PlainTableBuilder::~PlainTableBuilder() {
}
void PlainTableBuilder::Add(const Slice& key, const Slice& value) {
size_t user_key_size = key.size() - 8;
assert(user_key_len_ == 0 || user_key_size == user_key_len_);
// temp buffer for metadata bytes between key and value.
char meta_bytes_buf[6];
size_t meta_bytes_buf_size = 0;
if (!IsFixedLength()) {
// Write key length
char key_size_buf[5]; // tmp buffer for key size as varint32
char* ptr = EncodeVarint32(key_size_buf, user_key_size);
assert(ptr <= key_size_buf + sizeof(key_size_buf));
auto len = ptr - key_size_buf;
file_->Append(Slice(key_size_buf, len));
offset_ += len;
}
// Write key
ParsedInternalKey parsed_key;
if (!ParseInternalKey(key, &parsed_key)) {
status_ = Status::Corruption(Slice());
return;
}
// For value size as varint32 (up to 5 bytes).
// If the row is of value type with seqId 0, flush the special flag together
// in this buffer to safe one file append call, which takes 1 byte.
char value_size_buf[6];
size_t value_size_buf_size = 0;
if (parsed_key.sequence == 0 && parsed_key.type == kTypeValue) {
file_->Append(Slice(key.data(), user_key_size));
offset_ += user_key_size;
value_size_buf[0] = PlainTableFactory::kValueTypeSeqId0;
value_size_buf_size = 1;
} else {
file_->Append(key);
offset_ += key.size();
}
// Write out the key
encoder_.AppendKey(key, file_, &offset_, meta_bytes_buf,
&meta_bytes_buf_size);
// Write value length
int value_size = value.size();
char* end_ptr =
EncodeVarint32(value_size_buf + value_size_buf_size, value_size);
assert(end_ptr <= value_size_buf + sizeof(value_size_buf));
value_size_buf_size = end_ptr - value_size_buf;
file_->Append(Slice(value_size_buf, value_size_buf_size));
EncodeVarint32(meta_bytes_buf + meta_bytes_buf_size, value_size);
assert(end_ptr <= meta_bytes_buf + sizeof(meta_bytes_buf));
meta_bytes_buf_size = end_ptr - meta_bytes_buf;
file_->Append(Slice(meta_bytes_buf, meta_bytes_buf_size));
// Write value
file_->Append(value);
offset_ += value_size + value_size_buf_size;
offset_ += value_size + meta_bytes_buf_size;
properties_.num_entries++;
properties_.raw_key_size += key.size();
@ -150,6 +143,8 @@ Status PlainTableBuilder::Finish() {
// -- Add basic properties
property_block_builder.AddTableProperty(properties_);
property_block_builder.Add(properties_.user_collected_properties);
// -- Add user collected properties
NotifyCollectTableCollectorsOnFinish(table_properties_collectors_,
options_.info_log.get(),

View File

@ -1,9 +1,7 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// IndexedTable is a simple table format for UNIT TEST ONLY. It is not built
// as production quality.
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#ifndef ROCKSDB_LITE
@ -12,6 +10,8 @@
#include "rocksdb/options.h"
#include "rocksdb/status.h"
#include "table/table_builder.h"
#include "table/plain_table_key_coding.h"
#include "rocksdb/table.h"
#include "rocksdb/table_properties.h"
namespace rocksdb {
@ -22,14 +22,15 @@ class WritableFile;
class TableBuilder;
class PlainTableBuilder: public TableBuilder {
public:
public:
// Create a builder that will store the contents of the table it is
// building in *file. Does not close the file. It is up to the
// caller to close the file after calling Finish(). The output file
// will be part of level specified by 'level'. A value of -1 means
// that the caller does not know which level the output file will reside.
PlainTableBuilder(const Options& options, WritableFile* file,
uint32_t user_key_size);
uint32_t user_key_size, EncodingType encoding_type,
size_t index_sparseness);
// REQUIRES: Either Finish() or Abandon() has been called.
~PlainTableBuilder();
@ -61,7 +62,7 @@ public:
// Finish() call, returns the size of the final generated file.
uint64_t FileSize() const override;
private:
private:
Options options_;
std::vector<std::unique_ptr<TablePropertiesCollector>>
table_properties_collectors_;
@ -69,14 +70,10 @@ private:
uint64_t offset_ = 0;
Status status_;
TableProperties properties_;
PlainTableKeyEncoder encoder_;
const size_t user_key_len_;
bool closed_ = false; // Either Finish() or Abandon() has been called.
bool IsFixedLength() const {
return user_key_len_ > 0;
}
// No copying allowed
PlainTableBuilder(const PlainTableBuilder&) = delete;
void operator=(const PlainTableBuilder&) = delete;

View File

@ -23,32 +23,42 @@ Status PlainTableFactory::NewTableReader(const Options& options,
return PlainTableReader::Open(options, soptions, icomp, std::move(file),
file_size, table, bloom_bits_per_key_,
hash_table_ratio_, index_sparseness_,
huge_page_tlb_size_);
huge_page_tlb_size_, full_scan_mode_);
}
TableBuilder* PlainTableFactory::NewTableBuilder(
const Options& options, const InternalKeyComparator& internal_comparator,
WritableFile* file, CompressionType compression_type) const {
return new PlainTableBuilder(options, file, user_key_len_);
return new PlainTableBuilder(options, file, user_key_len_, encoding_type_,
index_sparseness_);
}
extern TableFactory* NewPlainTableFactory(uint32_t user_key_len,
int bloom_bits_per_key,
double hash_table_ratio,
size_t index_sparseness,
size_t huge_page_tlb_size) {
size_t huge_page_tlb_size,
EncodingType encoding_type) {
return new PlainTableFactory(user_key_len, bloom_bits_per_key,
hash_table_ratio, index_sparseness,
huge_page_tlb_size);
huge_page_tlb_size, encoding_type);
}
extern TableFactory* NewTotalOrderPlainTableFactory(uint32_t user_key_len,
int bloom_bits_per_key,
size_t index_sparseness,
size_t huge_page_tlb_size) {
size_t huge_page_tlb_size,
bool full_scan_mode) {
return new PlainTableFactory(user_key_len, bloom_bits_per_key, 0,
index_sparseness, huge_page_tlb_size);
index_sparseness, huge_page_tlb_size, kPlain,
full_scan_mode);
}
const std::string PlainTablePropertyNames::kPrefixExtractorName =
"rocksdb.prefix.extractor.name";
const std::string PlainTablePropertyNames::kEncodingType =
"rocksdb.plain.table.encoding.type";
} // namespace rocksdb
#endif // ROCKSDB_LITE

View File

@ -6,6 +6,7 @@
#ifndef ROCKSDB_LITE
#include <memory>
#include <string>
#include <stdint.h>
#include "rocksdb/options.h"
@ -27,20 +28,103 @@ class TableBuilder;
// parameter of the factory class. Output file format:
// +-------------+-----------------+
// | version | user_key_length |
// +------------++------------------------------+ <= key1 offset
// | [key_size] | key1 | value_size | |
// +------------++------------+-----------------+ <= key1 offset
// | encoded key1 | value_size | |
// +------------+-------------+-------------+ |
// | value1 |
// | |
// +----------------------------------------+---+ <= key2 offset
// | [key_size] | key2 | value_size | |
// +--------------------------+-------------+---+ <= key2 offset
// | encoded key2 | value_size | |
// +------------+-------------+-------------+ |
// | value2 |
// | |
// | ...... |
// +-----------------+--------------------------+
// If user_key_length = kPlainTableVariableLength, it means the key is variable
// length, there will be an extra field for key size encoded before every key.
//
// When the key encoding type is kPlain. Key part is encoded as:
// +------------+--------------------+
// | [key_size] | internal key |
// +------------+--------------------+
// for the case of user_key_len = kPlainTableVariableLength case,
// and simply:
// +----------------------+
// | internal key |
// +----------------------+
// for user_key_len != kPlainTableVariableLength case.
//
// If key encoding type is kPrefix. Keys are encoding in this format.
// There are three ways to encode a key:
// (1) Full Key
// +---------------+---------------+-------------------+
// | Full Key Flag | Full Key Size | Full Internal Key |
// +---------------+---------------+-------------------+
// which simply encodes a full key
//
// (2) A key shared the same prefix as the previous key, which is encoded as
// format of (1).
// +-------------+-------------+-------------+-------------+------------+
// | Prefix Flag | Prefix Size | Suffix Flag | Suffix Size | Key Suffix |
// +-------------+-------------+-------------+-------------+------------+
// where key is the suffix part of the key, including the internal bytes.
// the actual key will be constructed by concatenating prefix part of the
// previous key, with the suffix part of the key here, with sizes given here.
//
// (3) A key shared the same prefix as the previous key, which is encoded as
// the format of (2).
// +-----------------+-----------------+------------------------+
// | Key Suffix Flag | Key Suffix Size | Suffix of Internal Key |
// +-----------------+-----------------+------------------------+
// The key will be constructed by concatenating previous key's prefix (which is
// also a prefix which the last key encoded in the format of (1)) and the
// key given here.
//
// For example, we for following keys (prefix and suffix are separated by
// spaces):
// 0000 0001
// 0000 00021
// 0000 0002
// 00011 00
// 0002 0001
// Will be encoded like this:
// FK 8 00000001
// PF 4 SF 5 00021
// SF 4 0002
// FK 7 0001100
// FK 8 00020001
// (where FK means full key flag, PF means prefix flag and SF means suffix flag)
//
// All those "key flag + key size" shown above are in this format:
// The 8 bits of the first byte:
// +----+----+----+----+----+----+----+----+
// | Type | Size |
// +----+----+----+----+----+----+----+----+
// Type indicates: full key, prefix, or suffix.
// The last 6 bits are for size. If the size bits are not all 1, it means the
// size of the key. Otherwise, varint32 is read after this byte. This varint
// value + 0x3F (the value of all 1) will be the key size.
//
// For example, full key with length 16 will be encoded as (binary):
// 00 010000
// (00 means full key)
// and a prefix with 100 bytes will be encoded as:
// 01 111111 00100101
// (63) (37)
// (01 means key suffix)
//
// All the internal keys above (including kPlain and kPrefix) are encoded in
// this format:
// There are two types:
// (1) normal internal key format
// +----------- ...... -------------+----+---+---+---+---+---+---+---+
// | user key |type| sequence ID |
// +----------- ..... --------------+----+---+---+---+---+---+---+---+
// (2) Special case for keys whose sequence ID is 0 and is value type
// +----------- ...... -------------+----+
// | user key |0x80|
// +----------- ..... --------------+----+
// To save 7 bytes for the special case where sequence ID = 0.
//
//
class PlainTableFactory : public TableFactory {
public:
~PlainTableFactory() {}
@ -63,12 +147,16 @@ class PlainTableFactory : public TableFactory {
int bloom_bits_per_key = 0,
double hash_table_ratio = 0.75,
size_t index_sparseness = 16,
size_t huge_page_tlb_size = 0)
size_t huge_page_tlb_size = 0,
EncodingType encoding_type = kPlain,
bool full_scan_mode = false)
: user_key_len_(user_key_len),
bloom_bits_per_key_(bloom_bits_per_key),
hash_table_ratio_(hash_table_ratio),
index_sparseness_(index_sparseness),
huge_page_tlb_size_(huge_page_tlb_size) {}
huge_page_tlb_size_(huge_page_tlb_size),
encoding_type_(encoding_type),
full_scan_mode_(full_scan_mode) {}
const char* Name() const override { return "PlainTable"; }
Status NewTableReader(const Options& options, const EnvOptions& soptions,
const InternalKeyComparator& internal_comparator,
@ -88,6 +176,8 @@ class PlainTableFactory : public TableFactory {
double hash_table_ratio_;
size_t index_sparseness_;
size_t huge_page_tlb_size_;
EncodingType encoding_type_;
bool full_scan_mode_;
};
} // namespace rocksdb

View File

@ -0,0 +1,323 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef ROCKSDB_LITE
#include "table/plain_table_key_coding.h"
#include "table/plain_table_factory.h"
#include "db/dbformat.h"
namespace rocksdb {
namespace {
enum EntryType : unsigned char {
kFullKey = 0,
kPrefixFromPreviousKey = 1,
kKeySuffix = 2,
};
// Control byte:
// First two bits indicate type of entry
// Other bytes are inlined sizes. If all bits are 1 (0x03F), overflow bytes
// are used. key_size-0x3F will be encoded as a variint32 after this bytes.
const unsigned char kSizeInlineLimit = 0x3F;
// Return 0 for error
size_t EncodeSize(EntryType type, uint32_t key_size, char* out_buffer) {
out_buffer[0] = type << 6;
if (key_size < 0x3F) {
// size inlined
out_buffer[0] |= static_cast<char>(key_size);
return 1;
} else {
out_buffer[0] |= kSizeInlineLimit;
char* ptr = EncodeVarint32(out_buffer + 1, key_size - kSizeInlineLimit);
return ptr - out_buffer;
}
}
// Return position after the size byte(s). nullptr means error
const char* DecodeSize(const char* offset, const char* limit,
EntryType* entry_type, size_t* key_size) {
assert(offset < limit);
*entry_type = static_cast<EntryType>(
(static_cast<unsigned char>(offset[0]) & ~kSizeInlineLimit) >> 6);
char inline_key_size = offset[0] & kSizeInlineLimit;
if (inline_key_size < kSizeInlineLimit) {
*key_size = inline_key_size;
return offset + 1;
} else {
uint32_t extra_size;
const char* ptr = GetVarint32Ptr(offset + 1, limit, &extra_size);
if (ptr == nullptr) {
return nullptr;
}
*key_size = kSizeInlineLimit + extra_size;
return ptr;
}
}
} // namespace
Status PlainTableKeyEncoder::AppendKey(const Slice& key, WritableFile* file,
uint64_t* offset, char* meta_bytes_buf,
size_t* meta_bytes_buf_size) {
ParsedInternalKey parsed_key;
if (!ParseInternalKey(key, &parsed_key)) {
return Status::Corruption(Slice());
}
Slice key_to_write = key; // Portion of internal key to write out.
size_t user_key_size = fixed_user_key_len_;
if (encoding_type_ == kPlain) {
if (fixed_user_key_len_ == kPlainTableVariableLength) {
user_key_size = key.size() - 8;
// Write key length
char key_size_buf[5]; // tmp buffer for key size as varint32
char* ptr = EncodeVarint32(key_size_buf, user_key_size);
assert(ptr <= key_size_buf + sizeof(key_size_buf));
auto len = ptr - key_size_buf;
Status s = file->Append(Slice(key_size_buf, len));
if (!s.ok()) {
return s;
}
*offset += len;
}
} else {
assert(encoding_type_ == kPrefix);
char size_bytes[12];
size_t size_bytes_pos = 0;
user_key_size = key.size() - 8;
Slice prefix =
prefix_extractor_->Transform(Slice(key.data(), user_key_size));
if (key_count_for_prefix == 0 || prefix != pre_prefix_.GetKey() ||
key_count_for_prefix % index_sparseness_ == 0) {
key_count_for_prefix = 1;
pre_prefix_.SetKey(prefix);
size_bytes_pos += EncodeSize(kFullKey, user_key_size, size_bytes);
Status s = file->Append(Slice(size_bytes, size_bytes_pos));
if (!s.ok()) {
return s;
}
*offset += size_bytes_pos;
} else {
key_count_for_prefix++;
if (key_count_for_prefix == 2) {
// For second key within a prefix, need to encode prefix length
size_bytes_pos +=
EncodeSize(kPrefixFromPreviousKey, pre_prefix_.GetKey().size(),
size_bytes + size_bytes_pos);
}
size_t prefix_len = pre_prefix_.GetKey().size();
size_bytes_pos += EncodeSize(kKeySuffix, user_key_size - prefix_len,
size_bytes + size_bytes_pos);
Status s = file->Append(Slice(size_bytes, size_bytes_pos));
if (!s.ok()) {
return s;
}
*offset += size_bytes_pos;
key_to_write = Slice(key.data() + prefix_len, key.size() - prefix_len);
}
}
// Encode full key
// For value size as varint32 (up to 5 bytes).
// If the row is of value type with seqId 0, flush the special flag together
// in this buffer to safe one file append call, which takes 1 byte.
if (parsed_key.sequence == 0 && parsed_key.type == kTypeValue) {
Status s =
file->Append(Slice(key_to_write.data(), key_to_write.size() - 8));
if (!s.ok()) {
return s;
}
*offset += key_to_write.size() - 8;
meta_bytes_buf[*meta_bytes_buf_size] = PlainTableFactory::kValueTypeSeqId0;
*meta_bytes_buf_size += 1;
} else {
file->Append(key_to_write);
*offset += key_to_write.size();
}
return Status::OK();
}
namespace {
Status ReadInternalKey(const char* key_ptr, const char* limit,
uint32_t user_key_size, ParsedInternalKey* parsed_key,
size_t* bytes_read, bool* internal_key_valid,
Slice* internal_key) {
if (key_ptr + user_key_size + 1 >= limit) {
return Status::Corruption("Unexpected EOF when reading the next key");
}
if (*(key_ptr + user_key_size) == PlainTableFactory::kValueTypeSeqId0) {
// Special encoding for the row with seqID=0
parsed_key->user_key = Slice(key_ptr, user_key_size);
parsed_key->sequence = 0;
parsed_key->type = kTypeValue;
*bytes_read += user_key_size + 1;
*internal_key_valid = false;
} else {
if (key_ptr + user_key_size + 8 >= limit) {
return Status::Corruption(
"Unexpected EOF when reading internal bytes of the next key");
}
*internal_key_valid = true;
*internal_key = Slice(key_ptr, user_key_size + 8);
if (!ParseInternalKey(*internal_key, parsed_key)) {
return Status::Corruption(
Slice("Incorrect value type found when reading the next key"));
}
*bytes_read += user_key_size + 8;
}
return Status::OK();
}
} // namespace
Status PlainTableKeyDecoder::NextPlainEncodingKey(
const char* start, const char* limit, ParsedInternalKey* parsed_key,
Slice* internal_key, size_t* bytes_read, bool* seekable) {
const char* key_ptr = start;
size_t user_key_size = 0;
if (fixed_user_key_len_ != kPlainTableVariableLength) {
user_key_size = fixed_user_key_len_;
key_ptr = start;
} else {
uint32_t tmp_size = 0;
key_ptr = GetVarint32Ptr(start, limit, &tmp_size);
if (key_ptr == nullptr) {
return Status::Corruption(
"Unexpected EOF when reading the next key's size");
}
user_key_size = static_cast<size_t>(tmp_size);
*bytes_read = key_ptr - start;
}
bool decoded_internal_key_valid;
Slice decoded_internal_key;
Status s =
ReadInternalKey(key_ptr, limit, user_key_size, parsed_key, bytes_read,
&decoded_internal_key_valid, &decoded_internal_key);
if (!s.ok()) {
return s;
}
if (internal_key != nullptr) {
if (decoded_internal_key_valid) {
*internal_key = decoded_internal_key;
} else {
// Need to copy out the internal key
cur_key_.SetInternalKey(*parsed_key);
*internal_key = cur_key_.GetKey();
}
}
return Status::OK();
}
Status PlainTableKeyDecoder::NextPrefixEncodingKey(
const char* start, const char* limit, ParsedInternalKey* parsed_key,
Slice* internal_key, size_t* bytes_read, bool* seekable) {
const char* key_ptr = start;
EntryType entry_type;
bool expect_suffix = false;
do {
size_t size = 0;
bool decoded_internal_key_valid;
const char* pos = DecodeSize(key_ptr, limit, &entry_type, &size);
if (pos == nullptr) {
return Status::Corruption("Unexpected EOF when reading size of the key");
}
*bytes_read += pos - key_ptr;
key_ptr = pos;
switch (entry_type) {
case kFullKey: {
expect_suffix = false;
Slice decoded_internal_key;
Status s =
ReadInternalKey(key_ptr, limit, size, parsed_key, bytes_read,
&decoded_internal_key_valid, &decoded_internal_key);
if (!s.ok()) {
return s;
}
saved_user_key_ = parsed_key->user_key;
if (internal_key != nullptr) {
if (decoded_internal_key_valid) {
*internal_key = decoded_internal_key;
} else {
cur_key_.SetInternalKey(*parsed_key);
*internal_key = cur_key_.GetKey();
}
}
break;
}
case kPrefixFromPreviousKey: {
if (seekable != nullptr) {
*seekable = false;
}
prefix_len_ = size;
assert(prefix_extractor_ == nullptr ||
prefix_extractor_->Transform(saved_user_key_).size() ==
prefix_len_);
// Need read another size flag for suffix
expect_suffix = true;
break;
}
case kKeySuffix: {
expect_suffix = false;
if (seekable != nullptr) {
*seekable = false;
}
assert(prefix_len_ >= 0);
cur_key_.Reserve(prefix_len_ + size);
Slice tmp_slice;
Status s = ReadInternalKey(key_ptr, limit, size, parsed_key, bytes_read,
&decoded_internal_key_valid, &tmp_slice);
if (!s.ok()) {
return s;
}
cur_key_.SetInternalKey(Slice(saved_user_key_.data(), prefix_len_),
*parsed_key);
assert(
prefix_extractor_ == nullptr ||
prefix_extractor_->Transform(ExtractUserKey(cur_key_.GetKey())) ==
Slice(saved_user_key_.data(), prefix_len_));
parsed_key->user_key = ExtractUserKey(cur_key_.GetKey());
if (internal_key != nullptr) {
*internal_key = cur_key_.GetKey();
}
break;
}
default:
return Status::Corruption("Identified size flag.");
}
} while (expect_suffix); // Another round if suffix is expected.
return Status::OK();
}
Status PlainTableKeyDecoder::NextKey(const char* start, const char* limit,
ParsedInternalKey* parsed_key,
Slice* internal_key, size_t* bytes_read,
bool* seekable) {
*bytes_read = 0;
if (seekable != nullptr) {
*seekable = true;
}
if (encoding_type_ == kPlain) {
return NextPlainEncodingKey(start, limit, parsed_key, internal_key,
bytes_read, seekable);
} else {
assert(encoding_type_ == kPrefix);
return NextPrefixEncodingKey(start, limit, parsed_key, internal_key,
bytes_read, seekable);
}
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

View File

@ -0,0 +1,97 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef ROCKSDB_LITE
#pragma once
#include "rocksdb/slice.h"
#include "db/dbformat.h"
namespace rocksdb {
class WritableFile;
class ParsedInternalKey;
// Helper class to write out a key to an output file
// Actual data format of the key is documented in plain_table_factory.h
class PlainTableKeyEncoder {
public:
explicit PlainTableKeyEncoder(EncodingType encoding_type,
uint32_t user_key_len,
const SliceTransform* prefix_extractor,
size_t index_sparseness)
: encoding_type_((prefix_extractor != nullptr) ? encoding_type : kPlain),
fixed_user_key_len_(user_key_len),
prefix_extractor_(prefix_extractor),
index_sparseness_((index_sparseness > 1) ? index_sparseness : 1),
key_count_for_prefix(0) {}
// key: the key to write out, in the format of internal key.
// file: the output file to write out
// offset: offset in the file. Needs to be updated after appending bytes
// for the key
// meta_bytes_buf: buffer for extra meta bytes
// meta_bytes_buf_size: offset to append extra meta bytes. Will be updated
// if meta_bytes_buf is updated.
Status AppendKey(const Slice& key, WritableFile* file, uint64_t* offset,
char* meta_bytes_buf, size_t* meta_bytes_buf_size);
// Return actual encoding type to be picked
EncodingType GetEncodingType() { return encoding_type_; }
private:
EncodingType encoding_type_;
uint32_t fixed_user_key_len_;
const SliceTransform* prefix_extractor_;
const size_t index_sparseness_;
size_t key_count_for_prefix;
IterKey pre_prefix_;
};
// A helper class to decode keys from input buffer
// Actual data format of the key is documented in plain_table_factory.h
class PlainTableKeyDecoder {
public:
explicit PlainTableKeyDecoder(EncodingType encoding_type,
uint32_t user_key_len,
const SliceTransform* prefix_extractor)
: encoding_type_(encoding_type),
prefix_len_(0),
fixed_user_key_len_(user_key_len),
prefix_extractor_(prefix_extractor),
in_prefix_(false) {}
// Find the next key.
// start: char array where the key starts.
// limit: boundary of the char array
// parsed_key: the output of the result key
// internal_key: if not null, fill with the output of the result key in
// un-parsed format
// bytes_read: how many bytes read from start. Output
// seekable: whether key can be read from this place. Used when building
// indexes. Output.
Status NextKey(const char* start, const char* limit,
ParsedInternalKey* parsed_key, Slice* internal_key,
size_t* bytes_read, bool* seekable = nullptr);
EncodingType encoding_type_;
uint32_t prefix_len_;
uint32_t fixed_user_key_len_;
Slice saved_user_key_;
IterKey cur_key_;
const SliceTransform* prefix_extractor_;
bool in_prefix_;
private:
Status NextPlainEncodingKey(const char* start, const char* limit,
ParsedInternalKey* parsed_key,
Slice* internal_key, size_t* bytes_read,
bool* seekable = nullptr);
Status NextPrefixEncodingKey(const char* start, const char* limit,
ParsedInternalKey* parsed_key,
Slice* internal_key, size_t* bytes_read,
bool* seekable = nullptr);
};
} // namespace rocksdb
#endif // ROCKSDB_LITE

View File

@ -23,6 +23,7 @@
#include "table/meta_blocks.h"
#include "table/two_level_iterator.h"
#include "table/plain_table_factory.h"
#include "table/plain_table_key_coding.h"
#include "util/arena.h"
#include "util/coding.h"
@ -43,6 +44,7 @@ inline uint32_t GetSliceHash(const Slice& s) {
}
inline uint32_t GetBucketIdFromHash(uint32_t hash, uint32_t num_buckets) {
assert(num_buckets >= 0);
return hash % num_buckets;
}
@ -51,7 +53,6 @@ inline uint32_t GetBucketIdFromHash(uint32_t hash, uint32_t num_buckets) {
inline uint32_t GetFixed32Element(const char* base, size_t offset) {
return DecodeFixed32(base + offset * sizeof(uint32_t));
}
} // namespace
// Iterator to iterate IndexedTable
@ -80,10 +81,11 @@ class PlainTableIterator : public Iterator {
private:
PlainTableReader* table_;
PlainTableKeyDecoder decoder_;
bool use_prefix_seek_;
uint32_t offset_;
uint32_t next_offset_;
IterKey key_;
Slice key_;
Slice value_;
Status status_;
// No copying allowed
@ -96,9 +98,11 @@ PlainTableReader::PlainTableReader(const Options& options,
unique_ptr<RandomAccessFile>&& file,
const EnvOptions& storage_options,
const InternalKeyComparator& icomparator,
EncodingType encoding_type,
uint64_t file_size,
const TableProperties* table_properties)
: internal_comparator_(icomparator),
encoding_type_(encoding_type),
data_end_offset_(table_properties->data_size),
user_key_len_(table_properties->fixed_key_len),
prefix_extractor_(options.prefix_extractor.get()),
@ -120,7 +124,7 @@ Status PlainTableReader::Open(const Options& options,
unique_ptr<TableReader>* table_reader,
const int bloom_bits_per_key,
double hash_table_ratio, size_t index_sparseness,
size_t huge_page_tlb_size) {
size_t huge_page_tlb_size, bool full_scan_mode) {
assert(options.allow_mmap_reads);
if (file_size > kMaxFileSize) {
@ -135,17 +139,53 @@ Status PlainTableReader::Open(const Options& options,
}
assert(hash_table_ratio >= 0.0);
std::unique_ptr<PlainTableReader> new_reader(
new PlainTableReader(options, std::move(file), soptions,
internal_comparator, file_size, props));
auto& user_props = props->user_collected_properties;
auto prefix_extractor_in_file =
user_props.find(PlainTablePropertyNames::kPrefixExtractorName);
// -- Populate Index
s = new_reader->PopulateIndex(props, bloom_bits_per_key, hash_table_ratio,
index_sparseness, huge_page_tlb_size);
if (!full_scan_mode && prefix_extractor_in_file != user_props.end()) {
if (!options.prefix_extractor) {
return Status::InvalidArgument(
"Prefix extractor is missing when opening a PlainTable built "
"using a prefix extractor");
} else if (prefix_extractor_in_file->second.compare(
options.prefix_extractor->Name()) != 0) {
return Status::InvalidArgument(
"Prefix extractor given doesn't match the one used to build "
"PlainTable");
}
}
EncodingType encoding_type = kPlain;
auto encoding_type_prop =
user_props.find(PlainTablePropertyNames::kEncodingType);
if (encoding_type_prop != user_props.end()) {
encoding_type = static_cast<EncodingType>(
DecodeFixed32(encoding_type_prop->second.c_str()));
}
std::unique_ptr<PlainTableReader> new_reader(new PlainTableReader(
options, std::move(file), soptions, internal_comparator, encoding_type,
file_size, props));
s = new_reader->MmapDataFile();
if (!s.ok()) {
return s;
}
// -- Populate Index
if (!full_scan_mode) {
s = new_reader->PopulateIndex(props, bloom_bits_per_key, hash_table_ratio,
index_sparseness, huge_page_tlb_size);
if (!s.ok()) {
return s;
}
} else {
// Flag to indicate it is a full scan mode so that none of the indexes
// can be used.
new_reader->index_size_ = kFullScanModeFlag;
}
*table_reader = std::move(new_reader);
return s;
}
@ -156,11 +196,10 @@ void PlainTableReader::SetupForCompaction() {
Iterator* PlainTableReader::NewIterator(const ReadOptions& options,
Arena* arena) {
if (arena == nullptr) {
return new PlainTableIterator(this, options_.prefix_extractor != nullptr);
return new PlainTableIterator(this, prefix_extractor_ != nullptr);
} else {
auto mem = arena->AllocateAligned(sizeof(PlainTableIterator));
return new (mem)
PlainTableIterator(this, options_.prefix_extractor != nullptr);
return new (mem) PlainTableIterator(this, prefix_extractor_ != nullptr);
}
}
@ -234,11 +273,15 @@ Status PlainTableReader::PopulateIndexRecordList(IndexRecordList* record_list,
// are in order.
*num_prefixes = 0;
PlainTableKeyDecoder decoder(encoding_type_, user_key_len_,
options_.prefix_extractor.get());
bool due_index = false;
while (pos < data_end_offset_) {
uint32_t key_offset = pos;
ParsedInternalKey key;
Slice value_slice;
Status s = Next(&pos, &key, &value_slice);
bool seekable = false;
Status s = Next(&decoder, &pos, &key, nullptr, &value_slice, &seekable);
if (!s.ok()) {
return s;
}
@ -256,12 +299,21 @@ Status PlainTableReader::PopulateIndexRecordList(IndexRecordList* record_list,
num_keys_per_prefix = 0;
prev_key_prefix_slice = key_prefix_slice;
prev_key_prefix_hash = GetSliceHash(key_prefix_slice);
due_index = true;
}
if (index_sparseness == 0 ||
num_keys_per_prefix++ % index_sparseness == 0) {
if (due_index) {
if (!seekable) {
return Status::Corruption("Key for a prefix is not seekable");
}
// Add an index key for every kIndexIntervalForSamePrefixKeys keys
record_list->AddRecord(prev_key_prefix_hash, key_offset);
due_index = false;
}
num_keys_per_prefix++;
if (index_sparseness == 0 || num_keys_per_prefix % index_sparseness == 0) {
due_index = true;
}
is_first_record = false;
}
@ -381,6 +433,11 @@ void PlainTableReader::FillIndexes(
index_size_, kSubIndexSize);
}
Status PlainTableReader::MmapDataFile() {
// Get mmapped memory to file_data_.
return file_->Read(0, file_size_, &file_data_, nullptr);
}
Status PlainTableReader::PopulateIndex(TableProperties* props,
int bloom_bits_per_key,
double hash_table_ratio,
@ -395,12 +452,6 @@ Status PlainTableReader::PopulateIndex(TableProperties* props,
"PlainTable requires a prefix extractor enable prefix hash mode.");
}
// Get mmapped memory to file_data_.
Status s = file_->Read(0, file_size_, &file_data_, nullptr);
if (!s.ok()) {
return s;
}
IndexRecordList record_list(kRecordsPerGroup);
// First, read the whole file, for every kIndexIntervalForSamePrefixKeys rows
// for a prefix (starting from the first one), generate a record of (hash,
@ -419,8 +470,8 @@ Status PlainTableReader::PopulateIndex(TableProperties* props,
}
}
s = PopulateIndexRecordList(&record_list, &num_prefixes, bloom_bits_per_key,
index_sparseness);
Status s = PopulateIndexRecordList(&record_list, &num_prefixes,
bloom_bits_per_key, index_sparseness);
if (!s.ok()) {
return s;
}
@ -484,7 +535,11 @@ Status PlainTableReader::GetOffset(const Slice& target, const Slice& prefix,
uint32_t mid = (high + low) / 2;
uint32_t file_offset = GetFixed32Element(base_ptr, mid);
size_t tmp;
Status s = ReadKey(file_data_.data() + file_offset, &mid_key, &tmp);
Status s = PlainTableKeyDecoder(encoding_type_, user_key_len_,
options_.prefix_extractor.get())
.NextKey(file_data_.data() + file_offset,
file_data_.data() + data_end_offset_, &mid_key,
nullptr, &tmp);
if (!s.ok()) {
return s;
}
@ -509,7 +564,15 @@ Status PlainTableReader::GetOffset(const Slice& target, const Slice& prefix,
ParsedInternalKey low_key;
size_t tmp;
uint32_t low_key_offset = GetFixed32Element(base_ptr, low);
Status s = ReadKey(file_data_.data() + low_key_offset, &low_key, &tmp);
Status s = PlainTableKeyDecoder(encoding_type_, user_key_len_,
options_.prefix_extractor.get())
.NextKey(file_data_.data() + low_key_offset,
file_data_.data() + data_end_offset_, &low_key,
nullptr, &tmp);
if (!s.ok()) {
return s;
}
if (GetPrefix(low_key) == prefix) {
prefix_matched = true;
*offset = low_key_offset;
@ -533,52 +596,10 @@ Slice PlainTableReader::GetPrefix(const ParsedInternalKey& target) const {
return GetPrefixFromUserKey(target.user_key);
}
Status PlainTableReader::ReadKey(const char* start, ParsedInternalKey* key,
size_t* bytes_read) const {
const char* key_ptr = nullptr;
*bytes_read = 0;
size_t user_key_size = 0;
if (IsFixedLength()) {
user_key_size = user_key_len_;
key_ptr = start;
} else {
uint32_t tmp_size = 0;
key_ptr =
GetVarint32Ptr(start, file_data_.data() + data_end_offset_, &tmp_size);
if (key_ptr == nullptr) {
return Status::Corruption(
"Unexpected EOF when reading the next key's size");
}
user_key_size = (size_t)tmp_size;
*bytes_read = key_ptr - start;
}
if (key_ptr + user_key_size + 1 >= file_data_.data() + data_end_offset_) {
return Status::Corruption("Unexpected EOF when reading the next key");
}
if (*(key_ptr + user_key_size) == PlainTableFactory::kValueTypeSeqId0) {
// Special encoding for the row with seqID=0
key->user_key = Slice(key_ptr, user_key_size);
key->sequence = 0;
key->type = kTypeValue;
*bytes_read += user_key_size + 1;
} else {
if (start + user_key_size + 8 >= file_data_.data() + data_end_offset_) {
return Status::Corruption(
"Unexpected EOF when reading internal bytes of the next key");
}
if (!ParseInternalKey(Slice(key_ptr, user_key_size + 8), key)) {
return Status::Corruption(
Slice("Incorrect value type found when reading the next key"));
}
*bytes_read += user_key_size + 8;
}
return Status::OK();
}
Status PlainTableReader::Next(uint32_t* offset, ParsedInternalKey* key,
Slice* value) const {
Status PlainTableReader::Next(PlainTableKeyDecoder* decoder, uint32_t* offset,
ParsedInternalKey* parsed_key,
Slice* internal_key, Slice* value,
bool* seekable) const {
if (*offset == data_end_offset_) {
*offset = data_end_offset_;
return Status::OK();
@ -590,7 +611,9 @@ Status PlainTableReader::Next(uint32_t* offset, ParsedInternalKey* key,
const char* start = file_data_.data() + *offset;
size_t bytes_for_key;
Status s = ReadKey(start, key, &bytes_for_key);
Status s =
decoder->NextKey(start, file_data_.data() + data_end_offset_, parsed_key,
internal_key, &bytes_for_key, seekable);
if (!s.ok()) {
return s;
}
@ -626,6 +649,11 @@ Status PlainTableReader::Get(const ReadOptions& ro, const Slice& target,
Slice prefix_slice;
uint32_t prefix_hash;
if (IsTotalOrderMode()) {
if (index_size_ == kFullScanModeFlag) {
// Full Scan Mode
status_ =
Status::InvalidArgument("Get() is not allowed in full scan mode.");
}
// Match whole user key for bloom filter check.
if (!MatchBloom(GetSliceHash(GetUserKey(target)))) {
return Status::OK();
@ -655,8 +683,10 @@ Status PlainTableReader::Get(const ReadOptions& ro, const Slice& target,
}
Slice found_value;
PlainTableKeyDecoder decoder(encoding_type_, user_key_len_,
options_.prefix_extractor.get());
while (offset < data_end_offset_) {
Status s = Next(&offset, &found_key, &found_value);
Status s = Next(&decoder, &offset, &found_key, nullptr, &found_value);
if (!s.ok()) {
return s;
}
@ -683,7 +713,10 @@ uint64_t PlainTableReader::ApproximateOffsetOf(const Slice& key) {
PlainTableIterator::PlainTableIterator(PlainTableReader* table,
bool use_prefix_seek)
: table_(table), use_prefix_seek_(use_prefix_seek) {
: table_(table),
decoder_(table_->encoding_type_, table_->user_key_len_,
table_->prefix_extractor_),
use_prefix_seek_(use_prefix_seek) {
next_offset_ = offset_ = table_->data_end_offset_;
}
@ -712,12 +745,21 @@ void PlainTableIterator::SeekToLast() {
void PlainTableIterator::Seek(const Slice& target) {
// If the user doesn't set prefix seek option and we are not able to do a
// total Seek(). assert failure.
if (!use_prefix_seek_ && table_->index_size_ > 1) {
assert(false);
status_ = Status::NotSupported(
"PlainTable cannot issue non-prefix seek unless in total order mode.");
offset_ = next_offset_ = table_->data_end_offset_;
return;
if (!use_prefix_seek_) {
if (table_->index_size_ == PlainTableReader::kFullScanModeFlag) {
// Full Scan Mode.
status_ =
Status::InvalidArgument("Seek() is not allowed in full scan mode.");
offset_ = next_offset_ = table_->data_end_offset_;
return;
} else if (table_->index_size_ > 1) {
assert(false);
status_ = Status::NotSupported(
"PlainTable cannot issue non-prefix seek unless in total order "
"mode.");
offset_ = next_offset_ = table_->data_end_offset_;
return;
}
}
Slice prefix_slice = table_->GetPrefix(target);
@ -762,11 +804,9 @@ void PlainTableIterator::Next() {
if (offset_ < table_->data_end_offset_) {
Slice tmp_slice;
ParsedInternalKey parsed_key;
status_ = table_->Next(&next_offset_, &parsed_key, &value_);
if (status_.ok()) {
// Make a copy in this case. TODO optimize.
key_.SetInternalKey(parsed_key);
} else {
status_ =
table_->Next(&decoder_, &next_offset_, &parsed_key, &key_, &value_);
if (!status_.ok()) {
offset_ = next_offset_ = table_->data_end_offset_;
}
}
@ -778,7 +818,7 @@ void PlainTableIterator::Prev() {
Slice PlainTableIterator::key() const {
assert(Valid());
return key_.GetKey();
return key_;
}
Slice PlainTableIterator::value() const {

View File

@ -33,6 +33,7 @@ struct ReadOptions;
class TableCache;
class TableReader;
class InternalKeyComparator;
class PlainTableKeyDecoder;
using std::unique_ptr;
using std::unordered_map;
@ -53,7 +54,8 @@ class PlainTableReader: public TableReader {
unique_ptr<RandomAccessFile>&& file, uint64_t file_size,
unique_ptr<TableReader>* table,
const int bloom_bits_per_key, double hash_table_ratio,
size_t index_sparseness, size_t huge_page_tlb_size);
size_t index_sparseness, size_t huge_page_tlb_size,
bool full_scan_mode);
Iterator* NewIterator(const ReadOptions&, Arena* arena = nullptr) override;
@ -75,7 +77,8 @@ class PlainTableReader: public TableReader {
PlainTableReader(const Options& options, unique_ptr<RandomAccessFile>&& file,
const EnvOptions& storage_options,
const InternalKeyComparator& internal_comparator,
uint64_t file_size, const TableProperties* table_properties);
EncodingType encoding_type, uint64_t file_size,
const TableProperties* table_properties);
virtual ~PlainTableReader();
protected:
@ -128,6 +131,7 @@ class PlainTableReader: public TableReader {
Status PopulateIndex(TableProperties* props, int bloom_bits_per_key,
double hash_table_ratio, size_t index_sparseness,
size_t huge_page_tlb_size);
Status MmapDataFile();
private:
struct IndexRecord;
@ -143,6 +147,7 @@ class PlainTableReader: public TableReader {
int index_size_ = 0;
char* sub_index_;
const InternalKeyComparator internal_comparator_;
EncodingType encoding_type_;
// represents plain table's current status.
Status status_;
Slice file_data_;
@ -159,6 +164,7 @@ class PlainTableReader: public TableReader {
static const size_t kOffsetLen = sizeof(uint32_t);
static const uint64_t kMaxFileSize = 1u << 31;
static const size_t kRecordsPerGroup = 256;
static const int kFullScanModeFlag = -1;
// Bloom filter is used to rule out non-existent key
bool enable_bloom_;
@ -213,14 +219,17 @@ class PlainTableReader: public TableReader {
const std::vector<uint32_t>& entries_per_bucket,
size_t huge_page_tlb_size);
// Read a plain table key from the position `start`. The read content
// will be written to `key` and the size of read bytes will be populated
// in `bytes_read`.
Status ReadKey(const char* row_ptr, ParsedInternalKey* key,
size_t* bytes_read) const;
// Read the key and value at `offset` to parameters `key` and `value`.
// Read the key and value at `offset` to parameters for keys, the and
// `seekable`.
// On success, `offset` will be updated as the offset for the next key.
Status Next(uint32_t* offset, ParsedInternalKey* key, Slice* value) const;
// `parsed_key` will be key in parsed format.
// if `internal_key` is not empty, it will be filled with key with slice
// format.
// if `seekable` is not null, it will return whether we can directly read
// data using this offset.
Status Next(PlainTableKeyDecoder* decoder, uint32_t* offset,
ParsedInternalKey* parsed_key, Slice* internal_key, Slice* value,
bool* seekable = nullptr) const;
// Get file offset for key target.
// return value prefix_matched is set to true if the offset is confirmed
// for a key with the same prefix as target.

View File

@ -157,9 +157,8 @@ Status SstFileReader::SetTableOptionsByMagicNumber(
} else if (table_magic_number == kPlainTableMagicNumber ||
table_magic_number == kLegacyPlainTableMagicNumber) {
options_.allow_mmap_reads = true;
options_.table_factory = std::make_shared<PlainTableFactory>(
table_properties_->fixed_key_len, 2, 0.8);
options_.prefix_extractor.reset(NewNoopTransform());
options_.table_factory.reset(NewTotalOrderPlainTableFactory(
kPlainTableVariableLength, 0, 1, 0, true));
fprintf(stdout, "Sst file format: plain table\n");
} else {
char error_msg_buffer[80];