97f6319e22
This reverts commit 12f1137355
.
145 lines
4.7 KiB
C++
145 lines
4.7 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#pragma once
|
|
|
|
#include <stdint.h>
|
|
|
|
#include <atomic>
|
|
#include <memory>
|
|
#include "rocksdb/rate_limiter.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
class Env;
|
|
class WriteControllerToken;
|
|
|
|
// WriteController is controlling write stalls in our write code-path. Write
|
|
// stalls happen when compaction can't keep up with write rate.
|
|
// All of the methods here (including WriteControllerToken's destructors) need
|
|
// to be called while holding DB mutex
|
|
class WriteController {
|
|
public:
|
|
explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u,
|
|
int64_t low_pri_rate_bytes_per_sec = 1024 * 1024)
|
|
: total_stopped_(0),
|
|
total_delayed_(0),
|
|
total_compaction_pressure_(0),
|
|
bytes_left_(0),
|
|
last_refill_time_(0),
|
|
low_pri_rate_limiter_(
|
|
NewGenericRateLimiter(low_pri_rate_bytes_per_sec)) {
|
|
set_max_delayed_write_rate(_delayed_write_rate);
|
|
}
|
|
~WriteController() = default;
|
|
|
|
// When an actor (column family) requests a stop token, all writes will be
|
|
// stopped until the stop token is released (deleted)
|
|
std::unique_ptr<WriteControllerToken> GetStopToken();
|
|
// When an actor (column family) requests a delay token, total delay for all
|
|
// writes to the DB will be controlled under the delayed write rate. Every
|
|
// write needs to call GetDelay() with number of bytes writing to the DB,
|
|
// which returns number of microseconds to sleep.
|
|
std::unique_ptr<WriteControllerToken> GetDelayToken(
|
|
uint64_t delayed_write_rate);
|
|
// When an actor (column family) requests a moderate token, compaction
|
|
// threads will be increased
|
|
std::unique_ptr<WriteControllerToken> GetCompactionPressureToken();
|
|
|
|
// these three metods are querying the state of the WriteController
|
|
bool IsStopped() const;
|
|
bool NeedsDelay() const { return total_delayed_.load() > 0; }
|
|
bool NeedSpeedupCompaction() const {
|
|
return IsStopped() || NeedsDelay() || total_compaction_pressure_ > 0;
|
|
}
|
|
// return how many microseconds the caller needs to sleep after the call
|
|
// num_bytes: how many number of bytes to put into the DB.
|
|
// Prerequisite: DB mutex held.
|
|
uint64_t GetDelay(Env* env, uint64_t num_bytes);
|
|
void set_delayed_write_rate(uint64_t write_rate) {
|
|
// avoid divide 0
|
|
if (write_rate == 0) {
|
|
write_rate = 1u;
|
|
} else if (write_rate > max_delayed_write_rate()) {
|
|
write_rate = max_delayed_write_rate();
|
|
}
|
|
delayed_write_rate_ = write_rate;
|
|
}
|
|
|
|
void set_max_delayed_write_rate(uint64_t write_rate) {
|
|
// avoid divide 0
|
|
if (write_rate == 0) {
|
|
write_rate = 1u;
|
|
}
|
|
max_delayed_write_rate_ = write_rate;
|
|
// update delayed_write_rate_ as well
|
|
delayed_write_rate_ = write_rate;
|
|
}
|
|
|
|
uint64_t delayed_write_rate() const { return delayed_write_rate_; }
|
|
|
|
uint64_t max_delayed_write_rate() const { return max_delayed_write_rate_; }
|
|
|
|
RateLimiter* low_pri_rate_limiter() { return low_pri_rate_limiter_.get(); }
|
|
|
|
private:
|
|
uint64_t NowMicrosMonotonic(Env* env);
|
|
|
|
friend class WriteControllerToken;
|
|
friend class StopWriteToken;
|
|
friend class DelayWriteToken;
|
|
friend class CompactionPressureToken;
|
|
|
|
std::atomic<int> total_stopped_;
|
|
std::atomic<int> total_delayed_;
|
|
std::atomic<int> total_compaction_pressure_;
|
|
uint64_t bytes_left_;
|
|
uint64_t last_refill_time_;
|
|
// write rate set when initialization or by `DBImpl::SetDBOptions`
|
|
uint64_t max_delayed_write_rate_;
|
|
// current write rate
|
|
uint64_t delayed_write_rate_;
|
|
|
|
std::unique_ptr<RateLimiter> low_pri_rate_limiter_;
|
|
};
|
|
|
|
class WriteControllerToken {
|
|
public:
|
|
explicit WriteControllerToken(WriteController* controller)
|
|
: controller_(controller) {}
|
|
virtual ~WriteControllerToken() {}
|
|
|
|
protected:
|
|
WriteController* controller_;
|
|
|
|
private:
|
|
// no copying allowed
|
|
WriteControllerToken(const WriteControllerToken&) = delete;
|
|
void operator=(const WriteControllerToken&) = delete;
|
|
};
|
|
|
|
class StopWriteToken : public WriteControllerToken {
|
|
public:
|
|
explicit StopWriteToken(WriteController* controller)
|
|
: WriteControllerToken(controller) {}
|
|
virtual ~StopWriteToken();
|
|
};
|
|
|
|
class DelayWriteToken : public WriteControllerToken {
|
|
public:
|
|
explicit DelayWriteToken(WriteController* controller)
|
|
: WriteControllerToken(controller) {}
|
|
virtual ~DelayWriteToken();
|
|
};
|
|
|
|
class CompactionPressureToken : public WriteControllerToken {
|
|
public:
|
|
explicit CompactionPressureToken(WriteController* controller)
|
|
: WriteControllerToken(controller) {}
|
|
virtual ~CompactionPressureToken();
|
|
};
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|