From 6594fef7ef6448af199f24de3e711c0453d87089 Mon Sep 17 00:00:00 2001 From: Mayank Agarwal Date: Tue, 19 Mar 2013 14:39:28 -0700 Subject: [PATCH] Exit and Join the background compaction threads while running rocksdb tests Summary: The background compaction threads are never exitted and therefore caused memory-leaks while running rpcksdb tests. Have changed the PosixEnv destructor to exit and join them and changed the tests likewise The memory leaked has reduced from 320 bytes to 64 bytes in all the tests. The 64 bytes is relating to pthread_exit, but still have to figure out why. The stack-trace right now with table_test.cc = 64 bytes in 1 blocks are possibly lost in loss record 4 of 5 at 0x475D8C: malloc (jemalloc.c:914) by 0x400D69E: _dl_map_object_deps (dl-deps.c:505) by 0x4013393: dl_open_worker (dl-open.c:263) by 0x400F015: _dl_catch_error (dl-error.c:178) by 0x4013B2B: _dl_open (dl-open.c:569) by 0x5D3E913: do_dlopen (dl-libc.c:86) by 0x400F015: _dl_catch_error (dl-error.c:178) by 0x5D3E9D6: __libc_dlopen_mode (dl-libc.c:47) by 0x5048BF3: pthread_cancel_init (unwind-forcedunwind.c:53) by 0x5048DC9: _Unwind_ForcedUnwind (unwind-forcedunwind.c:126) by 0x5046D9F: __pthread_unwind (unwind.c:130) by 0x50413A4: pthread_exit (pthreadP.h:289) Test Plan: make all check Reviewers: dhruba, sheki, haobo Reviewed By: dhruba CC: leveldb, chip Differential Revision: https://reviews.facebook.net/D9573 --- util/env_posix.cc | 47 ++++++++++++++++++++++++++++++++++++----------- 1 file changed, 36 insertions(+), 11 deletions(-) diff --git a/util/env_posix.cc b/util/env_posix.cc index 7cdf66cec..957c5b105 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -600,9 +600,9 @@ class PosixFileLock : public FileLock { class PosixEnv : public Env { public: PosixEnv(); - virtual ~PosixEnv() { - fprintf(stderr, "Destroying Env::Default()\n"); - exit(1); + + virtual ~PosixEnv(){ + WaitForBGThreads(); } void SetFD_CLOEXEC(int fd, const EnvOptions* options) { @@ -804,6 +804,8 @@ class PosixEnv : public Env { virtual void Schedule(void (*function)(void*), void* arg); + virtual void WaitForBGThreads(); + virtual void StartThread(void (*function)(void* arg), void* arg); virtual Status GetTestDirectory(std::string* result) { @@ -973,22 +975,43 @@ class PosixEnv : public Env { // Entry per Schedule() call struct BGItem { void* arg; void (*function)(void*); }; typedef std::deque BGQueue; + int queue_size_; // number of items in BGQueue + bool exit_all_threads_; BGQueue queue_; + std::vector threads_to_join_; }; PosixEnv::PosixEnv() : checkedDiskForMmap_(false), forceMmapOff(false), page_size_(getpagesize()), started_bgthread_(0), - num_threads_(1) { + num_threads_(1), + queue_size_(0), + exit_all_threads_(false) { PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr)); bgthread_.resize(num_threads_); } +// Signal and Join all background threads started by calls to Schedule +void PosixEnv::WaitForBGThreads() { + PthreadCall("lock", pthread_mutex_lock(&mu_)); + assert(! exit_all_threads_); + exit_all_threads_ = true; + PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_)); + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + for (unsigned int i = 0; i < threads_to_join_.size(); i++) { + pthread_join(threads_to_join_[i], nullptr); + } +} + void PosixEnv::Schedule(void (*function)(void*), void* arg) { PthreadCall("lock", pthread_mutex_lock(&mu_)); + if (exit_all_threads_) { + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + return; + } // Start background thread if necessary for (; started_bgthread_ < num_threads_; started_bgthread_++) { PthreadCall( @@ -997,6 +1020,7 @@ void PosixEnv::Schedule(void (*function)(void*), void* arg) { nullptr, &PosixEnv::BGThreadWrapper, this)); + threads_to_join_.push_back(bgthread_[started_bgthread_]); fprintf(stdout, "Created bg thread 0x%lx\n", bgthread_[started_bgthread_]); } @@ -1015,10 +1039,13 @@ void PosixEnv::BGThread() { while (true) { // Wait until there is an item that is ready to run PthreadCall("lock", pthread_mutex_lock(&mu_)); - while (queue_.empty()) { + while (queue_.empty() && !exit_all_threads_) { PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_)); } - + if (exit_all_threads_) { // mechanism to let BG threads exit safely + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + break; + } void (*function)(void*) = queue_.front().function; void* arg = queue_.front().arg; queue_.pop_front(); @@ -1048,17 +1075,15 @@ void PosixEnv::StartThread(void (*function)(void* arg), void* arg) { state->arg = arg; PthreadCall("start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state)); + threads_to_join_.push_back(t); } } // namespace -static pthread_once_t once = PTHREAD_ONCE_INIT; -static Env* default_env; -static void InitDefaultEnv() { default_env = new PosixEnv; } +static PosixEnv default_env; Env* Env::Default() { - pthread_once(&once, InitDefaultEnv); - return default_env; + return &default_env; } } // namespace leveldb