Merge branch 'master' into fix-range-deletion-bug

This commit is contained in:
奏之章 2019-12-04 15:39:56 +08:00 committed by GitHub
commit 73cd3da055
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 205 additions and 41 deletions

View File

@ -2074,6 +2074,9 @@ endif
.cc.o:
$(AM_V_CC)$(CXX) $(CXXFLAGS) -c $< -o $@ $(COVERAGEFLAGS)
.cpp.o:
$(AM_V_CC)$(CXX) $(CXXFLAGS) -c $< -o $@ $(COVERAGEFLAGS)
.c.o:
$(AM_V_CC)$(CC) $(CFLAGS) -c $< -o $@
endif
@ -2082,7 +2085,7 @@ endif
# ---------------------------------------------------------------------------
all_sources = $(LIB_SOURCES) $(MAIN_SOURCES) $(MOCK_LIB_SOURCES) $(TOOL_LIB_SOURCES) $(BENCH_LIB_SOURCES) $(TEST_LIB_SOURCES) $(ANALYZER_LIB_SOURCES) $(STRESS_LIB_SOURCES)
DEPFILES = $(all_sources:.cc=.cc.d)
DEPFILES = $(all_sources:.cc=.cc.d) $(FOLLY_SOURCES:.cpp=.cpp.d)
# Add proper dependency support so changing a .h file forces a .cc file to
# rebuild.
@ -2093,6 +2096,10 @@ DEPFILES = $(all_sources:.cc=.cc.d)
@$(CXX) $(CXXFLAGS) $(PLATFORM_SHARED_CFLAGS) \
-MM -MT'$@' -MT'$(<:.cc=.o)' "$<" -o '$@'
%.cpp.d: %.cpp
@$(CXX) $(CXXFLAGS) $(PLATFORM_SHARED_CFLAGS) \
-MM -MT'$@' -MT'$(<:.cpp=.o)' "$<" -o '$@'
ifeq ($(HAVE_POWER8),1)
DEPFILES_C = $(LIB_SOURCES_C:.c=.c.d)
DEPFILES_ASM = $(LIB_SOURCES_ASM:.S=.S.d)

1
cache/lru_cache.h vendored
View File

@ -133,7 +133,6 @@ struct LRUHandle {
// Caclculate the memory usage by metadata
inline size_t CalcTotalCharge(
CacheMetadataChargePolicy metadata_charge_policy) {
assert(key_length);
size_t meta_charge = 0;
if (metadata_charge_policy == kFullChargeCacheMetadata) {
#ifdef ROCKSDB_MALLOC_USABLE_SIZE

11
db/c.cc
View File

@ -2032,6 +2032,17 @@ void rocksdb_block_based_options_set_index_type(
options->rep.index_type = static_cast<BlockBasedTableOptions::IndexType>(v);
}
void rocksdb_block_based_options_set_data_block_index_type(
rocksdb_block_based_table_options_t* options, int v) {
options->rep.data_block_index_type =
static_cast<BlockBasedTableOptions::DataBlockIndexType>(v);
}
void rocksdb_block_based_options_set_data_block_hash_ratio(
rocksdb_block_based_table_options_t* options, double v) {
options->rep.data_block_hash_table_util_ratio = v;
}
void rocksdb_block_based_options_set_hash_index_allow_collision(
rocksdb_block_based_table_options_t* options, unsigned char v) {
options->rep.hash_index_allow_collision = v;

View File

@ -487,8 +487,11 @@ int main(int argc, char** argv) {
rocksdb_options_set_paranoid_checks(options, 1);
rocksdb_options_set_max_open_files(options, 10);
rocksdb_options_set_base_background_compactions(options, 1);
table_options = rocksdb_block_based_options_create();
rocksdb_block_based_options_set_block_cache(table_options, cache);
rocksdb_block_based_options_set_data_block_index_type(table_options, 1);
rocksdb_block_based_options_set_data_block_hash_ratio(table_options, 0.75);
rocksdb_options_set_block_based_table_factory(options, table_options);
rocksdb_options_set_compression(options, rocksdb_no_compression);

View File

@ -1109,6 +1109,8 @@ class DBImpl : public DB {
bool read_only = false, bool error_if_log_file_exist = false,
bool error_if_data_exists_in_logs = false);
virtual bool OwnTablesAndLogs() const { return true; }
private:
friend class DB;
friend class ErrorHandler;

View File

@ -385,6 +385,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
w->Close();
}
bool own_files = OwnTablesAndLogs();
std::unordered_set<uint64_t> files_to_del;
for (const auto& candidate_file : candidate_files) {
const std::string& to_delete = candidate_file.file_name;
@ -484,6 +485,12 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
}
#endif // !ROCKSDB_LITE
// If I do not own these files, e.g. secondary instance with max_open_files
// = -1, then no need to delete or schedule delete these files since they
// will be removed by their owner, e.g. the primary instance.
if (!own_files) {
continue;
}
Status file_deletion_status;
if (schedule_only) {
InstrumentedMutexLock guard_lock(&mutex_);
@ -495,7 +502,6 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
{
// After purging obsolete files, remove them from files_grabbed_for_purge_.
// Use a temporary vector to perform bulk deletion via swap.
InstrumentedMutexLock guard_lock(&mutex_);
autovector<uint64_t> to_be_removed;
for (auto fn : files_grabbed_for_purge_) {

View File

@ -11,6 +11,7 @@
#include "db/merge_context.h"
#include "logging/auto_roll_logger.h"
#include "monitoring/perf_context_imp.h"
#include "util/cast_util.h"
namespace rocksdb {
@ -497,45 +498,61 @@ Status DBImplSecondary::TryCatchUpWithPrimary() {
// read the manifest and apply new changes to the secondary instance
std::unordered_set<ColumnFamilyData*> cfds_changed;
JobContext job_context(0, true /*create_superversion*/);
InstrumentedMutexLock lock_guard(&mutex_);
s = static_cast<ReactiveVersionSet*>(versions_.get())
->ReadAndApply(&mutex_, &manifest_reader_, &cfds_changed);
{
InstrumentedMutexLock lock_guard(&mutex_);
s = static_cast_with_check<ReactiveVersionSet>(versions_.get())
->ReadAndApply(&mutex_, &manifest_reader_, &cfds_changed);
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64,
static_cast<uint64_t>(versions_->LastSequence()));
for (ColumnFamilyData* cfd : cfds_changed) {
if (cfd->IsDropped()) {
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n",
cfd->GetName().c_str());
continue;
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64,
static_cast<uint64_t>(versions_->LastSequence()));
for (ColumnFamilyData* cfd : cfds_changed) {
if (cfd->IsDropped()) {
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n",
cfd->GetName().c_str());
continue;
}
VersionStorageInfo::LevelSummaryStorage tmp;
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] Level summary: %s\n", cfd->GetName().c_str(),
cfd->current()->storage_info()->LevelSummary(&tmp));
}
VersionStorageInfo::LevelSummaryStorage tmp;
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] Level summary: %s\n",
cfd->GetName().c_str(),
cfd->current()->storage_info()->LevelSummary(&tmp));
}
// list wal_dir to discover new WALs and apply new changes to the secondary
// instance
if (s.ok()) {
s = FindAndRecoverLogFiles(&cfds_changed, &job_context);
}
if (s.IsPathNotFound()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Secondary tries to read WAL, but WAL file(s) have already "
"been purged by primary.");
s = Status::OK();
}
if (s.ok()) {
for (auto cfd : cfds_changed) {
cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(),
&job_context.memtables_to_free);
auto& sv_context = job_context.superversion_contexts.back();
cfd->InstallSuperVersion(&sv_context, &mutex_);
sv_context.NewSuperVersion();
// list wal_dir to discover new WALs and apply new changes to the secondary
// instance
if (s.ok()) {
s = FindAndRecoverLogFiles(&cfds_changed, &job_context);
}
if (s.IsPathNotFound()) {
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"Secondary tries to read WAL, but WAL file(s) have already "
"been purged by primary.");
s = Status::OK();
}
if (s.ok()) {
for (auto cfd : cfds_changed) {
cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(),
&job_context.memtables_to_free);
auto& sv_context = job_context.superversion_contexts.back();
cfd->InstallSuperVersion(&sv_context, &mutex_);
sv_context.NewSuperVersion();
}
}
job_context.Clean();
}
job_context.Clean();
// Cleanup unused, obsolete files.
JobContext purge_files_job_context(0);
{
InstrumentedMutexLock lock_guard(&mutex_);
// Currently, secondary instance does not own the database files, thus it
// is unnecessary for the secondary to force full scan.
FindObsoleteFiles(&purge_files_job_context, /*force=*/false);
}
if (purge_files_job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(purge_files_job_context);
}
purge_files_job_context.Clean();
return s;
}

View File

@ -172,6 +172,24 @@ class DBImplSecondary : public DBImpl {
return Status::NotSupported("Not supported operation in secondary mode.");
}
using DBImpl::SetDBOptions;
Status SetDBOptions(const std::unordered_map<std::string, std::string>&
/*options_map*/) override {
// Currently not supported because changing certain options may cause
// flush/compaction.
return Status::NotSupported("Not supported operation in secondary mode.");
}
using DBImpl::SetOptions;
Status SetOptions(
ColumnFamilyHandle* /*cfd*/,
const std::unordered_map<std::string, std::string>& /*options_map*/)
override {
// Currently not supported because changing certain options may cause
// flush/compaction and/or write to MANIFEST.
return Status::NotSupported("Not supported operation in secondary mode.");
}
using DBImpl::SyncWAL;
Status SyncWAL() override {
return Status::NotSupported("Not supported operation in secondary mode.");
@ -269,6 +287,14 @@ class DBImplSecondary : public DBImpl {
return s;
}
bool OwnTablesAndLogs() const override {
// Currently, the secondary instance does not own the database files. It
// simply opens the files of the primary instance and tracks their file
// descriptors until they become obsolete. In the future, the secondary may
// create links to database files. OwnTablesAndLogs will return true then.
return false;
}
private:
friend class DB;

View File

@ -195,6 +195,90 @@ TEST_F(DBSecondaryTest, OpenAsSecondary) {
verify_db_func("new_foo_value", "new_bar_value");
}
namespace {
class TraceFileEnv : public EnvWrapper {
public:
explicit TraceFileEnv(Env* target) : EnvWrapper(target) {}
Status NewRandomAccessFile(const std::string& f,
std::unique_ptr<RandomAccessFile>* r,
const EnvOptions& env_options) override {
class TracedRandomAccessFile : public RandomAccessFile {
public:
TracedRandomAccessFile(std::unique_ptr<RandomAccessFile>&& target,
std::atomic<int>& counter)
: target_(std::move(target)), files_closed_(counter) {}
~TracedRandomAccessFile() override {
files_closed_.fetch_add(1, std::memory_order_relaxed);
}
Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override {
return target_->Read(offset, n, result, scratch);
}
private:
std::unique_ptr<RandomAccessFile> target_;
std::atomic<int>& files_closed_;
};
Status s = target()->NewRandomAccessFile(f, r, env_options);
if (s.ok()) {
r->reset(new TracedRandomAccessFile(std::move(*r), files_closed_));
}
return s;
}
int files_closed() const {
return files_closed_.load(std::memory_order_relaxed);
}
private:
std::atomic<int> files_closed_{0};
};
} // namespace
TEST_F(DBSecondaryTest, SecondaryCloseFiles) {
Options options;
options.env = env_;
options.max_open_files = 1;
options.disable_auto_compactions = true;
Reopen(options);
Options options1;
std::unique_ptr<Env> traced_env(new TraceFileEnv(env_));
options1.env = traced_env.get();
OpenSecondary(options1);
static const auto verify_db = [&]() {
std::unique_ptr<Iterator> iter1(dbfull()->NewIterator(ReadOptions()));
std::unique_ptr<Iterator> iter2(db_secondary_->NewIterator(ReadOptions()));
for (iter1->SeekToFirst(), iter2->SeekToFirst();
iter1->Valid() && iter2->Valid(); iter1->Next(), iter2->Next()) {
ASSERT_EQ(iter1->key(), iter2->key());
ASSERT_EQ(iter1->value(), iter2->value());
}
ASSERT_FALSE(iter1->Valid());
ASSERT_FALSE(iter2->Valid());
};
ASSERT_OK(Put("a", "value"));
ASSERT_OK(Put("c", "value"));
ASSERT_OK(Flush());
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
verify_db();
ASSERT_OK(Put("b", "value"));
ASSERT_OK(Put("d", "value"));
ASSERT_OK(Flush());
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
verify_db();
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
ASSERT_EQ(2, static_cast<TraceFileEnv*>(traced_env.get())->files_closed());
Status s = db_secondary_->SetDBOptions({{"max_open_files", "-1"}});
ASSERT_TRUE(s.IsNotSupported());
CloseSecondary();
}
TEST_F(DBSecondaryTest, OpenAsSecondaryWALTailing) {
Options options;
options.env = env_;

View File

@ -706,6 +706,14 @@ enum {
};
extern ROCKSDB_LIBRARY_API void rocksdb_block_based_options_set_index_type(
rocksdb_block_based_table_options_t*, int); // uses one of the above enums
enum {
rocksdb_block_based_table_data_block_index_type_binary_search = 0,
rocksdb_block_based_table_data_block_index_type_binary_search_and_hash = 1,
};
extern ROCKSDB_LIBRARY_API void rocksdb_block_based_options_set_data_block_index_type(
rocksdb_block_based_table_options_t*, int); // uses one of the above enums
extern ROCKSDB_LIBRARY_API void rocksdb_block_based_options_set_data_block_hash_ratio(
rocksdb_block_based_table_options_t* options, double v);
extern ROCKSDB_LIBRARY_API void
rocksdb_block_based_options_set_hash_index_allow_collision(
rocksdb_block_based_table_options_t*, unsigned char);

View File

@ -577,10 +577,11 @@ void BlockCacheTraceAnalyzer::WriteSkewness(
pairs.push_back(itr);
}
// Sort in descending order.
sort(
pairs.begin(), pairs.end(),
[=](std::pair<std::string, uint64_t>& a,
std::pair<std::string, uint64_t>& b) { return b.second < a.second; });
sort(pairs.begin(), pairs.end(),
[=](const std::pair<std::string, uint64_t>& a,
const std::pair<std::string, uint64_t>& b) {
return b.second < a.second;
});
size_t prev_start_index = 0;
for (auto const& percent : percent_buckets) {