Add inline comments to flush job (#4464)
Summary: It also renames InstallMemtableFlushResults to MaybeInstallMemtableFlushResults to clarify its contract. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4464 Differential Revision: D10224918 Pulled By: maysamyabandeh fbshipit-source-id: 04e3f2d8542002cb9f8010cb436f5152751b3cbe
This commit is contained in:
parent
1fb6805527
commit
21b51dfec4
@ -40,7 +40,7 @@ TEST_F(DBFlushTest, FlushWhileWritingManifest) {
|
|||||||
SyncPoint::GetInstance()->LoadDependency(
|
SyncPoint::GetInstance()->LoadDependency(
|
||||||
{{"VersionSet::LogAndApply:WriteManifest",
|
{{"VersionSet::LogAndApply:WriteManifest",
|
||||||
"DBFlushTest::FlushWhileWritingManifest:1"},
|
"DBFlushTest::FlushWhileWritingManifest:1"},
|
||||||
{"MemTableList::InstallMemtableFlushResults:InProgress",
|
{"MemTableList::TryInstallMemtableFlushResults:InProgress",
|
||||||
"VersionSet::LogAndApply:WriteManifestDone"}});
|
"VersionSet::LogAndApply:WriteManifestDone"}});
|
||||||
SyncPoint::GetInstance()->EnableProcessing();
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
|
||||||
|
@ -229,7 +229,7 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
|
|||||||
} else {
|
} else {
|
||||||
TEST_SYNC_POINT("FlushJob::InstallResults");
|
TEST_SYNC_POINT("FlushJob::InstallResults");
|
||||||
// Replace immutable memtable with the generated Table
|
// Replace immutable memtable with the generated Table
|
||||||
s = cfd_->imm()->InstallMemtableFlushResults(
|
s = cfd_->imm()->TryInstallMemtableFlushResults(
|
||||||
cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
|
cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
|
||||||
meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
|
meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
|
||||||
log_buffer_);
|
log_buffer_);
|
||||||
|
@ -320,8 +320,9 @@ void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
|
|||||||
imm_flush_needed.store(true, std::memory_order_release);
|
imm_flush_needed.store(true, std::memory_order_release);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Record a successful flush in the manifest file
|
// Try record a successful flush in the manifest file. It might just return
|
||||||
Status MemTableList::InstallMemtableFlushResults(
|
// Status::OK letting a concurrent flush to do actual the recording..
|
||||||
|
Status MemTableList::TryInstallMemtableFlushResults(
|
||||||
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
|
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
|
||||||
const autovector<MemTable*>& mems, LogsWithPrepTracker* prep_tracker,
|
const autovector<MemTable*>& mems, LogsWithPrepTracker* prep_tracker,
|
||||||
VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number,
|
VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number,
|
||||||
@ -331,7 +332,9 @@ Status MemTableList::InstallMemtableFlushResults(
|
|||||||
ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
|
ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
|
||||||
mu->AssertHeld();
|
mu->AssertHeld();
|
||||||
|
|
||||||
// flush was successful
|
// Flush was successful
|
||||||
|
// Record the status on the memtable object. Either this call or a call by a
|
||||||
|
// concurrent flush thread will read the status and write it to manifest.
|
||||||
for (size_t i = 0; i < mems.size(); ++i) {
|
for (size_t i = 0; i < mems.size(); ++i) {
|
||||||
// All the edits are associated with the first memtable of this batch.
|
// All the edits are associated with the first memtable of this batch.
|
||||||
assert(i == 0 || mems[i]->GetEdits()->NumEntries() == 0);
|
assert(i == 0 || mems[i]->GetEdits()->NumEntries() == 0);
|
||||||
@ -343,7 +346,7 @@ Status MemTableList::InstallMemtableFlushResults(
|
|||||||
// if some other thread is already committing, then return
|
// if some other thread is already committing, then return
|
||||||
Status s;
|
Status s;
|
||||||
if (commit_in_progress_) {
|
if (commit_in_progress_) {
|
||||||
TEST_SYNC_POINT("MemTableList::InstallMemtableFlushResults:InProgress");
|
TEST_SYNC_POINT("MemTableList::TryInstallMemtableFlushResults:InProgress");
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -354,11 +357,16 @@ Status MemTableList::InstallMemtableFlushResults(
|
|||||||
// while the current thread is writing manifest where mutex is released.
|
// while the current thread is writing manifest where mutex is released.
|
||||||
while (s.ok()) {
|
while (s.ok()) {
|
||||||
auto& memlist = current_->memlist_;
|
auto& memlist = current_->memlist_;
|
||||||
|
// The back is the oldest; if flush_completed_ is not set to it, it means
|
||||||
|
// that we were assigned a more recent memtable. The memtables' flushes must
|
||||||
|
// be recorded in manifest in order. A concurrent flush thread, who is
|
||||||
|
// assigned to flush the oldest memtable, will later wake up and does all
|
||||||
|
// the pending writes to manifest, in order.
|
||||||
if (memlist.empty() || !memlist.back()->flush_completed_) {
|
if (memlist.empty() || !memlist.back()->flush_completed_) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
// scan all memtables from the earliest, and commit those
|
// scan all memtables from the earliest, and commit those
|
||||||
// (in that order) that have finished flushing. Memetables
|
// (in that order) that have finished flushing. Memtables
|
||||||
// are always committed in the order that they were created.
|
// are always committed in the order that they were created.
|
||||||
uint64_t batch_file_number = 0;
|
uint64_t batch_file_number = 0;
|
||||||
size_t batch_count = 0;
|
size_t batch_count = 0;
|
||||||
@ -381,6 +389,7 @@ Status MemTableList::InstallMemtableFlushResults(
|
|||||||
batch_count++;
|
batch_count++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(myabandeh): Not sure how batch_count could be 0 here.
|
||||||
if (batch_count > 0) {
|
if (batch_count > 0) {
|
||||||
if (vset->db_options()->allow_2pc) {
|
if (vset->db_options()->allow_2pc) {
|
||||||
assert(edit_list.size() > 0);
|
assert(edit_list.size() > 0);
|
||||||
@ -406,7 +415,7 @@ Status MemTableList::InstallMemtableFlushResults(
|
|||||||
// The reason is as follows (refer to
|
// The reason is as follows (refer to
|
||||||
// ColumnFamilyTest.FlushAndDropRaceCondition).
|
// ColumnFamilyTest.FlushAndDropRaceCondition).
|
||||||
// If the column family is dropped, then according to LogAndApply, its
|
// If the column family is dropped, then according to LogAndApply, its
|
||||||
// corrresponding flush operation is NOT written to the MANIFEST. This
|
// corresponding flush operation is NOT written to the MANIFEST. This
|
||||||
// means the DB is not aware of the L0 files generated from the flush.
|
// means the DB is not aware of the L0 files generated from the flush.
|
||||||
// By committing the new state, we remove the memtable from the memtable
|
// By committing the new state, we remove the memtable from the memtable
|
||||||
// list. Creating an iterator on this column family will not be able to
|
// list. Creating an iterator on this column family will not be able to
|
||||||
|
@ -208,8 +208,9 @@ class MemTableList {
|
|||||||
void RollbackMemtableFlush(const autovector<MemTable*>& mems,
|
void RollbackMemtableFlush(const autovector<MemTable*>& mems,
|
||||||
uint64_t file_number);
|
uint64_t file_number);
|
||||||
|
|
||||||
// Commit a successful flush in the manifest file
|
// Try commit a successful flush in the manifest file. It might just return
|
||||||
Status InstallMemtableFlushResults(
|
// Status::OK letting a concurrent flush to do the actual the recording.
|
||||||
|
Status TryInstallMemtableFlushResults(
|
||||||
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
|
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
|
||||||
const autovector<MemTable*>& m, LogsWithPrepTracker* prep_tracker,
|
const autovector<MemTable*>& m, LogsWithPrepTracker* prep_tracker,
|
||||||
VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number,
|
VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number,
|
||||||
|
@ -47,7 +47,7 @@ class MemTableListTest : public testing::Test {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Calls MemTableList::InstallMemtableFlushResults() and sets up all
|
// Calls MemTableList::TryInstallMemtableFlushResults() and sets up all
|
||||||
// structures needed to call this function.
|
// structures needed to call this function.
|
||||||
Status Mock_InstallMemtableFlushResults(
|
Status Mock_InstallMemtableFlushResults(
|
||||||
MemTableList* list, const MutableCFOptions& mutable_cf_options,
|
MemTableList* list, const MutableCFOptions& mutable_cf_options,
|
||||||
@ -83,7 +83,7 @@ class MemTableListTest : public testing::Test {
|
|||||||
InstrumentedMutex mutex;
|
InstrumentedMutex mutex;
|
||||||
InstrumentedMutexLock l(&mutex);
|
InstrumentedMutexLock l(&mutex);
|
||||||
LogsWithPrepTracker dummy_prep_tracker;
|
LogsWithPrepTracker dummy_prep_tracker;
|
||||||
return list->InstallMemtableFlushResults(
|
return list->TryInstallMemtableFlushResults(
|
||||||
cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex, 1,
|
cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex, 1,
|
||||||
to_delete, nullptr, &log_buffer);
|
to_delete, nullptr, &log_buffer);
|
||||||
}
|
}
|
||||||
@ -560,7 +560,7 @@ TEST_F(MemTableListTest, FlushPendingTest) {
|
|||||||
// Note: now to_flush contains tables[0,1,2,4]. to_flush2 contains
|
// Note: now to_flush contains tables[0,1,2,4]. to_flush2 contains
|
||||||
// tables[3].
|
// tables[3].
|
||||||
// Current implementation will only commit memtables in the order they were
|
// Current implementation will only commit memtables in the order they were
|
||||||
// created. So InstallMemtableFlushResults will install the first 3 tables
|
// created. So TryInstallMemtableFlushResults will install the first 3 tables
|
||||||
// in to_flush and stop when it encounters a table not yet flushed.
|
// in to_flush and stop when it encounters a table not yet flushed.
|
||||||
ASSERT_EQ(2, list.NumNotFlushed());
|
ASSERT_EQ(2, list.NumNotFlushed());
|
||||||
int num_in_history = std::min(3, max_write_buffer_number_to_maintain);
|
int num_in_history = std::min(3, max_write_buffer_number_to_maintain);
|
||||||
@ -585,7 +585,7 @@ TEST_F(MemTableListTest, FlushPendingTest) {
|
|||||||
ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size());
|
ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size());
|
||||||
|
|
||||||
for (const auto& m : to_delete) {
|
for (const auto& m : to_delete) {
|
||||||
// Refcount should be 0 after calling InstallMemtableFlushResults.
|
// Refcount should be 0 after calling TryInstallMemtableFlushResults.
|
||||||
// Verify this, by Ref'ing then UnRef'ing:
|
// Verify this, by Ref'ing then UnRef'ing:
|
||||||
m->Ref();
|
m->Ref();
|
||||||
ASSERT_EQ(m, m->Unref());
|
ASSERT_EQ(m, m->Unref());
|
||||||
@ -598,7 +598,7 @@ TEST_F(MemTableListTest, FlushPendingTest) {
|
|||||||
ASSERT_EQ(to_delete_size, to_delete.size());
|
ASSERT_EQ(to_delete_size, to_delete.size());
|
||||||
|
|
||||||
for (const auto& m : to_delete) {
|
for (const auto& m : to_delete) {
|
||||||
// Refcount should be 0 after calling InstallMemtableFlushResults.
|
// Refcount should be 0 after calling TryInstallMemtableFlushResults.
|
||||||
// Verify this, by Ref'ing then UnRef'ing:
|
// Verify this, by Ref'ing then UnRef'ing:
|
||||||
m->Ref();
|
m->Ref();
|
||||||
ASSERT_EQ(m, m->Unref());
|
ASSERT_EQ(m, m->Unref());
|
||||||
|
@ -70,7 +70,7 @@ static OperationStageInfo global_op_stage_table[] = {
|
|||||||
{ThreadStatus::STAGE_MEMTABLE_ROLLBACK,
|
{ThreadStatus::STAGE_MEMTABLE_ROLLBACK,
|
||||||
"MemTableList::RollbackMemtableFlush"},
|
"MemTableList::RollbackMemtableFlush"},
|
||||||
{ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS,
|
{ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS,
|
||||||
"MemTableList::InstallMemtableFlushResults"},
|
"MemTableList::TryInstallMemtableFlushResults"},
|
||||||
};
|
};
|
||||||
|
|
||||||
// The structure that describes a state.
|
// The structure that describes a state.
|
||||||
|
Loading…
Reference in New Issue
Block a user