diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 7385b399c..c09e2398e 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -300,6 +300,93 @@ TEST_F(DBCompactionTest, SkipStatsUpdateTest) { ASSERT_GT(env_->random_file_open_counter_.load(), 5); } +TEST_F(DBCompactionTest, TestTableReaderForCompaction) { + Options options; + options = CurrentOptions(options); + options.env = env_; + options.new_table_reader_for_compaction_inputs = true; + options.max_open_files = 100; + options.level0_file_num_compaction_trigger = 3; + DestroyAndReopen(options); + Random rnd(301); + + int num_table_cache_lookup = 0; + int num_new_table_reader = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "TableCache::FindTable:0", [&](void* arg) { + assert(arg != nullptr); + bool no_io = *(reinterpret_cast(arg)); + if (!no_io) { + // filter out cases for table properties queries. + num_table_cache_lookup++; + } + }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "TableCache::GetTableReader:0", + [&](void* arg) { num_new_table_reader++; }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + for (int k = 0; k < options.level0_file_num_compaction_trigger; ++k) { + ASSERT_OK(Put(Key(k), Key(k))); + ASSERT_OK(Put(Key(10 - k), "bar")); + if (k < options.level0_file_num_compaction_trigger - 1) { + num_table_cache_lookup = 0; + Flush(); + dbfull()->TEST_WaitForCompact(); + // preloading iterator issues one table cache lookup and create + // a new table reader. + ASSERT_EQ(num_table_cache_lookup, 1); + ASSERT_EQ(num_new_table_reader, 1); + + num_table_cache_lookup = 0; + num_new_table_reader = 0; + ASSERT_EQ(Key(k), Get(Key(k))); + // lookup iterator from table cache and no need to create a new one. + ASSERT_EQ(num_table_cache_lookup, 1); + ASSERT_EQ(num_new_table_reader, 0); + } + } + + num_table_cache_lookup = 0; + num_new_table_reader = 0; + Flush(); + dbfull()->TEST_WaitForCompact(); + // Preloading iterator issues one table cache lookup and creates + // a new table reader. One file is created for flush and one for compaction. + // Compaction inputs make no table cache look-up. + ASSERT_EQ(num_table_cache_lookup, 2); + // Create new iterator for: + // (1) 1 for verifying flush results + // (2) 3 for compaction input files + // (3) 1 for verifying compaction results. + ASSERT_EQ(num_new_table_reader, 5); + + num_table_cache_lookup = 0; + num_new_table_reader = 0; + ASSERT_EQ(Key(1), Get(Key(1))); + ASSERT_EQ(num_table_cache_lookup, 1); + ASSERT_EQ(num_new_table_reader, 0); + + num_table_cache_lookup = 0; + num_new_table_reader = 0; + CompactRangeOptions cro; + cro.change_level = true; + cro.target_level = 2; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + db_->CompactRange(cro, nullptr, nullptr); + // Only verifying compaction outputs issues one table cache lookup. + ASSERT_EQ(num_table_cache_lookup, 1); + // One for compaction input, one for verifying compaction results. + ASSERT_EQ(num_new_table_reader, 2); + + num_table_cache_lookup = 0; + num_new_table_reader = 0; + ASSERT_EQ(Key(1), Get(Key(1))); + ASSERT_EQ(num_table_cache_lookup, 1); + ASSERT_EQ(num_new_table_reader, 0); + + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); +} TEST_P(DBCompactionTestWithParam, CompactionDeletionTriggerReopen) { for (int tid = 0; tid < 2; ++tid) { diff --git a/db/table_cache.cc b/db/table_cache.cc index 47a4e2d3a..09de6001c 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -21,6 +21,7 @@ #include "util/file_reader_writer.h" #include "util/perf_context_imp.h" #include "util/stop_watch.h" +#include "util/sync_point.h" namespace rocksdb { @@ -38,6 +39,11 @@ static void UnrefEntry(void* arg1, void* arg2) { cache->Release(h); } +static void DeleteTableReader(void* arg1, void* arg2) { + TableReader* table_reader = reinterpret_cast(arg1); + delete table_reader; +} + static Slice GetSliceForFileNumber(const uint64_t* file_number) { return Slice(reinterpret_cast(file_number), sizeof(*file_number)); @@ -76,6 +82,33 @@ void TableCache::ReleaseHandle(Cache::Handle* handle) { cache_->Release(handle); } +Status TableCache::GetTableReader( + const EnvOptions& env_options, + const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, + bool advise_random_on_open, bool record_read_stats, + HistogramImpl* file_read_hist, unique_ptr* table_reader) { + std::string fname = + TableFileName(ioptions_.db_paths, fd.GetNumber(), fd.GetPathId()); + unique_ptr file; + Status s = ioptions_.env->NewRandomAccessFile(fname, &file, env_options); + RecordTick(ioptions_.statistics, NO_FILE_OPENS); + if (s.ok()) { + if (advise_random_on_open) { + file->Hint(RandomAccessFile::RANDOM); + } + StopWatch sw(ioptions_.env, ioptions_.statistics, TABLE_OPEN_IO_MICROS); + std::unique_ptr file_reader( + new RandomAccessFileReader(std::move(file), ioptions_.env, + ioptions_.statistics, record_read_stats, + file_read_hist)); + s = ioptions_.table_factory->NewTableReader( + ioptions_, env_options, internal_comparator, std::move(file_reader), + fd.GetFileSize(), table_reader); + TEST_SYNC_POINT("TableCache::GetTableReader:0"); + } + return s; +} + Status TableCache::FindTable(const EnvOptions& env_options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, Cache::Handle** handle, @@ -86,31 +119,17 @@ Status TableCache::FindTable(const EnvOptions& env_options, uint64_t number = fd.GetNumber(); Slice key = GetSliceForFileNumber(&number); *handle = cache_->Lookup(key); + TEST_SYNC_POINT_CALLBACK("TableCache::FindTable:0", + const_cast(&no_io)); + if (*handle == nullptr) { if (no_io) { // Dont do IO and return a not-found status return Status::Incomplete("Table not found in table_cache, no_io is set"); } - std::string fname = - TableFileName(ioptions_.db_paths, fd.GetNumber(), fd.GetPathId()); - unique_ptr file; unique_ptr table_reader; - s = ioptions_.env->NewRandomAccessFile(fname, &file, env_options); - RecordTick(ioptions_.statistics, NO_FILE_OPENS); - if (s.ok()) { - if (ioptions_.advise_random_on_open) { - file->Hint(RandomAccessFile::RANDOM); - } - StopWatch sw(ioptions_.env, ioptions_.statistics, TABLE_OPEN_IO_MICROS); - std::unique_ptr file_reader( - new RandomAccessFileReader( - std::move(file), ioptions_.env, - record_read_stats ? ioptions_.statistics : nullptr, - SST_READ_MICROS, file_read_hist)); - s = ioptions_.table_factory->NewTableReader( - ioptions_, env_options, internal_comparator, std::move(file_reader), - fd.GetFileSize(), &table_reader); - } - + s = GetTableReader(env_options, internal_comparator, fd, + ioptions_.advise_random_on_open, record_read_stats, + file_read_hist, &table_reader); if (!s.ok()) { assert(table_reader == nullptr); RecordTick(ioptions_.statistics, NO_FILE_ERRORS); @@ -136,30 +155,47 @@ Iterator* TableCache::NewIterator(const ReadOptions& options, if (table_reader_ptr != nullptr) { *table_reader_ptr = nullptr; } - TableReader* table_reader = fd.table_reader; + + TableReader* table_reader = nullptr; Cache::Handle* handle = nullptr; - Status s; - if (table_reader == nullptr) { - s = FindTable(env_options, icomparator, fd, &handle, - options.read_tier == kBlockCacheTier, !for_compaction, - file_read_hist); + bool create_new_table_reader = + (for_compaction && ioptions_.new_table_reader_for_compaction_inputs); + if (create_new_table_reader) { + unique_ptr table_reader_unique_ptr; + Status s = GetTableReader(env_options, icomparator, fd, false, false, + nullptr, &table_reader_unique_ptr); if (!s.ok()) { return NewErrorIterator(s, arena); } - table_reader = GetTableReaderFromHandle(handle); + table_reader = table_reader_unique_ptr.release(); + } else { + table_reader = fd.table_reader; + if (table_reader == nullptr) { + Status s = FindTable(env_options, icomparator, fd, &handle, + options.read_tier == kBlockCacheTier, + !for_compaction, file_read_hist); + if (!s.ok()) { + return NewErrorIterator(s, arena); + } + table_reader = GetTableReaderFromHandle(handle); + } } Iterator* result = table_reader->NewIterator(options, arena); - if (handle != nullptr) { + + if (create_new_table_reader) { + assert(handle == nullptr); + result->RegisterCleanup(&DeleteTableReader, table_reader, nullptr); + } else if (handle != nullptr) { result->RegisterCleanup(&UnrefEntry, cache_, handle); } - if (table_reader_ptr != nullptr) { - *table_reader_ptr = table_reader; - } if (for_compaction) { table_reader->SetupForCompaction(); } + if (table_reader_ptr != nullptr) { + *table_reader_ptr = table_reader; + } return result; } diff --git a/db/table_cache.h b/db/table_cache.h index 4474fcc6a..a33d9e791 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -61,6 +61,13 @@ class TableCache { // Evict any entry for the specified file number static void Evict(Cache* cache, uint64_t file_number); + // Build a table reader + Status GetTableReader(const EnvOptions& env_options, + const InternalKeyComparator& internal_comparator, + const FileDescriptor& fd, bool advise_random_on_open, + bool record_read_stats, HistogramImpl* file_read_hist, + unique_ptr* table_reader); + // Find table reader Status FindTable(const EnvOptions& toptions, const InternalKeyComparator& internal_comparator, diff --git a/include/rocksdb/immutable_options.h b/include/rocksdb/immutable_options.h index 0943fcbec..051f1ff8d 100644 --- a/include/rocksdb/immutable_options.h +++ b/include/rocksdb/immutable_options.h @@ -89,6 +89,8 @@ struct ImmutableCFOptions { Options::AccessHint access_hint_on_compaction_start; + bool new_table_reader_for_compaction_inputs; + int num_levels; bool optimize_filters_for_hits; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 29da0d3ed..47630fe38 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1026,6 +1026,18 @@ struct DBOptions { }; AccessHint access_hint_on_compaction_start; + // If true, always create a new file descriptor and new table reader + // for compaction inputs. Turn this parameter on may introduce extra + // memory usage in the table reader, if it allocates extra memory + // for indexes. This will allow file descriptor prefetch options + // to be set for compaction input files and not to impact file + // descriptors for the same file used by user queries. + // Suggest to enable BlockBasedTableOptions.cache_index_and_filter_blocks + // for this mode if using block-based table. + // + // Default: false + bool new_table_reader_for_compaction_inputs; + // Use adaptive mutex, which spins in the user space before resorting // to kernel. This could reduce context switch when the mutex is not // heavily contended. However, if the mutex is hot, we could end up diff --git a/util/db_test_util.cc b/util/db_test_util.cc index 2be96aca7..d79495573 100644 --- a/util/db_test_util.cc +++ b/util/db_test_util.cc @@ -166,7 +166,7 @@ bool DBTestBase::ChangeFilterOptions() { if (option_config_ == kDefault) { option_config_ = kFilter; } else if (option_config_ == kFilter) { - option_config_ = kFullFilter; + option_config_ = kFullFilterWithNewTableReaderForCompactions; } else { return false; } @@ -228,8 +228,9 @@ Options DBTestBase::CurrentOptions( case kFilter: table_options.filter_policy.reset(NewBloomFilterPolicy(10, true)); break; - case kFullFilter: + case kFullFilterWithNewTableReaderForCompactions: table_options.filter_policy.reset(NewBloomFilterPolicy(10, false)); + options.new_table_reader_for_compaction_inputs = true; break; case kUncompressed: options.compression = kNoCompression; diff --git a/util/db_test_util.h b/util/db_test_util.h index 72c0b1af2..966c58c33 100644 --- a/util/db_test_util.h +++ b/util/db_test_util.h @@ -419,7 +419,7 @@ class DBTestBase : public testing::Test { kHashCuckoo = 8, kMergePut = 9, kFilter = 10, - kFullFilter = 11, + kFullFilterWithNewTableReaderForCompactions = 11, kUncompressed = 12, kNumLevel_3 = 13, kDBLogDir = 14, diff --git a/util/options.cc b/util/options.cc index 9ce8b26c9..a10331caa 100644 --- a/util/options.cc +++ b/util/options.cc @@ -69,6 +69,8 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options) level_compaction_dynamic_level_bytes( options.level_compaction_dynamic_level_bytes), access_hint_on_compaction_start(options.access_hint_on_compaction_start), + new_table_reader_for_compaction_inputs( + options.new_table_reader_for_compaction_inputs), num_levels(options.num_levels), optimize_filters_for_hits(options.optimize_filters_for_hits), listeners(options.listeners), @@ -238,6 +240,7 @@ DBOptions::DBOptions() advise_random_on_open(true), db_write_buffer_size(0), access_hint_on_compaction_start(NORMAL), + new_table_reader_for_compaction_inputs(false), use_adaptive_mutex(false), bytes_per_sync(0), wal_bytes_per_sync(0), @@ -289,6 +292,8 @@ DBOptions::DBOptions(const Options& options) advise_random_on_open(options.advise_random_on_open), db_write_buffer_size(options.db_write_buffer_size), access_hint_on_compaction_start(options.access_hint_on_compaction_start), + new_table_reader_for_compaction_inputs( + options.new_table_reader_for_compaction_inputs), use_adaptive_mutex(options.use_adaptive_mutex), bytes_per_sync(options.bytes_per_sync), wal_bytes_per_sync(options.wal_bytes_per_sync), @@ -364,6 +369,8 @@ void DBOptions::Dump(Logger* log) const { db_write_buffer_size); Warn(log, " Options.access_hint_on_compaction_start: %s", access_hints[access_hint_on_compaction_start]); + Warn(log, " Options.new_table_reader_for_compaction_inputs: %d", + new_table_reader_for_compaction_inputs); Warn(log, " Options.use_adaptive_mutex: %d", use_adaptive_mutex); Warn(log, " Options.rate_limiter: %p", diff --git a/util/options_helper.h b/util/options_helper.h index 79299d79c..869bbdd92 100644 --- a/util/options_helper.h +++ b/util/options_helper.h @@ -82,6 +82,9 @@ static std::unordered_map db_options_type_info = { {"skip_stats_update_on_db_open", {offsetof(struct DBOptions, skip_stats_update_on_db_open), OptionType::kBoolean}}, + {"new_table_reader_for_compaction_inputs", + {offsetof(struct DBOptions, new_table_reader_for_compaction_inputs), + OptionType::kBoolean}}, {"use_adaptive_mutex", {offsetof(struct DBOptions, use_adaptive_mutex), OptionType::kBoolean}}, {"use_fsync", diff --git a/util/options_test.cc b/util/options_test.cc index fbf428b9f..a59e68d83 100644 --- a/util/options_test.cc +++ b/util/options_test.cc @@ -142,37 +142,38 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { }; std::unordered_map db_options_map = { - {"create_if_missing", "false"}, - {"create_missing_column_families", "true"}, - {"error_if_exists", "false"}, - {"paranoid_checks", "true"}, - {"max_open_files", "32"}, - {"max_total_wal_size", "33"}, - {"disable_data_sync", "false"}, - {"use_fsync", "true"}, - {"db_log_dir", "/db_log_dir"}, - {"wal_dir", "/wal_dir"}, - {"delete_obsolete_files_period_micros", "34"}, - {"max_background_compactions", "35"}, - {"max_background_flushes", "36"}, - {"max_log_file_size", "37"}, - {"log_file_time_to_roll", "38"}, - {"keep_log_file_num", "39"}, - {"max_manifest_file_size", "40"}, - {"table_cache_numshardbits", "41"}, - {"WAL_ttl_seconds", "43"}, - {"WAL_size_limit_MB", "44"}, - {"manifest_preallocation_size", "45"}, - {"allow_os_buffer", "false"}, - {"allow_mmap_reads", "true"}, - {"allow_mmap_writes", "false"}, - {"is_fd_close_on_exec", "true"}, - {"skip_log_error_on_recovery", "false"}, - {"stats_dump_period_sec", "46"}, - {"advise_random_on_open", "true"}, - {"use_adaptive_mutex", "false"}, - {"bytes_per_sync", "47"}, - {"wal_bytes_per_sync", "48"}, + {"create_if_missing", "false"}, + {"create_missing_column_families", "true"}, + {"error_if_exists", "false"}, + {"paranoid_checks", "true"}, + {"max_open_files", "32"}, + {"max_total_wal_size", "33"}, + {"disable_data_sync", "false"}, + {"use_fsync", "true"}, + {"db_log_dir", "/db_log_dir"}, + {"wal_dir", "/wal_dir"}, + {"delete_obsolete_files_period_micros", "34"}, + {"max_background_compactions", "35"}, + {"max_background_flushes", "36"}, + {"max_log_file_size", "37"}, + {"log_file_time_to_roll", "38"}, + {"keep_log_file_num", "39"}, + {"max_manifest_file_size", "40"}, + {"table_cache_numshardbits", "41"}, + {"WAL_ttl_seconds", "43"}, + {"WAL_size_limit_MB", "44"}, + {"manifest_preallocation_size", "45"}, + {"allow_os_buffer", "false"}, + {"allow_mmap_reads", "true"}, + {"allow_mmap_writes", "false"}, + {"is_fd_close_on_exec", "true"}, + {"skip_log_error_on_recovery", "false"}, + {"stats_dump_period_sec", "46"}, + {"advise_random_on_open", "true"}, + {"use_adaptive_mutex", "false"}, + {"new_table_reader_for_compaction_inputs", "true"}, + {"bytes_per_sync", "47"}, + {"wal_bytes_per_sync", "48"}, }; ColumnFamilyOptions base_cf_opt; @@ -277,6 +278,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.stats_dump_period_sec, 46U); ASSERT_EQ(new_db_opt.advise_random_on_open, true); ASSERT_EQ(new_db_opt.use_adaptive_mutex, false); + ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true); ASSERT_EQ(new_db_opt.bytes_per_sync, static_cast(47)); ASSERT_EQ(new_db_opt.wal_bytes_per_sync, static_cast(48)); }