use CAS when returning SuperVersion to ThreadLocal

Summary:
Add a check at the end of GetImpl to release SuperVersion if it becomes
obsolete. Also do Scrape() inside InstallSuperVersion so it happens more
frequent.

Test Plan:
make all check
running asan_check now

Reviewers: igor, haobo, sdong, dhruba

Reviewed By: haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D16641
This commit is contained in:
Lei Jin 2014-03-07 14:43:22 -08:00
parent ebe2527f9a
commit e5fa4944fc
6 changed files with 112 additions and 25 deletions

View File

@ -63,6 +63,10 @@
namespace rocksdb { namespace rocksdb {
int DBImpl::SuperVersion::dummy = 0;
void* const DBImpl::SuperVersion::kSVInUse = &DBImpl::SuperVersion::dummy;
void* const DBImpl::SuperVersion::kSVObsolete = nullptr;
void DumpLeveldbBuildVersion(Logger * log); void DumpLeveldbBuildVersion(Logger * log);
// Information kept for every waiting writer // Information kept for every waiting writer
@ -1327,10 +1331,6 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
if (s.ok()) { if (s.ok()) {
InstallSuperVersion(deletion_state); InstallSuperVersion(deletion_state);
// Reset SuperVersions cached in thread local storage
if (options_.allow_thread_local) {
ResetThreadLocalSuperVersions(&deletion_state);
}
if (madeProgress) { if (madeProgress) {
*madeProgress = 1; *madeProgress = 1;
} }
@ -2874,6 +2874,10 @@ void DBImpl::InstallSuperVersion(DeletionState& deletion_state) {
SuperVersion* old_superversion = InstallSuperVersion(new_superversion); SuperVersion* old_superversion = InstallSuperVersion(new_superversion);
deletion_state.new_superversion = nullptr; deletion_state.new_superversion = nullptr;
deletion_state.superversions_to_free.push_back(old_superversion); deletion_state.superversions_to_free.push_back(old_superversion);
// Reset SuperVersions cached in thread local storage
if (options_.allow_thread_local) {
ResetThreadLocalSuperVersions(&deletion_state);
}
} }
DBImpl::SuperVersion* DBImpl::InstallSuperVersion( DBImpl::SuperVersion* DBImpl::InstallSuperVersion(
@ -2896,9 +2900,12 @@ DBImpl::SuperVersion* DBImpl::InstallSuperVersion(
void DBImpl::ResetThreadLocalSuperVersions(DeletionState* deletion_state) { void DBImpl::ResetThreadLocalSuperVersions(DeletionState* deletion_state) {
mutex_.AssertHeld(); mutex_.AssertHeld();
autovector<void*> sv_ptrs; autovector<void*> sv_ptrs;
local_sv_->Scrape(&sv_ptrs); local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
for (auto ptr : sv_ptrs) { for (auto ptr : sv_ptrs) {
assert(ptr); assert(ptr);
if (ptr == SuperVersion::kSVInUse) {
continue;
}
auto sv = static_cast<SuperVersion*>(ptr); auto sv = static_cast<SuperVersion*>(ptr);
if (static_cast<SuperVersion*>(ptr)->Unref()) { if (static_cast<SuperVersion*>(ptr)->Unref()) {
sv->Cleanup(); sv->Cleanup();
@ -2936,10 +2943,17 @@ Status DBImpl::GetImpl(const ReadOptions& options,
// is being used while a new SuperVersion is installed, the cached // is being used while a new SuperVersion is installed, the cached
// SuperVersion can become stale. It will eventually get refreshed either // SuperVersion can become stale. It will eventually get refreshed either
// on the next GetImpl() call or next SuperVersion installation. // on the next GetImpl() call or next SuperVersion installation.
sv = static_cast<SuperVersion*>(local_sv_->Swap(nullptr)); void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
if (!sv || sv->version_number != // Invariant:
super_version_number_.load(std::memory_order_relaxed)) { // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_UPDATES); // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
// should only keep kSVInUse during a GetImpl.
assert(ptr != SuperVersion::kSVInUse);
sv = static_cast<SuperVersion*>(ptr);
if (sv == SuperVersion::kSVObsolete ||
sv->version_number != super_version_number_.load(
std::memory_order_relaxed)) {
RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_ACQUIRES);
SuperVersion* sv_to_delete = nullptr; SuperVersion* sv_to_delete = nullptr;
if (sv && sv->Unref()) { if (sv && sv->Unref()) {
@ -2999,11 +3013,25 @@ Status DBImpl::GetImpl(const ReadOptions& options,
mutex_.Unlock(); mutex_.Unlock();
} }
// Release SuperVersion bool unref_sv = true;
if (LIKELY(options_.allow_thread_local)) { if (LIKELY(options_.allow_thread_local)) {
// Put the SuperVersion back // Put the SuperVersion back
local_sv_->Reset(static_cast<void*>(sv)); void* expected = SuperVersion::kSVInUse;
if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
// When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
// storage has not been altered and no Scrape has happend. The
// SuperVersion is still current.
unref_sv = false;
} else { } else {
// ThreadLocal scrape happened in the process of this GetImpl call (after
// thread local Swap() at the beginning and before CompareAndSwap()).
// This means the SuperVersion it holds is obsolete.
assert(expected == SuperVersion::kSVObsolete);
}
}
if (unref_sv) {
// Release SuperVersion
bool delete_sv = false; bool delete_sv = false;
if (sv->Unref()) { if (sv->Unref()) {
mutex_.Lock(); mutex_.Lock();
@ -3014,6 +3042,7 @@ Status DBImpl::GetImpl(const ReadOptions& options,
if (delete_sv) { if (delete_sv) {
delete sv; delete sv;
} }
RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_RELEASES);
} }
// Note, tickers are atomic now - no lock protection needed any more. // Note, tickers are atomic now - no lock protection needed any more.

View File

@ -174,9 +174,22 @@ class DBImpl : public DB {
void Cleanup(); void Cleanup();
void Init(MemTable* new_mem, MemTableListVersion* new_imm, void Init(MemTable* new_mem, MemTableListVersion* new_imm,
Version* new_current); Version* new_current);
// The value of dummy is not actually used. kSVInUse takes its address as a
// mark in the thread local storage to indicate the SuperVersion is in use
// by thread. This way, the value of kSVInUse is guaranteed to have no
// conflict with SuperVersion object address and portable on different
// platform.
static int dummy;
static void* const kSVInUse;
static void* const kSVObsolete;
}; };
static void SuperVersionUnrefHandle(void* ptr) { static void SuperVersionUnrefHandle(void* ptr) {
// UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
// destroyed. When former happens, the thread shouldn't see kSVInUse.
// When latter happens, we are in ~DBImpl(), no get should happen as well.
assert(ptr != SuperVersion::kSVInUse);
DBImpl::SuperVersion* sv = static_cast<DBImpl::SuperVersion*>(ptr); DBImpl::SuperVersion* sv = static_cast<DBImpl::SuperVersion*>(ptr);
if (sv->Unref()) { if (sv->Unref()) {
sv->db->mutex_.Lock(); sv->db->mutex_.Lock();

View File

@ -122,7 +122,8 @@ enum Tickers {
// Number of table's properties loaded directly from file, without creating // Number of table's properties loaded directly from file, without creating
// table reader object. // table reader object.
NUMBER_DIRECT_LOAD_TABLE_PROPERTIES, NUMBER_DIRECT_LOAD_TABLE_PROPERTIES,
NUMBER_SUPERVERSION_UPDATES, NUMBER_SUPERVERSION_ACQUIRES,
NUMBER_SUPERVERSION_RELEASES,
TICKER_ENUM_MAX TICKER_ENUM_MAX
}; };
@ -178,7 +179,8 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{COMPACT_WRITE_BYTES, "rocksdb.compact.write.bytes"}, {COMPACT_WRITE_BYTES, "rocksdb.compact.write.bytes"},
{NUMBER_DIRECT_LOAD_TABLE_PROPERTIES, {NUMBER_DIRECT_LOAD_TABLE_PROPERTIES,
"rocksdb.number.direct.load.table.properties"}, "rocksdb.number.direct.load.table.properties"},
{NUMBER_SUPERVERSION_UPDATES, "rocksdb.number.superversion_updates"}, {NUMBER_SUPERVERSION_ACQUIRES, "rocksdb.number.superversion_acquires"},
{NUMBER_SUPERVERSION_RELEASES, "rocksdb.number.superversion_releases"},
}; };
/** /**

View File

@ -138,12 +138,25 @@ void* ThreadLocalPtr::StaticMeta::Swap(uint32_t id, void* ptr) {
return tls->entries[id].ptr.exchange(ptr, std::memory_order_relaxed); return tls->entries[id].ptr.exchange(ptr, std::memory_order_relaxed);
} }
void ThreadLocalPtr::StaticMeta::Scrape(uint32_t id, autovector<void*>* ptrs) { bool ThreadLocalPtr::StaticMeta::CompareAndSwap(uint32_t id, void* ptr,
void*& expected) {
auto* tls = GetThreadLocal();
if (UNLIKELY(id >= tls->entries.size())) {
// Need mutex to protect entries access within ReclaimId
MutexLock l(&mutex_);
tls->entries.resize(id + 1);
}
return tls->entries[id].ptr.compare_exchange_strong(expected, ptr,
std::memory_order_relaxed, std::memory_order_relaxed);
}
void ThreadLocalPtr::StaticMeta::Scrape(uint32_t id, autovector<void*>* ptrs,
void* const replacement) {
MutexLock l(&mutex_); MutexLock l(&mutex_);
for (ThreadData* t = head_.next; t != &head_; t = t->next) { for (ThreadData* t = head_.next; t != &head_; t = t->next) {
if (id < t->entries.size()) { if (id < t->entries.size()) {
void* ptr = void* ptr =
t->entries[id].ptr.exchange(nullptr, std::memory_order_relaxed); t->entries[id].ptr.exchange(replacement, std::memory_order_relaxed);
if (ptr != nullptr) { if (ptr != nullptr) {
ptrs->push_back(ptr); ptrs->push_back(ptr);
} }
@ -225,8 +238,12 @@ void* ThreadLocalPtr::Swap(void* ptr) {
return StaticMeta::Instance()->Swap(id_, ptr); return StaticMeta::Instance()->Swap(id_, ptr);
} }
void ThreadLocalPtr::Scrape(autovector<void*>* ptrs) { bool ThreadLocalPtr::CompareAndSwap(void* ptr, void*& expected) {
StaticMeta::Instance()->Scrape(id_, ptrs); return StaticMeta::Instance()->CompareAndSwap(id_, ptr, expected);
}
void ThreadLocalPtr::Scrape(autovector<void*>* ptrs, void* const replacement) {
StaticMeta::Instance()->Scrape(id_, ptrs, replacement);
} }
} // namespace rocksdb } // namespace rocksdb

View File

@ -47,9 +47,15 @@ class ThreadLocalPtr {
// Atomically swap the supplied ptr and return the previous value // Atomically swap the supplied ptr and return the previous value
void* Swap(void* ptr); void* Swap(void* ptr);
// Return non-nullptr data for all existing threads and reset them // Atomically compare the stored value with expected. Set the new
// to nullptr // pointer value to thread local only if the comparision is true.
void Scrape(autovector<void*>* ptrs); // Otherwise, expected returns the stored value.
// Return true on success, false on failure
bool CompareAndSwap(void* ptr, void*& expected);
// Reset all thread local data to replacement, and return non-nullptr
// data for all existing threads
void Scrape(autovector<void*>* ptrs, void* const replacement);
protected: protected:
struct Entry { struct Entry {
@ -99,8 +105,12 @@ class ThreadLocalPtr {
void Reset(uint32_t id, void* ptr); void Reset(uint32_t id, void* ptr);
// Atomically swap the supplied ptr and return the previous value // Atomically swap the supplied ptr and return the previous value
void* Swap(uint32_t id, void* ptr); void* Swap(uint32_t id, void* ptr);
// Return data for all existing threads and return them to nullptr // Atomically compare and swap the provided value only if it equals
void Scrape(uint32_t id, autovector<void*>* ptrs); // to expected value.
bool CompareAndSwap(uint32_t id, void* ptr, void*& expected);
// Reset all thread local data to replacement, and return non-nullptr
// data for all existing threads
void Scrape(uint32_t id, autovector<void*>* ptrs, void* const replacement);
// Register the UnrefHandler for id // Register the UnrefHandler for id
void SetHandler(uint32_t id, UnrefHandler handler); void SetHandler(uint32_t id, UnrefHandler handler);

View File

@ -435,8 +435,8 @@ TEST(ThreadLocalTest, Scrape) {
// Scrape all thread local data. No unref at thread // Scrape all thread local data. No unref at thread
// exit or ThreadLocalPtr destruction // exit or ThreadLocalPtr destruction
autovector<void*> ptrs; autovector<void*> ptrs;
p.tls1.Scrape(&ptrs); p.tls1.Scrape(&ptrs, nullptr);
p.tls2->Scrape(&ptrs); p.tls2->Scrape(&ptrs, nullptr);
delete p.tls2; delete p.tls2;
// Signal to exit // Signal to exit
mu.Lock(); mu.Lock();
@ -449,6 +449,22 @@ TEST(ThreadLocalTest, Scrape) {
} }
} }
TEST(ThreadLocalTest, CompareAndSwap) {
ThreadLocalPtr tls;
ASSERT_TRUE(tls.Swap(reinterpret_cast<void*>(1)) == nullptr);
void* expected = reinterpret_cast<void*>(1);
// Swap in 2
ASSERT_TRUE(tls.CompareAndSwap(reinterpret_cast<void*>(2), expected));
expected = reinterpret_cast<void*>(100);
// Fail Swap, still 2
ASSERT_TRUE(!tls.CompareAndSwap(reinterpret_cast<void*>(2), expected));
ASSERT_EQ(expected, reinterpret_cast<void*>(2));
// Swap in 3
expected = reinterpret_cast<void*>(2);
ASSERT_TRUE(tls.CompareAndSwap(reinterpret_cast<void*>(3), expected));
ASSERT_EQ(tls.Get(), reinterpret_cast<void*>(3));
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {