Free obsolete memtables outside the dbmutex.
Summary: Large memory allocations and frees are costly and best done outside the db-mutex. The memtables are already allocated outside the db-mutex but they were being freed while holding the db-mutex. This patch frees obsolete memtables outside the db-mutex. Test Plan: make check db_stress Unit tests pass, I am in the process of running stress tests. Reviewers: haobo, igor, emayanke Reviewed By: haobo CC: reconnect.grayhat, leveldb Differential Revision: https://reviews.facebook.net/D14319
This commit is contained in:
parent
3ce3658411
commit
27bbef1180
@ -300,6 +300,9 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
|
||||
}
|
||||
|
||||
DBImpl::~DBImpl() {
|
||||
std::vector<MemTable*> to_delete;
|
||||
to_delete.reserve(options_.max_write_buffer_number);
|
||||
|
||||
// Wait for background work to finish
|
||||
if (flush_on_destroy_ && mem_->GetFirstSequenceNumber() != 0) {
|
||||
FlushMemTable(FlushOptions());
|
||||
@ -317,8 +320,14 @@ DBImpl::~DBImpl() {
|
||||
env_->UnlockFile(db_lock_);
|
||||
}
|
||||
|
||||
if (mem_ != nullptr) mem_->Unref();
|
||||
imm_.UnrefAll();
|
||||
if (mem_ != nullptr) {
|
||||
delete mem_->Unref();
|
||||
}
|
||||
|
||||
imm_.UnrefAll(&to_delete);
|
||||
for (MemTable* m: to_delete) {
|
||||
delete m;
|
||||
}
|
||||
LogFlush(options_.info_log);
|
||||
}
|
||||
|
||||
@ -954,7 +963,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
|
||||
// file-systems cause the DB::Open() to fail.
|
||||
break;
|
||||
}
|
||||
mem->Unref();
|
||||
delete mem->Unref();
|
||||
mem = nullptr;
|
||||
}
|
||||
}
|
||||
@ -965,7 +974,9 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
|
||||
// file-systems cause the DB::Open() to fail.
|
||||
}
|
||||
|
||||
if (mem != nullptr && !external_table) mem->Unref();
|
||||
if (mem != nullptr && !external_table) {
|
||||
delete mem->Unref();
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
@ -2480,9 +2491,14 @@ struct IterState {
|
||||
|
||||
static void CleanupIteratorState(void* arg1, void* arg2) {
|
||||
IterState* state = reinterpret_cast<IterState*>(arg1);
|
||||
std::vector<MemTable*> to_delete;
|
||||
to_delete.reserve(state->mem.size());
|
||||
state->mu->Lock();
|
||||
for (unsigned int i = 0; i < state->mem.size(); i++) {
|
||||
state->mem[i]->Unref();
|
||||
MemTable* m = state->mem[i]->Unref();
|
||||
if (m != nullptr) {
|
||||
to_delete.push_back(m);
|
||||
}
|
||||
}
|
||||
state->version->Unref();
|
||||
// delete only the sst obsolete files
|
||||
@ -2491,6 +2507,9 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
|
||||
state->db->FindObsoleteFiles(deletion_state, false, true);
|
||||
state->mu->Unlock();
|
||||
state->db->PurgeObsoleteFiles(deletion_state);
|
||||
|
||||
// delete obsolete memtables outside the db-mutex
|
||||
for (MemTable* m : to_delete) delete m;
|
||||
delete state;
|
||||
}
|
||||
} // namespace
|
||||
@ -2558,6 +2577,8 @@ Status DBImpl::GetImpl(const ReadOptions& options,
|
||||
|
||||
StopWatch sw(env_, options_.statistics.get(), DB_GET);
|
||||
SequenceNumber snapshot;
|
||||
std::vector<MemTable*> to_delete;
|
||||
to_delete.reserve(options_.max_write_buffer_number);
|
||||
mutex_.Lock();
|
||||
if (options.snapshot != nullptr) {
|
||||
snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
|
||||
@ -2600,11 +2621,15 @@ Status DBImpl::GetImpl(const ReadOptions& options,
|
||||
have_stat_update && current->UpdateStats(stats)) {
|
||||
MaybeScheduleFlushOrCompaction();
|
||||
}
|
||||
mem->Unref();
|
||||
imm.UnrefAll();
|
||||
MemTable* m = mem->Unref();
|
||||
imm.UnrefAll(&to_delete);
|
||||
current->Unref();
|
||||
mutex_.Unlock();
|
||||
|
||||
// free up all obsolete memtables outside the mutex
|
||||
delete m;
|
||||
for (MemTable* v: to_delete) delete v;
|
||||
|
||||
LogFlush(options_.info_log);
|
||||
// Note, tickers are atomic now - no lock protection needed any more.
|
||||
RecordTick(options_.statistics.get(), NUMBER_KEYS_READ);
|
||||
@ -2618,6 +2643,9 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
|
||||
|
||||
StopWatch sw(env_, options_.statistics.get(), DB_MULTIGET);
|
||||
SequenceNumber snapshot;
|
||||
std::vector<MemTable*> to_delete;
|
||||
to_delete.reserve(options_.max_write_buffer_number);
|
||||
|
||||
mutex_.Lock();
|
||||
if (options.snapshot != nullptr) {
|
||||
snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
|
||||
@ -2679,11 +2707,15 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
|
||||
have_stat_update && current->UpdateStats(stats)) {
|
||||
MaybeScheduleFlushOrCompaction();
|
||||
}
|
||||
mem->Unref();
|
||||
imm.UnrefAll();
|
||||
MemTable* m = mem->Unref();
|
||||
imm.UnrefAll(&to_delete);
|
||||
current->Unref();
|
||||
mutex_.Unlock();
|
||||
|
||||
// free up all obsolete memtables outside the mutex
|
||||
delete m;
|
||||
for (MemTable* v: to_delete) delete v;
|
||||
|
||||
LogFlush(options_.info_log);
|
||||
RecordTick(options_.statistics.get(), NUMBER_MULTIGET_CALLS);
|
||||
RecordTick(options_.statistics.get(), NUMBER_MULTIGET_KEYS_READ, numKeys);
|
||||
|
@ -39,16 +39,20 @@ class MemTable {
|
||||
int numlevel = 7,
|
||||
const Options& options = Options());
|
||||
|
||||
~MemTable();
|
||||
|
||||
// Increase reference count.
|
||||
void Ref() { ++refs_; }
|
||||
|
||||
// Drop reference count. Delete if no more references exist.
|
||||
void Unref() {
|
||||
// Drop reference count.
|
||||
// If the refcount goes to zero return this memtable, otherwise return null
|
||||
MemTable* Unref() {
|
||||
--refs_;
|
||||
assert(refs_ >= 0);
|
||||
if (refs_ <= 0) {
|
||||
delete this;
|
||||
return this;
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Returns an estimate of the number of bytes of data in use by this
|
||||
@ -129,7 +133,6 @@ class MemTable {
|
||||
void MarkImmutable() { table_->MarkReadOnly(); }
|
||||
|
||||
private:
|
||||
~MemTable(); // Private since only Unref() should be used to delete it
|
||||
friend class MemTableIterator;
|
||||
friend class MemTableBackwardIterator;
|
||||
friend class MemTableList;
|
||||
|
@ -28,10 +28,15 @@ void MemTableList::RefAll() {
|
||||
}
|
||||
}
|
||||
|
||||
// Drop reference count on all underling memtables
|
||||
void MemTableList::UnrefAll() {
|
||||
// Drop reference count on all underling memtables. If the
|
||||
// refcount of an underlying memtable drops to zero, then
|
||||
// return it in to_delete vector.
|
||||
void MemTableList::UnrefAll(std::vector<MemTable*>* to_delete) {
|
||||
for (auto &memtable : memlist_) {
|
||||
memtable->Unref();
|
||||
MemTable* m = memtable->Unref();
|
||||
if (m != nullptr) {
|
||||
to_delete->push_back(m);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -44,8 +44,10 @@ class MemTableList {
|
||||
// Increase reference count on all underling memtables
|
||||
void RefAll();
|
||||
|
||||
// Drop reference count on all underling memtables
|
||||
void UnrefAll();
|
||||
// Drop reference count on all underling memtables. If the refcount
|
||||
// on an underlying memtable drops to zero, then return it in
|
||||
// to_delete vector.
|
||||
void UnrefAll(std::vector<MemTable*>* to_delete);
|
||||
|
||||
// Returns the total number of memtables in the list
|
||||
int size();
|
||||
|
Loading…
x
Reference in New Issue
Block a user