PlainTableReader to support non-mmap mode

Summary:
PlainTableReader now only allows mmap-mode. Add the support to non-mmap mode for more flexibility.
Refactor the codes to move all logic of reading data to PlainTableKeyDecoder, and consolidate the calls to Read() call and ReadVarint32() call. Implement the calls for both of mmap and non-mmap case seperately. For non-mmap mode, make copy of keys in several places when we need to move the buffer after reading the keys.

Test Plan: Add the mode of non-mmap case in plain_table_db_test. Run it in valgrind mode too.

Subscribers: leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D47187
This commit is contained in:
sdong 2015-09-16 16:57:43 -07:00
parent d746eaad5e
commit df34aea331
12 changed files with 463 additions and 227 deletions

View File

@ -6275,7 +6275,7 @@ TEST_F(DBTest, TableOptionsSanitizeTest) {
options.table_factory.reset(new PlainTableFactory()); options.table_factory.reset(new PlainTableFactory());
options.prefix_extractor.reset(NewNoopTransform()); options.prefix_extractor.reset(NewNoopTransform());
Destroy(options); Destroy(options);
ASSERT_TRUE(TryReopen(options).IsNotSupported()); ASSERT_TRUE(!TryReopen(options).IsNotSupported());
// Test for check of prefix_extractor when hash index is used for // Test for check of prefix_extractor when hash index is used for
// block-based table // block-based table

View File

@ -42,28 +42,33 @@ using std::unique_ptr;
namespace rocksdb { namespace rocksdb {
class PlainTableDBTest : public testing::Test { class PlainTableDBTest : public testing::Test,
public testing::WithParamInterface<bool> {
protected: protected:
private: private:
std::string dbname_; std::string dbname_;
Env* env_; Env* env_;
DB* db_; DB* db_;
bool mmap_mode_;
Options last_options_; Options last_options_;
public: public:
PlainTableDBTest() : env_(Env::Default()) { PlainTableDBTest() : env_(Env::Default()) {}
dbname_ = test::TmpDir() + "/plain_table_db_test";
EXPECT_OK(DestroyDB(dbname_, Options()));
db_ = nullptr;
Reopen();
}
~PlainTableDBTest() { ~PlainTableDBTest() {
delete db_; delete db_;
EXPECT_OK(DestroyDB(dbname_, Options())); EXPECT_OK(DestroyDB(dbname_, Options()));
} }
void SetUp() override {
mmap_mode_ = GetParam();
dbname_ = test::TmpDir() + "/plain_table_db_test";
EXPECT_OK(DestroyDB(dbname_, Options()));
db_ = nullptr;
Reopen();
}
// Return the current option configuration. // Return the current option configuration.
Options CurrentOptions() { Options CurrentOptions() {
Options options; Options options;
@ -82,7 +87,7 @@ class PlainTableDBTest : public testing::Test {
options.memtable_factory.reset(NewHashLinkListRepFactory(4, 0, 3, true)); options.memtable_factory.reset(NewHashLinkListRepFactory(4, 0, 3, true));
options.prefix_extractor.reset(NewFixedPrefixTransform(8)); options.prefix_extractor.reset(NewFixedPrefixTransform(8));
options.allow_mmap_reads = true; options.allow_mmap_reads = mmap_mode_;
return options; return options;
} }
@ -187,7 +192,7 @@ class PlainTableDBTest : public testing::Test {
} }
}; };
TEST_F(PlainTableDBTest, Empty) { TEST_P(PlainTableDBTest, Empty) {
ASSERT_TRUE(dbfull() != nullptr); ASSERT_TRUE(dbfull() != nullptr);
ASSERT_EQ("NOT_FOUND", Get("0000000000000foo")); ASSERT_EQ("NOT_FOUND", Get("0000000000000foo"));
} }
@ -208,7 +213,7 @@ class TestPlainTableReader : public PlainTableReader {
: PlainTableReader(ioptions, std::move(file), env_options, icomparator, : PlainTableReader(ioptions, std::move(file), env_options, icomparator,
encoding_type, file_size, table_properties), encoding_type, file_size, table_properties),
expect_bloom_not_match_(expect_bloom_not_match) { expect_bloom_not_match_(expect_bloom_not_match) {
Status s = MmapDataFile(); Status s = MmapDataIfNeeded();
EXPECT_TRUE(s.ok()); EXPECT_TRUE(s.ok());
s = PopulateIndex(const_cast<TableProperties*>(table_properties), s = PopulateIndex(const_cast<TableProperties*>(table_properties),
@ -309,7 +314,7 @@ class TestPlainTableFactory : public PlainTableFactory {
bool* expect_bloom_not_match_; bool* expect_bloom_not_match_;
}; };
TEST_F(PlainTableDBTest, Flush) { TEST_P(PlainTableDBTest, Flush) {
for (size_t huge_page_tlb_size = 0; huge_page_tlb_size <= 2 * 1024 * 1024; for (size_t huge_page_tlb_size = 0; huge_page_tlb_size <= 2 * 1024 * 1024;
huge_page_tlb_size += 2 * 1024 * 1024) { huge_page_tlb_size += 2 * 1024 * 1024) {
for (EncodingType encoding_type : {kPlain, kPrefix}) { for (EncodingType encoding_type : {kPlain, kPrefix}) {
@ -396,7 +401,7 @@ TEST_F(PlainTableDBTest, Flush) {
} }
} }
TEST_F(PlainTableDBTest, Flush2) { TEST_P(PlainTableDBTest, Flush2) {
for (size_t huge_page_tlb_size = 0; huge_page_tlb_size <= 2 * 1024 * 1024; for (size_t huge_page_tlb_size = 0; huge_page_tlb_size <= 2 * 1024 * 1024;
huge_page_tlb_size += 2 * 1024 * 1024) { huge_page_tlb_size += 2 * 1024 * 1024) {
for (EncodingType encoding_type : {kPlain, kPrefix}) { for (EncodingType encoding_type : {kPlain, kPrefix}) {
@ -476,7 +481,7 @@ TEST_F(PlainTableDBTest, Flush2) {
} }
} }
TEST_F(PlainTableDBTest, Iterator) { TEST_P(PlainTableDBTest, Iterator) {
for (size_t huge_page_tlb_size = 0; huge_page_tlb_size <= 2 * 1024 * 1024; for (size_t huge_page_tlb_size = 0; huge_page_tlb_size <= 2 * 1024 * 1024;
huge_page_tlb_size += 2 * 1024 * 1024) { huge_page_tlb_size += 2 * 1024 * 1024) {
for (EncodingType encoding_type : {kPlain, kPrefix}) { for (EncodingType encoding_type : {kPlain, kPrefix}) {
@ -610,7 +615,7 @@ std::string MakeLongKey(size_t length, char c) {
} }
} // namespace } // namespace
TEST_F(PlainTableDBTest, IteratorLargeKeys) { TEST_P(PlainTableDBTest, IteratorLargeKeys) {
Options options = CurrentOptions(); Options options = CurrentOptions();
PlainTableOptions plain_table_options; PlainTableOptions plain_table_options;
@ -660,7 +665,7 @@ std::string MakeLongKeyWithPrefix(size_t length, char c) {
} }
} // namespace } // namespace
TEST_F(PlainTableDBTest, IteratorLargeKeysWithPrefix) { TEST_P(PlainTableDBTest, IteratorLargeKeysWithPrefix) {
Options options = CurrentOptions(); Options options = CurrentOptions();
PlainTableOptions plain_table_options; PlainTableOptions plain_table_options;
@ -702,7 +707,7 @@ TEST_F(PlainTableDBTest, IteratorLargeKeysWithPrefix) {
delete iter; delete iter;
} }
TEST_F(PlainTableDBTest, IteratorReverseSuffixComparator) { TEST_P(PlainTableDBTest, IteratorReverseSuffixComparator) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.create_if_missing = true; options.create_if_missing = true;
// Set only one bucket to force bucket conflict. // Set only one bucket to force bucket conflict.
@ -771,7 +776,7 @@ TEST_F(PlainTableDBTest, IteratorReverseSuffixComparator) {
delete iter; delete iter;
} }
TEST_F(PlainTableDBTest, HashBucketConflict) { TEST_P(PlainTableDBTest, HashBucketConflict) {
for (size_t huge_page_tlb_size = 0; huge_page_tlb_size <= 2 * 1024 * 1024; for (size_t huge_page_tlb_size = 0; huge_page_tlb_size <= 2 * 1024 * 1024;
huge_page_tlb_size += 2 * 1024 * 1024) { huge_page_tlb_size += 2 * 1024 * 1024) {
for (unsigned char i = 1; i <= 3; i++) { for (unsigned char i = 1; i <= 3; i++) {
@ -864,7 +869,7 @@ TEST_F(PlainTableDBTest, HashBucketConflict) {
} }
} }
TEST_F(PlainTableDBTest, HashBucketConflictReverseSuffixComparator) { TEST_P(PlainTableDBTest, HashBucketConflictReverseSuffixComparator) {
for (size_t huge_page_tlb_size = 0; huge_page_tlb_size <= 2 * 1024 * 1024; for (size_t huge_page_tlb_size = 0; huge_page_tlb_size <= 2 * 1024 * 1024;
huge_page_tlb_size += 2 * 1024 * 1024) { huge_page_tlb_size += 2 * 1024 * 1024) {
for (unsigned char i = 1; i <= 3; i++) { for (unsigned char i = 1; i <= 3; i++) {
@ -957,7 +962,7 @@ TEST_F(PlainTableDBTest, HashBucketConflictReverseSuffixComparator) {
} }
} }
TEST_F(PlainTableDBTest, NonExistingKeyToNonEmptyBucket) { TEST_P(PlainTableDBTest, NonExistingKeyToNonEmptyBucket) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.create_if_missing = true; options.create_if_missing = true;
// Set only one bucket to force bucket conflict. // Set only one bucket to force bucket conflict.
@ -1013,7 +1018,7 @@ static std::string RandomString(Random* rnd, int len) {
return r; return r;
} }
TEST_F(PlainTableDBTest, CompactionTrigger) { TEST_P(PlainTableDBTest, CompactionTrigger) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.write_buffer_size = 120 << 10; // 100KB options.write_buffer_size = 120 << 10; // 100KB
options.num_levels = 3; options.num_levels = 3;
@ -1048,7 +1053,7 @@ TEST_F(PlainTableDBTest, CompactionTrigger) {
ASSERT_EQ(NumTableFilesAtLevel(1), 1); ASSERT_EQ(NumTableFilesAtLevel(1), 1);
} }
TEST_F(PlainTableDBTest, AdaptiveTable) { TEST_P(PlainTableDBTest, AdaptiveTable) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.create_if_missing = true; options.create_if_missing = true;
@ -1091,6 +1096,8 @@ TEST_F(PlainTableDBTest, AdaptiveTable) {
ASSERT_NE("v5", Get("3000000000000bar")); ASSERT_NE("v5", Get("3000000000000bar"));
} }
INSTANTIATE_TEST_CASE_P(PlainTableDBTest, PlainTableDBTest, ::testing::Bool());
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

View File

@ -45,10 +45,6 @@ class AdaptiveTableFactory : public TableFactory {
// Sanitizes the specified DB Options. // Sanitizes the specified DB Options.
Status SanitizeOptions(const DBOptions& db_opts, Status SanitizeOptions(const DBOptions& db_opts,
const ColumnFamilyOptions& cf_opts) const override { const ColumnFamilyOptions& cf_opts) const override {
if (db_opts.allow_mmap_reads == false) {
return Status::NotSupported(
"AdaptiveTable with allow_mmap_reads == false is not supported.");
}
return Status::OK(); return Status::OK();
} }

View File

@ -168,10 +168,6 @@ class PlainTableFactory : public TableFactory {
// Sanitizes the specified DB Options. // Sanitizes the specified DB Options.
Status SanitizeOptions(const DBOptions& db_opts, Status SanitizeOptions(const DBOptions& db_opts,
const ColumnFamilyOptions& cf_opts) const override { const ColumnFamilyOptions& cf_opts) const override {
if (db_opts.allow_mmap_reads == false) {
return Status::NotSupported(
"PlainTable with allow_mmap_reads == false is not supported.");
}
return Status::OK(); return Status::OK();
} }

View File

@ -6,20 +6,23 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include "table/plain_table_key_coding.h" #include "table/plain_table_key_coding.h"
#include <algorithm>
#include <string>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "table/plain_table_reader.h"
#include "table/plain_table_factory.h" #include "table/plain_table_factory.h"
#include "util/file_reader_writer.h" #include "util/file_reader_writer.h"
namespace rocksdb { namespace rocksdb {
namespace {
enum PlainTableEntryType : unsigned char { enum PlainTableEntryType : unsigned char {
kFullKey = 0, kFullKey = 0,
kPrefixFromPreviousKey = 1, kPrefixFromPreviousKey = 1,
kKeySuffix = 2, kKeySuffix = 2,
}; };
namespace {
// Control byte: // Control byte:
// First two bits indicate type of entry // First two bits indicate type of entry
// Other bytes are inlined sizes. If all bits are 1 (0x03F), overflow bytes // Other bytes are inlined sizes. If all bits are 1 (0x03F), overflow bytes
@ -42,28 +45,40 @@ size_t EncodeSize(PlainTableEntryType type, uint32_t key_size,
return ptr - out_buffer; return ptr - out_buffer;
} }
} }
} // namespace
// Return position after the size byte(s). nullptr means error // Fill bytes_read with number of bytes read.
const char* DecodeSize(const char* offset, const char* limit, inline Status PlainTableKeyDecoder::DecodeSize(uint32_t start_offset,
PlainTableEntryType* entry_type, uint32_t* key_size) { PlainTableEntryType* entry_type,
assert(offset < limit); uint32_t* key_size,
uint32_t* bytes_read) {
Slice next_byte_slice;
bool success = file_reader_.Read(start_offset, 1, &next_byte_slice);
if (!success) {
return file_reader_.status();
}
*entry_type = static_cast<PlainTableEntryType>( *entry_type = static_cast<PlainTableEntryType>(
(static_cast<unsigned char>(offset[0]) & ~kSizeInlineLimit) >> 6); (static_cast<unsigned char>(next_byte_slice[0]) & ~kSizeInlineLimit) >>
char inline_key_size = offset[0] & kSizeInlineLimit; 6);
char inline_key_size = next_byte_slice[0] & kSizeInlineLimit;
if (inline_key_size < kSizeInlineLimit) { if (inline_key_size < kSizeInlineLimit) {
*key_size = inline_key_size; *key_size = inline_key_size;
return offset + 1; *bytes_read = 1;
return Status::OK();
} else { } else {
uint32_t extra_size; uint32_t extra_size;
const char* ptr = GetVarint32Ptr(offset + 1, limit, &extra_size); uint32_t tmp_bytes_read;
if (ptr == nullptr) { success = file_reader_.ReadVarint32(start_offset + 1, &extra_size,
return nullptr; &tmp_bytes_read);
if (!success) {
return file_reader_.status();
} }
assert(tmp_bytes_read > 0);
*key_size = kSizeInlineLimit + extra_size; *key_size = kSizeInlineLimit + extra_size;
return ptr; *bytes_read = tmp_bytes_read + 1;
return Status::OK();
} }
} }
} // namespace
Status PlainTableKeyEncoder::AppendKey(const Slice& key, Status PlainTableKeyEncoder::AppendKey(const Slice& key,
WritableFileWriter* file, WritableFileWriter* file,
@ -149,28 +164,101 @@ Status PlainTableKeyEncoder::AppendKey(const Slice& key,
return Status::OK(); return Status::OK();
} }
namespace { inline bool PlainTableKeyDecoder::FileReader::Read(uint32_t file_offset,
Status ReadInternalKey(const char* key_ptr, const char* limit, uint32_t len, Slice* out) {
uint32_t user_key_size, ParsedInternalKey* parsed_key, if (file_info_->is_mmap_mode) {
size_t* bytes_read, bool* internal_key_valid, assert(file_offset + len <= file_info_->data_end_offset);
Slice* internal_key) { *out = Slice(file_info_->file_data.data() + file_offset, len);
if (key_ptr + user_key_size + 1 >= limit) { return true;
return Status::Corruption("Unexpected EOF when reading the next key"); } else {
return ReadNonMmap(file_offset, len, out);
} }
if (*(key_ptr + user_key_size) == PlainTableFactory::kValueTypeSeqId0) { }
bool PlainTableKeyDecoder::FileReader::ReadNonMmap(uint32_t file_offset,
uint32_t len, Slice* out) {
const uint32_t kPrefetchSize = 256u;
if (file_offset < buf_start_offset_ ||
file_offset + len > buf_start_offset_ + buf_len_) {
// Load buffer
assert(file_offset + len <= file_info_->data_end_offset);
uint32_t size_to_read = std::min(file_info_->data_end_offset - file_offset,
std::max(kPrefetchSize, len));
if (size_to_read > buf_capacity_) {
buf_.reset(new char[size_to_read]);
buf_capacity_ = size_to_read;
buf_len_ = 0;
}
Slice read_result;
Status s = file_info_->file->Read(file_offset, size_to_read, &read_result,
buf_.get());
if (!s.ok()) {
status_ = s;
return false;
}
buf_start_offset_ = file_offset;
buf_len_ = size_to_read;
}
*out = Slice(buf_.get() + (file_offset - buf_start_offset_), len);
return true;
}
inline bool PlainTableKeyDecoder::FileReader::ReadVarint32(
uint32_t offset, uint32_t* out, uint32_t* bytes_read) {
if (file_info_->is_mmap_mode) {
const char* start = file_info_->file_data.data() + offset;
const char* limit =
file_info_->file_data.data() + file_info_->data_end_offset;
const char* key_ptr = GetVarint32Ptr(start, limit, out);
assert(key_ptr != nullptr);
*bytes_read = static_cast<uint32_t>(key_ptr - start);
return true;
} else {
return ReadVarint32NonMmap(offset, out, bytes_read);
}
}
bool PlainTableKeyDecoder::FileReader::ReadVarint32NonMmap(
uint32_t offset, uint32_t* out, uint32_t* bytes_read) {
const char* start;
const char* limit;
const uint32_t kMaxVarInt32Size = 6u;
uint32_t bytes_to_read =
std::min(file_info_->data_end_offset - offset, kMaxVarInt32Size);
Slice bytes;
if (!Read(offset, bytes_to_read, &bytes)) {
return false;
}
start = bytes.data();
limit = bytes.data() + bytes.size();
const char* key_ptr = GetVarint32Ptr(start, limit, out);
*bytes_read =
(key_ptr != nullptr) ? static_cast<uint32_t>(key_ptr - start) : 0;
return true;
}
Status PlainTableKeyDecoder::ReadInternalKey(
uint32_t file_offset, uint32_t user_key_size, ParsedInternalKey* parsed_key,
uint32_t* bytes_read, bool* internal_key_valid, Slice* internal_key) {
Slice tmp_slice;
bool success = file_reader_.Read(file_offset, user_key_size + 1, &tmp_slice);
if (!success) {
return file_reader_.status();
}
if (tmp_slice[user_key_size] == PlainTableFactory::kValueTypeSeqId0) {
// Special encoding for the row with seqID=0 // Special encoding for the row with seqID=0
parsed_key->user_key = Slice(key_ptr, user_key_size); parsed_key->user_key = Slice(tmp_slice.data(), user_key_size);
parsed_key->sequence = 0; parsed_key->sequence = 0;
parsed_key->type = kTypeValue; parsed_key->type = kTypeValue;
*bytes_read += user_key_size + 1; *bytes_read += user_key_size + 1;
*internal_key_valid = false; *internal_key_valid = false;
} else { } else {
if (key_ptr + user_key_size + 8 >= limit) { success = file_reader_.Read(file_offset, user_key_size + 8, internal_key);
return Status::Corruption( if (!success) {
"Unexpected EOF when reading internal bytes of the next key"); return file_reader_.status();
} }
*internal_key_valid = true; *internal_key_valid = true;
*internal_key = Slice(key_ptr, user_key_size + 8);
if (!ParseInternalKey(*internal_key, parsed_key)) { if (!ParseInternalKey(*internal_key, parsed_key)) {
return Status::Corruption( return Status::Corruption(
Slice("Incorrect value type found when reading the next key")); Slice("Incorrect value type found when reading the next key"));
@ -179,36 +267,44 @@ Status ReadInternalKey(const char* key_ptr, const char* limit,
} }
return Status::OK(); return Status::OK();
} }
} // namespace
Status PlainTableKeyDecoder::NextPlainEncodingKey( Status PlainTableKeyDecoder::NextPlainEncodingKey(uint32_t start_offset,
const char* start, const char* limit, ParsedInternalKey* parsed_key, ParsedInternalKey* parsed_key,
Slice* internal_key, size_t* bytes_read, bool* seekable) { Slice* internal_key,
const char* key_ptr = start; uint32_t* bytes_read,
bool* seekable) {
uint32_t user_key_size = 0; uint32_t user_key_size = 0;
Status s;
if (fixed_user_key_len_ != kPlainTableVariableLength) { if (fixed_user_key_len_ != kPlainTableVariableLength) {
user_key_size = fixed_user_key_len_; user_key_size = fixed_user_key_len_;
key_ptr = start;
} else { } else {
uint32_t tmp_size = 0; uint32_t tmp_size = 0;
key_ptr = GetVarint32Ptr(start, limit, &tmp_size); uint32_t tmp_read;
if (key_ptr == nullptr) { bool success =
return Status::Corruption( file_reader_.ReadVarint32(start_offset, &tmp_size, &tmp_read);
"Unexpected EOF when reading the next key's size"); if (!success) {
return file_reader_.status();
} }
assert(tmp_read > 0);
user_key_size = tmp_size; user_key_size = tmp_size;
*bytes_read = key_ptr - start; *bytes_read = tmp_read;
} }
// dummy initial value to avoid compiler complain // dummy initial value to avoid compiler complain
bool decoded_internal_key_valid = true; bool decoded_internal_key_valid = true;
Slice decoded_internal_key; Slice decoded_internal_key;
Status s = s = ReadInternalKey(start_offset + *bytes_read, user_key_size, parsed_key,
ReadInternalKey(key_ptr, limit, user_key_size, parsed_key, bytes_read, bytes_read, &decoded_internal_key_valid,
&decoded_internal_key_valid, &decoded_internal_key); &decoded_internal_key);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
if (internal_key != nullptr) { if (!file_reader_.file_info_->is_mmap_mode) {
cur_key_.SetInternalKey(*parsed_key);
parsed_key->user_key = Slice(cur_key_.GetKey().data(), user_key_size);
if (internal_key != nullptr) {
*internal_key = cur_key_.GetKey();
}
} else if (internal_key != nullptr) {
if (decoded_internal_key_valid) { if (decoded_internal_key_valid) {
*internal_key = decoded_internal_key; *internal_key = decoded_internal_key;
} else { } else {
@ -221,41 +317,55 @@ Status PlainTableKeyDecoder::NextPlainEncodingKey(
} }
Status PlainTableKeyDecoder::NextPrefixEncodingKey( Status PlainTableKeyDecoder::NextPrefixEncodingKey(
const char* start, const char* limit, ParsedInternalKey* parsed_key, uint32_t start_offset, ParsedInternalKey* parsed_key, Slice* internal_key,
Slice* internal_key, size_t* bytes_read, bool* seekable) { uint32_t* bytes_read, bool* seekable) {
const char* key_ptr = start;
PlainTableEntryType entry_type; PlainTableEntryType entry_type;
bool expect_suffix = false; bool expect_suffix = false;
Status s;
do { do {
uint32_t size = 0; uint32_t size = 0;
// dummy initial value to avoid compiler complain // dummy initial value to avoid compiler complain
bool decoded_internal_key_valid = true; bool decoded_internal_key_valid = true;
const char* pos = DecodeSize(key_ptr, limit, &entry_type, &size); uint32_t my_bytes_read = 0;
if (pos == nullptr) { s = DecodeSize(start_offset + *bytes_read, &entry_type, &size,
&my_bytes_read);
if (!s.ok()) {
return s;
}
if (my_bytes_read == 0) {
return Status::Corruption("Unexpected EOF when reading size of the key"); return Status::Corruption("Unexpected EOF when reading size of the key");
} }
*bytes_read += pos - key_ptr; *bytes_read += my_bytes_read;
key_ptr = pos;
switch (entry_type) { switch (entry_type) {
case kFullKey: { case kFullKey: {
expect_suffix = false; expect_suffix = false;
Slice decoded_internal_key; Slice decoded_internal_key;
Status s = s = ReadInternalKey(start_offset + *bytes_read, size, parsed_key,
ReadInternalKey(key_ptr, limit, size, parsed_key, bytes_read, bytes_read, &decoded_internal_key_valid,
&decoded_internal_key_valid, &decoded_internal_key); &decoded_internal_key);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
saved_user_key_ = parsed_key->user_key; if (!file_reader_.file_info_->is_mmap_mode ||
if (internal_key != nullptr) { (internal_key != nullptr && !decoded_internal_key_valid)) {
if (decoded_internal_key_valid) { // In non-mmap mode, always need to make a copy of keys returned to
*internal_key = decoded_internal_key; // users, because after reading value for the key, the key might
} else { // be invalid.
cur_key_.SetInternalKey(*parsed_key); cur_key_.SetInternalKey(*parsed_key);
saved_user_key_ = cur_key_.GetKey();
if (!file_reader_.file_info_->is_mmap_mode) {
parsed_key->user_key = Slice(cur_key_.GetKey().data(), size);
}
if (internal_key != nullptr) {
*internal_key = cur_key_.GetKey(); *internal_key = cur_key_.GetKey();
} }
} else {
if (internal_key != nullptr) {
*internal_key = decoded_internal_key;
}
saved_user_key_ = parsed_key->user_key;
} }
break; break;
} }
@ -276,20 +386,32 @@ Status PlainTableKeyDecoder::NextPrefixEncodingKey(
if (seekable != nullptr) { if (seekable != nullptr) {
*seekable = false; *seekable = false;
} }
cur_key_.Reserve(prefix_len_ + size);
Slice tmp_slice; Slice tmp_slice;
Status s = ReadInternalKey(key_ptr, limit, size, parsed_key, bytes_read, s = ReadInternalKey(start_offset + *bytes_read, size, parsed_key,
&decoded_internal_key_valid, &tmp_slice); bytes_read, &decoded_internal_key_valid,
&tmp_slice);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
cur_key_.SetInternalKey(Slice(saved_user_key_.data(), prefix_len_), if (!file_reader_.file_info_->is_mmap_mode) {
*parsed_key); // In non-mmap mode, we need to make a copy of keys returned to
assert( // users, because after reading value for the key, the key might
prefix_extractor_ == nullptr || // be invalid.
prefix_extractor_->Transform(ExtractUserKey(cur_key_.GetKey())) == // saved_user_key_ points to cur_key_. We are making a copy of
Slice(saved_user_key_.data(), prefix_len_)); // the prefix part to another string, and construct the current
// key from the prefix part and the suffix part back to cur_key_.
std::string tmp =
Slice(saved_user_key_.data(), prefix_len_).ToString();
cur_key_.Reserve(prefix_len_ + size);
cur_key_.SetInternalKey(tmp, *parsed_key);
parsed_key->user_key =
Slice(cur_key_.GetKey().data(), prefix_len_ + size);
} else {
cur_key_.Reserve(prefix_len_ + size);
cur_key_.SetInternalKey(Slice(saved_user_key_.data(), prefix_len_),
*parsed_key);
}
parsed_key->user_key = ExtractUserKey(cur_key_.GetKey()); parsed_key->user_key = ExtractUserKey(cur_key_.GetKey());
if (internal_key != nullptr) { if (internal_key != nullptr) {
*internal_key = cur_key_.GetKey(); *internal_key = cur_key_.GetKey();
@ -297,29 +419,61 @@ Status PlainTableKeyDecoder::NextPrefixEncodingKey(
break; break;
} }
default: default:
return Status::Corruption("Identified size flag."); return Status::Corruption("Un-identified size flag.");
} }
} while (expect_suffix); // Another round if suffix is expected. } while (expect_suffix); // Another round if suffix is expected.
return Status::OK(); return Status::OK();
} }
Status PlainTableKeyDecoder::NextKey(const char* start, const char* limit, Status PlainTableKeyDecoder::NextKey(uint32_t start_offset,
ParsedInternalKey* parsed_key, ParsedInternalKey* parsed_key,
Slice* internal_key, size_t* bytes_read, Slice* internal_key, Slice* value,
bool* seekable) { uint32_t* bytes_read, bool* seekable) {
assert(value != nullptr);
Status s = NextKeyNoValue(start_offset, parsed_key, internal_key, bytes_read,
seekable);
if (s.ok()) {
assert(bytes_read != nullptr);
uint32_t value_size;
uint32_t value_size_bytes;
bool success = file_reader_.ReadVarint32(start_offset + *bytes_read,
&value_size, &value_size_bytes);
if (!success) {
return file_reader_.status();
}
if (value_size_bytes == 0) {
return Status::Corruption(
"Unexpected EOF when reading the next value's size.");
}
*bytes_read += value_size_bytes;
success = file_reader_.Read(start_offset + *bytes_read, value_size, value);
if (!success) {
return file_reader_.status();
}
*bytes_read += value_size;
}
return s;
}
Status PlainTableKeyDecoder::NextKeyNoValue(uint32_t start_offset,
ParsedInternalKey* parsed_key,
Slice* internal_key,
uint32_t* bytes_read,
bool* seekable) {
*bytes_read = 0; *bytes_read = 0;
if (seekable != nullptr) { if (seekable != nullptr) {
*seekable = true; *seekable = true;
} }
Status s;
if (encoding_type_ == kPlain) { if (encoding_type_ == kPlain) {
return NextPlainEncodingKey(start, limit, parsed_key, internal_key, return NextPlainEncodingKey(start_offset, parsed_key, internal_key,
bytes_read, seekable); bytes_read, seekable);
} else { } else {
assert(encoding_type_ == kPrefix); assert(encoding_type_ == kPrefix);
return NextPrefixEncodingKey(start, limit, parsed_key, internal_key, return NextPrefixEncodingKey(start_offset, parsed_key, internal_key,
bytes_read, seekable); bytes_read, seekable);
} }
} }
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE #endif // ROCKSDB_LIT

View File

@ -13,6 +13,8 @@ namespace rocksdb {
class WritableFile; class WritableFile;
struct ParsedInternalKey; struct ParsedInternalKey;
struct PlainTableReaderFileInfo;
enum PlainTableEntryType : unsigned char;
// Helper class to write out a key to an output file // Helper class to write out a key to an output file
// Actual data format of the key is documented in plain_table_factory.h // Actual data format of the key is documented in plain_table_factory.h
@ -53,10 +55,12 @@ class PlainTableKeyEncoder {
// Actual data format of the key is documented in plain_table_factory.h // Actual data format of the key is documented in plain_table_factory.h
class PlainTableKeyDecoder { class PlainTableKeyDecoder {
public: public:
explicit PlainTableKeyDecoder(EncodingType encoding_type, explicit PlainTableKeyDecoder(const PlainTableReaderFileInfo* file_info,
EncodingType encoding_type,
uint32_t user_key_len, uint32_t user_key_len,
const SliceTransform* prefix_extractor) const SliceTransform* prefix_extractor)
: encoding_type_(encoding_type), : file_reader_(file_info),
encoding_type_(encoding_type),
prefix_len_(0), prefix_len_(0),
fixed_user_key_len_(user_key_len), fixed_user_key_len_(user_key_len),
prefix_extractor_(prefix_extractor), prefix_extractor_(prefix_extractor),
@ -70,9 +74,51 @@ class PlainTableKeyDecoder {
// bytes_read: how many bytes read from start. Output // bytes_read: how many bytes read from start. Output
// seekable: whether key can be read from this place. Used when building // seekable: whether key can be read from this place. Used when building
// indexes. Output. // indexes. Output.
Status NextKey(const char* start, const char* limit, Status NextKey(uint32_t start_offset, ParsedInternalKey* parsed_key,
ParsedInternalKey* parsed_key, Slice* internal_key, Slice* internal_key, Slice* value, uint32_t* bytes_read,
size_t* bytes_read, bool* seekable = nullptr); bool* seekable = nullptr);
Status NextKeyNoValue(uint32_t start_offset, ParsedInternalKey* parsed_key,
Slice* internal_key, uint32_t* bytes_read,
bool* seekable = nullptr);
class FileReader {
public:
explicit FileReader(const PlainTableReaderFileInfo* file_info)
: file_info_(file_info),
buf_start_offset_(0),
buf_len_(0),
buf_capacity_(0) {}
// In mmaped mode, the results point to mmaped area of the file, which
// means it is always valid before closing the file.
// In non-mmap mode, the results point to an internal buffer. If the caller
// makes another read call, the results will not be valid. So callers should
// make a copy when needed.
// If return false, status code is stored in status_.
inline bool Read(uint32_t file_offset, uint32_t len, Slice* output);
// If return false, status code is stored in status_.
bool ReadNonMmap(uint32_t file_offset, uint32_t len, Slice* output);
// *bytes_read = 0 means eof. false means failure and status is saved
// in status_. Not directly returning Status to save copying status
// object to map previous performance of mmap mode.
inline bool ReadVarint32(uint32_t offset, uint32_t* output,
uint32_t* bytes_read);
bool ReadVarint32NonMmap(uint32_t offset, uint32_t* output,
uint32_t* bytes_read);
Status status() const { return status_; }
const PlainTableReaderFileInfo* file_info_;
std::unique_ptr<char[]> buf_;
uint32_t buf_start_offset_;
uint32_t buf_len_;
uint32_t buf_capacity_;
Status status_;
};
FileReader file_reader_;
EncodingType encoding_type_; EncodingType encoding_type_;
uint32_t prefix_len_; uint32_t prefix_len_;
uint32_t fixed_user_key_len_; uint32_t fixed_user_key_len_;
@ -82,14 +128,20 @@ class PlainTableKeyDecoder {
bool in_prefix_; bool in_prefix_;
private: private:
Status NextPlainEncodingKey(const char* start, const char* limit, Status NextPlainEncodingKey(uint32_t start_offset,
ParsedInternalKey* parsed_key, ParsedInternalKey* parsed_key,
Slice* internal_key, size_t* bytes_read, Slice* internal_key, uint32_t* bytes_read,
bool* seekable = nullptr); bool* seekable = nullptr);
Status NextPrefixEncodingKey(const char* start, const char* limit, Status NextPrefixEncodingKey(uint32_t start_offset,
ParsedInternalKey* parsed_key, ParsedInternalKey* parsed_key,
Slice* internal_key, size_t* bytes_read, Slice* internal_key, uint32_t* bytes_read,
bool* seekable = nullptr); bool* seekable = nullptr);
Status ReadInternalKey(uint32_t file_offset, uint32_t user_key_size,
ParsedInternalKey* parsed_key, uint32_t* bytes_read,
bool* internal_key_valid, Slice* internal_key);
inline Status DecodeSize(uint32_t start_offset,
PlainTableEntryType* entry_type, uint32_t* key_size,
uint32_t* bytes_read);
}; };
} // namespace rocksdb } // namespace rocksdb

View File

@ -99,13 +99,13 @@ PlainTableReader::PlainTableReader(const ImmutableCFOptions& ioptions,
: internal_comparator_(icomparator), : internal_comparator_(icomparator),
encoding_type_(encoding_type), encoding_type_(encoding_type),
full_scan_mode_(false), full_scan_mode_(false),
data_end_offset_(static_cast<uint32_t>(table_properties->data_size)),
user_key_len_(static_cast<uint32_t>(table_properties->fixed_key_len)), user_key_len_(static_cast<uint32_t>(table_properties->fixed_key_len)),
prefix_extractor_(ioptions.prefix_extractor), prefix_extractor_(ioptions.prefix_extractor),
enable_bloom_(false), enable_bloom_(false),
bloom_(6, nullptr), bloom_(6, nullptr),
file_info_(std::move(file), storage_options,
static_cast<uint32_t>(table_properties->data_size)),
ioptions_(ioptions), ioptions_(ioptions),
file_(std::move(file)),
file_size_(file_size), file_size_(file_size),
table_properties_(nullptr) {} table_properties_(nullptr) {}
@ -121,7 +121,6 @@ Status PlainTableReader::Open(const ImmutableCFOptions& ioptions,
const int bloom_bits_per_key, const int bloom_bits_per_key,
double hash_table_ratio, size_t index_sparseness, double hash_table_ratio, size_t index_sparseness,
size_t huge_page_tlb_size, bool full_scan_mode) { size_t huge_page_tlb_size, bool full_scan_mode) {
assert(ioptions.allow_mmap_reads);
if (file_size > PlainTableIndex::kMaxFileSize) { if (file_size > PlainTableIndex::kMaxFileSize) {
return Status::NotSupported("File is too large for PlainTableReader!"); return Status::NotSupported("File is too large for PlainTableReader!");
} }
@ -163,7 +162,7 @@ Status PlainTableReader::Open(const ImmutableCFOptions& ioptions,
ioptions, std::move(file), env_options, internal_comparator, ioptions, std::move(file), env_options, internal_comparator,
encoding_type, file_size, props)); encoding_type, file_size, props));
s = new_reader->MmapDataFile(); s = new_reader->MmapDataIfNeeded();
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -204,13 +203,14 @@ Iterator* PlainTableReader::NewIterator(const ReadOptions& options,
Status PlainTableReader::PopulateIndexRecordList( Status PlainTableReader::PopulateIndexRecordList(
PlainTableIndexBuilder* index_builder, vector<uint32_t>* prefix_hashes) { PlainTableIndexBuilder* index_builder, vector<uint32_t>* prefix_hashes) {
Slice prev_key_prefix_slice; Slice prev_key_prefix_slice;
std::string prev_key_prefix_buf;
uint32_t pos = data_start_offset_; uint32_t pos = data_start_offset_;
bool is_first_record = true; bool is_first_record = true;
Slice key_prefix_slice; Slice key_prefix_slice;
PlainTableKeyDecoder decoder(encoding_type_, user_key_len_, PlainTableKeyDecoder decoder(&file_info_, encoding_type_, user_key_len_,
ioptions_.prefix_extractor); ioptions_.prefix_extractor);
while (pos < data_end_offset_) { while (pos < file_info_.data_end_offset) {
uint32_t key_offset = pos; uint32_t key_offset = pos;
ParsedInternalKey key; ParsedInternalKey key;
Slice value_slice; Slice value_slice;
@ -228,7 +228,12 @@ Status PlainTableReader::PopulateIndexRecordList(
if (!is_first_record) { if (!is_first_record) {
prefix_hashes->push_back(GetSliceHash(prev_key_prefix_slice)); prefix_hashes->push_back(GetSliceHash(prev_key_prefix_slice));
} }
prev_key_prefix_slice = key_prefix_slice; if (file_info_.is_mmap_mode) {
prev_key_prefix_slice = key_prefix_slice;
} else {
prev_key_prefix_buf = key_prefix_slice.ToString();
prev_key_prefix_slice = prev_key_prefix_buf;
}
} }
} }
@ -268,9 +273,12 @@ void PlainTableReader::FillBloom(vector<uint32_t>* prefix_hashes) {
} }
} }
Status PlainTableReader::MmapDataFile() { Status PlainTableReader::MmapDataIfNeeded() {
// Get mmapped memory to file_data_. if (file_info_.is_mmap_mode) {
return file_->Read(0, file_size_, &file_data_, nullptr); // Get mmapped memory.
return file_info_.file->Read(0, file_size_, &file_info_.file_data, nullptr);
}
return Status::OK();
} }
Status PlainTableReader::PopulateIndex(TableProperties* props, Status PlainTableReader::PopulateIndex(TableProperties* props,
@ -282,31 +290,37 @@ Status PlainTableReader::PopulateIndex(TableProperties* props,
table_properties_.reset(props); table_properties_.reset(props);
BlockContents bloom_block_contents; BlockContents bloom_block_contents;
auto s = ReadMetaBlock(file_.get(), file_size_, kPlainTableMagicNumber, auto s = ReadMetaBlock(file_info_.file.get(), file_size_,
ioptions_.env, BloomBlockBuilder::kBloomBlock, kPlainTableMagicNumber, ioptions_.env,
&bloom_block_contents); BloomBlockBuilder::kBloomBlock, &bloom_block_contents);
bool index_in_file = s.ok(); bool index_in_file = s.ok();
BlockContents index_block_contents; BlockContents index_block_contents;
s = ReadMetaBlock(file_.get(), file_size_, kPlainTableMagicNumber, s = ReadMetaBlock(
ioptions_.env, PlainTableIndexBuilder::kPlainTableIndexBlock, file_info_.file.get(), file_size_, kPlainTableMagicNumber, ioptions_.env,
&index_block_contents); PlainTableIndexBuilder::kPlainTableIndexBlock, &index_block_contents);
index_in_file &= s.ok(); index_in_file &= s.ok();
Slice* bloom_block; Slice* bloom_block;
if (index_in_file) { if (index_in_file) {
// If bloom_block_contents.allocation is not empty (which will be the case
// for non-mmap mode), it holds the alloated memory for the bloom block.
// It needs to be kept alive to keep `bloom_block` valid.
bloom_block_alloc_ = std::move(bloom_block_contents.allocation);
bloom_block = &bloom_block_contents.data; bloom_block = &bloom_block_contents.data;
} else { } else {
bloom_block = nullptr; bloom_block = nullptr;
} }
// index_in_file == true only if there are kBloomBlock and // index_in_file == true only if there are kBloomBlock and
// kPlainTableIndexBlock // kPlainTableIndexBlock in file
// in file
Slice* index_block; Slice* index_block;
if (index_in_file) { if (index_in_file) {
// If index_block_contents.allocation is not empty (which will be the case
// for non-mmap mode), it holds the alloated memory for the index block.
// It needs to be kept alive to keep `index_block` valid.
index_block_alloc_ = std::move(index_block_contents.allocation);
index_block = &index_block_contents.data; index_block = &index_block_contents.data;
} else { } else {
index_block = nullptr; index_block = nullptr;
@ -401,7 +415,7 @@ Status PlainTableReader::GetOffset(const Slice& target, const Slice& prefix,
uint32_t prefix_index_offset; uint32_t prefix_index_offset;
auto res = index_.GetOffset(prefix_hash, &prefix_index_offset); auto res = index_.GetOffset(prefix_hash, &prefix_index_offset);
if (res == PlainTableIndex::kNoPrefixForBucket) { if (res == PlainTableIndex::kNoPrefixForBucket) {
*offset = data_end_offset_; *offset = file_info_.data_end_offset;
return Status::OK(); return Status::OK();
} else if (res == PlainTableIndex::kDirectToFile) { } else if (res == PlainTableIndex::kDirectToFile) {
*offset = prefix_index_offset; *offset = prefix_index_offset;
@ -420,16 +434,15 @@ Status PlainTableReader::GetOffset(const Slice& target, const Slice& prefix,
return Status::Corruption(Slice()); return Status::Corruption(Slice());
} }
PlainTableKeyDecoder decoder(&file_info_, encoding_type_, user_key_len_,
ioptions_.prefix_extractor);
// The key is between [low, high). Do a binary search between it. // The key is between [low, high). Do a binary search between it.
while (high - low > 1) { while (high - low > 1) {
uint32_t mid = (high + low) / 2; uint32_t mid = (high + low) / 2;
uint32_t file_offset = GetFixed32Element(base_ptr, mid); uint32_t file_offset = GetFixed32Element(base_ptr, mid);
size_t tmp; uint32_t tmp;
Status s = PlainTableKeyDecoder(encoding_type_, user_key_len_, Status s = decoder.NextKeyNoValue(file_offset, &mid_key, nullptr, &tmp);
ioptions_.prefix_extractor)
.NextKey(file_data_.data() + file_offset,
file_data_.data() + data_end_offset_, &mid_key,
nullptr, &tmp);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -452,13 +465,9 @@ Status PlainTableReader::GetOffset(const Slice& target, const Slice& prefix,
// prefix as target. We need to rule out one of them to avoid to go // prefix as target. We need to rule out one of them to avoid to go
// to the wrong prefix. // to the wrong prefix.
ParsedInternalKey low_key; ParsedInternalKey low_key;
size_t tmp; uint32_t tmp;
uint32_t low_key_offset = GetFixed32Element(base_ptr, low); uint32_t low_key_offset = GetFixed32Element(base_ptr, low);
Status s = PlainTableKeyDecoder(encoding_type_, user_key_len_, Status s = decoder.NextKeyNoValue(low_key_offset, &low_key, nullptr, &tmp);
ioptions_.prefix_extractor)
.NextKey(file_data_.data() + low_key_offset,
file_data_.data() + data_end_offset_, &low_key,
nullptr, &tmp);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -473,7 +482,7 @@ Status PlainTableReader::GetOffset(const Slice& target, const Slice& prefix,
} else { } else {
// target is larger than a key of the last prefix in this bucket // target is larger than a key of the last prefix in this bucket
// but with a different prefix. Key does not exist. // but with a different prefix. Key does not exist.
*offset = data_end_offset_; *offset = file_info_.data_end_offset;
} }
return Status::OK(); return Status::OK();
} }
@ -482,41 +491,26 @@ bool PlainTableReader::MatchBloom(uint32_t hash) const {
return !enable_bloom_ || bloom_.MayContainHash(hash); return !enable_bloom_ || bloom_.MayContainHash(hash);
} }
Status PlainTableReader::Next(PlainTableKeyDecoder* decoder, uint32_t* offset, Status PlainTableReader::Next(PlainTableKeyDecoder* decoder, uint32_t* offset,
ParsedInternalKey* parsed_key, ParsedInternalKey* parsed_key,
Slice* internal_key, Slice* value, Slice* internal_key, Slice* value,
bool* seekable) const { bool* seekable) const {
if (*offset == data_end_offset_) { if (*offset == file_info_.data_end_offset) {
*offset = data_end_offset_; *offset = file_info_.data_end_offset;
return Status::OK(); return Status::OK();
} }
if (*offset > data_end_offset_) { if (*offset > file_info_.data_end_offset) {
return Status::Corruption("Offset is out of file size"); return Status::Corruption("Offset is out of file size");
} }
const char* start = file_data_.data() + *offset; uint32_t bytes_read;
size_t bytes_for_key; Status s = decoder->NextKey(*offset, parsed_key, internal_key, value,
Status s = &bytes_read, seekable);
decoder->NextKey(start, file_data_.data() + data_end_offset_, parsed_key,
internal_key, &bytes_for_key, seekable);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
uint32_t value_size; *offset = *offset + bytes_read;
const char* value_ptr = GetVarint32Ptr(
start + bytes_for_key, file_data_.data() + data_end_offset_, &value_size);
if (value_ptr == nullptr) {
return Status::Corruption(
"Unexpected EOF when reading the next value's size.");
}
*offset = *offset + static_cast<uint32_t>(value_ptr - start) + value_size;
if (*offset > data_end_offset_) {
return Status::Corruption("Unexpected EOF when reading the next value. ");
}
*value = Slice(value_ptr, value_size);
return Status::OK(); return Status::OK();
} }
@ -556,6 +550,7 @@ Status PlainTableReader::Get(const ReadOptions& ro, const Slice& target,
bool prefix_match; bool prefix_match;
Status s = Status s =
GetOffset(target, prefix_slice, prefix_hash, prefix_match, &offset); GetOffset(target, prefix_slice, prefix_hash, prefix_match, &offset);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -565,9 +560,9 @@ Status PlainTableReader::Get(const ReadOptions& ro, const Slice& target,
return Status::Corruption(Slice()); return Status::Corruption(Slice());
} }
Slice found_value; Slice found_value;
PlainTableKeyDecoder decoder(encoding_type_, user_key_len_, PlainTableKeyDecoder decoder(&file_info_, encoding_type_, user_key_len_,
ioptions_.prefix_extractor); ioptions_.prefix_extractor);
while (offset < data_end_offset_) { while (offset < file_info_.data_end_offset) {
s = Next(&decoder, &offset, &found_key, nullptr, &found_value); s = Next(&decoder, &offset, &found_key, nullptr, &found_value);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
@ -598,24 +593,24 @@ uint64_t PlainTableReader::ApproximateOffsetOf(const Slice& key) {
PlainTableIterator::PlainTableIterator(PlainTableReader* table, PlainTableIterator::PlainTableIterator(PlainTableReader* table,
bool use_prefix_seek) bool use_prefix_seek)
: table_(table), : table_(table),
decoder_(table_->encoding_type_, table_->user_key_len_, decoder_(&table_->file_info_, table_->encoding_type_,
table_->prefix_extractor_), table_->user_key_len_, table_->prefix_extractor_),
use_prefix_seek_(use_prefix_seek) { use_prefix_seek_(use_prefix_seek) {
next_offset_ = offset_ = table_->data_end_offset_; next_offset_ = offset_ = table_->file_info_.data_end_offset;
} }
PlainTableIterator::~PlainTableIterator() { PlainTableIterator::~PlainTableIterator() {
} }
bool PlainTableIterator::Valid() const { bool PlainTableIterator::Valid() const {
return offset_ < table_->data_end_offset_ return offset_ < table_->file_info_.data_end_offset &&
&& offset_ >= table_->data_start_offset_; offset_ >= table_->data_start_offset_;
} }
void PlainTableIterator::SeekToFirst() { void PlainTableIterator::SeekToFirst() {
next_offset_ = table_->data_start_offset_; next_offset_ = table_->data_start_offset_;
if (next_offset_ >= table_->data_end_offset_) { if (next_offset_ >= table_->file_info_.data_end_offset) {
next_offset_ = offset_ = table_->data_end_offset_; next_offset_ = offset_ = table_->file_info_.data_end_offset;
} else { } else {
Next(); Next();
} }
@ -633,14 +628,14 @@ void PlainTableIterator::Seek(const Slice& target) {
if (table_->full_scan_mode_) { if (table_->full_scan_mode_) {
status_ = status_ =
Status::InvalidArgument("Seek() is not allowed in full scan mode."); Status::InvalidArgument("Seek() is not allowed in full scan mode.");
offset_ = next_offset_ = table_->data_end_offset_; offset_ = next_offset_ = table_->file_info_.data_end_offset;
return; return;
} else if (table_->GetIndexSize() > 1) { } else if (table_->GetIndexSize() > 1) {
assert(false); assert(false);
status_ = Status::NotSupported( status_ = Status::NotSupported(
"PlainTable cannot issue non-prefix seek unless in total order " "PlainTable cannot issue non-prefix seek unless in total order "
"mode."); "mode.");
offset_ = next_offset_ = table_->data_end_offset_; offset_ = next_offset_ = table_->file_info_.data_end_offset;
return; return;
} }
} }
@ -651,7 +646,7 @@ void PlainTableIterator::Seek(const Slice& target) {
if (!table_->IsTotalOrderMode()) { if (!table_->IsTotalOrderMode()) {
prefix_hash = GetSliceHash(prefix_slice); prefix_hash = GetSliceHash(prefix_slice);
if (!table_->MatchBloom(prefix_hash)) { if (!table_->MatchBloom(prefix_hash)) {
offset_ = next_offset_ = table_->data_end_offset_; offset_ = next_offset_ = table_->file_info_.data_end_offset;
return; return;
} }
} }
@ -659,16 +654,16 @@ void PlainTableIterator::Seek(const Slice& target) {
status_ = table_->GetOffset(target, prefix_slice, prefix_hash, prefix_match, status_ = table_->GetOffset(target, prefix_slice, prefix_hash, prefix_match,
&next_offset_); &next_offset_);
if (!status_.ok()) { if (!status_.ok()) {
offset_ = next_offset_ = table_->data_end_offset_; offset_ = next_offset_ = table_->file_info_.data_end_offset;
return; return;
} }
if (next_offset_ < table_-> data_end_offset_) { if (next_offset_ < table_->file_info_.data_end_offset) {
for (Next(); status_.ok() && Valid(); Next()) { for (Next(); status_.ok() && Valid(); Next()) {
if (!prefix_match) { if (!prefix_match) {
// Need to verify the first key's prefix // Need to verify the first key's prefix
if (table_->GetPrefix(key()) != prefix_slice) { if (table_->GetPrefix(key()) != prefix_slice) {
offset_ = next_offset_ = table_->data_end_offset_; offset_ = next_offset_ = table_->file_info_.data_end_offset;
break; break;
} }
prefix_match = true; prefix_match = true;
@ -678,19 +673,19 @@ void PlainTableIterator::Seek(const Slice& target) {
} }
} }
} else { } else {
offset_ = table_->data_end_offset_; offset_ = table_->file_info_.data_end_offset;
} }
} }
void PlainTableIterator::Next() { void PlainTableIterator::Next() {
offset_ = next_offset_; offset_ = next_offset_;
if (offset_ < table_->data_end_offset_) { if (offset_ < table_->file_info_.data_end_offset) {
Slice tmp_slice; Slice tmp_slice;
ParsedInternalKey parsed_key; ParsedInternalKey parsed_key;
status_ = status_ =
table_->Next(&decoder_, &next_offset_, &parsed_key, &key_, &value_); table_->Next(&decoder_, &next_offset_, &parsed_key, &key_, &value_);
if (!status_.ok()) { if (!status_.ok()) {
offset_ = next_offset_ = table_->data_end_offset_; offset_ = next_offset_ = table_->file_info_.data_end_offset;
} }
} }
} }

View File

@ -44,6 +44,20 @@ using std::unordered_map;
using std::vector; using std::vector;
extern const uint32_t kPlainTableVariableLength; extern const uint32_t kPlainTableVariableLength;
struct PlainTableReaderFileInfo {
bool is_mmap_mode;
Slice file_data;
uint32_t data_end_offset;
unique_ptr<RandomAccessFileReader> file;
PlainTableReaderFileInfo(unique_ptr<RandomAccessFileReader>&& _file,
const EnvOptions& storage_options,
uint32_t _data_size_offset)
: is_mmap_mode(storage_options.use_mmap_reads),
data_end_offset(_data_size_offset),
file(std::move(_file)) {}
};
// Based on following output file format shown in plain_table_factory.h // Based on following output file format shown in plain_table_factory.h
// When opening the output file, IndexedTableReader creates a hash table // When opening the output file, IndexedTableReader creates a hash table
// from key prefixes to offset of the output file. IndexedTable will decide // from key prefixes to offset of the output file. IndexedTable will decide
@ -108,14 +122,13 @@ class PlainTableReader: public TableReader {
double hash_table_ratio, size_t index_sparseness, double hash_table_ratio, size_t index_sparseness,
size_t huge_page_tlb_size); size_t huge_page_tlb_size);
Status MmapDataFile(); Status MmapDataIfNeeded();
private: private:
const InternalKeyComparator internal_comparator_; const InternalKeyComparator internal_comparator_;
EncodingType encoding_type_; EncodingType encoding_type_;
// represents plain table's current status. // represents plain table's current status.
Status status_; Status status_;
Slice file_data_;
PlainTableIndex index_; PlainTableIndex index_;
bool full_scan_mode_; bool full_scan_mode_;
@ -123,7 +136,6 @@ class PlainTableReader: public TableReader {
// data_start_offset_ and data_end_offset_ defines the range of the // data_start_offset_ and data_end_offset_ defines the range of the
// sst file that stores data. // sst file that stores data.
const uint32_t data_start_offset_ = 0; const uint32_t data_start_offset_ = 0;
const uint32_t data_end_offset_;
const uint32_t user_key_len_; const uint32_t user_key_len_;
const SliceTransform* prefix_extractor_; const SliceTransform* prefix_extractor_;
@ -132,10 +144,12 @@ class PlainTableReader: public TableReader {
// Bloom filter is used to rule out non-existent key // Bloom filter is used to rule out non-existent key
bool enable_bloom_; bool enable_bloom_;
DynamicBloom bloom_; DynamicBloom bloom_;
PlainTableReaderFileInfo file_info_;
Arena arena_; Arena arena_;
std::unique_ptr<char[]> index_block_alloc_;
std::unique_ptr<char[]> bloom_block_alloc_;
const ImmutableCFOptions& ioptions_; const ImmutableCFOptions& ioptions_;
unique_ptr<RandomAccessFileReader> file_;
uint64_t file_size_; uint64_t file_size_;
std::shared_ptr<const TableProperties> table_properties_; std::shared_ptr<const TableProperties> table_properties_;

View File

@ -79,20 +79,20 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options,
+ "/rocksdb_table_reader_benchmark"; + "/rocksdb_table_reader_benchmark";
std::string dbname = test::TmpDir() + "/rocksdb_table_reader_bench_db"; std::string dbname = test::TmpDir() + "/rocksdb_table_reader_bench_db";
WriteOptions wo; WriteOptions wo;
unique_ptr<WritableFile> file;
Env* env = Env::Default(); Env* env = Env::Default();
TableBuilder* tb = nullptr; TableBuilder* tb = nullptr;
DB* db = nullptr; DB* db = nullptr;
Status s; Status s;
const ImmutableCFOptions ioptions(opts); const ImmutableCFOptions ioptions(opts);
unique_ptr<WritableFileWriter> file_writer;
if (!through_db) { if (!through_db) {
unique_ptr<WritableFile> file;
env->NewWritableFile(file_name, &file, env_options); env->NewWritableFile(file_name, &file, env_options);
std::vector<std::unique_ptr<IntTblPropCollectorFactory> > std::vector<std::unique_ptr<IntTblPropCollectorFactory> >
int_tbl_prop_collector_factories; int_tbl_prop_collector_factories;
unique_ptr<WritableFileWriter> file_writer( file_writer.reset(new WritableFileWriter(std::move(file), env_options));
new WritableFileWriter(std::move(file), env_options));
tb = opts.table_factory->NewTableBuilder( tb = opts.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, ikc, &int_tbl_prop_collector_factories, TableBuilderOptions(ioptions, ikc, &int_tbl_prop_collector_factories,
@ -117,7 +117,7 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options,
} }
if (!through_db) { if (!through_db) {
tb->Finish(); tb->Finish();
file->Close(); file_writer->Close();
} else { } else {
db->Flush(FlushOptions()); db->Flush(FlushOptions());
} }
@ -126,6 +126,10 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options,
if (!through_db) { if (!through_db) {
unique_ptr<RandomAccessFile> raf; unique_ptr<RandomAccessFile> raf;
s = env->NewRandomAccessFile(file_name, &raf, env_options); s = env->NewRandomAccessFile(file_name, &raf, env_options);
if (!s.ok()) {
fprintf(stderr, "Create File Error: %s\n", s.ToString().c_str());
exit(1);
}
uint64_t file_size; uint64_t file_size;
env->GetFileSize(file_name, &file_size); env->GetFileSize(file_name, &file_size);
unique_ptr<RandomAccessFileReader> file_reader( unique_ptr<RandomAccessFileReader> file_reader(
@ -133,6 +137,10 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options,
s = opts.table_factory->NewTableReader( s = opts.table_factory->NewTableReader(
TableReaderOptions(ioptions, env_options, ikc), std::move(file_reader), TableReaderOptions(ioptions, env_options, ikc), std::move(file_reader),
file_size, &table_reader); file_size, &table_reader);
if (!s.ok()) {
fprintf(stderr, "Open Table Error: %s\n", s.ToString().c_str());
exit(1);
}
} }
Random rnd(301); Random rnd(301);

View File

@ -203,7 +203,7 @@ class BlockConstructor: public Constructor {
// A helper class that converts internal format keys into user keys // A helper class that converts internal format keys into user keys
class KeyConvertingIterator: public Iterator { class KeyConvertingIterator: public Iterator {
public: public:
KeyConvertingIterator(Iterator* iter, bool arena_mode = false) explicit KeyConvertingIterator(Iterator* iter, bool arena_mode = false)
: iter_(iter), arena_mode_(arena_mode) {} : iter_(iter), arena_mode_(arena_mode) {}
virtual ~KeyConvertingIterator() { virtual ~KeyConvertingIterator() {
if (arena_mode_) { if (arena_mode_) {
@ -263,6 +263,7 @@ class TableConstructor: public Constructor {
const InternalKeyComparator& internal_comparator, const InternalKeyComparator& internal_comparator,
const stl_wrappers::KVMap& kv_map) override { const stl_wrappers::KVMap& kv_map) override {
Reset(); Reset();
soptions.use_mmap_reads = ioptions.allow_mmap_reads;
file_writer_.reset(test::GetWritableFileWriter(new test::StringSink())); file_writer_.reset(test::GetWritableFileWriter(new test::StringSink()));
unique_ptr<TableBuilder> builder; unique_ptr<TableBuilder> builder;
std::vector<std::unique_ptr<IntTblPropCollectorFactory>> std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
@ -350,7 +351,7 @@ class TableConstructor: public Constructor {
TableConstructor(); TableConstructor();
static uint64_t cur_uniq_id_; static uint64_t cur_uniq_id_;
const EnvOptions soptions; EnvOptions soptions;
}; };
uint64_t TableConstructor::cur_uniq_id_ = 1; uint64_t TableConstructor::cur_uniq_id_ = 1;
@ -476,6 +477,7 @@ struct TestArgs {
int restart_interval; int restart_interval;
CompressionType compression; CompressionType compression;
uint32_t format_version; uint32_t format_version;
bool use_mmap;
}; };
static std::vector<TestArgs> GenerateArgList() { static std::vector<TestArgs> GenerateArgList() {
@ -521,13 +523,17 @@ static std::vector<TestArgs> GenerateArgList() {
for (auto reverse_compare : reverse_compare_types) { for (auto reverse_compare : reverse_compare_types) {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (test_type == PLAIN_TABLE_SEMI_FIXED_PREFIX || if (test_type == PLAIN_TABLE_SEMI_FIXED_PREFIX ||
test_type == PLAIN_TABLE_FULL_STR_PREFIX) { test_type == PLAIN_TABLE_FULL_STR_PREFIX ||
test_type == PLAIN_TABLE_TOTAL_ORDER) {
// Plain table doesn't use restart index or compression. // Plain table doesn't use restart index or compression.
TestArgs one_arg; TestArgs one_arg;
one_arg.type = test_type; one_arg.type = test_type;
one_arg.reverse_compare = reverse_compare; one_arg.reverse_compare = reverse_compare;
one_arg.restart_interval = restart_intervals[0]; one_arg.restart_interval = restart_intervals[0];
one_arg.compression = compression_types[0].first; one_arg.compression = compression_types[0].first;
one_arg.use_mmap = true;
test_args.push_back(one_arg);
one_arg.use_mmap = false;
test_args.push_back(one_arg); test_args.push_back(one_arg);
continue; continue;
} }
@ -541,6 +547,7 @@ static std::vector<TestArgs> GenerateArgList() {
one_arg.restart_interval = restart_interval; one_arg.restart_interval = restart_interval;
one_arg.compression = compression_type.first; one_arg.compression = compression_type.first;
one_arg.format_version = compression_type.second ? 2 : 1; one_arg.format_version = compression_type.second ? 2 : 1;
one_arg.use_mmap = false;
test_args.push_back(one_arg); test_args.push_back(one_arg);
} }
} }
@ -602,6 +609,7 @@ class HarnessTest : public testing::Test {
support_prev_ = true; support_prev_ = true;
only_support_prefix_seek_ = false; only_support_prefix_seek_ = false;
options_.allow_mmap_reads = args.use_mmap;
switch (args.type) { switch (args.type) {
case BLOCK_BASED_TABLE_TEST: case BLOCK_BASED_TABLE_TEST:
table_options_.flush_block_policy_factory.reset( table_options_.flush_block_policy_factory.reset(
@ -619,7 +627,6 @@ class HarnessTest : public testing::Test {
support_prev_ = false; support_prev_ = false;
only_support_prefix_seek_ = true; only_support_prefix_seek_ = true;
options_.prefix_extractor.reset(new FixedOrLessPrefixTransform(2)); options_.prefix_extractor.reset(new FixedOrLessPrefixTransform(2));
options_.allow_mmap_reads = true;
options_.table_factory.reset(NewPlainTableFactory()); options_.table_factory.reset(NewPlainTableFactory());
constructor_ = new TableConstructor(options_.comparator, true); constructor_ = new TableConstructor(options_.comparator, true);
internal_comparator_.reset( internal_comparator_.reset(
@ -629,7 +636,6 @@ class HarnessTest : public testing::Test {
support_prev_ = false; support_prev_ = false;
only_support_prefix_seek_ = true; only_support_prefix_seek_ = true;
options_.prefix_extractor.reset(NewNoopTransform()); options_.prefix_extractor.reset(NewNoopTransform());
options_.allow_mmap_reads = true;
options_.table_factory.reset(NewPlainTableFactory()); options_.table_factory.reset(NewPlainTableFactory());
constructor_ = new TableConstructor(options_.comparator, true); constructor_ = new TableConstructor(options_.comparator, true);
internal_comparator_.reset( internal_comparator_.reset(
@ -639,7 +645,6 @@ class HarnessTest : public testing::Test {
support_prev_ = false; support_prev_ = false;
only_support_prefix_seek_ = false; only_support_prefix_seek_ = false;
options_.prefix_extractor = nullptr; options_.prefix_extractor = nullptr;
options_.allow_mmap_reads = true;
{ {
PlainTableOptions plain_table_options; PlainTableOptions plain_table_options;

View File

@ -99,7 +99,8 @@ bool DBTestBase::ChangeOptions(int skip_mask) {
if ((skip_mask & kSkipPlainTable) && if ((skip_mask & kSkipPlainTable) &&
(option_config_ == kPlainTableAllBytesPrefix || (option_config_ == kPlainTableAllBytesPrefix ||
option_config_ == kPlainTableFirstBytePrefix || option_config_ == kPlainTableFirstBytePrefix ||
option_config_ == kPlainTableCappedPrefix)) { option_config_ == kPlainTableCappedPrefix ||
option_config_ == kPlainTableCappedPrefixNonMmap)) {
continue; continue;
} }
if ((skip_mask & kSkipHashIndex) && if ((skip_mask & kSkipHashIndex) &&
@ -223,6 +224,13 @@ Options DBTestBase::CurrentOptions(
options.max_sequential_skip_in_iterations = 999999; options.max_sequential_skip_in_iterations = 999999;
set_block_based_table_factory = false; set_block_based_table_factory = false;
break; break;
case kPlainTableCappedPrefixNonMmap:
options.table_factory.reset(new PlainTableFactory());
options.prefix_extractor.reset(NewCappedPrefixTransform(8));
options.allow_mmap_reads = false;
options.max_sequential_skip_in_iterations = 999999;
set_block_based_table_factory = false;
break;
case kPlainTableAllBytesPrefix: case kPlainTableAllBytesPrefix:
options.table_factory.reset(new PlainTableFactory()); options.table_factory.reset(new PlainTableFactory());
options.prefix_extractor.reset(NewNoopTransform()); options.prefix_extractor.reset(NewNoopTransform());

View File

@ -418,32 +418,33 @@ class DBTestBase : public testing::Test {
kBlockBasedTableWithWholeKeyHashIndex = 2, kBlockBasedTableWithWholeKeyHashIndex = 2,
kPlainTableFirstBytePrefix = 3, kPlainTableFirstBytePrefix = 3,
kPlainTableCappedPrefix = 4, kPlainTableCappedPrefix = 4,
kPlainTableAllBytesPrefix = 5, kPlainTableCappedPrefixNonMmap = 5,
kVectorRep = 6, kPlainTableAllBytesPrefix = 6,
kHashLinkList = 7, kVectorRep = 7,
kHashCuckoo = 8, kHashLinkList = 8,
kMergePut = 9, kHashCuckoo = 9,
kFilter = 10, kMergePut = 10,
kFullFilterWithNewTableReaderForCompactions = 11, kFilter = 11,
kUncompressed = 12, kFullFilterWithNewTableReaderForCompactions = 12,
kNumLevel_3 = 13, kUncompressed = 13,
kDBLogDir = 14, kNumLevel_3 = 14,
kWalDirAndMmapReads = 15, kDBLogDir = 15,
kManifestFileSize = 16, kWalDirAndMmapReads = 16,
kPerfOptions = 17, kManifestFileSize = 17,
kDeletesFilterFirst = 18, kPerfOptions = 18,
kHashSkipList = 19, kDeletesFilterFirst = 19,
kUniversalCompaction = 20, kHashSkipList = 20,
kUniversalCompactionMultiLevel = 21, kUniversalCompaction = 21,
kCompressedBlockCache = 22, kUniversalCompactionMultiLevel = 22,
kInfiniteMaxOpenFiles = 23, kCompressedBlockCache = 23,
kxxHashChecksum = 24, kInfiniteMaxOpenFiles = 24,
kFIFOCompaction = 25, kxxHashChecksum = 25,
kOptimizeFiltersForHits = 26, kFIFOCompaction = 26,
kRowCache = 27, kOptimizeFiltersForHits = 27,
kLevelSubcompactions = 28, kRowCache = 28,
kUniversalSubcompactions = 29, kLevelSubcompactions = 29,
kEnd = 28 kUniversalSubcompactions = 30,
kEnd = 29
}; };
int option_config_; int option_config_;