Improve scalability of DB::GetSnapshot()
Summary: Now DB::GetSnapshot() doesn't scale to more column families, as it needs to go through all the column families to find whether snapshot is supported. This patch optimizes it. Test Plan: Add unit tests to cover negative cases. make all check Reviewers: yhchiang, rven, igor Reviewed By: igor Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D30093
This commit is contained in:
parent
ee95cae9a4
commit
d7a486668c
@ -731,6 +731,27 @@ TEST(ColumnFamilyTest, DifferentWriteBufferSizes) {
|
|||||||
Close();
|
Close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(ColumnFamilyTest, MemtableNotSupportSnapshot) {
|
||||||
|
Open();
|
||||||
|
auto* s1 = dbfull()->GetSnapshot();
|
||||||
|
ASSERT_TRUE(s1 != nullptr);
|
||||||
|
dbfull()->ReleaseSnapshot(s1);
|
||||||
|
|
||||||
|
// Add a column family that doesn't support snapshot
|
||||||
|
ColumnFamilyOptions first;
|
||||||
|
first.memtable_factory.reset(NewHashCuckooRepFactory(1024 * 1024));
|
||||||
|
CreateColumnFamilies({"first"}, {first});
|
||||||
|
auto* s2 = dbfull()->GetSnapshot();
|
||||||
|
ASSERT_TRUE(s2 == nullptr);
|
||||||
|
|
||||||
|
// Add a column family that supports snapshot. Snapshot stays not supported.
|
||||||
|
ColumnFamilyOptions second;
|
||||||
|
CreateColumnFamilies({"second"}, {second});
|
||||||
|
auto* s3 = dbfull()->GetSnapshot();
|
||||||
|
ASSERT_TRUE(s3 == nullptr);
|
||||||
|
Close();
|
||||||
|
}
|
||||||
|
|
||||||
TEST(ColumnFamilyTest, DifferentMergeOperators) {
|
TEST(ColumnFamilyTest, DifferentMergeOperators) {
|
||||||
Open();
|
Open();
|
||||||
CreateColumnFamilies({"first", "second"});
|
CreateColumnFamilies({"first", "second"});
|
||||||
|
@ -202,6 +202,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
|
|||||||
default_cf_handle_(nullptr),
|
default_cf_handle_(nullptr),
|
||||||
total_log_size_(0),
|
total_log_size_(0),
|
||||||
max_total_in_memory_state_(0),
|
max_total_in_memory_state_(0),
|
||||||
|
is_snapshot_supported_(true),
|
||||||
write_buffer_(options.db_write_buffer_size),
|
write_buffer_(options.db_write_buffer_size),
|
||||||
tmp_batch_(),
|
tmp_batch_(),
|
||||||
bg_schedule_needed_(false),
|
bg_schedule_needed_(false),
|
||||||
@ -1305,8 +1306,8 @@ Status DBImpl::CompactFilesImpl(
|
|||||||
CompactionJob compaction_job(c.get(), db_options_, *c->mutable_cf_options(),
|
CompactionJob compaction_job(c.get(), db_options_, *c->mutable_cf_options(),
|
||||||
env_options_, versions_.get(), &shutting_down_,
|
env_options_, versions_.get(), &shutting_down_,
|
||||||
&log_buffer, db_directory_.get(), stats_,
|
&log_buffer, db_directory_.get(), stats_,
|
||||||
&snapshots_, IsSnapshotSupported(), table_cache_,
|
&snapshots_, is_snapshot_supported_,
|
||||||
std::move(yield_callback));
|
table_cache_, std::move(yield_callback));
|
||||||
compaction_job.Prepare();
|
compaction_job.Prepare();
|
||||||
|
|
||||||
mutex_.Unlock();
|
mutex_.Unlock();
|
||||||
@ -2090,7 +2091,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
|
|||||||
CompactionJob compaction_job(c.get(), db_options_, *c->mutable_cf_options(),
|
CompactionJob compaction_job(c.get(), db_options_, *c->mutable_cf_options(),
|
||||||
env_options_, versions_.get(), &shutting_down_,
|
env_options_, versions_.get(), &shutting_down_,
|
||||||
log_buffer, db_directory_.get(), stats_,
|
log_buffer, db_directory_.get(), stats_,
|
||||||
&snapshots_, IsSnapshotSupported(),
|
&snapshots_, is_snapshot_supported_,
|
||||||
table_cache_, std::move(yield_callback));
|
table_cache_, std::move(yield_callback));
|
||||||
compaction_job.Prepare();
|
compaction_job.Prepare();
|
||||||
mutex_.Unlock();
|
mutex_.Unlock();
|
||||||
@ -2494,6 +2495,11 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
|
|||||||
assert(cfd != nullptr);
|
assert(cfd != nullptr);
|
||||||
delete InstallSuperVersion(
|
delete InstallSuperVersion(
|
||||||
cfd, nullptr, *cfd->GetLatestMutableCFOptions());
|
cfd, nullptr, *cfd->GetLatestMutableCFOptions());
|
||||||
|
|
||||||
|
if (!cfd->mem()->IsSnapshotSupported()) {
|
||||||
|
is_snapshot_supported_ = false;
|
||||||
|
}
|
||||||
|
|
||||||
*handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
|
*handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
|
||||||
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
|
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
|
||||||
"Created column family [%s] (ID %u)",
|
"Created column family [%s] (ID %u)",
|
||||||
@ -2520,6 +2526,8 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
|
|||||||
return Status::InvalidArgument("Can't drop default column family");
|
return Status::InvalidArgument("Can't drop default column family");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool cf_support_snapshot = cfd->mem()->IsSnapshotSupported();
|
||||||
|
|
||||||
VersionEdit edit;
|
VersionEdit edit;
|
||||||
edit.DropColumnFamily();
|
edit.DropColumnFamily();
|
||||||
edit.SetColumnFamily(cfd->GetID());
|
edit.SetColumnFamily(cfd->GetID());
|
||||||
@ -2539,6 +2547,19 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
|
|||||||
&edit, &mutex_);
|
&edit, &mutex_);
|
||||||
write_thread_.ExitWriteThread(&w, &w, s);
|
write_thread_.ExitWriteThread(&w, &w, s);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!cf_support_snapshot) {
|
||||||
|
// Dropped Column Family doesn't support snapshot. Need to recalculate
|
||||||
|
// is_snapshot_supported_.
|
||||||
|
bool new_is_snapshot_supported = true;
|
||||||
|
for (auto c : *versions_->GetColumnFamilySet()) {
|
||||||
|
if (!c->mem()->IsSnapshotSupported()) {
|
||||||
|
new_is_snapshot_supported = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
is_snapshot_supported_ = new_is_snapshot_supported;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
@ -2712,22 +2733,13 @@ Status DBImpl::NewIterators(
|
|||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool DBImpl::IsSnapshotSupported() const {
|
|
||||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
|
||||||
if (!cfd->mem()->IsSnapshotSupported()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
const Snapshot* DBImpl::GetSnapshot() {
|
const Snapshot* DBImpl::GetSnapshot() {
|
||||||
int64_t unix_time = 0;
|
int64_t unix_time = 0;
|
||||||
env_->GetCurrentTime(&unix_time); // Ignore error
|
env_->GetCurrentTime(&unix_time); // Ignore error
|
||||||
|
|
||||||
MutexLock l(&mutex_);
|
MutexLock l(&mutex_);
|
||||||
// returns null if the underlying memtable does not support snapshot.
|
// returns null if the underlying memtable does not support snapshot.
|
||||||
if (!IsSnapshotSupported()) return nullptr;
|
if (!is_snapshot_supported_) return nullptr;
|
||||||
return snapshots_.New(versions_->LastSequence(), unix_time);
|
return snapshots_.New(versions_->LastSequence(), unix_time);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3622,6 +3634,9 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (!cfd->mem()->IsSnapshotSupported()) {
|
||||||
|
impl->is_snapshot_supported_ = false;
|
||||||
|
}
|
||||||
if (cfd->ioptions()->merge_operator != nullptr &&
|
if (cfd->ioptions()->merge_operator != nullptr &&
|
||||||
!cfd->mem()->IsMergeOperatorSupported()) {
|
!cfd->mem()->IsMergeOperatorSupported()) {
|
||||||
s = Status::InvalidArgument(
|
s = Status::InvalidArgument(
|
||||||
|
@ -383,13 +383,6 @@ class DBImpl : public DB {
|
|||||||
// dump rocksdb.stats to LOG
|
// dump rocksdb.stats to LOG
|
||||||
void MaybeDumpStats();
|
void MaybeDumpStats();
|
||||||
|
|
||||||
// Return true if the current db supports snapshot. If the current
|
|
||||||
// DB does not support snapshot, then calling GetSnapshot() will always
|
|
||||||
// return nullptr.
|
|
||||||
//
|
|
||||||
// @see GetSnapshot()
|
|
||||||
virtual bool IsSnapshotSupported() const;
|
|
||||||
|
|
||||||
// Return the minimum empty level that could hold the total data in the
|
// Return the minimum empty level that could hold the total data in the
|
||||||
// input level. Return the input level, if such level could not be found.
|
// input level. Return the input level, if such level could not be found.
|
||||||
int FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd,
|
int FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd,
|
||||||
@ -441,6 +434,8 @@ class DBImpl : public DB {
|
|||||||
// some code-paths
|
// some code-paths
|
||||||
bool single_column_family_mode_;
|
bool single_column_family_mode_;
|
||||||
|
|
||||||
|
bool is_snapshot_supported_;
|
||||||
|
|
||||||
std::unique_ptr<Directory> db_directory_;
|
std::unique_ptr<Directory> db_directory_;
|
||||||
|
|
||||||
WriteBuffer write_buffer_;
|
WriteBuffer write_buffer_;
|
||||||
|
@ -1622,6 +1622,11 @@ TEST(DBTest, GetSnapshot) {
|
|||||||
std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x');
|
std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x');
|
||||||
ASSERT_OK(Put(1, key, "v1"));
|
ASSERT_OK(Put(1, key, "v1"));
|
||||||
const Snapshot* s1 = db_->GetSnapshot();
|
const Snapshot* s1 = db_->GetSnapshot();
|
||||||
|
if (option_config_ == kHashCuckoo) {
|
||||||
|
// NOt supported case.
|
||||||
|
ASSERT_TRUE(s1 == nullptr);
|
||||||
|
break;
|
||||||
|
}
|
||||||
ASSERT_OK(Put(1, key, "v2"));
|
ASSERT_OK(Put(1, key, "v2"));
|
||||||
ASSERT_EQ("v2", Get(1, key));
|
ASSERT_EQ("v2", Get(1, key));
|
||||||
ASSERT_EQ("v1", Get(1, key, s1));
|
ASSERT_EQ("v1", Get(1, key, s1));
|
||||||
@ -1630,8 +1635,7 @@ TEST(DBTest, GetSnapshot) {
|
|||||||
ASSERT_EQ("v1", Get(1, key, s1));
|
ASSERT_EQ("v1", Get(1, key, s1));
|
||||||
db_->ReleaseSnapshot(s1);
|
db_->ReleaseSnapshot(s1);
|
||||||
}
|
}
|
||||||
// skip as HashCuckooRep does not support snapshot
|
} while (ChangeOptions());
|
||||||
} while (ChangeOptions(kSkipHashCuckoo));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(DBTest, GetSnapshotLink) {
|
TEST(DBTest, GetSnapshotLink) {
|
||||||
|
Loading…
Reference in New Issue
Block a user