diff --git a/Makefile b/Makefile index be7758de9..6fa2864eb 100644 --- a/Makefile +++ b/Makefile @@ -72,6 +72,7 @@ TESTS = \ merge_test \ redis_test \ reduce_levels_test \ + plain_table_db_test \ simple_table_db_test \ skiplist_test \ stringappend_test \ @@ -90,6 +91,7 @@ TOOLS = \ db_repl_stress \ blob_store_bench + PROGRAMS = db_bench signal_test $(TESTS) $(TOOLS) BENCHMARKS = db_bench_sqlite3 db_bench_tree_db table_reader_bench @@ -260,11 +262,14 @@ crc32c_test: util/crc32c_test.o $(LIBOBJECTS) $(TESTHARNESS) db_test: db/db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) db/db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) +plain_table_db_test: db/plain_table_db_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(CXX) db/plain_table_db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) + simple_table_db_test: db/simple_table_db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) db/simple_table_db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) table_reader_bench: table/table_reader_bench.o $(LIBOBJECTS) $(TESTHARNESS) - $(CXX) table/table_reader_bench.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) + $(CXX) table/table_reader_bench.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) -pg perf_context_test: db/perf_context_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) db/perf_context_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) diff --git a/db/plain_table_db_test.cc b/db/plain_table_db_test.cc new file mode 100644 index 000000000..3697b4c45 --- /dev/null +++ b/db/plain_table_db_test.cc @@ -0,0 +1,332 @@ +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include +#include + +#include "rocksdb/db.h" +#include "rocksdb/filter_policy.h" +#include "db/db_impl.h" +#include "db/filename.h" +#include "db/version_set.h" +#include "db/write_batch_internal.h" +#include "db/db_statistics.h" +#include "rocksdb/cache.h" +#include "rocksdb/compaction_filter.h" +#include "rocksdb/env.h" +#include "rocksdb/table.h" +#include "rocksdb/plain_table_factory.h" +#include "util/hash.h" +#include "util/logging.h" +#include "util/mutexlock.h" +#include "util/testharness.h" +#include "util/testutil.h" +#include "utilities/merge_operators.h" + +using std::unique_ptr; + +namespace rocksdb { + +class PlainTableDBTest { +protected: +public: + std::string dbname_; + Env* env_; + DB* db_; + + Options last_options_; + + PlainTableDBTest() : + env_(Env::Default()) { + dbname_ = test::TmpDir() + "/plain_table_db_test"; + ASSERT_OK(DestroyDB(dbname_, Options())); + db_ = nullptr; + Reopen(); + } + + ~PlainTableDBTest() { + delete db_; + ASSERT_OK(DestroyDB(dbname_, Options())); + } + + // Return the current option configuration. + Options CurrentOptions() { + Options options; + options.table_factory.reset(new PlainTableFactory(16, 8)); + options.allow_mmap_reads = true; + return options; + } + + DBImpl* dbfull() { + return reinterpret_cast(db_); + } + + void Reopen(Options* options = nullptr) { + ASSERT_OK(TryReopen(options)); + } + + void Close() { + delete db_; + db_ = nullptr; + } + + void DestroyAndReopen(Options* options = nullptr) { + //Destroy using last options + Destroy(&last_options_); + ASSERT_OK(TryReopen(options)); + } + + void Destroy(Options* options) { + delete db_; + db_ = nullptr; + ASSERT_OK(DestroyDB(dbname_, *options)); + } + + Status PureReopen(Options* options, DB** db) { + return DB::Open(*options, dbname_, db); + } + + Status TryReopen(Options* options = nullptr) { + delete db_; + db_ = nullptr; + Options opts; + if (options != nullptr) { + opts = *options; + } else { + opts = CurrentOptions(); + opts.create_if_missing = true; + } + last_options_ = opts; + + return DB::Open(opts, dbname_, &db_); + } + + Status Put(const Slice& k, const Slice& v) { + return db_->Put(WriteOptions(), k, v); + } + + Status Delete(const std::string& k) { + return db_->Delete(WriteOptions(), k); + } + + std::string Get(const std::string& k, const Snapshot* snapshot = nullptr) { + ReadOptions options; + options.snapshot = snapshot; + std::string result; + Status s = db_->Get(options, k, &result); + if (s.IsNotFound()) { + result = "NOT_FOUND"; + } else if (!s.ok()) { + result = s.ToString(); + } + return result; + } + + + int NumTableFilesAtLevel(int level) { + std::string property; + ASSERT_TRUE( + db_->GetProperty("rocksdb.num-files-at-level" + NumberToString(level), + &property)); + return atoi(property.c_str()); + } + + // Return spread of files per level + std::string FilesPerLevel() { + std::string result; + int last_non_zero_offset = 0; + for (int level = 0; level < db_->NumberLevels(); level++) { + int f = NumTableFilesAtLevel(level); + char buf[100]; + snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f); + result += buf; + if (f > 0) { + last_non_zero_offset = result.size(); + } + } + result.resize(last_non_zero_offset); + return result; + } + + std::string IterStatus(Iterator* iter) { + std::string result; + if (iter->Valid()) { + result = iter->key().ToString() + "->" + iter->value().ToString(); + } else { + result = "(invalid)"; + } + return result; + } +}; + +TEST(PlainTableDBTest, Empty) { + ASSERT_TRUE(db_ != nullptr); + ASSERT_EQ("NOT_FOUND", Get("0000000000000foo")); +} + +TEST(PlainTableDBTest, ReadWrite) { + ASSERT_OK(Put("1000000000000foo", "v1")); + ASSERT_EQ("v1", Get("1000000000000foo")); + ASSERT_OK(Put("0000000000000bar", "v2")); + ASSERT_OK(Put("1000000000000foo", "v3")); + ASSERT_EQ("v3", Get("1000000000000foo")); + ASSERT_EQ("v2", Get("0000000000000bar")); +} + +TEST(PlainTableDBTest, Flush) { + ASSERT_OK(Put("1000000000000foo", "v1")); + ASSERT_OK(Put("0000000000000bar", "v2")); + ASSERT_OK(Put("1000000000000foo", "v3")); + dbfull()->TEST_FlushMemTable(); + ASSERT_EQ("v3", Get("1000000000000foo")); + ASSERT_EQ("v2", Get("0000000000000bar")); +} + +TEST(PlainTableDBTest, Iterator) { + ASSERT_OK(Put("1000000000foo002", "v_2")); + ASSERT_OK(Put("0000000000000bar", "random")); + ASSERT_OK(Put("1000000000foo001", "v1")); + ASSERT_OK(Put("3000000000000bar", "bar_v")); + ASSERT_OK(Put("1000000000foo003", "v__3")); + ASSERT_OK(Put("1000000000foo004", "v__4")); + ASSERT_OK(Put("1000000000foo005", "v__5")); + ASSERT_OK(Put("1000000000foo007", "v__7")); + ASSERT_OK(Put("1000000000foo008", "v__8")); + dbfull()->TEST_FlushMemTable(); + ASSERT_EQ("v1", Get("1000000000foo001")); + ASSERT_EQ("v__3", Get("1000000000foo003")); + ReadOptions ro; + Iterator* iter = dbfull()->NewIterator(ro); + iter->Seek("1000000000foo001"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("1000000000foo001", iter->key().ToString()); + ASSERT_EQ("v1", iter->value().ToString()); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("1000000000foo002", iter->key().ToString()); + ASSERT_EQ("v_2", iter->value().ToString()); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("1000000000foo003", iter->key().ToString()); + ASSERT_EQ("v__3", iter->value().ToString()); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("1000000000foo004", iter->key().ToString()); + ASSERT_EQ("v__4", iter->value().ToString()); + + iter->Seek("3000000000000bar"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("3000000000000bar", iter->key().ToString()); + ASSERT_EQ("bar_v", iter->value().ToString()); + + iter->Seek("1000000000foo000"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("1000000000foo001", iter->key().ToString()); + ASSERT_EQ("v1", iter->value().ToString()); + + iter->Seek("1000000000foo005"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("1000000000foo005", iter->key().ToString()); + ASSERT_EQ("v__5", iter->value().ToString()); + + iter->Seek("1000000000foo006"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("1000000000foo007", iter->key().ToString()); + ASSERT_EQ("v__7", iter->value().ToString()); + + iter->Seek("1000000000foo008"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("1000000000foo008", iter->key().ToString()); + ASSERT_EQ("v__8", iter->value().ToString()); + + iter->Seek("1000000000foo009"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("3000000000000bar", iter->key().ToString()); + + + delete iter; +} + +TEST(PlainTableDBTest, Flush2) { + ASSERT_OK(Put("0000000000000bar", "b")); + ASSERT_OK(Put("1000000000000foo", "v1")); + dbfull()->TEST_FlushMemTable(); + + ASSERT_OK(Put("1000000000000foo", "v2")); + dbfull()->TEST_FlushMemTable(); + ASSERT_EQ("v2", Get("1000000000000foo")); + + ASSERT_OK(Put("0000000000000eee", "v3")); + dbfull()->TEST_FlushMemTable(); + ASSERT_EQ("v3", Get("0000000000000eee")); + + ASSERT_OK(Delete("0000000000000bar")); + dbfull()->TEST_FlushMemTable(); + ASSERT_EQ("NOT_FOUND", Get("0000000000000bar")); + + ASSERT_OK(Put("0000000000000eee", "v5")); + dbfull()->TEST_FlushMemTable(); + ASSERT_EQ("v5", Get("0000000000000eee")); +} + +static std::string Key(int i) { + char buf[100]; + snprintf(buf, sizeof(buf), "key_______%06d", i); + return std::string(buf); +} + +static std::string RandomString(Random* rnd, int len) { + std::string r; + test::RandomString(rnd, len, &r); + return r; +} + +TEST(PlainTableDBTest, CompactionTrigger) { + Options options = CurrentOptions(); + options.write_buffer_size = 100 << 10; //100KB + options.num_levels = 3; + options.max_mem_compaction_level = 0; + options.level0_file_num_compaction_trigger = 3; + Reopen(&options); + + Random rnd(301); + + for (int num = 0; num < options.level0_file_num_compaction_trigger - 1; + num++) { + std::vector values; + // Write 120KB (12 values, each 10K) + for (int i = 0; i < 12; i++) { + values.push_back(RandomString(&rnd, 10000)); + ASSERT_OK(Put(Key(i), values[i])); + } + dbfull()->TEST_WaitForFlushMemTable(); + ASSERT_EQ(NumTableFilesAtLevel(0), num + 1); + } + + //generate one more file in level-0, and should trigger level-0 compaction + std::vector values; + for (int i = 0; i < 12; i++) { + values.push_back(RandomString(&rnd, 10000)); + ASSERT_OK(Put(Key(i), values[i])); + } + dbfull()->TEST_WaitForCompact(); + + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + ASSERT_EQ(NumTableFilesAtLevel(1), 1); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + return rocksdb::test::RunAllTests(); +} diff --git a/include/rocksdb/plain_table_factory.h b/include/rocksdb/plain_table_factory.h new file mode 100644 index 000000000..f8a0cb9a9 --- /dev/null +++ b/include/rocksdb/plain_table_factory.h @@ -0,0 +1,69 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include +#include + +#include "rocksdb/options.h" +#include "rocksdb/table.h" + +namespace rocksdb { + +struct Options; +struct EnvOptions; + +using std::unique_ptr; +class Status; +class RandomAccessFile; +class WritableFile; +class Table; +class TableBuilder; + +// IndexedTable requires fixed length key, configured as a constructor +// parameter of the factory class. Output file format: +// +--------------------------------------------+ <= key1 offset +// | key1 | value_size (4 bytes) | | +// +----------------------------------------+ | +// | value1 | +// | | +// +----------------------------------------+---+ <= key2 offset +// | key2 | value_size (4 bytes) | | +// +----------------------------------------+ | +// | value2 | +// | | +// | ...... | +// +-----------------+--------------------------+ <= index_block_offset +// | key1 | key1 offset (8 bytes) | +// +-----------------+--------------------------+ +// | key2 | key2 offset (8 bytes) | +// +-----------------+--------------------------+ +// | key3 | key3 offset (8 bytes) | +// +-----------------+--------------------------+ +// | ...... | +// +-----------------+------------+-------------+ +class PlainTableFactory: public TableFactory { +public: + ~PlainTableFactory() { + } + PlainTableFactory(int user_key_size, int key_prefix_len) : + user_key_size_(user_key_size), key_prefix_len_(key_prefix_len) { + } + const char* Name() const override { + return "PlainTable"; + } + Status GetTableReader(const Options& options, const EnvOptions& soptions, + unique_ptr && file, + uint64_t file_size, + unique_ptr* table) const override; + + TableBuilder* GetTableBuilder(const Options& options, WritableFile* file, + CompressionType compression_type) const + override; +private: + int user_key_size_; + int key_prefix_len_; +}; + +} // namespace rocksdb diff --git a/table/plain_table_builder.cc b/table/plain_table_builder.cc new file mode 100644 index 000000000..ed0b4d988 --- /dev/null +++ b/table/plain_table_builder.cc @@ -0,0 +1,77 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "table/plain_table_builder.h" + +#include +#include + +#include "rocksdb/comparator.h" +#include "rocksdb/env.h" +#include "rocksdb/filter_policy.h" +#include "rocksdb/options.h" +#include "table/block_builder.h" +#include "table/filter_block.h" +#include "table/format.h" +#include "util/coding.h" +#include "util/crc32c.h" +#include "util/stop_watch.h" + +namespace rocksdb { + +PlainTableBuilder::PlainTableBuilder(const Options& options, + WritableFile* file, + int user_key_size, int key_prefix_len) : + options_(options), file_(file), user_key_size_(user_key_size), + key_prefix_len_(key_prefix_len) { +} + +PlainTableBuilder::~PlainTableBuilder() { +} + +Status PlainTableBuilder::ChangeOptions(const Options& options) { + return Status::OK(); +} + +void PlainTableBuilder::Add(const Slice& key, const Slice& value) { + assert((int) key.size() == GetInternalKeyLength()); + + // Write key-value pair + file_->Append(key); + offset_ += GetInternalKeyLength(); + + std::string size; + int value_size = value.size(); + PutFixed32(&size, value_size); + Slice sizeSlice(size); + file_->Append(sizeSlice); + file_->Append(value); + offset_ += value_size + 4; + + num_entries_++; +} + +Status PlainTableBuilder::status() const { + return Status::OK(); +} + +Status PlainTableBuilder::Finish() { + assert(!closed_); + closed_ = true; + return Status::OK(); +} + +void PlainTableBuilder::Abandon() { + closed_ = true; +} + +uint64_t PlainTableBuilder::NumEntries() const { + return num_entries_; +} + +uint64_t PlainTableBuilder::FileSize() const { + return offset_; +} + +} // namespace rocksdb diff --git a/table/plain_table_builder.h b/table/plain_table_builder.h new file mode 100644 index 000000000..b48552efc --- /dev/null +++ b/table/plain_table_builder.h @@ -0,0 +1,91 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// IndexedTable is a simple table format for UNIT TEST ONLY. It is not built +// as production quality. + +#pragma once +#include +#include "rocksdb/options.h" +#include "rocksdb/status.h" +#include "rocksdb/table.h" +#include "rocksdb/table_properties.h" + +namespace rocksdb { + +class BlockBuilder; +class BlockHandle; +class WritableFile; +class TableBuilder; + +class PlainTableBuilder: public TableBuilder { +public: + // Create a builder that will store the contents of the table it is + // building in *file. Does not close the file. It is up to the + // caller to close the file after calling Finish(). The output file + // will be part of level specified by 'level'. A value of -1 means + // that the caller does not know which level the output file will reside. + PlainTableBuilder(const Options& options, WritableFile* file, + int user_key_size, int key_prefix_len); + + // REQUIRES: Either Finish() or Abandon() has been called. + ~PlainTableBuilder(); + + // Change the options used by this builder. Note: only some of the + // option fields can be changed after construction. If a field is + // not allowed to change dynamically and its value in the structure + // passed to the constructor is different from its value in the + // structure passed to this method, this method will return an error + // without changing any fields. + Status ChangeOptions(const Options& options); + + // Add key,value to the table being constructed. + // REQUIRES: key is after any previously added key according to comparator. + // REQUIRES: Finish(), Abandon() have not been called + void Add(const Slice& key, const Slice& value) override; + + // Return non-ok iff some error has been detected. + Status status() const override; + + // Finish building the table. Stops using the file passed to the + // constructor after this function returns. + // REQUIRES: Finish(), Abandon() have not been called + Status Finish() override; + + // Indicate that the contents of this builder should be abandoned. Stops + // using the file passed to the constructor after this function returns. + // If the caller is not going to call Finish(), it must call Abandon() + // before destroying this builder. + // REQUIRES: Finish(), Abandon() have not been called + void Abandon() override; + + // Number of calls to Add() so far. + uint64_t NumEntries() const override; + + // Size of the file generated so far. If invoked after a successful + // Finish() call, returns the size of the final generated file. + uint64_t FileSize() const override; + +private: + Options options_; + WritableFile* file_; + uint64_t offset_ = 0; + Status status_; + uint64_t num_entries_ = 0; + + const size_t user_key_size_; + const size_t key_prefix_len_; + bool closed_ = false; // Either Finish() or Abandon() has been called. + + int GetInternalKeyLength() { + return user_key_size_ + 8; + } + + // No copying allowed + PlainTableBuilder(const PlainTableBuilder&) = delete; + void operator=(const PlainTableBuilder&) = delete; +}; + +} // namespace rocksdb + diff --git a/table/plain_table_factory.cc b/table/plain_table_factory.cc new file mode 100644 index 000000000..10393501d --- /dev/null +++ b/table/plain_table_factory.cc @@ -0,0 +1,31 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "rocksdb/plain_table_factory.h" + +#include +#include +#include "table/plain_table_builder.h" +#include "table/plain_table_reader.h" +#include "port/port.h" + +namespace rocksdb { + +Status PlainTableFactory::GetTableReader(const Options& options, + const EnvOptions& soptions, + unique_ptr && file, + uint64_t file_size, + unique_ptr* table) + const { + return PlainTableReader::Open(options, soptions, std::move(file), file_size, + table, user_key_size_, key_prefix_len_); +} + +TableBuilder* PlainTableFactory::GetTableBuilder( + const Options& options, WritableFile* file, + CompressionType compression_type) const { + return new PlainTableBuilder(options, file, user_key_size_, + key_prefix_len_); +} +} // namespace rocksdb diff --git a/table/plain_table_reader.cc b/table/plain_table_reader.cc new file mode 100644 index 000000000..5577c4eca --- /dev/null +++ b/table/plain_table_reader.cc @@ -0,0 +1,358 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "table/plain_table_reader.h" + +#include + +#include "db/dbformat.h" + +#include "rocksdb/cache.h" +#include "rocksdb/comparator.h" +#include "rocksdb/env.h" +#include "rocksdb/filter_policy.h" +#include "rocksdb/options.h" +#include "rocksdb/statistics.h" + +#include "table/block.h" +#include "table/filter_block.h" +#include "table/format.h" +#include "table/two_level_iterator.h" + +#include "util/coding.h" +#include "util/hash.h" +#include "util/histogram.h" +#include "util/perf_context_imp.h" +#include "util/stop_watch.h" + +namespace std { +template<> +struct hash { +public: + std::size_t operator()(rocksdb::Slice const& s) const { + return rocksdb::Hash(s.data(), s.size(), 397); + } +}; +} + +namespace rocksdb { + +PlainTableReader::PlainTableReader(const EnvOptions& storage_options, + uint64_t file_size, int user_key_size, + int key_prefix_len) : + soptions_(storage_options), file_size_(file_size), + user_key_size_(user_key_size), key_prefix_len_(key_prefix_len) { + hash_table_ = nullptr; +} + +PlainTableReader::~PlainTableReader() { + if (hash_table_ != nullptr) { + delete[] hash_table_; + } +} + +Status PlainTableReader::Open(const Options& options, + const EnvOptions& soptions, + unique_ptr && file, + uint64_t file_size, + unique_ptr* table_reader, + const int user_key_size, + const int key_prefix_len) { + assert(options.allow_mmap_reads); + + PlainTableReader* t = new PlainTableReader(soptions, file_size, + user_key_size, + key_prefix_len); + t->file_ = std::move(file); + t->options_ = options; + Status s = t->PopulateIndex(file_size); + if (!s.ok()) { + delete t; + return s; + } + table_reader->reset(t); + return s; +} + +void PlainTableReader::SetupForCompaction() { +} + +bool PlainTableReader::PrefixMayMatch(const Slice& internal_prefix) { + return true; +} + +Iterator* PlainTableReader::NewIterator(const ReadOptions& options) { + return new PlainTableIterator(this); +} + +Status PlainTableReader::PopulateIndex(uint64_t file_size) { + Slice key_slice; + Slice key_prefix_slice; + Slice key_suffix_slice; + Slice value_slice; + Slice tmp_slice; + Slice prev_key_prefix_slice; + uint64_t pos = 0; + uint64_t data_offset_for_cur_prefix = 0; + int count_prefix = 0; + bool first = true; + std::string prefix_sub_index; + HistogramImpl keys_per_prefix_hist; + std::unordered_map tmp_index; + + while (pos < file_size) { + uint64_t key_offset = pos; + pos = Next(pos, &key_slice, &value_slice, &tmp_slice); + key_prefix_slice = Slice(key_slice.data(), key_prefix_len_); + + if (first || prev_key_prefix_slice != key_prefix_slice) { + if (!first) { + if (count_prefix < 8 || key_prefix_len_ == user_key_size_) { + tmp_index[prev_key_prefix_slice] = data_offset_for_cur_prefix; + } else { + tmp_index[prev_key_prefix_slice] = sub_index_.length() + | kSubIndexMask; + PutFixed32(&sub_index_, (count_prefix - 1) / 8 + 1); + sub_index_.append(prefix_sub_index); + } + prefix_sub_index.clear(); + data_offset_for_cur_prefix = key_offset; + keys_per_prefix_hist.Add(count_prefix); + } + prev_key_prefix_slice = key_prefix_slice; + count_prefix = 1; + } else { + count_prefix++; + } + if (key_prefix_len_ < user_key_size_ && count_prefix % 8 == 1) { + prefix_sub_index.append(key_slice.data() + key_prefix_len_, + user_key_size_ - key_prefix_len_); + PutFixed64(&prefix_sub_index, key_offset); + } + + first = false; + } + keys_per_prefix_hist.Add(count_prefix); + if (count_prefix <= 2 || key_prefix_len_ == user_key_size_) { + tmp_index[prev_key_prefix_slice] = data_offset_for_cur_prefix; + } else { + tmp_index[prev_key_prefix_slice] = sub_index_.length() | kSubIndexMask; + PutFixed32(&sub_index_, (count_prefix - 1) / 8 + 1); + sub_index_.append(prefix_sub_index); + } + + if (hash_table_ != nullptr) { + delete[] hash_table_; + } + // Make the hash table 3/5 full + hash_table_size_ = tmp_index.size() * 1.66; + hash_table_ = new char[GetHashTableRecordLen() * hash_table_size_]; + for (int i = 0; i < hash_table_size_; i++) { + memcpy(GetHashTableBucketPtr(i) + key_prefix_len_, &file_size_, + kOffsetLen); + } + + for (auto it = tmp_index.begin(); it != tmp_index.end(); ++it) { + int bucket = GetHashTableBucket(it->first); + uint64_t* hash_value; + while (true) { + GetHashValue(bucket, &hash_value); + if (*hash_value == file_size_) { + break; + } + bucket = (bucket + 1) % hash_table_size_; + } + + char* bucket_ptr = GetHashTableBucketPtr(bucket); + memcpy(bucket_ptr, it->first.data(), key_prefix_len_); + memcpy(bucket_ptr + key_prefix_len_, &it->second, kOffsetLen); + } + + Log(options_.info_log, "Number of prefixes: %d, suffix_map length %ld", + hash_table_size_, sub_index_.length()); + Log(options_.info_log, "Number of Keys per prefix Histogram: %s", + keys_per_prefix_hist.ToString().c_str()); + + return Status::OK(); +} + +inline int PlainTableReader::GetHashTableBucket(Slice key) { + return rocksdb::Hash(key.data(), key_prefix_len_, 397) % hash_table_size_; +} + +inline void PlainTableReader::GetHashValue(int bucket, uint64_t** ret_value) { + *ret_value = (uint64_t*) (GetHashTableBucketPtr(bucket) + key_prefix_len_); +} + +Status PlainTableReader::GetOffset(const Slice& target, uint64_t* offset) { + Status s; + + int bucket = GetHashTableBucket(target); + uint64_t* found_value; + Slice hash_key; + while (true) { + GetHashValue(bucket, &found_value); + if (*found_value == file_size_) { + break; + } + GetHashKey(bucket, &hash_key); + if (target.starts_with(hash_key)) { + break; + } + bucket = (bucket + 1) % hash_table_size_; + } + + if (*found_value == file_size_ || (*found_value & kSubIndexMask) == 0) { + *offset = *found_value; + return Status::OK(); + } + + uint32_t low = 0; + uint64_t prefix_index_offset = *found_value ^ kSubIndexMask; + uint32_t high = DecodeFixed32(sub_index_.data() + prefix_index_offset); + uint64_t base_offset = prefix_index_offset + 4; + char* mid_key_str = new char[target.size()]; + memcpy(mid_key_str, target.data(), target.size()); + Slice mid_key = Slice(mid_key_str, target.size()); + + // The key is between (low, high). Do a binary search between it. + while (high - low > 1) { + uint32_t mid = (high + low) / 2; + const char* base = sub_index_.data() + base_offset + + (user_key_size_ - key_prefix_len_ + kOffsetLen) * mid; + memcpy(mid_key_str + key_prefix_len_, base, + user_key_size_ - key_prefix_len_); + + int cmp_result = options_.comparator->Compare(target, mid_key); + if (cmp_result > 0) { + low = mid; + } else { + if (cmp_result == 0) { + // Happen to have found the exact key or target is smaller than the + // first key after base_offset. + *offset = DecodeFixed64(base + user_key_size_ - key_prefix_len_); + delete[] mid_key_str; + return s; + } else { + high = mid; + } + } + } + + const char* base = sub_index_.data() + base_offset + + (user_key_size_ - key_prefix_len_ + kOffsetLen) * low; + *offset = DecodeFixed64(base + user_key_size_ - key_prefix_len_); + + delete[] mid_key_str; + return s; +} + +uint64_t PlainTableReader::Next(uint64_t offset, Slice* key, Slice* value, + Slice* tmp_slice) { + if (offset >= file_size_) { + return file_size_; + } + int internal_key_size = GetInternalKeyLength(); + + Status s = file_->Read(offset, internal_key_size, key, nullptr); + offset += internal_key_size; + + s = file_->Read(offset, 4, tmp_slice, nullptr); + offset += 4; + uint32_t value_size = DecodeFixed32(tmp_slice->data()); + + s = file_->Read(offset, value_size, value, nullptr); + offset += value_size; + + return offset; +} + +Status PlainTableReader::Get( + const ReadOptions& ro, const Slice& target, void* arg, + bool (*saver)(void*, const Slice&, const Slice&, bool), + void (*mark_key_may_exist)(void*)) { + uint64_t offset; + Status s = GetOffset(target, &offset); + if (!s.ok()) { + return s; + } + Slice found_key; + Slice found_value; + Slice tmp_slice; + while (offset < file_size_) { + offset = Next(offset, &found_key, &found_value, &tmp_slice); + if (options_.comparator->Compare(found_key, target) >= 0 + && !(*saver)(arg, found_key, found_value, true)) { + break; + } + } + return s; +} + +bool PlainTableReader::TEST_KeyInCache(const ReadOptions& options, + const Slice& key) { + return false; +} + +uint64_t PlainTableReader::ApproximateOffsetOf(const Slice& key) { + return 0; +} + +PlainTableIterator::PlainTableIterator(PlainTableReader* table) : + table_(table) { + SeekToFirst(); +} + +PlainTableIterator::~PlainTableIterator() { +} + +bool PlainTableIterator::Valid() const { + return offset_ < table_->file_size_ && offset_ >= 0; +} + +void PlainTableIterator::SeekToFirst() { + next_offset_ = 0; + Next(); +} + +void PlainTableIterator::SeekToLast() { + assert(false); +} + +void PlainTableIterator::Seek(const Slice& target) { + Status s = table_->GetOffset(target, &next_offset_); + if (!s.ok()) { + status_ = s; + } + if (next_offset_ < table_->file_size_) { + for (Next(); + Valid() && table_->options_.comparator->Compare(key(), target) < 0; + Next()) { + } + } +} + +void PlainTableIterator::Next() { + offset_ = next_offset_; + Slice tmp_slice; + next_offset_ = table_->Next(next_offset_, &key_, &value_, &tmp_slice); +} + +void PlainTableIterator::Prev() { + assert(false); +} + +Slice PlainTableIterator::key() const { + return key_; +} + +Slice PlainTableIterator::value() const { + return value_; +} + +Status PlainTableIterator::status() const { + return status_; +} + +} // namespace rocksdb diff --git a/table/plain_table_reader.h b/table/plain_table_reader.h new file mode 100644 index 000000000..44b545833 --- /dev/null +++ b/table/plain_table_reader.h @@ -0,0 +1,168 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include +#include +#include +#include "rocksdb/env.h" +#include "rocksdb/iterator.h" +#include "rocksdb/table.h" + +namespace rocksdb { + +class Block; +class BlockHandle; +class Footer; +struct Options; +class RandomAccessFile; +struct ReadOptions; +class TableCache; +class TableReader; + +using std::unique_ptr; +using std::unordered_map; + +// Based on following output file format: +// +--------------------------------------------+ <= key1_data_offset +// | key1 | value_size (4 bytes) | | +// +----------------------------------------+ | +// | value1 | +// | | +// +----------------------------------------+---+ <= key2_data_offset +// | key2 | value_size (4 bytes) | | +// +----------------------------------------+ | +// | value2 | +// | | +// | ...... | +// +-----------------+--------------------------+ <= index_block_offset +// | key1 | key1 offset (8 bytes) | +// +-----------------+--------------------------+ <= key2_index_offset +// | key2 | key2 offset (8 bytes) | +// +-----------------+--------------------------+ <= key3_index_offset +// | key3 | key3 offset (8 bytes) | +// +-----------------+--------------------------+ <= key4_index_offset +// | ...... | +// +-----------------+------------+-------------+ +// When opening the output file, IndexedTableReader creates a hash table +// from key prefixes to offset of the output file. IndexedTable will decide +// whether it points to the data offset of the first key with the key prefix +// or the offset of it. If there are too many keys share this prefix, it will +// create a binary search-able index from the suffix to offset on disk. +// +// The implementation of IndexedTableReader requires output file is mmaped +class PlainTableReader: public TableReader { +public: + static Status Open(const Options& options, const EnvOptions& soptions, + unique_ptr && file, uint64_t file_size, + unique_ptr* table, const int user_key_size, + const int key_prefix_len); + + bool PrefixMayMatch(const Slice& internal_prefix); + + Iterator* NewIterator(const ReadOptions&); + + Status Get( + const ReadOptions&, const Slice& key, void* arg, + bool (*handle_result)(void* arg, const Slice& k, const Slice& v, bool), + void (*mark_key_may_exist)(void*) = nullptr); + + uint64_t ApproximateOffsetOf(const Slice& key); + + bool TEST_KeyInCache(const ReadOptions& options, const Slice& key); + + void SetupForCompaction(); + + TableProperties& GetTableProperties() { + return tbl_props; + } + + PlainTableReader(const EnvOptions& storage_options, uint64_t file_size, + int user_key_size, int key_prefix_len); + ~PlainTableReader(); + +private: + char* hash_table_; + int hash_table_size_; + std::string sub_index_; + + Options options_; + const EnvOptions& soptions_; + Status status_; + unique_ptr file_; + + uint64_t file_size_; + const size_t user_key_size_; + const size_t key_prefix_len_; + + TableProperties tbl_props; + + static const size_t kNumInternalBytes = 8; + static const uint64_t kSubIndexMask = 0x8000000000000000; + static const size_t kOffsetLen = sizeof(uint64_t); + + inline int GetHashTableBucket(Slice key); + inline size_t GetInternalKeyLength() { + return user_key_size_ + kNumInternalBytes; + } + inline size_t GetHashTableRecordLen() { + return key_prefix_len_ + kOffsetLen; + } + inline char* GetHashTableBucketPtr(int bucket) { + return hash_table_ + GetHashTableRecordLen() * bucket; + } + inline void GetHashKey(int bucket, Slice* slice) { + *slice = Slice(GetHashTableBucketPtr(bucket), key_prefix_len_); + } + inline void GetHashValue(int bucket, uint64_t** ret_value); + + friend class TableCache; + friend class PlainTableIterator; + + Status PopulateIndex(uint64_t file_size); + uint64_t Next(uint64_t offset, Slice* key, Slice* value, Slice* tmp_slice); + Status GetOffset(const Slice& target, uint64_t* offset); + + // No copying allowed + explicit PlainTableReader(const TableReader&) = delete; + void operator=(const TableReader&) = delete; +}; + +// Iterator to iterate IndexedTable +class PlainTableIterator: public Iterator { +public: + explicit PlainTableIterator(PlainTableReader* table); + ~PlainTableIterator(); + + bool Valid() const; + + void SeekToFirst(); + + void SeekToLast(); + + void Seek(const Slice& target); + + void Next(); + + void Prev(); + + Slice key() const; + + Slice value() const; + + Status status() const; + +private: + PlainTableReader* table_; + uint64_t offset_; + uint64_t next_offset_; + Slice key_; + Slice value_; + Status status_; + // No copying allowed + PlainTableIterator(const PlainTableIterator&) = delete; + void operator=(const Iterator&) = delete; +}; + +} // namespace rocksdb diff --git a/table/table_reader_bench.cc b/table/table_reader_bench.cc index e7b6b0b7a..8d3fd2412 100644 --- a/table/table_reader_bench.cc +++ b/table/table_reader_bench.cc @@ -6,12 +6,14 @@ #include #include "rocksdb/db.h" +#include "rocksdb/slice_transform.h" #include "rocksdb/table.h" #include "rocksdb/slice_transform.h" #include "db/db_impl.h" #include "db/dbformat.h" #include "port/atomic_pointer.h" #include "table/block_based_table_factory.h" +#include "rocksdb/plain_table_factory.h" #include "util/histogram.h" #include "util/testharness.h" #include "util/testutil.h" @@ -218,6 +220,8 @@ DEFINE_bool(iterator, false, "For test iterator"); DEFINE_bool(through_db, false, "If enable, a DB instance will be created and " "the query will be against DB. Otherwise, will be directly against " "a table reader."); +DEFINE_bool(plain_table, false, "Use PlainTable"); + int main(int argc, char** argv) { google::SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) + @@ -230,10 +234,18 @@ int main(int argc, char** argv) { options.prefix_extractor = rocksdb::NewFixedPrefixTransform( FLAGS_prefix_len); } - options.SetUpDefaultFlushBlockPolicyFactory(); rocksdb::ReadOptions ro; rocksdb::EnvOptions env_options; options.create_if_missing = true; + options.compression = rocksdb::CompressionType::kNoCompression; + + if (FLAGS_plain_table) { + options.allow_mmap_reads = true; + env_options.use_mmap_reads = true; + tf = new rocksdb::PlainTableFactory(16, FLAGS_prefix_len); + } else { + tf = new rocksdb::BlockBasedTableFactory(); + } options.table_factory = std::shared_ptr(tf); TableReaderBenchmark(options, env_options, ro, FLAGS_num_keys1, diff --git a/util/env_posix.cc b/util/env_posix.cc index 356008225..c6995b30c 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -306,7 +306,9 @@ class PosixMmapReadableFile: public RandomAccessFile { assert(options.use_mmap_reads); assert(options.use_os_buffer); } - virtual ~PosixMmapReadableFile() { munmap(mmapped_region_, length_); } + virtual ~PosixMmapReadableFile() { + assert(munmap(mmapped_region_, length_) == 0); + } virtual Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const {