DB::Flush() Do not wait for background threads when there is nothing in mem table
Summary: When we have multiple column families, users can issue Flush() on every column families to make sure everything is flushes, even if some of them might be empty. By skipping the waiting for empty cases, it can be greatly speed up. Still wait for people's comments before writing unit tests for it. Test Plan: Will write a unit test to make sure it is correct. Reviewers: ljin, yhchiang, igor Reviewed By: igor Subscribers: leveldb, dhruba Differential Revision: https://reviews.facebook.net/D22953
This commit is contained in:
parent
a2bb7c3c33
commit
011241bb99
@ -375,7 +375,7 @@ DBImpl::~DBImpl() {
|
||||
mutex_.Lock();
|
||||
if (flush_on_destroy_) {
|
||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||
if (cfd->mem()->GetFirstSequenceNumber() != 0) {
|
||||
if (!cfd->mem()->IsEmpty()) {
|
||||
cfd->Ref();
|
||||
mutex_.Unlock();
|
||||
FlushMemTable(cfd, FlushOptions());
|
||||
@ -1905,6 +1905,12 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
|
||||
{
|
||||
WriteContext context;
|
||||
MutexLock guard_lock(&mutex_);
|
||||
|
||||
if (cfd->imm()->size() == 0 && cfd->mem()->IsEmpty()) {
|
||||
// Nothing to flush
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
s = BeginWrite(&w, 0);
|
||||
assert(s.ok() && !w.done); // No timeout and nobody should do our job
|
||||
|
||||
|
@ -2535,6 +2535,49 @@ class SleepingBackgroundTask {
|
||||
bool done_with_sleep_;
|
||||
};
|
||||
|
||||
TEST(DBTest, FlushEmptyColumnFamily) {
|
||||
// Block flush thread and disable compaction thread
|
||||
env_->SetBackgroundThreads(1, Env::HIGH);
|
||||
env_->SetBackgroundThreads(1, Env::LOW);
|
||||
SleepingBackgroundTask sleeping_task_low;
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
Env::Priority::LOW);
|
||||
SleepingBackgroundTask sleeping_task_high;
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_high,
|
||||
Env::Priority::HIGH);
|
||||
|
||||
Options options = CurrentOptions();
|
||||
// disable compaction
|
||||
options.disable_auto_compactions = true;
|
||||
WriteOptions writeOpt = WriteOptions();
|
||||
writeOpt.disableWAL = true;
|
||||
options.max_write_buffer_number = 2;
|
||||
options.min_write_buffer_number_to_merge = 1;
|
||||
CreateAndReopenWithCF({"pikachu"}, &options);
|
||||
|
||||
// Compaction can still go through even if no thread can flush the
|
||||
// mem table.
|
||||
ASSERT_OK(Flush(0));
|
||||
ASSERT_OK(Flush(1));
|
||||
|
||||
// Insert can go through
|
||||
ASSERT_OK(dbfull()->Put(writeOpt, handles_[0], "foo", "v1"));
|
||||
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
|
||||
|
||||
ASSERT_EQ("v1", Get(0, "foo"));
|
||||
ASSERT_EQ("v1", Get(1, "bar"));
|
||||
|
||||
sleeping_task_high.WakeUp();
|
||||
sleeping_task_high.WaitUntilDone();
|
||||
|
||||
// Flush can still go through.
|
||||
ASSERT_OK(Flush(0));
|
||||
ASSERT_OK(Flush(1));
|
||||
|
||||
sleeping_task_low.WakeUp();
|
||||
sleeping_task_low.WaitUntilDone();
|
||||
}
|
||||
|
||||
TEST(DBTest, GetProperty) {
|
||||
// Set sizes to both background thread pool to be 1 and block them.
|
||||
env_->SetBackgroundThreads(1, Env::HIGH);
|
||||
|
@ -414,7 +414,7 @@ static bool SaveValue(void* arg, const char* entry) {
|
||||
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
|
||||
MergeContext& merge_context, const Options& options) {
|
||||
// The sequence number is updated synchronously in version_set.h
|
||||
if (first_seqno_ == 0) {
|
||||
if (IsEmpty()) {
|
||||
// Avoiding recording stats for speed.
|
||||
return false;
|
||||
}
|
||||
|
@ -137,6 +137,9 @@ class MemTable {
|
||||
// Returns the edits area that is needed for flushing the memtable
|
||||
VersionEdit* GetEdits() { return &edit_; }
|
||||
|
||||
// Returns if there is no entry inserted to the mem table.
|
||||
bool IsEmpty() const { return first_seqno_ == 0; }
|
||||
|
||||
// Returns the sequence number of the first element that was inserted
|
||||
// into the memtable
|
||||
SequenceNumber GetFirstSequenceNumber() { return first_seqno_; }
|
||||
|
Loading…
Reference in New Issue
Block a user