diff --git a/db/db_impl.cc b/db/db_impl.cc index fa35581ae..5a2f0de4a 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -44,6 +44,7 @@ #include "rocksdb/table.h" #include "port/port.h" #include "table/block.h" +#include "table/block_based_table_factory.h" #include "table/merger.h" #include "table/two_level_iterator.h" #include "util/auto_roll_logger.h" @@ -198,10 +199,6 @@ Options SanitizeOptions(const std::string& dbname, std::make_shared() ); - if (!result.flush_block_policy_factory) { - result.SetUpDefaultFlushBlockPolicyFactory(); - } - return result; } diff --git a/db/table_properties_collector_test.cc b/db/table_properties_collector_test.cc index 145165da1..6f405b28a 100644 --- a/db/table_properties_collector_test.cc +++ b/db/table_properties_collector_test.cc @@ -12,6 +12,7 @@ #include "db/table_properties_collector.h" #include "rocksdb/table_properties.h" #include "rocksdb/table.h" +#include "table/block_based_table_factory.h" #include "util/coding.h" #include "util/testharness.h" #include "util/testutil.h" @@ -83,13 +84,10 @@ class DumbLogger : public Logger { // Utilities test functions void MakeBuilder( - Options options, + const Options& options, std::unique_ptr* writable, std::unique_ptr* builder) { writable->reset(new FakeWritableFile); - if (!options.flush_block_policy_factory) { - options.SetUpDefaultFlushBlockPolicyFactory(); - } builder->reset( options.table_factory->GetTableBuilder(options, writable->get(), options.compression)); @@ -99,6 +97,7 @@ void OpenTable( const Options& options, const std::string& contents, std::unique_ptr* table_reader) { + std::unique_ptr file(new FakeRandomeAccessFile(contents)); auto s = options.table_factory->GetTableReader( options, diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 5554e716f..0cc33be68 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -32,7 +32,6 @@ class CompactionFilterFactory; class Comparator; class Env; class FilterPolicy; -class FlushBlockPolicyFactory; class Logger; class MergeOperator; class Snapshot; @@ -491,13 +490,6 @@ struct Options { // from the database, because otherwise the read can be very slow. Options* PrepareForBulkLoad(); - // Set up the default flush-block policy factory. By default, we'll use - // `FlushBlockBySizePolicyFactory` as the policy factory. - // Note: Please call this method after block_size and block_size_deviation - // is set. - // REQUIRES: flush_block_policy_factory is not set. - Options* SetUpDefaultFlushBlockPolicyFactory(); - // Disable automatic compactions. Manual compactions can still // be issued on this database. bool disable_auto_compactions; @@ -632,13 +624,6 @@ struct Options { // Number of locks used for inplace update // Default: 10000, if inplace_update_support = true, else 0. size_t inplace_update_num_locks; - - // Creates the instances of flush block policy. - // A flush-block policy provides a configurable way to determine when to - // flush a block in the block based tables, - // Default: nullptr. User can call FlushBlockBySizePolicyFactory() to set - // up default policy factory (`FlushBlockBySizePolicyFactory`). - std::shared_ptr flush_block_policy_factory; }; // diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index de0098a24..a5bf216dc 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -93,13 +93,15 @@ struct BlockBasedTableBuilder::Rep { char compressed_cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize]; size_t compressed_cache_key_prefix_size; - BlockHandle pending_handle; // Handle to add to index block std::string compressed_output; std::unique_ptr flush_block_policy; - Rep(const Options& opt, WritableFile* f, CompressionType compression_type) + Rep(const Options& opt, + WritableFile* f, + FlushBlockPolicyFactory* flush_block_policy_factory, + CompressionType compression_type) : options(opt), file(f), data_block(options), @@ -108,17 +110,19 @@ struct BlockBasedTableBuilder::Rep { index_block(1 /* block_restart_interval */, options.comparator), compression_type(compression_type), filter_block(opt.filter_policy == nullptr ? nullptr - : new FilterBlockBuilder(opt)) { - assert(options.flush_block_policy_factory); - auto factory = options.flush_block_policy_factory; - flush_block_policy.reset(factory->NewFlushBlockPolicy(data_block)); + : new FilterBlockBuilder(opt)), + flush_block_policy( + flush_block_policy_factory->NewFlushBlockPolicy(data_block)) { } }; -BlockBasedTableBuilder::BlockBasedTableBuilder(const Options& options, - WritableFile* file, - CompressionType compression_type) - : rep_(new Rep(options, file, compression_type)) { +BlockBasedTableBuilder::BlockBasedTableBuilder( + const Options& options, + WritableFile* file, + FlushBlockPolicyFactory* flush_block_policy_factory, + CompressionType compression_type) + : rep_(new Rep(options, + file, flush_block_policy_factory, compression_type)) { if (rep_->filter_block != nullptr) { rep_->filter_block->StartBlock(0); } diff --git a/table/block_based_table_builder.h b/table/block_based_table_builder.h index 87307a0cb..517f8e785 100644 --- a/table/block_based_table_builder.h +++ b/table/block_based_table_builder.h @@ -9,6 +9,7 @@ #pragma once #include +#include "rocksdb/flush_block_policy.h" #include "rocksdb/options.h" #include "rocksdb/status.h" #include "rocksdb/table.h" @@ -25,7 +26,9 @@ class BlockBasedTableBuilder : public TableBuilder { // Create a builder that will store the contents of the table it is // building in *file. Does not close the file. It is up to the // caller to close the file after calling Finish(). - BlockBasedTableBuilder(const Options& options, WritableFile* file, + BlockBasedTableBuilder(const Options& options, + WritableFile* file, + FlushBlockPolicyFactory* flush_block_policy_factory, CompressionType compression_type); // REQUIRES: Either Finish() or Abandon() has been called. diff --git a/table/block_based_table_factory.cc b/table/block_based_table_factory.cc index 0944c7f56..43734ea71 100644 --- a/table/block_based_table_factory.cc +++ b/table/block_based_table_factory.cc @@ -29,6 +29,36 @@ Status BlockBasedTableFactory::GetTableReader( TableBuilder* BlockBasedTableFactory::GetTableBuilder( const Options& options, WritableFile* file, CompressionType compression_type) const { - return new BlockBasedTableBuilder(options, file, compression_type); + auto flush_block_policy_factory = flush_block_policy_factory_.get(); + + // if flush block policy factory is not set, we'll create the default one + // from the options. + // + // NOTE: we cannot pre-cache the "default block policy factory" because + // `FlushBlockBySizePolicyFactory` takes `options.block_size` and + // `options.block_size_deviation` as parameters, which may be different + // every time. + if (flush_block_policy_factory == nullptr) { + flush_block_policy_factory = + new FlushBlockBySizePolicyFactory(options.block_size, + options.block_size_deviation); + } + + auto table_builder = new BlockBasedTableBuilder( + options, + file, + flush_block_policy_factory, + compression_type); + + // Delete flush_block_policy_factory only when it's just created from the + // options. + // We can safely delete flush_block_policy_factory since it will only be used + // during the construction of `BlockBasedTableBuilder`. + if (flush_block_policy_factory != flush_block_policy_factory_.get()) { + delete flush_block_policy_factory; + } + + return table_builder; } + } // namespace rocksdb diff --git a/table/block_based_table_factory.h b/table/block_based_table_factory.h index 677979f4e..d6ead29a0 100644 --- a/table/block_based_table_factory.h +++ b/table/block_based_table_factory.h @@ -11,6 +11,8 @@ #include #include +#include "rocksdb/flush_block_policy.h" +#include "rocksdb/options.h" #include "rocksdb/table.h" namespace rocksdb { @@ -29,13 +31,23 @@ class BlockBasedTableBuilder; class BlockBasedTableFactory: public TableFactory { public: + // @flush_block_policy_factory creates the instances of flush block policy. + // which provides a configurable way to determine when to flush a block in + // the block based tables. If not set, table builder will use the default + // block flush policy, which cut blocks by block size (please refer to + // `FlushBlockBySizePolicy`). + BlockBasedTableFactory( + FlushBlockPolicyFactory* flush_block_policy_factory = nullptr) : + flush_block_policy_factory_(flush_block_policy_factory) { + } + ~BlockBasedTableFactory() { } - BlockBasedTableFactory() { - } + const char* Name() const override { return "BlockBasedTable"; } + Status GetTableReader(const Options& options, const EnvOptions& soptions, unique_ptr && file, uint64_t file_size, @@ -44,6 +56,9 @@ public: TableBuilder* GetTableBuilder(const Options& options, WritableFile* file, CompressionType compression_type) const override; + + private: + std::unique_ptr flush_block_policy_factory_; }; } // namespace rocksdb diff --git a/table/table_test.cc b/table/table_test.cc index c1f11cd18..e93e9bcec 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -21,6 +21,7 @@ #include "rocksdb/iterator.h" #include "rocksdb/memtablerep.h" #include "table/block_based_table_builder.h" +#include "table/block_based_table_factory.h" #include "table/block_based_table_reader.h" #include "table/block_builder.h" #include "table/block.h" @@ -44,12 +45,6 @@ static std::string Reverse(const Slice& key) { return rev; } -static Options GetDefaultOptions() { - Options options; - options.SetUpDefaultFlushBlockPolicyFactory(); - return options; -} - class ReverseKeyComparator : public Comparator { public: virtual const char* Name() const { @@ -257,7 +252,12 @@ class BlockBasedTableConstructor: public Constructor { virtual Status FinishImpl(const Options& options, const KVMap& data) { Reset(); sink_.reset(new StringSink()); - BlockBasedTableBuilder builder(options, sink_.get(), options.compression); + BlockBasedTableBuilder builder( + options, + sink_.get(), + new FlushBlockBySizePolicyFactory( + options.block_size, options.block_size_deviation), + options.compression); for (KVMap::const_iterator it = data.begin(); it != data.end(); @@ -430,7 +430,7 @@ class DBConstructor: public Constructor { void NewDB() { std::string name = test::TmpDir() + "/table_testdb"; - Options options = GetDefaultOptions(); + Options options; options.comparator = comparator_; Status status = DestroyDB(name, options); ASSERT_TRUE(status.ok()) << status.ToString(); @@ -449,7 +449,7 @@ class DBConstructor: public Constructor { static bool SnappyCompressionSupported() { std::string out; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; - return port::Snappy_Compress(GetDefaultOptions().compression_opts, + return port::Snappy_Compress(Options().compression_opts, in.data(), in.size(), &out); } @@ -457,7 +457,7 @@ static bool SnappyCompressionSupported() { static bool ZlibCompressionSupported() { std::string out; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; - return port::Zlib_Compress(GetDefaultOptions().compression_opts, + return port::Zlib_Compress(Options().compression_opts, in.data(), in.size(), &out); } @@ -466,7 +466,7 @@ static bool ZlibCompressionSupported() { static bool BZip2CompressionSupported() { std::string out; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; - return port::BZip2_Compress(GetDefaultOptions().compression_opts, + return port::BZip2_Compress(Options().compression_opts, in.data(), in.size(), &out); } @@ -487,7 +487,7 @@ struct TestArgs { }; -static std::vector Generate_Arg_List() { +static std::vector GenerateArgList() { std::vector ret; TestType test_type[4] = {TABLE_TEST, BLOCK_TEST, MEMTABLE_TEST, DB_TEST}; int test_type_len = 4; @@ -536,14 +536,13 @@ class Harness { void Init(const TestArgs& args) { delete constructor_; constructor_ = nullptr; - options_ = GetDefaultOptions(); + options_ = Options(); options_.block_restart_interval = args.restart_interval; options_.compression = args.compression; // Use shorter block size for tests to exercise block boundary // conditions more. options_.block_size = 256; - options_.SetUpDefaultFlushBlockPolicyFactory(); if (args.reverse_compare) { options_.comparator = &reverse_key_comparator; } @@ -737,13 +736,13 @@ class Harness { DB* db() const { return constructor_->db(); } private: - Options options_ = GetDefaultOptions(); + Options options_ = Options(); Constructor* constructor_; }; // Test the empty key TEST(Harness, SimpleEmptyKey) { - std::vector args = Generate_Arg_List(); + std::vector args = GenerateArgList(); for (unsigned int i = 0; i < args.size(); i++) { Init(args[i]); Random rnd(test::RandomSeed() + 1); @@ -753,7 +752,7 @@ TEST(Harness, SimpleEmptyKey) { } TEST(Harness, SimpleSingle) { - std::vector args = Generate_Arg_List(); + std::vector args = GenerateArgList(); for (unsigned int i = 0; i < args.size(); i++) { Init(args[i]); Random rnd(test::RandomSeed() + 2); @@ -763,7 +762,7 @@ TEST(Harness, SimpleSingle) { } TEST(Harness, SimpleMulti) { - std::vector args = Generate_Arg_List(); + std::vector args = GenerateArgList(); for (unsigned int i = 0; i < args.size(); i++) { Init(args[i]); Random rnd(test::RandomSeed() + 3); @@ -775,7 +774,7 @@ TEST(Harness, SimpleMulti) { } TEST(Harness, SimpleSpecialKey) { - std::vector args = Generate_Arg_List(); + std::vector args = GenerateArgList(); for (unsigned int i = 0; i < args.size(); i++) { Init(args[i]); Random rnd(test::RandomSeed() + 4); @@ -814,7 +813,7 @@ TEST(TableTest, BasicTableProperties) { std::vector keys; KVMap kvmap; - Options options = GetDefaultOptions(); + Options options; options.compression = kNoCompression; options.block_restart_interval = 1; @@ -848,7 +847,7 @@ TEST(TableTest, FilterPolicyNameProperties) { c.Add("a1", "val1"); std::vector keys; KVMap kvmap; - Options options = GetDefaultOptions(); + Options options; std::unique_ptr filter_policy( NewBloomFilterPolicy(10) ); @@ -891,7 +890,7 @@ TEST(TableTest, IndexSizeStat) { std::vector ks; KVMap kvmap; - Options options = GetDefaultOptions(); + Options options; options.compression = kNoCompression; options.block_restart_interval = 1; @@ -910,11 +909,6 @@ TEST(TableTest, NumBlockStat) { options.compression = kNoCompression; options.block_restart_interval = 1; options.block_size = 1000; - options.SetUpDefaultFlushBlockPolicyFactory(); - - // Block Size changed, need to set up a new flush policy to reflect the - // change. - options.SetUpDefaultFlushBlockPolicyFactory(); for (int i = 0; i < 10; ++i) { // the key/val are slightly smaller than block size, so that each block @@ -979,7 +973,7 @@ class BlockCacheProperties { TEST(TableTest, BlockCacheTest) { // -- Table construction - Options options = GetDefaultOptions(); + Options options; options.create_if_missing = true; options.statistics = CreateDBStatistics(); options.block_cache = NewLRUCache(1024); @@ -1117,9 +1111,8 @@ TEST(TableTest, ApproximateOffsetOfPlain) { c.Add("k07", std::string(100000, 'x')); std::vector keys; KVMap kvmap; - Options options = GetDefaultOptions(); + Options options; options.block_size = 1024; - options.SetUpDefaultFlushBlockPolicyFactory(); options.compression = kNoCompression; c.Finish(options, &keys, &kvmap); @@ -1147,9 +1140,8 @@ static void Do_Compression_Test(CompressionType comp) { c.Add("k04", test::CompressibleString(&rnd, 0.25, 10000, &tmp)); std::vector keys; KVMap kvmap; - Options options = GetDefaultOptions(); + Options options; options.block_size = 1024; - options.SetUpDefaultFlushBlockPolicyFactory(); options.compression = comp; c.Finish(options, &keys, &kvmap); @@ -1190,9 +1182,8 @@ TEST(TableTest, BlockCacheLeak) { // in the cache. This test checks whether the Table actually makes use of the // unique ID from the file. - Options opt = GetDefaultOptions(); + Options opt; opt.block_size = 1024; - opt.SetUpDefaultFlushBlockPolicyFactory(); opt.compression = kNoCompression; opt.block_cache = NewLRUCache(16*1024*1024); // big enough so we don't ever // lose cached values. @@ -1225,7 +1216,7 @@ TEST(TableTest, BlockCacheLeak) { } TEST(Harness, Randomized) { - std::vector args = Generate_Arg_List(); + std::vector args = GenerateArgList(); for (unsigned int i = 0; i < args.size(); i++) { Init(args[i]); Random rnd(test::RandomSeed() + 5); @@ -1277,7 +1268,7 @@ TEST(MemTableTest, Simple) { MemTable* memtable = new MemTable(cmp, table_factory); memtable->Ref(); WriteBatch batch; - Options options = GetDefaultOptions(); + Options options; WriteBatchInternal::SetSequence(&batch, 100); batch.Put(std::string("k1"), std::string("v1")); batch.Put(std::string("k2"), std::string("v2")); diff --git a/util/options.cc b/util/options.cc index 4fbba08a7..7fa7586e3 100644 --- a/util/options.cc +++ b/util/options.cc @@ -16,7 +16,6 @@ #include "rocksdb/comparator.h" #include "rocksdb/env.h" #include "rocksdb/filter_policy.h" -#include "rocksdb/flush_block_policy.h" #include "rocksdb/merge_operator.h" #include "table/block_based_table_factory.h" @@ -290,8 +289,6 @@ Options::Dump(Logger* log) const inplace_update_support); Log(log, " Options.inplace_update_num_locks: %zd", inplace_update_num_locks); - Log(log, " Options.flush_block_policy_factory: %s", - flush_block_policy_factory ? flush_block_policy_factory->Name() : ""); } // Options::Dump // @@ -331,11 +328,4 @@ Options::PrepareForBulkLoad() return this; } -Options* Options::SetUpDefaultFlushBlockPolicyFactory() { - flush_block_policy_factory = - std::make_shared( - block_size, block_size_deviation); - return this; -} - } // namespace rocksdb