rocksdb/db/write_controller.h
mrambacher 12f1137355 Add a SystemClock class to capture the time functions of an Env (#7858)
Summary:
Introduces and uses a SystemClock class to RocksDB.  This class contains the time-related functions of an Env and these functions can be redirected from the Env to the SystemClock.

Many of the places that used an Env (Timer, PerfStepTimer, RepeatableThread, RateLimiter, WriteController) for time-related functions have been changed to use SystemClock instead.  There are likely more places that can be changed, but this is a start to show what can/should be done.  Over time it would be nice to migrate most (if not all) of the uses of the time functions from the Env to the SystemClock.

There are several Env classes that implement these functions.  Most of these have not been converted yet to SystemClock implementations; that will come in a subsequent PR.  It would be good to unify many of the Mock Timer implementations, so that they behave similarly and be tested similarly (some override Sleep, some use a MockSleep, etc).

Additionally, this change will allow new methods to be introduced to the SystemClock (like https://github.com/facebook/rocksdb/issues/7101 WaitFor) in a consistent manner across a smaller number of classes.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7858

Reviewed By: pdillinger

Differential Revision: D26006406

Pulled By: mrambacher

fbshipit-source-id: ed10a8abbdab7ff2e23d69d85bd25b3e7e899e90
2021-01-25 22:09:11 -08:00

146 lines
4.8 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <stdint.h>
#include <atomic>
#include <memory>
#include "rocksdb/rate_limiter.h"
namespace ROCKSDB_NAMESPACE {
class SystemClock;
class WriteControllerToken;
// WriteController is controlling write stalls in our write code-path. Write
// stalls happen when compaction can't keep up with write rate.
// All of the methods here (including WriteControllerToken's destructors) need
// to be called while holding DB mutex
class WriteController {
public:
explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u,
int64_t low_pri_rate_bytes_per_sec = 1024 * 1024)
: total_stopped_(0),
total_delayed_(0),
total_compaction_pressure_(0),
bytes_left_(0),
last_refill_time_(0),
low_pri_rate_limiter_(
NewGenericRateLimiter(low_pri_rate_bytes_per_sec)) {
set_max_delayed_write_rate(_delayed_write_rate);
}
~WriteController() = default;
// When an actor (column family) requests a stop token, all writes will be
// stopped until the stop token is released (deleted)
std::unique_ptr<WriteControllerToken> GetStopToken();
// When an actor (column family) requests a delay token, total delay for all
// writes to the DB will be controlled under the delayed write rate. Every
// write needs to call GetDelay() with number of bytes writing to the DB,
// which returns number of microseconds to sleep.
std::unique_ptr<WriteControllerToken> GetDelayToken(
uint64_t delayed_write_rate);
// When an actor (column family) requests a moderate token, compaction
// threads will be increased
std::unique_ptr<WriteControllerToken> GetCompactionPressureToken();
// these three metods are querying the state of the WriteController
bool IsStopped() const;
bool NeedsDelay() const { return total_delayed_.load() > 0; }
bool NeedSpeedupCompaction() const {
return IsStopped() || NeedsDelay() || total_compaction_pressure_ > 0;
}
// return how many microseconds the caller needs to sleep after the call
// num_bytes: how many number of bytes to put into the DB.
// Prerequisite: DB mutex held.
uint64_t GetDelay(const std::shared_ptr<SystemClock>& clock,
uint64_t num_bytes);
void set_delayed_write_rate(uint64_t write_rate) {
// avoid divide 0
if (write_rate == 0) {
write_rate = 1u;
} else if (write_rate > max_delayed_write_rate()) {
write_rate = max_delayed_write_rate();
}
delayed_write_rate_ = write_rate;
}
void set_max_delayed_write_rate(uint64_t write_rate) {
// avoid divide 0
if (write_rate == 0) {
write_rate = 1u;
}
max_delayed_write_rate_ = write_rate;
// update delayed_write_rate_ as well
delayed_write_rate_ = write_rate;
}
uint64_t delayed_write_rate() const { return delayed_write_rate_; }
uint64_t max_delayed_write_rate() const { return max_delayed_write_rate_; }
RateLimiter* low_pri_rate_limiter() { return low_pri_rate_limiter_.get(); }
private:
uint64_t NowMicrosMonotonic(const std::shared_ptr<SystemClock>& clock);
friend class WriteControllerToken;
friend class StopWriteToken;
friend class DelayWriteToken;
friend class CompactionPressureToken;
std::atomic<int> total_stopped_;
std::atomic<int> total_delayed_;
std::atomic<int> total_compaction_pressure_;
uint64_t bytes_left_;
uint64_t last_refill_time_;
// write rate set when initialization or by `DBImpl::SetDBOptions`
uint64_t max_delayed_write_rate_;
// current write rate
uint64_t delayed_write_rate_;
std::unique_ptr<RateLimiter> low_pri_rate_limiter_;
};
class WriteControllerToken {
public:
explicit WriteControllerToken(WriteController* controller)
: controller_(controller) {}
virtual ~WriteControllerToken() {}
protected:
WriteController* controller_;
private:
// no copying allowed
WriteControllerToken(const WriteControllerToken&) = delete;
void operator=(const WriteControllerToken&) = delete;
};
class StopWriteToken : public WriteControllerToken {
public:
explicit StopWriteToken(WriteController* controller)
: WriteControllerToken(controller) {}
virtual ~StopWriteToken();
};
class DelayWriteToken : public WriteControllerToken {
public:
explicit DelayWriteToken(WriteController* controller)
: WriteControllerToken(controller) {}
virtual ~DelayWriteToken();
};
class CompactionPressureToken : public WriteControllerToken {
public:
explicit CompactionPressureToken(WriteController* controller)
: WriteControllerToken(controller) {}
virtual ~CompactionPressureToken();
};
} // namespace ROCKSDB_NAMESPACE