rocksdb/util/work_queue.h

149 lines
4.0 KiB
C
Raw Normal View History

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
/*
* Copyright (c) 2016-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the
* LICENSE file in the root directory of this source tree) and the GPLv2 (found
* in the COPYING file in the root directory of this source tree).
*/
#pragma once
#include <atomic>
#include <cassert>
#include <condition_variable>
#include <cstddef>
#include <functional>
#include <mutex>
#include <queue>
namespace ROCKSDB_NAMESPACE {
/// Unbounded thread-safe work queue.
//
// This file is an excerpt from Facebook's zstd repo at
// https://github.com/facebook/zstd/. The relevant file is
// contrib/pzstd/utils/WorkQueue.h.
template <typename T>
class WorkQueue {
// Protects all member variable access
std::mutex mutex_;
std::condition_variable readerCv_;
std::condition_variable writerCv_;
std::condition_variable finishCv_;
std::queue<T> queue_;
bool done_;
std::size_t maxSize_;
// Must have lock to call this function
bool full() const {
if (maxSize_ == 0) {
return false;
}
return queue_.size() >= maxSize_;
}
public:
/**
* Constructs an empty work queue with an optional max size.
* If `maxSize == 0` the queue size is unbounded.
*
* @param maxSize The maximum allowed size of the work queue.
*/
WorkQueue(std::size_t maxSize = 0) : done_(false), maxSize_(maxSize) {}
/**
* Push an item onto the work queue. Notify a single thread that work is
* available. If `finish()` has been called, do nothing and return false.
* If `push()` returns false, then `item` has not been copied from.
*
* @param item Item to push onto the queue.
* @returns True upon success, false if `finish()` has been called. An
* item was pushed iff `push()` returns true.
*/
template <typename U>
bool push(U&& item) {
{
std::unique_lock<std::mutex> lock(mutex_);
while (full() && !done_) {
writerCv_.wait(lock);
}
if (done_) {
return false;
}
queue_.push(std::forward<U>(item));
}
readerCv_.notify_one();
return true;
}
/**
* Attempts to pop an item off the work queue. It will block until data is
* available or `finish()` has been called.
*
* @param[out] item If `pop` returns `true`, it contains the popped item.
* If `pop` returns `false`, it is unmodified.
* @returns True upon success. False if the queue is empty and
* `finish()` has been called.
*/
bool pop(T& item) {
{
std::unique_lock<std::mutex> lock(mutex_);
while (queue_.empty() && !done_) {
readerCv_.wait(lock);
}
if (queue_.empty()) {
assert(done_);
return false;
}
item = queue_.front();
queue_.pop();
}
writerCv_.notify_one();
return true;
}
/**
* Sets the maximum queue size. If `maxSize == 0` then it is unbounded.
*
* @param maxSize The new maximum queue size.
*/
void setMaxSize(std::size_t maxSize) {
{
std::lock_guard<std::mutex> lock(mutex_);
maxSize_ = maxSize;
}
writerCv_.notify_all();
}
/**
* Promise that `push()` won't be called again, so once the queue is empty
* there will never any more work.
*/
void finish() {
{
std::lock_guard<std::mutex> lock(mutex_);
assert(!done_);
done_ = true;
}
readerCv_.notify_all();
writerCv_.notify_all();
finishCv_.notify_all();
}
/// Blocks until `finish()` has been called (but the queue may not be empty).
void waitUntilFinished() {
std::unique_lock<std::mutex> lock(mutex_);
while (!done_) {
finishCv_.wait(lock);
}
}
};
Revamp cache_bench to resemble a real workload (#6629) Summary: I suspect LRUCache could use some optimization, and to support such an effort, a good benchmarking tool is needed. The existing cache_bench was heavily skewed toward insertion and lookup misses, and did not saturate memory with other work. This change should improve those things to better resemble a real workload. (All below using clang compiler, for some consistency, but not necessarily same version and settings.) The real workload is from production MySQL on RocksDB, filtering stacks containing "LRU", "ShardedCache" or "CacheShard." Lookup inclusive: 66% Insert inclusive: 17% Release inclusive: 15% An alternate simulated workload is MySQL running a LinkBench read test: Lookup inclusive: 54% Insert inclusive: 24% Release inclusive: 21% cache_bench default settings, prior to this change: Lookup inclusive: 35.8% Insert inclusive: 63.6% Release inclusive: 0% cache_bench after this change (intended as somewhat "tighter" workload than average production, more like LinkBench): Lookup inclusive: 52% Insert inclusive: 20% Release inclusive: 26% And top exclusive stacks (portion of stack samples as filtered above): Production MySQL: LRUHandleTable::FindPointer: 25.3% rocksdb::operator==: 15.1% <-- Slice == LRUCacheShard::LRU_Remove: 13.8% ShardedCache::Lookup: 8.9% __pthread_mutex_lock: 7.1% LRUCacheShard::LRU_Insert: 6.3% MurmurHash64A: 4.8% <-- Since upgraded to XXH3p ... Old cache_bench: LRUHandleTable::FindPointer: 23.6% __pthread_mutex_lock: 15.0% __pthread_mutex_unlock_usercnt: 11.7% __lll_lock_wait: 8.6% __lll_unlock_wake: 6.8% LRUCacheShard::LRU_Insert: 6.0% ShardedCache::Lookup: 4.4% LRUCacheShard::LRU_Remove: 2.8% ... rocksdb::operator==: 0.2% <-- Slice == ... New cache_bench: LRUHandleTable::FindPointer: 22.8% __pthread_mutex_unlock_usercnt: 14.3% rocksdb::operator==: 10.5% <-- Slice == LRUCacheShard::LRU_Insert: 9.0% __pthread_mutex_lock: 5.9% LRUCacheShard::LRU_Remove: 5.0% ... ShardedCache::Lookup: 2.9% ... So there's a bit more lock contention in the benchmark than in production, but otherwise looks similar enough to me. At least it's a big improvement over the existing code. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6629 Test Plan: No production code changes, ran cache_bench with ASAN Reviewed By: ltamasi Differential Revision: D20824318 Pulled By: pdillinger fbshipit-source-id: 6f8dc5891ead0f87edbed3a615ecd5289d9abe12
2020-04-03 10:24:09 -07:00
} // namespace ROCKSDB_NAMESPACE