From e3584f9c28833ec0530b39540ffd406ee41dbc3a Mon Sep 17 00:00:00 2001 From: "gabor@google.com" Date: Mon, 22 Aug 2011 21:08:51 +0000 Subject: [PATCH] Bugfix for issue 33; reduce lock contention in Get(), parallel benchmarks. - Fix for issue 33 (non-null-terminated result from leveldb_property_value()) - Support for running multiple instances of a benchmark in parallel. - Reduce lock contention on Get(): (1) Do not hold the lock while searching memtables. (2) Shard block and table caches 16-ways. Benchmark for evaluating this change: $ db_bench --benchmarks=fillseq1,readrandom --threads=$n (fillseq1 is a small hack to make sure fillseq runs once regardless of number of threads specified on the command line). git-svn-id: https://leveldb.googlecode.com/svn/trunk@49 62dab493-f737-651d-591e-8d6aee1b9529 --- db/c.cc | 3 +- db/db_bench.cc | 530 ++++++++++++++++++++++++++++++--------------- db/db_impl.cc | 36 +-- util/cache.cc | 149 ++++++++----- util/cache_test.cc | 39 +++- util/histogram.cc | 11 + util/histogram.h | 1 + 7 files changed, 510 insertions(+), 259 deletions(-) diff --git a/db/c.cc b/db/c.cc index 366dd2d40..038e5c06c 100644 --- a/db/c.cc +++ b/db/c.cc @@ -196,7 +196,8 @@ char* leveldb_property_value( const char* propname) { std::string tmp; if (db->rep->GetProperty(Slice(propname), &tmp)) { - return CopyString(tmp); + // We use strdup() since we expect human readable output. + return strdup(tmp.c_str()); } else { return NULL; } diff --git a/db/db_bench.cc b/db/db_bench.cc index 7b4e41aab..d3ec61bd9 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -14,6 +14,7 @@ #include "port/port.h" #include "util/crc32c.h" #include "util/histogram.h" +#include "util/mutexlock.h" #include "util/random.h" #include "util/testutil.h" @@ -60,6 +61,9 @@ static int FLAGS_num = 1000000; // Number of read operations to do. If negative, do FLAGS_num reads. static int FLAGS_reads = -1; +// Number of concurrent threads to run. +static int FLAGS_threads = 1; + // Size of each value static int FLAGS_value_size = 100; @@ -91,8 +95,9 @@ static const char* FLAGS_db = "/tmp/dbbench"; namespace leveldb { -// Helper for quickly generating random data. namespace { + +// Helper for quickly generating random data. class RandomGenerator { private: std::string data_; @@ -136,6 +141,152 @@ static Slice TrimSpace(Slice s) { return Slice(s.data() + start, limit - start); } +static void AppendWithSpace(std::string* str, Slice msg) { + if (msg.empty()) return; + if (!str->empty()) { + str->push_back(' '); + } + str->append(msg.data(), msg.size()); +} + +class Stats { + private: + double start_; + double finish_; + double seconds_; + int done_; + int next_report_; + int64_t bytes_; + double last_op_finish_; + Histogram hist_; + std::string message_; + + public: + Stats() { Start(); } + + void Start() { + next_report_ = 100; + last_op_finish_ = start_; + hist_.Clear(); + done_ = 0; + bytes_ = 0; + seconds_ = 0; + start_ = Env::Default()->NowMicros(); + finish_ = start_; + message_.clear(); + } + + void Merge(const Stats& other) { + hist_.Merge(other.hist_); + done_ += other.done_; + bytes_ += other.bytes_; + seconds_ += other.seconds_; + if (other.start_ < start_) start_ = other.start_; + if (other.finish_ > finish_) finish_ = other.finish_; + + // Just keep the messages from one thread + if (message_.empty()) message_ = other.message_; + } + + void Stop() { + finish_ = Env::Default()->NowMicros(); + seconds_ = (finish_ - start_) * 1e-6; + } + + void AddMessage(Slice msg) { + AppendWithSpace(&message_, msg); + } + + void FinishedSingleOp() { + if (FLAGS_histogram) { + double now = Env::Default()->NowMicros(); + double micros = now - last_op_finish_; + hist_.Add(micros); + if (micros > 20000) { + fprintf(stderr, "long op: %.1f micros%30s\r", micros, ""); + fflush(stderr); + } + last_op_finish_ = now; + } + + done_++; + if (done_ >= next_report_) { + if (next_report_ < 1000) next_report_ += 100; + else if (next_report_ < 5000) next_report_ += 500; + else if (next_report_ < 10000) next_report_ += 1000; + else if (next_report_ < 50000) next_report_ += 5000; + else if (next_report_ < 100000) next_report_ += 10000; + else if (next_report_ < 500000) next_report_ += 50000; + else next_report_ += 100000; + fprintf(stderr, "... finished %d ops%30s\r", done_, ""); + fflush(stderr); + } + } + + void AddBytes(int64_t n) { + bytes_ += n; + } + + void Report(const Slice& name) { + // Pretend at least one op was done in case we are running a benchmark + // that does not call FinishedSingleOp(). + if (done_ < 1) done_ = 1; + + std::string extra; + if (bytes_ > 0) { + // Rate is computed on actual elapsed time, not the sum of per-thread + // elapsed times. + double elapsed = (finish_ - start_) * 1e-6; + char rate[100]; + snprintf(rate, sizeof(rate), "%6.1f MB/s", + (bytes_ / 1048576.0) / elapsed); + extra = rate; + } + AppendWithSpace(&extra, message_); + + fprintf(stdout, "%-12s : %11.3f micros/op;%s%s\n", + name.ToString().c_str(), + seconds_ * 1e6 / done_, + (extra.empty() ? "" : " "), + extra.c_str()); + if (FLAGS_histogram) { + fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str()); + } + fflush(stdout); + } +}; + +// State shared by all concurrent executions of the same benchmark. +struct SharedState { + port::Mutex mu; + port::CondVar cv; + int total; + + // Each thread goes through the following states: + // (1) initializing + // (2) waiting for others to be initialized + // (3) running + // (4) done + + int num_initialized; + int num_done; + bool start; + + SharedState() : cv(&mu) { } +}; + +// Per-thread state for concurrent executions of the same benchmark. +struct ThreadState { + int tid; // 0..n-1 when running in n threads + Random rand; // Has different seeds for different threads + Stats stats; + + ThreadState(int index) + : tid(index), + rand(1000 + index) { + } +}; + } class Benchmark { @@ -143,20 +294,11 @@ class Benchmark { Cache* cache_; DB* db_; int num_; + int value_size_; + int entries_per_batch_; + WriteOptions write_options_; int reads_; int heap_counter_; - double start_; - double last_op_finish_; - int64_t bytes_; - std::string message_; - std::string post_message_; - Histogram hist_; - RandomGenerator gen_; - Random rand_; - - // State kept for progress messages - int done_; - int next_report_; // When to report next void PrintHeader() { const int kKeySize = 16; @@ -232,94 +374,15 @@ class Benchmark { #endif } - void Start() { - start_ = Env::Default()->NowMicros() * 1e-6; - bytes_ = 0; - message_.clear(); - last_op_finish_ = start_; - hist_.Clear(); - done_ = 0; - next_report_ = 100; - } - - void FinishedSingleOp() { - if (FLAGS_histogram) { - double now = Env::Default()->NowMicros() * 1e-6; - double micros = (now - last_op_finish_) * 1e6; - hist_.Add(micros); - if (micros > 20000) { - fprintf(stderr, "long op: %.1f micros%30s\r", micros, ""); - fflush(stderr); - } - last_op_finish_ = now; - } - - done_++; - if (done_ >= next_report_) { - if (next_report_ < 1000) next_report_ += 100; - else if (next_report_ < 5000) next_report_ += 500; - else if (next_report_ < 10000) next_report_ += 1000; - else if (next_report_ < 50000) next_report_ += 5000; - else if (next_report_ < 100000) next_report_ += 10000; - else if (next_report_ < 500000) next_report_ += 50000; - else next_report_ += 100000; - fprintf(stderr, "... finished %d ops%30s\r", done_, ""); - fflush(stderr); - } - } - - void Stop(const Slice& name) { - double finish = Env::Default()->NowMicros() * 1e-6; - - // Pretend at least one op was done in case we are running a benchmark - // that does nto call FinishedSingleOp(). - if (done_ < 1) done_ = 1; - - if (bytes_ > 0) { - char rate[100]; - snprintf(rate, sizeof(rate), "%6.1f MB/s", - (bytes_ / 1048576.0) / (finish - start_)); - if (!message_.empty()) { - message_ = std::string(rate) + " " + message_; - } else { - message_ = rate; - } - } - - fprintf(stdout, "%-12s : %11.3f micros/op;%s%s\n", - name.ToString().c_str(), - (finish - start_) * 1e6 / done_, - (message_.empty() ? "" : " "), - message_.c_str()); - if (FLAGS_histogram) { - fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str()); - } - fflush(stdout); - - if (!post_message_.empty()) { - fprintf(stdout, "\n%s\n", post_message_.c_str()); - post_message_.clear(); - } - } - public: - enum Order { - SEQUENTIAL, - RANDOM - }; - enum DBState { - FRESH, - EXISTING - }; - Benchmark() : cache_(FLAGS_cache_size >= 0 ? NewLRUCache(FLAGS_cache_size) : NULL), db_(NULL), num_(FLAGS_num), + value_size_(FLAGS_value_size), + entries_per_batch_(1), reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads), - heap_counter_(0), - bytes_(0), - rand_(301) { + heap_counter_(0) { std::vector files; Env::Default()->GetChildren(FLAGS_db, &files); for (int i = 0; i < files.size(); i++) { @@ -353,98 +416,203 @@ class Benchmark { benchmarks = sep + 1; } - Start(); + // Reset parameters that may be overriddden bwlow + num_ = FLAGS_num; + reads_ = num_; + value_size_ = FLAGS_value_size; + entries_per_batch_ = 1; + write_options_ = WriteOptions(); + + void (Benchmark::*method)(ThreadState*) = NULL; + bool fresh_db = false; - WriteOptions write_options; - bool known = true; if (name == Slice("fillseq")) { - Write(write_options, SEQUENTIAL, FRESH, num_, FLAGS_value_size, 1); + fresh_db = true; + method = &Benchmark::WriteSeq; } else if (name == Slice("fillbatch")) { - Write(write_options, SEQUENTIAL, FRESH, num_, FLAGS_value_size, 1000); + fresh_db = true; + entries_per_batch_ = 1000; + method = &Benchmark::WriteSeq; } else if (name == Slice("fillrandom")) { - Write(write_options, RANDOM, FRESH, num_, FLAGS_value_size, 1); + fresh_db = true; + method = &Benchmark::WriteRandom; } else if (name == Slice("overwrite")) { - Write(write_options, RANDOM, EXISTING, num_, FLAGS_value_size, 1); + fresh_db = false; + method = &Benchmark::WriteRandom; } else if (name == Slice("fillsync")) { - write_options.sync = true; - Write(write_options, RANDOM, FRESH, num_ / 1000, FLAGS_value_size, 1); + fresh_db = true; + num_ /= 1000; + write_options_.sync = true; + method = &Benchmark::WriteRandom; } else if (name == Slice("fill100K")) { - Write(write_options, RANDOM, FRESH, num_ / 1000, 100 * 1000, 1); + fresh_db = true; + num_ /= 1000; + value_size_ = 100 * 1000; + method = &Benchmark::WriteRandom; } else if (name == Slice("readseq")) { - ReadSequential(); + method = &Benchmark::ReadSequential; } else if (name == Slice("readreverse")) { - ReadReverse(); + method = &Benchmark::ReadReverse; } else if (name == Slice("readrandom")) { - ReadRandom(); + method = &Benchmark::ReadRandom; } else if (name == Slice("readhot")) { - ReadHot(); + method = &Benchmark::ReadHot; } else if (name == Slice("readrandomsmall")) { - int n = reads_; reads_ /= 1000; - ReadRandom(); - reads_ = n; + method = &Benchmark::ReadRandom; } else if (name == Slice("compact")) { - Compact(); + method = &Benchmark::Compact; } else if (name == Slice("crc32c")) { - Crc32c(4096, "(4K per op)"); + method = &Benchmark::Crc32c; } else if (name == Slice("acquireload")) { - AcquireLoad(); + method = &Benchmark::AcquireLoad; } else if (name == Slice("snappycomp")) { - SnappyCompress(); + method = &Benchmark::SnappyCompress; } else if (name == Slice("snappyuncomp")) { - SnappyUncompress(); + method = &Benchmark::SnappyUncompress; } else if (name == Slice("heapprofile")) { HeapProfile(); } else if (name == Slice("stats")) { PrintStats(); } else { - known = false; if (name != Slice()) { // No error message for empty name fprintf(stderr, "unknown benchmark '%s'\n", name.ToString().c_str()); } } - if (known) { - Stop(name); + + if (fresh_db) { + if (FLAGS_use_existing_db) { + fprintf(stdout, "%-12s : skipped (--use_existing_db is true)\n", + name.ToString().c_str()); + method = NULL; + } else { + delete db_; + db_ = NULL; + DestroyDB(FLAGS_db, Options()); + Open(); + } + } + + if (method != NULL) { + RunBenchmark(name, method); } } } private: - void Crc32c(int size, const char* label) { + struct ThreadArg { + Benchmark* bm; + SharedState* shared; + ThreadState* thread; + void (Benchmark::*method)(ThreadState*); + }; + + static void ThreadBody(void* v) { + ThreadArg* arg = reinterpret_cast(v); + SharedState* shared = arg->shared; + ThreadState* thread = arg->thread; + { + MutexLock l(&shared->mu); + shared->num_initialized++; + if (shared->num_initialized >= shared->total) { + shared->cv.SignalAll(); + } + while (!shared->start) { + shared->cv.Wait(); + } + } + + thread->stats.Start(); + (arg->bm->*(arg->method))(thread); + thread->stats.Stop(); + + { + MutexLock l(&shared->mu); + shared->num_done++; + if (shared->num_done >= shared->total) { + shared->cv.SignalAll(); + } + } + } + + void RunBenchmark(Slice name, void (Benchmark::*method)(ThreadState*)) { + const int n = FLAGS_threads; + SharedState shared; + shared.total = n; + shared.num_initialized = 0; + shared.num_done = 0; + shared.start = false; + + ThreadArg* arg = new ThreadArg[n]; + for (int i = 0; i < n; i++) { + arg[i].bm = this; + arg[i].method = method; + arg[i].shared = &shared; + arg[i].thread = new ThreadState(i); + Env::Default()->StartThread(ThreadBody, &arg[i]); + } + + shared.mu.Lock(); + while (shared.num_initialized < n) { + shared.cv.Wait(); + } + + shared.start = true; + shared.cv.SignalAll(); + while (shared.num_done < n) { + shared.cv.Wait(); + } + shared.mu.Unlock(); + + for (int i = 1; i < n; i++) { + arg[0].thread->stats.Merge(arg[i].thread->stats); + } + arg[0].thread->stats.Report(name); + + for (int i = 0; i < n; i++) { + delete arg[i].thread; + } + delete[] arg; + } + + void Crc32c(ThreadState* thread) { // Checksum about 500MB of data total + const int size = 4096; + const char* label = "(4K per op)"; std::string data(size, 'x'); int64_t bytes = 0; uint32_t crc = 0; while (bytes < 500 * 1048576) { crc = crc32c::Value(data.data(), size); - FinishedSingleOp(); + thread->stats.FinishedSingleOp(); bytes += size; } // Print so result is not dead fprintf(stderr, "... crc=0x%x\r", static_cast(crc)); - bytes_ = bytes; - message_ = label; + thread->stats.AddBytes(bytes); + thread->stats.AddMessage(label); } - void AcquireLoad() { + void AcquireLoad(ThreadState* thread) { int dummy; port::AtomicPointer ap(&dummy); int count = 0; void *ptr = NULL; - message_ = "(each op is 1000 loads)"; + thread->stats.AddMessage("(each op is 1000 loads)"); while (count < 100000) { for (int i = 0; i < 1000; i++) { ptr = ap.Acquire_Load(); } count++; - FinishedSingleOp(); + thread->stats.FinishedSingleOp(); } if (ptr == NULL) exit(1); // Disable unused variable warning. } - void SnappyCompress() { - Slice input = gen_.Generate(Options().block_size); + void SnappyCompress(ThreadState* thread) { + RandomGenerator gen; + Slice input = gen.Generate(Options().block_size); int64_t bytes = 0; int64_t produced = 0; bool ok = true; @@ -453,22 +621,23 @@ class Benchmark { ok = port::Snappy_Compress(input.data(), input.size(), &compressed); produced += compressed.size(); bytes += input.size(); - FinishedSingleOp(); + thread->stats.FinishedSingleOp(); } if (!ok) { - message_ = "(snappy failure)"; + thread->stats.AddMessage("(snappy failure)"); } else { char buf[100]; snprintf(buf, sizeof(buf), "(output: %.1f%%)", (produced * 100.0) / bytes); - message_ = buf; - bytes_ = bytes; + thread->stats.AddMessage(buf); + thread->stats.AddBytes(bytes); } } - void SnappyUncompress() { - Slice input = gen_.Generate(Options().block_size); + void SnappyUncompress(ThreadState* thread) { + RandomGenerator gen; + Slice input = gen.Generate(Options().block_size); std::string compressed; bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed); int64_t bytes = 0; @@ -477,14 +646,14 @@ class Benchmark { ok = port::Snappy_Uncompress(compressed.data(), compressed.size(), uncompressed); bytes += input.size(); - FinishedSingleOp(); + thread->stats.FinishedSingleOp(); } delete[] uncompressed; if (!ok) { - message_ = "(snappy failure)"; + thread->stats.AddMessage("(snappy failure)"); } else { - bytes_ = bytes; + thread->stats.AddBytes(bytes); } } @@ -501,95 +670,97 @@ class Benchmark { } } - void Write(const WriteOptions& options, Order order, DBState state, - int num_entries, int value_size, int entries_per_batch) { - if (state == FRESH) { - if (FLAGS_use_existing_db) { - message_ = "skipping (--use_existing_db is true)"; - return; - } - delete db_; - db_ = NULL; - DestroyDB(FLAGS_db, Options()); - Open(); - Start(); // Do not count time taken to destroy/open - } + void WriteSeq(ThreadState* thread) { + DoWrite(thread, true); + } - if (num_entries != num_) { + void WriteRandom(ThreadState* thread) { + DoWrite(thread, false); + } + + void DoWrite(ThreadState* thread, bool seq) { + if (num_ != FLAGS_num) { char msg[100]; - snprintf(msg, sizeof(msg), "(%d ops)", num_entries); - message_ = msg; + snprintf(msg, sizeof(msg), "(%d ops)", num_); + thread->stats.AddMessage(msg); } + RandomGenerator gen; WriteBatch batch; Status s; std::string val; - for (int i = 0; i < num_entries; i += entries_per_batch) { + int64_t bytes = 0; + for (int i = 0; i < num_; i += entries_per_batch_) { batch.Clear(); - for (int j = 0; j < entries_per_batch; j++) { - const int k = (order == SEQUENTIAL) ? i+j : (rand_.Next() % FLAGS_num); + for (int j = 0; j < entries_per_batch_; j++) { + const int k = seq ? i+j : (thread->rand.Next() % FLAGS_num); char key[100]; snprintf(key, sizeof(key), "%016d", k); - batch.Put(key, gen_.Generate(value_size)); - bytes_ += value_size + strlen(key); - FinishedSingleOp(); + batch.Put(key, gen.Generate(value_size_)); + bytes += value_size_ + strlen(key); + thread->stats.FinishedSingleOp(); } - s = db_->Write(options, &batch); + s = db_->Write(write_options_, &batch); if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); exit(1); } } + thread->stats.AddBytes(bytes); } - void ReadSequential() { + void ReadSequential(ThreadState* thread) { Iterator* iter = db_->NewIterator(ReadOptions()); int i = 0; + int64_t bytes = 0; for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) { - bytes_ += iter->key().size() + iter->value().size(); - FinishedSingleOp(); + bytes += iter->key().size() + iter->value().size(); + thread->stats.FinishedSingleOp(); ++i; } delete iter; + thread->stats.AddBytes(bytes); } - void ReadReverse() { + void ReadReverse(ThreadState* thread) { Iterator* iter = db_->NewIterator(ReadOptions()); int i = 0; + int64_t bytes = 0; for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) { - bytes_ += iter->key().size() + iter->value().size(); - FinishedSingleOp(); + bytes += iter->key().size() + iter->value().size(); + thread->stats.FinishedSingleOp(); ++i; } delete iter; + thread->stats.AddBytes(bytes); } - void ReadRandom() { + void ReadRandom(ThreadState* thread) { ReadOptions options; std::string value; for (int i = 0; i < reads_; i++) { char key[100]; - const int k = rand_.Next() % FLAGS_num; + const int k = thread->rand.Next() % FLAGS_num; snprintf(key, sizeof(key), "%016d", k); db_->Get(options, key, &value); - FinishedSingleOp(); + thread->stats.FinishedSingleOp(); } } - void ReadHot() { + void ReadHot(ThreadState* thread) { ReadOptions options; std::string value; const int range = (FLAGS_num + 99) / 100; for (int i = 0; i < reads_; i++) { char key[100]; - const int k = rand_.Next() % range; + const int k = thread->rand.Next() % range; snprintf(key, sizeof(key), "%016d", k); db_->Get(options, key, &value); - FinishedSingleOp(); + thread->stats.FinishedSingleOp(); } } - void Compact() { + void Compact(ThreadState* thread) { DBImpl* dbi = reinterpret_cast(db_); dbi->TEST_CompactMemTable(); int max_level_with_files = 1; @@ -609,10 +780,9 @@ class Benchmark { void PrintStats() { std::string stats; if (!db_->GetProperty("leveldb.stats", &stats)) { - message_ = "(failed)"; - } else { - post_message_ = stats; + stats = "(failed)"; } + fprintf(stdout, "\n%s\n", stats.c_str()); } static void WriteToFile(void* arg, const char* buf, int n) { @@ -625,13 +795,13 @@ class Benchmark { WritableFile* file; Status s = Env::Default()->NewWritableFile(fname, &file); if (!s.ok()) { - message_ = s.ToString(); + fprintf(stderr, "%s\n", s.ToString().c_str()); return; } bool ok = port::GetHeapProfile(WriteToFile, file); delete file; if (!ok) { - message_ = "not supported"; + fprintf(stderr, "heap profiling not supported\n"); Env::Default()->DeleteFile(fname); } } @@ -661,6 +831,8 @@ int main(int argc, char** argv) { FLAGS_num = n; } else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) { FLAGS_reads = n; + } else if (sscanf(argv[i], "--threads=%d%c", &n, &junk) == 1) { + FLAGS_threads = n; } else if (sscanf(argv[i], "--value_size=%d%c", &n, &junk) == 1) { FLAGS_value_size = n; } else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) { diff --git a/db/db_impl.cc b/db/db_impl.cc index fff4eaf85..c4c6a614b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -989,27 +989,37 @@ Status DBImpl::Get(const ReadOptions& options, snapshot = versions_->LastSequence(); } - // First look in the memtable, then in the immutable memtable (if any). - LookupKey lkey(key, snapshot); - if (mem_->Get(lkey, value, &s)) { - return s; - } - if (imm_ != NULL && imm_->Get(lkey, value, &s)) { - return s; - } - - // Not in memtable(s); try live files in level order + MemTable* mem = mem_; + MemTable* imm = imm_; Version* current = versions_->current(); + mem->Ref(); + if (imm != NULL) imm->Ref(); current->Ref(); + + bool have_stat_update = false; Version::GetStats stats; - { // Unlock while reading from files + + // Unlock while reading from files and memtables + { mutex_.Unlock(); - s = current->Get(options, lkey, value, &stats); + // First look in the memtable, then in the immutable memtable (if any). + LookupKey lkey(key, snapshot); + if (mem_->Get(lkey, value, &s)) { + // Done + } else if (imm_ != NULL && imm_->Get(lkey, value, &s)) { + // Done + } else { + s = current->Get(options, lkey, value, &stats); + have_stat_update = true; + } mutex_.Lock(); } - if (current->UpdateStats(stats)) { + + if (have_stat_update && current->UpdateStats(stats)) { MaybeScheduleCompaction(); } + mem->Unref(); + if (imm != NULL) imm->Unref(); current->Unref(); return s; } diff --git a/util/cache.cc b/util/cache.cc index 5cff3ddce..ce99f08c2 100644 --- a/util/cache.cc +++ b/util/cache.cc @@ -30,7 +30,8 @@ struct LRUHandle { LRUHandle* prev; size_t charge; // TODO(opt): Only allow uint32_t? size_t key_length; - size_t refs; // TODO(opt): Pack with "key_length"? + uint32_t refs; + uint32_t hash; // Hash of key(); used for fast sharding and comparisons char key_data[1]; // Beginning of key Slice key() const { @@ -54,12 +55,12 @@ class HandleTable { HandleTable() : length_(0), elems_(0), list_(NULL) { Resize(); } ~HandleTable() { delete[] list_; } - LRUHandle* Lookup(LRUHandle* h) { - return *FindPointer(h); + LRUHandle* Lookup(const Slice& key, uint32_t hash) { + return *FindPointer(key, hash); } LRUHandle* Insert(LRUHandle* h) { - LRUHandle** ptr = FindPointer(h); + LRUHandle** ptr = FindPointer(h->key(), h->hash); LRUHandle* old = *ptr; h->next_hash = (old == NULL ? NULL : old->next_hash); *ptr = h; @@ -74,8 +75,8 @@ class HandleTable { return old; } - LRUHandle* Remove(LRUHandle* h) { - LRUHandle** ptr = FindPointer(h); + LRUHandle* Remove(const Slice& key, uint32_t hash) { + LRUHandle** ptr = FindPointer(key, hash); LRUHandle* result = *ptr; if (result != NULL) { *ptr = result->next_hash; @@ -92,13 +93,12 @@ class HandleTable { LRUHandle** list_; // Return a pointer to slot that points to a cache entry that - // matches *h. If there is no such cache entry, return a pointer to - // the trailing slot in the corresponding linked list. - LRUHandle** FindPointer(LRUHandle* h) { - Slice key = h->key(); - uint32_t hash = Hash(key.data(), key.size(), 0); + // matches key/hash. If there is no such cache entry, return a + // pointer to the trailing slot in the corresponding linked list. + LRUHandle** FindPointer(const Slice& key, uint32_t hash) { LRUHandle** ptr = &list_[hash & (length_ - 1)]; - while (*ptr != NULL && key != (*ptr)->key()) { + while (*ptr != NULL && + ((*ptr)->hash != hash || key != (*ptr)->key())) { ptr = &(*ptr)->next_hash; } return ptr; @@ -117,7 +117,7 @@ class HandleTable { while (h != NULL) { LRUHandle* next = h->next_hash; Slice key = h->key(); - uint32_t hash = Hash(key.data(), key.size(), 0); + uint32_t hash = h->hash; LRUHandle** ptr = &new_list[hash & (new_length - 1)]; h->next_hash = *ptr; *ptr = h; @@ -132,26 +132,30 @@ class HandleTable { } }; -class LRUCache : public Cache { +// A single shard of sharded cache. +class LRUCache { public: - explicit LRUCache(size_t capacity); - virtual ~LRUCache(); + LRUCache(); + ~LRUCache(); - virtual Handle* Insert(const Slice& key, void* value, size_t charge, - void (*deleter)(const Slice& key, void* value)); - virtual Handle* Lookup(const Slice& key); - virtual void Release(Handle* handle); - virtual void* Value(Handle* handle); - virtual void Erase(const Slice& key); - virtual uint64_t NewId(); + // Separate from constructor so caller can easily make an array of LRUCache + void SetCapacity(size_t capacity) { capacity_ = capacity; } + + // Like Cache methods, but with an extra "hash" parameter. + Cache::Handle* Insert(const Slice& key, uint32_t hash, + void* value, size_t charge, + void (*deleter)(const Slice& key, void* value)); + Cache::Handle* Lookup(const Slice& key, uint32_t hash); + void Release(Cache::Handle* handle); + void Erase(const Slice& key, uint32_t hash); private: void LRU_Remove(LRUHandle* e); void LRU_Append(LRUHandle* e); void Unref(LRUHandle* e); - // Constructor parameters - const size_t capacity_; + // Initialized before use. + size_t capacity_; // mutex_ protects the following state. port::Mutex mutex_; @@ -165,9 +169,8 @@ class LRUCache : public Cache { HandleTable table_; }; -LRUCache::LRUCache(size_t capacity) - : capacity_(capacity), - usage_(0), +LRUCache::LRUCache() + : usage_(0), last_id_(0) { // Make empty circular linked list lru_.next = &lru_; @@ -206,32 +209,25 @@ void LRUCache::LRU_Append(LRUHandle* e) { e->next->prev = e; } -Cache::Handle* LRUCache::Lookup(const Slice& key) { +Cache::Handle* LRUCache::Lookup(const Slice& key, uint32_t hash) { MutexLock l(&mutex_); - - LRUHandle dummy; - dummy.next = &dummy; - dummy.value = const_cast(&key); - LRUHandle* e = table_.Lookup(&dummy); + LRUHandle* e = table_.Lookup(key, hash); if (e != NULL) { e->refs++; LRU_Remove(e); LRU_Append(e); } - return reinterpret_cast(e); + return reinterpret_cast(e); } -void* LRUCache::Value(Handle* handle) { - return reinterpret_cast(handle)->value; -} - -void LRUCache::Release(Handle* handle) { +void LRUCache::Release(Cache::Handle* handle) { MutexLock l(&mutex_); Unref(reinterpret_cast(handle)); } -Cache::Handle* LRUCache::Insert(const Slice& key, void* value, size_t charge, - void (*deleter)(const Slice& key, void* value)) { +Cache::Handle* LRUCache::Insert( + const Slice& key, uint32_t hash, void* value, size_t charge, + void (*deleter)(const Slice& key, void* value)) { MutexLock l(&mutex_); LRUHandle* e = reinterpret_cast( @@ -240,6 +236,7 @@ Cache::Handle* LRUCache::Insert(const Slice& key, void* value, size_t charge, e->deleter = deleter; e->charge = charge; e->key_length = key.size(); + e->hash = hash; e->refs = 2; // One from LRUCache, one for the returned handle memcpy(e->key_data, key.data(), key.size()); LRU_Append(e); @@ -254,35 +251,77 @@ Cache::Handle* LRUCache::Insert(const Slice& key, void* value, size_t charge, while (usage_ > capacity_ && lru_.next != &lru_) { LRUHandle* old = lru_.next; LRU_Remove(old); - table_.Remove(old); + table_.Remove(old->key(), old->hash); Unref(old); } - return reinterpret_cast(e); + return reinterpret_cast(e); } -void LRUCache::Erase(const Slice& key) { +void LRUCache::Erase(const Slice& key, uint32_t hash) { MutexLock l(&mutex_); - - LRUHandle dummy; - dummy.next = &dummy; - dummy.value = const_cast(&key); - LRUHandle* e = table_.Remove(&dummy); + LRUHandle* e = table_.Remove(key, hash); if (e != NULL) { LRU_Remove(e); Unref(e); } } -uint64_t LRUCache::NewId() { - MutexLock l(&mutex_); - return ++(last_id_); -} +static const int kNumShardBits = 4; +static const int kNumShards = 1 << kNumShardBits; + +class ShardedLRUCache : public Cache { + private: + LRUCache shard_[kNumShards]; + port::Mutex id_mutex_; + uint64_t last_id_; + + static inline uint32_t HashSlice(const Slice& s) { + return Hash(s.data(), s.size(), 0); + } + + static uint32_t Shard(uint32_t hash) { + return hash >> (32 - kNumShardBits); + } + + public: + explicit ShardedLRUCache(size_t capacity) { + const size_t per_shard = (capacity + (kNumShards - 1)) / kNumShards; + for (int s = 0; s < kNumShards; s++) { + shard_[s].SetCapacity(per_shard); + } + } + virtual ~ShardedLRUCache() { } + virtual Handle* Insert(const Slice& key, void* value, size_t charge, + void (*deleter)(const Slice& key, void* value)) { + const uint32_t hash = HashSlice(key); + return shard_[Shard(hash)].Insert(key, hash, value, charge, deleter); + } + virtual Handle* Lookup(const Slice& key) { + const uint32_t hash = HashSlice(key); + return shard_[Shard(hash)].Lookup(key, hash); + } + virtual void Release(Handle* handle) { + LRUHandle* h = reinterpret_cast(handle); + shard_[Shard(h->hash)].Release(handle); + } + virtual void Erase(const Slice& key) { + const uint32_t hash = HashSlice(key); + shard_[Shard(hash)].Erase(key, hash); + } + virtual void* Value(Handle* handle) { + return reinterpret_cast(handle)->value; + } + virtual uint64_t NewId() { + MutexLock l(&id_mutex_); + return ++(last_id_); + } +}; } // end anonymous namespace Cache* NewLRUCache(size_t capacity) { - return new LRUCache(capacity); + return new ShardedLRUCache(capacity); } } diff --git a/util/cache_test.cc b/util/cache_test.cc index dbab9881a..8a7f1c4c6 100644 --- a/util/cache_test.cc +++ b/util/cache_test.cc @@ -32,7 +32,7 @@ class CacheTest { current_->deleted_values_.push_back(DecodeValue(v)); } - static const int kCacheSize = 100; + static const int kCacheSize = 1000; std::vector deleted_keys_; std::vector deleted_values_; Cache* cache_; @@ -137,23 +137,40 @@ TEST(CacheTest, EvictionPolicy) { Insert(200, 201); // Frequently used entry must be kept around - for (int i = 0; i < kCacheSize; i++) { + for (int i = 0; i < kCacheSize + 100; i++) { Insert(1000+i, 2000+i); ASSERT_EQ(2000+i, Lookup(1000+i)); ASSERT_EQ(101, Lookup(100)); } ASSERT_EQ(101, Lookup(100)); - ASSERT_EQ(2, deleted_keys_.size()); - ASSERT_EQ(200, deleted_keys_[0]); - ASSERT_EQ(201, deleted_values_[0]); + ASSERT_EQ(-1, Lookup(200)); } -TEST(CacheTest, HeavyEntry) { - Insert(100, 101); - Insert(200, 201, kCacheSize); - ASSERT_EQ(1, deleted_keys_.size()); - ASSERT_EQ(100, deleted_keys_[0]); - ASSERT_EQ(101, deleted_values_[0]); +TEST(CacheTest, HeavyEntries) { + // Add a bunch of light and heavy entries and then count the combined + // size of items still in the cache, which must be approximately the + // same as the total capacity. + const int kLight = 1; + const int kHeavy = 10; + int added = 0; + int index = 0; + while (added < 2*kCacheSize) { + const int weight = (index & 1) ? kLight : kHeavy; + Insert(index, 1000+index, weight); + added += weight; + index++; + } + + int cached_weight = 0; + for (int i = 0; i < index; i++) { + const int weight = (i & 1 ? kLight : kHeavy); + int r = Lookup(i); + if (r >= 0) { + cached_weight += weight; + ASSERT_EQ(1000+i, r); + } + } + ASSERT_LE(cached_weight, kCacheSize + kCacheSize/10); } TEST(CacheTest, NewId) { diff --git a/util/histogram.cc b/util/histogram.cc index c5178ef4a..12ec3cfcd 100644 --- a/util/histogram.cc +++ b/util/histogram.cc @@ -55,6 +55,17 @@ void Histogram::Add(double value) { sum_squares_ += (value * value); } +void Histogram::Merge(const Histogram& other) { + if (other.min_ < min_) min_ = other.min_; + if (other.max_ > max_) max_ = other.max_; + num_ += other.num_; + sum_ += other.sum_; + sum_squares_ += other.sum_squares_; + for (int b = 0; b < kNumBuckets; b++) { + buckets_[b] += other.buckets_[b]; + } +} + double Histogram::Median() const { return Percentile(50.0); } diff --git a/util/histogram.h b/util/histogram.h index f72f122ec..32484c079 100644 --- a/util/histogram.h +++ b/util/histogram.h @@ -16,6 +16,7 @@ class Histogram { void Clear(); void Add(double value); + void Merge(const Histogram& other); std::string ToString() const;