diff --git a/Makefile b/Makefile index 3b0ddffee..ea41f025a 100644 --- a/Makefile +++ b/Makefile @@ -2074,6 +2074,9 @@ endif .cc.o: $(AM_V_CC)$(CXX) $(CXXFLAGS) -c $< -o $@ $(COVERAGEFLAGS) +.cpp.o: + $(AM_V_CC)$(CXX) $(CXXFLAGS) -c $< -o $@ $(COVERAGEFLAGS) + .c.o: $(AM_V_CC)$(CC) $(CFLAGS) -c $< -o $@ endif @@ -2082,7 +2085,7 @@ endif # --------------------------------------------------------------------------- all_sources = $(LIB_SOURCES) $(MAIN_SOURCES) $(MOCK_LIB_SOURCES) $(TOOL_LIB_SOURCES) $(BENCH_LIB_SOURCES) $(TEST_LIB_SOURCES) $(ANALYZER_LIB_SOURCES) $(STRESS_LIB_SOURCES) -DEPFILES = $(all_sources:.cc=.cc.d) +DEPFILES = $(all_sources:.cc=.cc.d) $(FOLLY_SOURCES:.cpp=.cpp.d) # Add proper dependency support so changing a .h file forces a .cc file to # rebuild. @@ -2093,6 +2096,10 @@ DEPFILES = $(all_sources:.cc=.cc.d) @$(CXX) $(CXXFLAGS) $(PLATFORM_SHARED_CFLAGS) \ -MM -MT'$@' -MT'$(<:.cc=.o)' "$<" -o '$@' +%.cpp.d: %.cpp + @$(CXX) $(CXXFLAGS) $(PLATFORM_SHARED_CFLAGS) \ + -MM -MT'$@' -MT'$(<:.cpp=.o)' "$<" -o '$@' + ifeq ($(HAVE_POWER8),1) DEPFILES_C = $(LIB_SOURCES_C:.c=.c.d) DEPFILES_ASM = $(LIB_SOURCES_ASM:.S=.S.d) diff --git a/cache/lru_cache.h b/cache/lru_cache.h index 6313c69db..1d9c8935c 100644 --- a/cache/lru_cache.h +++ b/cache/lru_cache.h @@ -133,7 +133,6 @@ struct LRUHandle { // Caclculate the memory usage by metadata inline size_t CalcTotalCharge( CacheMetadataChargePolicy metadata_charge_policy) { - assert(key_length); size_t meta_charge = 0; if (metadata_charge_policy == kFullChargeCacheMetadata) { #ifdef ROCKSDB_MALLOC_USABLE_SIZE diff --git a/db/c.cc b/db/c.cc index 76007e917..34523f596 100644 --- a/db/c.cc +++ b/db/c.cc @@ -2032,6 +2032,17 @@ void rocksdb_block_based_options_set_index_type( options->rep.index_type = static_cast(v); } +void rocksdb_block_based_options_set_data_block_index_type( + rocksdb_block_based_table_options_t* options, int v) { + options->rep.data_block_index_type = + static_cast(v); +} + +void rocksdb_block_based_options_set_data_block_hash_ratio( + rocksdb_block_based_table_options_t* options, double v) { + options->rep.data_block_hash_table_util_ratio = v; +} + void rocksdb_block_based_options_set_hash_index_allow_collision( rocksdb_block_based_table_options_t* options, unsigned char v) { options->rep.hash_index_allow_collision = v; diff --git a/db/c_test.c b/db/c_test.c index e851aad53..dc62b59ef 100644 --- a/db/c_test.c +++ b/db/c_test.c @@ -487,8 +487,11 @@ int main(int argc, char** argv) { rocksdb_options_set_paranoid_checks(options, 1); rocksdb_options_set_max_open_files(options, 10); rocksdb_options_set_base_background_compactions(options, 1); + table_options = rocksdb_block_based_options_create(); rocksdb_block_based_options_set_block_cache(table_options, cache); + rocksdb_block_based_options_set_data_block_index_type(table_options, 1); + rocksdb_block_based_options_set_data_block_hash_ratio(table_options, 0.75); rocksdb_options_set_block_based_table_factory(options, table_options); rocksdb_options_set_compression(options, rocksdb_no_compression); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index fe97e08be..67a81259d 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1109,6 +1109,8 @@ class DBImpl : public DB { bool read_only = false, bool error_if_log_file_exist = false, bool error_if_data_exists_in_logs = false); + virtual bool OwnTablesAndLogs() const { return true; } + private: friend class DB; friend class ErrorHandler; diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index 1fa288406..d6053e8be 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -385,6 +385,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { w->Close(); } + bool own_files = OwnTablesAndLogs(); std::unordered_set files_to_del; for (const auto& candidate_file : candidate_files) { const std::string& to_delete = candidate_file.file_name; @@ -484,6 +485,12 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { } #endif // !ROCKSDB_LITE + // If I do not own these files, e.g. secondary instance with max_open_files + // = -1, then no need to delete or schedule delete these files since they + // will be removed by their owner, e.g. the primary instance. + if (!own_files) { + continue; + } Status file_deletion_status; if (schedule_only) { InstrumentedMutexLock guard_lock(&mutex_); @@ -495,7 +502,6 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { { // After purging obsolete files, remove them from files_grabbed_for_purge_. - // Use a temporary vector to perform bulk deletion via swap. InstrumentedMutexLock guard_lock(&mutex_); autovector to_be_removed; for (auto fn : files_grabbed_for_purge_) { diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index 8eac41ded..a113019a9 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -11,6 +11,7 @@ #include "db/merge_context.h" #include "logging/auto_roll_logger.h" #include "monitoring/perf_context_imp.h" +#include "util/cast_util.h" namespace rocksdb { @@ -497,45 +498,61 @@ Status DBImplSecondary::TryCatchUpWithPrimary() { // read the manifest and apply new changes to the secondary instance std::unordered_set cfds_changed; JobContext job_context(0, true /*create_superversion*/); - InstrumentedMutexLock lock_guard(&mutex_); - s = static_cast(versions_.get()) - ->ReadAndApply(&mutex_, &manifest_reader_, &cfds_changed); + { + InstrumentedMutexLock lock_guard(&mutex_); + s = static_cast_with_check(versions_.get()) + ->ReadAndApply(&mutex_, &manifest_reader_, &cfds_changed); - ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64, - static_cast(versions_->LastSequence())); - for (ColumnFamilyData* cfd : cfds_changed) { - if (cfd->IsDropped()) { - ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n", - cfd->GetName().c_str()); - continue; + ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64, + static_cast(versions_->LastSequence())); + for (ColumnFamilyData* cfd : cfds_changed) { + if (cfd->IsDropped()) { + ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n", + cfd->GetName().c_str()); + continue; + } + VersionStorageInfo::LevelSummaryStorage tmp; + ROCKS_LOG_DEBUG(immutable_db_options_.info_log, + "[%s] Level summary: %s\n", cfd->GetName().c_str(), + cfd->current()->storage_info()->LevelSummary(&tmp)); } - VersionStorageInfo::LevelSummaryStorage tmp; - ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] Level summary: %s\n", - cfd->GetName().c_str(), - cfd->current()->storage_info()->LevelSummary(&tmp)); - } - // list wal_dir to discover new WALs and apply new changes to the secondary - // instance - if (s.ok()) { - s = FindAndRecoverLogFiles(&cfds_changed, &job_context); - } - if (s.IsPathNotFound()) { - ROCKS_LOG_INFO(immutable_db_options_.info_log, - "Secondary tries to read WAL, but WAL file(s) have already " - "been purged by primary."); - s = Status::OK(); - } - if (s.ok()) { - for (auto cfd : cfds_changed) { - cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(), - &job_context.memtables_to_free); - auto& sv_context = job_context.superversion_contexts.back(); - cfd->InstallSuperVersion(&sv_context, &mutex_); - sv_context.NewSuperVersion(); + // list wal_dir to discover new WALs and apply new changes to the secondary + // instance + if (s.ok()) { + s = FindAndRecoverLogFiles(&cfds_changed, &job_context); + } + if (s.IsPathNotFound()) { + ROCKS_LOG_INFO( + immutable_db_options_.info_log, + "Secondary tries to read WAL, but WAL file(s) have already " + "been purged by primary."); + s = Status::OK(); + } + if (s.ok()) { + for (auto cfd : cfds_changed) { + cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(), + &job_context.memtables_to_free); + auto& sv_context = job_context.superversion_contexts.back(); + cfd->InstallSuperVersion(&sv_context, &mutex_); + sv_context.NewSuperVersion(); + } } - job_context.Clean(); } + job_context.Clean(); + + // Cleanup unused, obsolete files. + JobContext purge_files_job_context(0); + { + InstrumentedMutexLock lock_guard(&mutex_); + // Currently, secondary instance does not own the database files, thus it + // is unnecessary for the secondary to force full scan. + FindObsoleteFiles(&purge_files_job_context, /*force=*/false); + } + if (purge_files_job_context.HaveSomethingToDelete()) { + PurgeObsoleteFiles(purge_files_job_context); + } + purge_files_job_context.Clean(); return s; } diff --git a/db/db_impl/db_impl_secondary.h b/db/db_impl/db_impl_secondary.h index ca853e258..99887f55b 100644 --- a/db/db_impl/db_impl_secondary.h +++ b/db/db_impl/db_impl_secondary.h @@ -172,6 +172,24 @@ class DBImplSecondary : public DBImpl { return Status::NotSupported("Not supported operation in secondary mode."); } + using DBImpl::SetDBOptions; + Status SetDBOptions(const std::unordered_map& + /*options_map*/) override { + // Currently not supported because changing certain options may cause + // flush/compaction. + return Status::NotSupported("Not supported operation in secondary mode."); + } + + using DBImpl::SetOptions; + Status SetOptions( + ColumnFamilyHandle* /*cfd*/, + const std::unordered_map& /*options_map*/) + override { + // Currently not supported because changing certain options may cause + // flush/compaction and/or write to MANIFEST. + return Status::NotSupported("Not supported operation in secondary mode."); + } + using DBImpl::SyncWAL; Status SyncWAL() override { return Status::NotSupported("Not supported operation in secondary mode."); @@ -269,6 +287,14 @@ class DBImplSecondary : public DBImpl { return s; } + bool OwnTablesAndLogs() const override { + // Currently, the secondary instance does not own the database files. It + // simply opens the files of the primary instance and tracks their file + // descriptors until they become obsolete. In the future, the secondary may + // create links to database files. OwnTablesAndLogs will return true then. + return false; + } + private: friend class DB; diff --git a/db/db_impl/db_secondary_test.cc b/db/db_impl/db_secondary_test.cc index 6caff005e..aabe61b41 100644 --- a/db/db_impl/db_secondary_test.cc +++ b/db/db_impl/db_secondary_test.cc @@ -195,6 +195,90 @@ TEST_F(DBSecondaryTest, OpenAsSecondary) { verify_db_func("new_foo_value", "new_bar_value"); } +namespace { +class TraceFileEnv : public EnvWrapper { + public: + explicit TraceFileEnv(Env* target) : EnvWrapper(target) {} + Status NewRandomAccessFile(const std::string& f, + std::unique_ptr* r, + const EnvOptions& env_options) override { + class TracedRandomAccessFile : public RandomAccessFile { + public: + TracedRandomAccessFile(std::unique_ptr&& target, + std::atomic& counter) + : target_(std::move(target)), files_closed_(counter) {} + ~TracedRandomAccessFile() override { + files_closed_.fetch_add(1, std::memory_order_relaxed); + } + Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override { + return target_->Read(offset, n, result, scratch); + } + + private: + std::unique_ptr target_; + std::atomic& files_closed_; + }; + Status s = target()->NewRandomAccessFile(f, r, env_options); + if (s.ok()) { + r->reset(new TracedRandomAccessFile(std::move(*r), files_closed_)); + } + return s; + } + + int files_closed() const { + return files_closed_.load(std::memory_order_relaxed); + } + + private: + std::atomic files_closed_{0}; +}; +} // namespace + +TEST_F(DBSecondaryTest, SecondaryCloseFiles) { + Options options; + options.env = env_; + options.max_open_files = 1; + options.disable_auto_compactions = true; + Reopen(options); + Options options1; + std::unique_ptr traced_env(new TraceFileEnv(env_)); + options1.env = traced_env.get(); + OpenSecondary(options1); + + static const auto verify_db = [&]() { + std::unique_ptr iter1(dbfull()->NewIterator(ReadOptions())); + std::unique_ptr iter2(db_secondary_->NewIterator(ReadOptions())); + for (iter1->SeekToFirst(), iter2->SeekToFirst(); + iter1->Valid() && iter2->Valid(); iter1->Next(), iter2->Next()) { + ASSERT_EQ(iter1->key(), iter2->key()); + ASSERT_EQ(iter1->value(), iter2->value()); + } + ASSERT_FALSE(iter1->Valid()); + ASSERT_FALSE(iter2->Valid()); + }; + + ASSERT_OK(Put("a", "value")); + ASSERT_OK(Put("c", "value")); + ASSERT_OK(Flush()); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + verify_db(); + + ASSERT_OK(Put("b", "value")); + ASSERT_OK(Put("d", "value")); + ASSERT_OK(Flush()); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + verify_db(); + + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + ASSERT_EQ(2, static_cast(traced_env.get())->files_closed()); + + Status s = db_secondary_->SetDBOptions({{"max_open_files", "-1"}}); + ASSERT_TRUE(s.IsNotSupported()); + CloseSecondary(); +} + TEST_F(DBSecondaryTest, OpenAsSecondaryWALTailing) { Options options; options.env = env_; diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index ba5408508..909e9888e 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -706,6 +706,14 @@ enum { }; extern ROCKSDB_LIBRARY_API void rocksdb_block_based_options_set_index_type( rocksdb_block_based_table_options_t*, int); // uses one of the above enums +enum { + rocksdb_block_based_table_data_block_index_type_binary_search = 0, + rocksdb_block_based_table_data_block_index_type_binary_search_and_hash = 1, +}; +extern ROCKSDB_LIBRARY_API void rocksdb_block_based_options_set_data_block_index_type( + rocksdb_block_based_table_options_t*, int); // uses one of the above enums +extern ROCKSDB_LIBRARY_API void rocksdb_block_based_options_set_data_block_hash_ratio( + rocksdb_block_based_table_options_t* options, double v); extern ROCKSDB_LIBRARY_API void rocksdb_block_based_options_set_hash_index_allow_collision( rocksdb_block_based_table_options_t*, unsigned char); diff --git a/tools/block_cache_analyzer/block_cache_trace_analyzer.cc b/tools/block_cache_analyzer/block_cache_trace_analyzer.cc index e6b6a2c05..73234e83a 100644 --- a/tools/block_cache_analyzer/block_cache_trace_analyzer.cc +++ b/tools/block_cache_analyzer/block_cache_trace_analyzer.cc @@ -577,10 +577,11 @@ void BlockCacheTraceAnalyzer::WriteSkewness( pairs.push_back(itr); } // Sort in descending order. - sort( - pairs.begin(), pairs.end(), - [=](std::pair& a, - std::pair& b) { return b.second < a.second; }); + sort(pairs.begin(), pairs.end(), + [=](const std::pair& a, + const std::pair& b) { + return b.second < a.second; + }); size_t prev_start_index = 0; for (auto const& percent : percent_buckets) {