Merge branch 'master' into performance
Conflicts: db/db_impl.h
This commit is contained in:
commit
ac92420fc5
39
HISTORY.md
Normal file
39
HISTORY.md
Normal file
@ -0,0 +1,39 @@
|
||||
# Rocksdb Change Log
|
||||
|
||||
## 2.7.0 (01/28/2014)
|
||||
|
||||
### Public API changes
|
||||
|
||||
* Renamed `StackableDB::GetRawDB()` to `StackableDB::GetBaseDB()`.
|
||||
* Renamed `WriteBatch::Data()` `const std::string& Data() const`.
|
||||
* Renamed class `TableStats` to `TableProperties`.
|
||||
* Deleted class `PrefixHashRepFactory`. Please use `NewHashSkipListRepFactory()` instead.
|
||||
* Supported multi-threaded `EnableFileDeletions()` and `DisableFileDeletions()`.
|
||||
* Added `DB::GetOptions()`.
|
||||
* Added `DB::GetDbIdentity()`.
|
||||
|
||||
### New Features
|
||||
|
||||
* Added [BackupableDB](https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F)
|
||||
* Implemented [TailingIterator](https://github.com/facebook/rocksdb/wiki/Tailing-Iterator), a special type of iterator that
|
||||
doesn't create a snapshot (can be used to read newly inserted data)
|
||||
and is optimized for doing sequential reads.
|
||||
* Added property block for table, which allows (1) a table to store
|
||||
its metadata and (2) end user to collect and store properties they
|
||||
are interested in.
|
||||
* Enabled caching index and filter block in block cache (turned off by default).
|
||||
* Supported error report when doing manual compaction.
|
||||
* Supported additional Linux platform flavors and Mac OS.
|
||||
* Put with `SliceParts` - Variant of `Put()` that gathers output like `writev(2)`
|
||||
* Bug fixes and code refactor for compatibility with upcoming Column
|
||||
Family feature.
|
||||
|
||||
### Performance Improvements
|
||||
|
||||
* Huge benchmark performance improvements by multiple efforts. For example, increase in readonly QPS from about 530k in 2.6 release to 1.1 million in 2.7 [1]
|
||||
* Speeding up a way RocksDB deleted obsolete files - no longer listing the whole directory under a lock -- decrease in p99
|
||||
* Use raw pointer instead of shared pointer for statistics: [5b825d](https://github.com/facebook/rocksdb/commit/5b825d6964e26ec3b4bb6faa708ebb1787f1d7bd) -- huge increase in performance -- shared pointers are slow
|
||||
* Optimized locking for `Get()` -- [1fdb3f](https://github.com/facebook/rocksdb/commit/1fdb3f7dc60e96394e3e5b69a46ede5d67fb976c) -- 1.5x QPS increase for some workloads
|
||||
* Cache speedup - [e8d40c3](https://github.com/facebook/rocksdb/commit/e8d40c31b3cca0c3e1ae9abe9b9003b1288026a9)
|
||||
* Implemented autovector, which allocates first N elements on stack. Most of vectors in RocksDB are small. Also, we never want to allocate heap objects while holding a mutex. -- [c01676e4](https://github.com/facebook/rocksdb/commit/c01676e46d3be08c3c140361ef1f5884f47d3b3c)
|
||||
* Lots of efforts to move malloc, memcpy and IO outside of locks
|
31
INSTALL.md
31
INSTALL.md
@ -17,15 +17,42 @@ libraries. You are on your own.
|
||||
|
||||
## Supported platforms
|
||||
|
||||
* **Linux**
|
||||
* **Linux - Ubuntu**
|
||||
* Upgrade your gcc to version at least 4.7 to get C++11 support.
|
||||
* Install gflags. First, try: `sudo apt-get install libgflags-dev`.
|
||||
* Install gflags. First, try: `sudo apt-get install libgflags-dev`
|
||||
If this doesn't work and you're using Ubuntu, here's a nice tutorial:
|
||||
(http://askubuntu.com/questions/312173/installing-gflags-12-04)
|
||||
* Install snappy. This is usually as easy as:
|
||||
`sudo apt-get install libsnappy-dev`.
|
||||
* Install zlib. Try: `sudo apt-get install zlib1g-dev`.
|
||||
* Install bzip2: `sudo apt-get install libbz2-dev`.
|
||||
* **Linux - CentOS**
|
||||
* Upgrade your gcc to version at least 4.7 to get C++11 support:
|
||||
`yum install gcc47-c++`
|
||||
* Install gflags:
|
||||
|
||||
wget https://gflags.googlecode.com/files/gflags-2.0-no-svn-files.tar.gz
|
||||
tar -xzvf gflags-2.0-no-svn-files.tar.gz
|
||||
cd gflags-2.0
|
||||
./configure && make && sudo make install
|
||||
|
||||
* Install snappy:
|
||||
|
||||
wget https://snappy.googlecode.com/files/snappy-1.1.1.tar.gz
|
||||
tar -xzvf snappy-1.1.1.tar.gz
|
||||
cd snappy-1.1.1
|
||||
./configure && make && sudo make install
|
||||
|
||||
* Install zlib:
|
||||
|
||||
sudo yum install zlib
|
||||
sudo yum install zlib-devel
|
||||
|
||||
* Install bzip2:
|
||||
|
||||
sudo yum install bzip2
|
||||
sudo yum install bzip2-devel
|
||||
|
||||
* **OS X**:
|
||||
* Install latest C++ compiler that supports C++ 11:
|
||||
* Update XCode: run `xcode-select --install` (or install it from XCode App's settting).
|
||||
|
4
Makefile
4
Makefile
@ -96,7 +96,9 @@ BENCHMARKS = db_bench_sqlite3 db_bench_tree_db table_reader_bench
|
||||
|
||||
# The library name is configurable since we are maintaining libraries of both
|
||||
# debug/release mode.
|
||||
LIBNAME = librocksdb
|
||||
ifeq ($(LIBNAME),)
|
||||
LIBNAME=librocksdb
|
||||
endif
|
||||
LIBRARY = ${LIBNAME}.a
|
||||
MEMENVLIBRARY = libmemenv.a
|
||||
|
||||
|
@ -5,13 +5,18 @@
|
||||
# of patent rights can be found in the PATENTS file in the same directory.
|
||||
|
||||
set -e
|
||||
if [ -z "$GIT" ]
|
||||
then
|
||||
GIT="git"
|
||||
fi
|
||||
|
||||
# Print out the colored progress info so that it can be brainlessly
|
||||
# distinguished by users.
|
||||
function title() {
|
||||
echo -e "\033[1;32m$*\033[0m"
|
||||
}
|
||||
|
||||
usage="Create new rocksdb version and prepare it for the release process\n"
|
||||
usage="Create new RocksDB version and prepare it for the release process\n"
|
||||
usage+="USAGE: ./make_new_version.sh <version>"
|
||||
|
||||
# -- Pre-check
|
||||
@ -27,35 +32,13 @@ if [ $GIT_BRANCH != "master" ]; then
|
||||
echo "Error: Current branch is '$GIT_BRANCH', Please switch to master branch."
|
||||
fi
|
||||
|
||||
# --Step 1: cutting new tag
|
||||
title "Adding new tag for this release ..."
|
||||
git tag -a "$ROCKSDB_VERSION.fb" -m "Rocksdb $ROCKSDB_VERSION"
|
||||
TAG="$ROCKSDB_VERSION.fb"
|
||||
$GIT tag -a "$TAG" -m "RocksDB $ROCKSDB_VERSION"
|
||||
|
||||
# Setting up the proxy for remote repo access
|
||||
export http_proxy=http://172.31.255.99:8080
|
||||
export https_proxy="$http_proxy";
|
||||
|
||||
title "Pushing new tag to remote repo ..."
|
||||
proxycmd.sh git push origin --tags
|
||||
$GIT push origin --tags
|
||||
|
||||
# --Step 2: Update README.fb
|
||||
title "Updating the latest version info in README.fb ..."
|
||||
sed -i "s/Latest release is [0-9]\+.[0-9]\+.fb/Latest release is $ROCKSDB_VERSION.fb/" README.fb
|
||||
git commit README.fb -m "update the latest version in README.fb to $ROCKSDB_VERSION"
|
||||
proxycmd.sh git push
|
||||
|
||||
# --Step 3: Prepare this repo for 3rd release
|
||||
title "Cleaning up repo ..."
|
||||
make clean
|
||||
git clean -fxd
|
||||
|
||||
title "Generating the build info ..."
|
||||
# Comment out the call of `build_detection_version` so that the SHA number and build date of this
|
||||
# release will remain constant. Otherwise everytime we run "make" util/build_version.cc will be
|
||||
# overridden.
|
||||
sed -i 's/^\$PWD\/build_tools\/build_detect_version$//' build_tools/build_detect_platform
|
||||
|
||||
# Generate util/build_version.cc
|
||||
build_tools/build_detect_version
|
||||
|
||||
title "Done!"
|
||||
title "Tag $TAG is pushed to github; if you want to delete it, please run"
|
||||
title "git tags -d $TAG && git push origin :refs/tags/$TAG"
|
||||
|
361
db/db_impl.cc
361
db/db_impl.cc
@ -265,7 +265,6 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
|
||||
mutex_(options.use_adaptive_mutex),
|
||||
shutting_down_(nullptr),
|
||||
bg_cv_(&mutex_),
|
||||
mem_rep_factory_(options_.memtable_factory.get()),
|
||||
mem_(new MemTable(internal_comparator_, options_)),
|
||||
imm_(options_.min_write_buffer_number_to_merge),
|
||||
logfile_number_(0),
|
||||
@ -283,15 +282,9 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
|
||||
purge_wal_files_last_run_(0),
|
||||
last_stats_dump_time_microsec_(0),
|
||||
default_interval_to_delete_obsolete_WAL_(600),
|
||||
stall_level0_slowdown_(0),
|
||||
stall_memtable_compaction_(0),
|
||||
stall_level0_num_files_(0),
|
||||
stall_level0_slowdown_count_(0),
|
||||
stall_memtable_compaction_count_(0),
|
||||
stall_level0_num_files_count_(0),
|
||||
started_at_(options.env->NowMicros()),
|
||||
flush_on_destroy_(false),
|
||||
stats_(options.num_levels),
|
||||
internal_stats_(options.num_levels, options.env,
|
||||
options.statistics.get()),
|
||||
delayed_writes_(0),
|
||||
storage_options_(options),
|
||||
bg_work_gate_closed_(false),
|
||||
@ -299,13 +292,6 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
|
||||
mem_->Ref();
|
||||
env_->GetAbsolutePath(dbname, &db_absolute_path_);
|
||||
|
||||
stall_leveln_slowdown_.resize(options.num_levels);
|
||||
stall_leveln_slowdown_count_.resize(options.num_levels);
|
||||
for (int i = 0; i < options.num_levels; ++i) {
|
||||
stall_leveln_slowdown_[i] = 0;
|
||||
stall_leveln_slowdown_count_[i] = 0;
|
||||
}
|
||||
|
||||
// Reserve ten files or so for other uses and give the rest to TableCache.
|
||||
// Give a large number for setting of "infinite" open files.
|
||||
const int table_cache_size =
|
||||
@ -1146,11 +1132,11 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) {
|
||||
meta.smallest_seqno, meta.largest_seqno);
|
||||
}
|
||||
|
||||
CompactionStats stats;
|
||||
InternalStats::CompactionStats stats;
|
||||
stats.micros = env_->NowMicros() - start_micros;
|
||||
stats.bytes_written = meta.file_size;
|
||||
stats.files_out_levelnp1 = 1;
|
||||
stats_[level].Add(stats);
|
||||
internal_stats_.AddCompactionStats(level, stats);
|
||||
RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES, meta.file_size);
|
||||
return s;
|
||||
}
|
||||
@ -1234,10 +1220,10 @@ Status DBImpl::WriteLevel0Table(autovector<MemTable*>& mems, VersionEdit* edit,
|
||||
meta.smallest_seqno, meta.largest_seqno);
|
||||
}
|
||||
|
||||
CompactionStats stats;
|
||||
InternalStats::CompactionStats stats;
|
||||
stats.micros = env_->NowMicros() - start_micros;
|
||||
stats.bytes_written = meta.file_size;
|
||||
stats_[level].Add(stats);
|
||||
internal_stats_.AddCompactionStats(level, stats);
|
||||
RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES, meta.file_size);
|
||||
return s;
|
||||
}
|
||||
@ -1249,8 +1235,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
|
||||
|
||||
if (!imm_.IsFlushPending()) {
|
||||
Log(options_.info_log, "FlushMemTableToOutputFile already in progress");
|
||||
Status s = Status::IOError("FlushMemTableToOutputFile already in progress");
|
||||
return s;
|
||||
return Status::IOError("FlushMemTableToOutputFile already in progress");
|
||||
}
|
||||
|
||||
// Save the contents of the earliest memtable as a new Table
|
||||
@ -1259,8 +1244,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
|
||||
imm_.PickMemtablesToFlush(&mems);
|
||||
if (mems.empty()) {
|
||||
Log(options_.info_log, "Nothing in memstore to flush");
|
||||
Status s = Status::IOError("Nothing in memstore to flush");
|
||||
return s;
|
||||
return Status::IOError("Nothing in memstore to flush");
|
||||
}
|
||||
|
||||
// record the logfile_number_ before we release the mutex
|
||||
@ -1940,6 +1924,13 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
|
||||
*madeProgress = false;
|
||||
mutex_.AssertHeld();
|
||||
|
||||
bool is_manual = (manual_compaction_ != nullptr) &&
|
||||
(manual_compaction_->in_progress == false);
|
||||
if (is_manual) {
|
||||
// another thread cannot pick up the same work
|
||||
manual_compaction_->in_progress = true;
|
||||
}
|
||||
|
||||
// TODO: remove memtable flush from formal compaction
|
||||
while (imm_.IsFlushPending()) {
|
||||
Log(options_.info_log,
|
||||
@ -1948,19 +1939,22 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
|
||||
options_.max_background_compactions - bg_compaction_scheduled_);
|
||||
Status stat = FlushMemTableToOutputFile(madeProgress, deletion_state);
|
||||
if (!stat.ok()) {
|
||||
if (is_manual) {
|
||||
manual_compaction_->status = stat;
|
||||
manual_compaction_->done = true;
|
||||
manual_compaction_->in_progress = false;
|
||||
manual_compaction_ = nullptr;
|
||||
}
|
||||
return stat;
|
||||
}
|
||||
}
|
||||
|
||||
unique_ptr<Compaction> c;
|
||||
bool is_manual = (manual_compaction_ != nullptr) &&
|
||||
(manual_compaction_->in_progress == false);
|
||||
InternalKey manual_end_storage;
|
||||
InternalKey* manual_end = &manual_end_storage;
|
||||
if (is_manual) {
|
||||
ManualCompaction* m = manual_compaction_;
|
||||
assert(!m->in_progress);
|
||||
m->in_progress = true; // another thread cannot pick up the same work
|
||||
assert(m->in_progress);
|
||||
c.reset(versions_->CompactRange(
|
||||
m->input_level, m->output_level, m->begin, m->end, &manual_end));
|
||||
if (!c) {
|
||||
@ -2617,7 +2611,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
|
||||
if (!options_.disableDataSync) {
|
||||
db_directory_->Fsync();
|
||||
}
|
||||
CompactionStats stats;
|
||||
|
||||
InternalStats::CompactionStats stats;
|
||||
stats.micros = env_->NowMicros() - start_micros - imm_micros;
|
||||
MeasureTime(options_.statistics.get(), COMPACTION_TIME, stats.micros);
|
||||
stats.files_in_leveln = compact->compaction->num_input_files(0);
|
||||
@ -2651,7 +2646,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
|
||||
|
||||
LogFlush(options_.info_log);
|
||||
mutex_.Lock();
|
||||
stats_[compact->compaction->output_level()].Add(stats);
|
||||
internal_stats_.AddCompactionStats(compact->compaction->output_level(),
|
||||
stats);
|
||||
|
||||
// if there were any unused file number (mostly in case of
|
||||
// compaction error), free up the entry from pending_putputs
|
||||
@ -3362,8 +3358,7 @@ Status DBImpl::MakeRoomForWrite(bool force,
|
||||
delayed = sw.ElapsedMicros();
|
||||
}
|
||||
RecordTick(options_.statistics.get(), STALL_L0_SLOWDOWN_MICROS, delayed);
|
||||
stall_level0_slowdown_ += delayed;
|
||||
stall_level0_slowdown_count_++;
|
||||
internal_stats_.RecordWriteStall(InternalStats::LEVEL0_SLOWDOWN, delayed);
|
||||
allow_delay = false; // Do not delay a single write more than once
|
||||
mutex_.Lock();
|
||||
delayed_writes_++;
|
||||
@ -3388,8 +3383,8 @@ Status DBImpl::MakeRoomForWrite(bool force,
|
||||
}
|
||||
RecordTick(options_.statistics.get(),
|
||||
STALL_MEMTABLE_COMPACTION_MICROS, stall);
|
||||
stall_memtable_compaction_ += stall;
|
||||
stall_memtable_compaction_count_++;
|
||||
internal_stats_.RecordWriteStall(InternalStats::MEMTABLE_COMPACTION,
|
||||
stall);
|
||||
} else if (versions_->current()->NumLevelFiles(0) >=
|
||||
options_.level0_stop_writes_trigger) {
|
||||
// There are too many level-0 files.
|
||||
@ -3403,8 +3398,7 @@ Status DBImpl::MakeRoomForWrite(bool force,
|
||||
stall = sw.ElapsedMicros();
|
||||
}
|
||||
RecordTick(options_.statistics.get(), STALL_L0_NUM_FILES_MICROS, stall);
|
||||
stall_level0_num_files_ += stall;
|
||||
stall_level0_num_files_count_++;
|
||||
internal_stats_.RecordWriteStall(InternalStats::LEVEL0_NUM_FILES, stall);
|
||||
} else if (allow_hard_rate_limit_delay && options_.hard_rate_limit > 1.0 &&
|
||||
(score = versions_->current()->MaxCompactionScore()) >
|
||||
options_.hard_rate_limit) {
|
||||
@ -3418,8 +3412,7 @@ Status DBImpl::MakeRoomForWrite(bool force,
|
||||
env_->SleepForMicroseconds(1000);
|
||||
delayed = sw.ElapsedMicros();
|
||||
}
|
||||
stall_leveln_slowdown_[max_level] += delayed;
|
||||
stall_leveln_slowdown_count_[max_level]++;
|
||||
internal_stats_.RecordLevelNSlowdown(max_level, delayed);
|
||||
// Make sure the following value doesn't round to zero.
|
||||
uint64_t rate_limit = std::max((delayed / 1000), (uint64_t) 1);
|
||||
rate_limit_delay_millis += rate_limit;
|
||||
@ -3516,297 +3509,9 @@ const Options& DBImpl::GetOptions() const {
|
||||
|
||||
bool DBImpl::GetProperty(const Slice& property, std::string* value) {
|
||||
value->clear();
|
||||
|
||||
MutexLock l(&mutex_);
|
||||
Version* current = versions_->current();
|
||||
Slice in = property;
|
||||
Slice prefix("rocksdb.");
|
||||
if (!in.starts_with(prefix)) return false;
|
||||
in.remove_prefix(prefix.size());
|
||||
|
||||
if (in.starts_with("num-files-at-level")) {
|
||||
in.remove_prefix(strlen("num-files-at-level"));
|
||||
uint64_t level;
|
||||
bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
|
||||
if (!ok || (int)level >= NumberLevels()) {
|
||||
return false;
|
||||
} else {
|
||||
char buf[100];
|
||||
snprintf(buf, sizeof(buf), "%d",
|
||||
current->NumLevelFiles(static_cast<int>(level)));
|
||||
*value = buf;
|
||||
return true;
|
||||
}
|
||||
} else if (in == "levelstats") {
|
||||
char buf[1000];
|
||||
snprintf(buf, sizeof(buf),
|
||||
"Level Files Size(MB)\n"
|
||||
"--------------------\n");
|
||||
value->append(buf);
|
||||
|
||||
for (int level = 0; level < NumberLevels(); level++) {
|
||||
snprintf(buf, sizeof(buf),
|
||||
"%3d %8d %8.0f\n",
|
||||
level,
|
||||
current->NumLevelFiles(level),
|
||||
current->NumLevelBytes(level) / 1048576.0);
|
||||
value->append(buf);
|
||||
}
|
||||
return true;
|
||||
|
||||
} else if (in == "stats") {
|
||||
char buf[1000];
|
||||
|
||||
uint64_t wal_bytes = 0;
|
||||
uint64_t wal_synced = 0;
|
||||
uint64_t user_bytes_written = 0;
|
||||
uint64_t write_other = 0;
|
||||
uint64_t write_self = 0;
|
||||
uint64_t write_with_wal = 0;
|
||||
uint64_t total_bytes_written = 0;
|
||||
uint64_t total_bytes_read = 0;
|
||||
uint64_t micros_up = env_->NowMicros() - started_at_;
|
||||
// Add "+1" to make sure seconds_up is > 0 and avoid NaN later
|
||||
double seconds_up = (micros_up + 1) / 1000000.0;
|
||||
uint64_t total_slowdown = 0;
|
||||
uint64_t total_slowdown_count = 0;
|
||||
uint64_t interval_bytes_written = 0;
|
||||
uint64_t interval_bytes_read = 0;
|
||||
uint64_t interval_bytes_new = 0;
|
||||
double interval_seconds_up = 0;
|
||||
|
||||
Statistics* s = options_.statistics.get();
|
||||
if (s) {
|
||||
wal_bytes = s->getTickerCount(WAL_FILE_BYTES);
|
||||
wal_synced = s->getTickerCount(WAL_FILE_SYNCED);
|
||||
user_bytes_written = s->getTickerCount(BYTES_WRITTEN);
|
||||
write_other = s->getTickerCount(WRITE_DONE_BY_OTHER);
|
||||
write_self = s->getTickerCount(WRITE_DONE_BY_SELF);
|
||||
write_with_wal = s->getTickerCount(WRITE_WITH_WAL);
|
||||
}
|
||||
|
||||
// Pardon the long line but I think it is easier to read this way.
|
||||
snprintf(buf, sizeof(buf),
|
||||
" Compactions\n"
|
||||
"Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) RW-Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count msComp msStall Ln-stall Stall-cnt\n"
|
||||
"------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n"
|
||||
);
|
||||
value->append(buf);
|
||||
for (int level = 0; level < current->NumberLevels(); level++) {
|
||||
int files = current->NumLevelFiles(level);
|
||||
if (stats_[level].micros > 0 || files > 0) {
|
||||
int64_t bytes_read = stats_[level].bytes_readn +
|
||||
stats_[level].bytes_readnp1;
|
||||
int64_t bytes_new = stats_[level].bytes_written -
|
||||
stats_[level].bytes_readnp1;
|
||||
double amplify = (stats_[level].bytes_readn == 0)
|
||||
? 0.0
|
||||
: (stats_[level].bytes_written +
|
||||
stats_[level].bytes_readnp1 +
|
||||
stats_[level].bytes_readn) /
|
||||
(double) stats_[level].bytes_readn;
|
||||
|
||||
total_bytes_read += bytes_read;
|
||||
total_bytes_written += stats_[level].bytes_written;
|
||||
|
||||
uint64_t stalls = level == 0 ?
|
||||
(stall_level0_slowdown_count_ +
|
||||
stall_level0_num_files_count_ +
|
||||
stall_memtable_compaction_count_) :
|
||||
stall_leveln_slowdown_count_[level];
|
||||
|
||||
double stall_us = level == 0 ?
|
||||
(stall_level0_slowdown_ +
|
||||
stall_level0_num_files_ +
|
||||
stall_memtable_compaction_) :
|
||||
stall_leveln_slowdown_[level];
|
||||
|
||||
snprintf(
|
||||
buf, sizeof(buf),
|
||||
"%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %8d %9.1f %9.1f %9lu\n",
|
||||
level,
|
||||
files,
|
||||
current->NumLevelBytes(level) / 1048576.0,
|
||||
current->NumLevelBytes(level) /
|
||||
versions_->MaxBytesForLevel(level),
|
||||
stats_[level].micros / 1e6,
|
||||
bytes_read / 1048576.0,
|
||||
stats_[level].bytes_written / 1048576.0,
|
||||
stats_[level].bytes_readn / 1048576.0,
|
||||
stats_[level].bytes_readnp1 / 1048576.0,
|
||||
bytes_new / 1048576.0,
|
||||
amplify,
|
||||
// +1 to avoid division by 0
|
||||
(bytes_read / 1048576.0) / ((stats_[level].micros+1) / 1000000.0),
|
||||
(stats_[level].bytes_written / 1048576.0) /
|
||||
((stats_[level].micros+1) / 1000000.0),
|
||||
stats_[level].files_in_leveln,
|
||||
stats_[level].files_in_levelnp1,
|
||||
stats_[level].files_out_levelnp1,
|
||||
stats_[level].files_out_levelnp1 - stats_[level].files_in_levelnp1,
|
||||
stats_[level].count,
|
||||
(int) ((double) stats_[level].micros /
|
||||
1000.0 /
|
||||
(stats_[level].count + 1)),
|
||||
(double) stall_us / 1000.0 / (stalls + 1),
|
||||
stall_us / 1000000.0,
|
||||
(unsigned long) stalls);
|
||||
|
||||
total_slowdown += stall_leveln_slowdown_[level];
|
||||
total_slowdown_count += stall_leveln_slowdown_count_[level];
|
||||
value->append(buf);
|
||||
}
|
||||
}
|
||||
|
||||
interval_bytes_new = user_bytes_written - last_stats_.ingest_bytes_;
|
||||
interval_bytes_read = total_bytes_read - last_stats_.compaction_bytes_read_;
|
||||
interval_bytes_written =
|
||||
total_bytes_written - last_stats_.compaction_bytes_written_;
|
||||
interval_seconds_up = seconds_up - last_stats_.seconds_up_;
|
||||
|
||||
snprintf(buf, sizeof(buf), "Uptime(secs): %.1f total, %.1f interval\n",
|
||||
seconds_up, interval_seconds_up);
|
||||
value->append(buf);
|
||||
|
||||
snprintf(buf, sizeof(buf),
|
||||
"Writes cumulative: %llu total, %llu batches, "
|
||||
"%.1f per batch, %.2f ingest GB\n",
|
||||
(unsigned long long) (write_other + write_self),
|
||||
(unsigned long long) write_self,
|
||||
(write_other + write_self) / (double) (write_self + 1),
|
||||
user_bytes_written / (1048576.0 * 1024));
|
||||
value->append(buf);
|
||||
|
||||
snprintf(buf, sizeof(buf),
|
||||
"WAL cumulative: %llu WAL writes, %llu WAL syncs, "
|
||||
"%.2f writes per sync, %.2f GB written\n",
|
||||
(unsigned long long) write_with_wal,
|
||||
(unsigned long long ) wal_synced,
|
||||
write_with_wal / (double) (wal_synced + 1),
|
||||
wal_bytes / (1048576.0 * 1024));
|
||||
value->append(buf);
|
||||
|
||||
snprintf(buf, sizeof(buf),
|
||||
"Compaction IO cumulative (GB): "
|
||||
"%.2f new, %.2f read, %.2f write, %.2f read+write\n",
|
||||
user_bytes_written / (1048576.0 * 1024),
|
||||
total_bytes_read / (1048576.0 * 1024),
|
||||
total_bytes_written / (1048576.0 * 1024),
|
||||
(total_bytes_read + total_bytes_written) / (1048576.0 * 1024));
|
||||
value->append(buf);
|
||||
|
||||
snprintf(buf, sizeof(buf),
|
||||
"Compaction IO cumulative (MB/sec): "
|
||||
"%.1f new, %.1f read, %.1f write, %.1f read+write\n",
|
||||
user_bytes_written / 1048576.0 / seconds_up,
|
||||
total_bytes_read / 1048576.0 / seconds_up,
|
||||
total_bytes_written / 1048576.0 / seconds_up,
|
||||
(total_bytes_read + total_bytes_written) / 1048576.0 / seconds_up);
|
||||
value->append(buf);
|
||||
|
||||
// +1 to avoid divide by 0 and NaN
|
||||
snprintf(buf, sizeof(buf),
|
||||
"Amplification cumulative: %.1f write, %.1f compaction\n",
|
||||
(double) (total_bytes_written + wal_bytes)
|
||||
/ (user_bytes_written + 1),
|
||||
(double) (total_bytes_written + total_bytes_read + wal_bytes)
|
||||
/ (user_bytes_written + 1));
|
||||
value->append(buf);
|
||||
|
||||
uint64_t interval_write_other = write_other - last_stats_.write_other_;
|
||||
uint64_t interval_write_self = write_self - last_stats_.write_self_;
|
||||
|
||||
snprintf(buf, sizeof(buf),
|
||||
"Writes interval: %llu total, %llu batches, "
|
||||
"%.1f per batch, %.1f ingest MB\n",
|
||||
(unsigned long long) (interval_write_other + interval_write_self),
|
||||
(unsigned long long) interval_write_self,
|
||||
(double) (interval_write_other + interval_write_self)
|
||||
/ (interval_write_self + 1),
|
||||
(user_bytes_written - last_stats_.ingest_bytes_) / 1048576.0);
|
||||
value->append(buf);
|
||||
|
||||
uint64_t interval_write_with_wal =
|
||||
write_with_wal - last_stats_.write_with_wal_;
|
||||
|
||||
uint64_t interval_wal_synced = wal_synced - last_stats_.wal_synced_;
|
||||
uint64_t interval_wal_bytes = wal_bytes - last_stats_.wal_bytes_;
|
||||
|
||||
snprintf(buf, sizeof(buf),
|
||||
"WAL interval: %llu WAL writes, %llu WAL syncs, "
|
||||
"%.2f writes per sync, %.2f MB written\n",
|
||||
(unsigned long long) interval_write_with_wal,
|
||||
(unsigned long long ) interval_wal_synced,
|
||||
interval_write_with_wal / (double) (interval_wal_synced + 1),
|
||||
interval_wal_bytes / (1048576.0 * 1024));
|
||||
value->append(buf);
|
||||
|
||||
snprintf(buf, sizeof(buf),
|
||||
"Compaction IO interval (MB): "
|
||||
"%.2f new, %.2f read, %.2f write, %.2f read+write\n",
|
||||
interval_bytes_new / 1048576.0,
|
||||
interval_bytes_read/ 1048576.0,
|
||||
interval_bytes_written / 1048576.0,
|
||||
(interval_bytes_read + interval_bytes_written) / 1048576.0);
|
||||
value->append(buf);
|
||||
|
||||
snprintf(buf, sizeof(buf),
|
||||
"Compaction IO interval (MB/sec): "
|
||||
"%.1f new, %.1f read, %.1f write, %.1f read+write\n",
|
||||
interval_bytes_new / 1048576.0 / interval_seconds_up,
|
||||
interval_bytes_read / 1048576.0 / interval_seconds_up,
|
||||
interval_bytes_written / 1048576.0 / interval_seconds_up,
|
||||
(interval_bytes_read + interval_bytes_written)
|
||||
/ 1048576.0 / interval_seconds_up);
|
||||
value->append(buf);
|
||||
|
||||
// +1 to avoid divide by 0 and NaN
|
||||
snprintf(buf, sizeof(buf),
|
||||
"Amplification interval: %.1f write, %.1f compaction\n",
|
||||
(double) (interval_bytes_written + wal_bytes)
|
||||
/ (interval_bytes_new + 1),
|
||||
(double) (interval_bytes_written + interval_bytes_read + wal_bytes)
|
||||
/ (interval_bytes_new + 1));
|
||||
value->append(buf);
|
||||
|
||||
snprintf(buf, sizeof(buf),
|
||||
"Stalls(secs): %.3f level0_slowdown, %.3f level0_numfiles, "
|
||||
"%.3f memtable_compaction, %.3f leveln_slowdown\n",
|
||||
stall_level0_slowdown_ / 1000000.0,
|
||||
stall_level0_num_files_ / 1000000.0,
|
||||
stall_memtable_compaction_ / 1000000.0,
|
||||
total_slowdown / 1000000.0);
|
||||
value->append(buf);
|
||||
|
||||
snprintf(buf, sizeof(buf),
|
||||
"Stalls(count): %lu level0_slowdown, %lu level0_numfiles, "
|
||||
"%lu memtable_compaction, %lu leveln_slowdown\n",
|
||||
(unsigned long) stall_level0_slowdown_count_,
|
||||
(unsigned long) stall_level0_num_files_count_,
|
||||
(unsigned long) stall_memtable_compaction_count_,
|
||||
(unsigned long) total_slowdown_count);
|
||||
value->append(buf);
|
||||
|
||||
last_stats_.compaction_bytes_read_ = total_bytes_read;
|
||||
last_stats_.compaction_bytes_written_ = total_bytes_written;
|
||||
last_stats_.ingest_bytes_ = user_bytes_written;
|
||||
last_stats_.seconds_up_ = seconds_up;
|
||||
last_stats_.wal_bytes_ = wal_bytes;
|
||||
last_stats_.wal_synced_ = wal_synced;
|
||||
last_stats_.write_with_wal_ = write_with_wal;
|
||||
last_stats_.write_other_ = write_other;
|
||||
last_stats_.write_self_ = write_self;
|
||||
|
||||
return true;
|
||||
} else if (in == "sstables") {
|
||||
*value = versions_->current()->DebugString();
|
||||
return true;
|
||||
} else if (in == "num-immutable-mem-table") {
|
||||
*value = std::to_string(imm_.size());
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
return internal_stats_.GetProperty(property, value, versions_.get(),
|
||||
imm_.size());
|
||||
}
|
||||
|
||||
void DBImpl::GetApproximateSizes(
|
||||
|
83
db/db_impl.h
83
db/db_impl.h
@ -26,6 +26,7 @@
|
||||
#include "rocksdb/transaction_log.h"
|
||||
#include "util/autovector.h"
|
||||
#include "util/stats_logger.h"
|
||||
#include "db/internal_stats.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
@ -386,7 +387,6 @@ class DBImpl : public DB {
|
||||
port::Mutex mutex_;
|
||||
port::AtomicPointer shutting_down_;
|
||||
port::CondVar bg_cv_; // Signalled when background work finishes
|
||||
MemTableRepFactory* mem_rep_factory_;
|
||||
MemTable* mem_;
|
||||
MemTableList imm_; // Memtable that are not changing
|
||||
uint64_t logfile_number_;
|
||||
@ -468,88 +468,9 @@ class DBImpl : public DB {
|
||||
// enabled and archive size_limit is disabled.
|
||||
uint64_t default_interval_to_delete_obsolete_WAL_;
|
||||
|
||||
// These count the number of microseconds for which MakeRoomForWrite stalls.
|
||||
uint64_t stall_level0_slowdown_;
|
||||
uint64_t stall_memtable_compaction_;
|
||||
uint64_t stall_level0_num_files_;
|
||||
std::vector<uint64_t> stall_leveln_slowdown_;
|
||||
uint64_t stall_level0_slowdown_count_;
|
||||
uint64_t stall_memtable_compaction_count_;
|
||||
uint64_t stall_level0_num_files_count_;
|
||||
std::vector<uint64_t> stall_leveln_slowdown_count_;
|
||||
|
||||
// Time at which this instance was started.
|
||||
const uint64_t started_at_;
|
||||
|
||||
bool flush_on_destroy_; // Used when disableWAL is true.
|
||||
|
||||
// Per level compaction stats. stats_[level] stores the stats for
|
||||
// compactions that produced data for the specified "level".
|
||||
struct CompactionStats {
|
||||
uint64_t micros;
|
||||
|
||||
// Bytes read from level N during compaction between levels N and N+1
|
||||
int64_t bytes_readn;
|
||||
|
||||
// Bytes read from level N+1 during compaction between levels N and N+1
|
||||
int64_t bytes_readnp1;
|
||||
|
||||
// Total bytes written during compaction between levels N and N+1
|
||||
int64_t bytes_written;
|
||||
|
||||
// Files read from level N during compaction between levels N and N+1
|
||||
int files_in_leveln;
|
||||
|
||||
// Files read from level N+1 during compaction between levels N and N+1
|
||||
int files_in_levelnp1;
|
||||
|
||||
// Files written during compaction between levels N and N+1
|
||||
int files_out_levelnp1;
|
||||
|
||||
// Number of compactions done
|
||||
int count;
|
||||
|
||||
CompactionStats() : micros(0), bytes_readn(0), bytes_readnp1(0),
|
||||
bytes_written(0), files_in_leveln(0),
|
||||
files_in_levelnp1(0), files_out_levelnp1(0),
|
||||
count(0) { }
|
||||
|
||||
void Add(const CompactionStats& c) {
|
||||
this->micros += c.micros;
|
||||
this->bytes_readn += c.bytes_readn;
|
||||
this->bytes_readnp1 += c.bytes_readnp1;
|
||||
this->bytes_written += c.bytes_written;
|
||||
this->files_in_leveln += c.files_in_leveln;
|
||||
this->files_in_levelnp1 += c.files_in_levelnp1;
|
||||
this->files_out_levelnp1 += c.files_out_levelnp1;
|
||||
this->count += 1;
|
||||
}
|
||||
};
|
||||
|
||||
std::vector<CompactionStats> stats_;
|
||||
|
||||
// Used to compute per-interval statistics
|
||||
struct StatsSnapshot {
|
||||
uint64_t compaction_bytes_read_; // Bytes read by compaction
|
||||
uint64_t compaction_bytes_written_; // Bytes written by compaction
|
||||
uint64_t ingest_bytes_; // Bytes written by user
|
||||
uint64_t wal_bytes_; // Bytes written to WAL
|
||||
uint64_t wal_synced_; // Number of times WAL is synced
|
||||
uint64_t write_with_wal_; // Number of writes that request WAL
|
||||
// These count the number of writes processed by the calling thread or
|
||||
// another thread.
|
||||
uint64_t write_other_;
|
||||
uint64_t write_self_;
|
||||
double seconds_up_;
|
||||
|
||||
StatsSnapshot() : compaction_bytes_read_(0), compaction_bytes_written_(0),
|
||||
ingest_bytes_(0), wal_bytes_(0), wal_synced_(0),
|
||||
write_with_wal_(0), write_other_(0), write_self_(0),
|
||||
seconds_up_(0) {}
|
||||
};
|
||||
|
||||
// Counters from the previous time per-interval stats were computed
|
||||
StatsSnapshot last_stats_;
|
||||
InternalStats internal_stats_;
|
||||
|
||||
static const int KEEP_LOG_FILE_NUM = 1000;
|
||||
std::string db_absolute_path_;
|
||||
|
298
db/internal_stats.cc
Normal file
298
db/internal_stats.cc
Normal file
@ -0,0 +1,298 @@
|
||||
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
//
|
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#include "db/internal_stats.h"
|
||||
|
||||
#include <vector>
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
bool InternalStats::GetProperty(const Slice& property, std::string* value,
|
||||
VersionSet* version_set, int immsize) {
|
||||
Version* current = version_set->current();
|
||||
Slice in = property;
|
||||
Slice prefix("rocksdb.");
|
||||
if (!in.starts_with(prefix)) return false;
|
||||
in.remove_prefix(prefix.size());
|
||||
|
||||
if (in.starts_with("num-files-at-level")) {
|
||||
in.remove_prefix(strlen("num-files-at-level"));
|
||||
uint64_t level;
|
||||
bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
|
||||
if (!ok || (int)level >= number_levels_) {
|
||||
return false;
|
||||
} else {
|
||||
char buf[100];
|
||||
snprintf(buf, sizeof(buf), "%d",
|
||||
current->NumLevelFiles(static_cast<int>(level)));
|
||||
*value = buf;
|
||||
return true;
|
||||
}
|
||||
} else if (in == "levelstats") {
|
||||
char buf[1000];
|
||||
snprintf(buf, sizeof(buf),
|
||||
"Level Files Size(MB)\n"
|
||||
"--------------------\n");
|
||||
value->append(buf);
|
||||
|
||||
for (int level = 0; level < number_levels_; level++) {
|
||||
snprintf(buf, sizeof(buf), "%3d %8d %8.0f\n", level,
|
||||
current->NumLevelFiles(level),
|
||||
current->NumLevelBytes(level) / 1048576.0);
|
||||
value->append(buf);
|
||||
}
|
||||
return true;
|
||||
|
||||
} else if (in == "stats") {
|
||||
char buf[1000];
|
||||
|
||||
uint64_t wal_bytes = 0;
|
||||
uint64_t wal_synced = 0;
|
||||
uint64_t user_bytes_written = 0;
|
||||
uint64_t write_other = 0;
|
||||
uint64_t write_self = 0;
|
||||
uint64_t write_with_wal = 0;
|
||||
uint64_t total_bytes_written = 0;
|
||||
uint64_t total_bytes_read = 0;
|
||||
uint64_t micros_up = env_->NowMicros() - started_at_;
|
||||
// Add "+1" to make sure seconds_up is > 0 and avoid NaN later
|
||||
double seconds_up = (micros_up + 1) / 1000000.0;
|
||||
uint64_t total_slowdown = 0;
|
||||
uint64_t total_slowdown_count = 0;
|
||||
uint64_t interval_bytes_written = 0;
|
||||
uint64_t interval_bytes_read = 0;
|
||||
uint64_t interval_bytes_new = 0;
|
||||
double interval_seconds_up = 0;
|
||||
|
||||
if (statistics_) {
|
||||
wal_bytes = statistics_->getTickerCount(WAL_FILE_BYTES);
|
||||
wal_synced = statistics_->getTickerCount(WAL_FILE_SYNCED);
|
||||
user_bytes_written = statistics_->getTickerCount(BYTES_WRITTEN);
|
||||
write_other = statistics_->getTickerCount(WRITE_DONE_BY_OTHER);
|
||||
write_self = statistics_->getTickerCount(WRITE_DONE_BY_SELF);
|
||||
write_with_wal = statistics_->getTickerCount(WRITE_WITH_WAL);
|
||||
}
|
||||
|
||||
// Pardon the long line but I think it is easier to read this way.
|
||||
snprintf(buf, sizeof(buf),
|
||||
" Compactions\n"
|
||||
"Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) RW-Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count msComp msStall Ln-stall Stall-cnt\n"
|
||||
"------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n"
|
||||
);
|
||||
value->append(buf);
|
||||
for (int level = 0; level < number_levels_; level++) {
|
||||
int files = current->NumLevelFiles(level);
|
||||
if (compaction_stats_[level].micros > 0 || files > 0) {
|
||||
int64_t bytes_read = compaction_stats_[level].bytes_readn +
|
||||
compaction_stats_[level].bytes_readnp1;
|
||||
int64_t bytes_new = compaction_stats_[level].bytes_written -
|
||||
compaction_stats_[level].bytes_readnp1;
|
||||
double amplify = (compaction_stats_[level].bytes_readn == 0)
|
||||
? 0.0
|
||||
: (compaction_stats_[level].bytes_written +
|
||||
compaction_stats_[level].bytes_readnp1 +
|
||||
compaction_stats_[level].bytes_readn) /
|
||||
(double)compaction_stats_[level].bytes_readn;
|
||||
|
||||
total_bytes_read += bytes_read;
|
||||
total_bytes_written += compaction_stats_[level].bytes_written;
|
||||
|
||||
uint64_t stalls = level == 0 ? (stall_counts_[LEVEL0_SLOWDOWN] +
|
||||
stall_counts_[LEVEL0_NUM_FILES] +
|
||||
stall_counts_[MEMTABLE_COMPACTION])
|
||||
: stall_leveln_slowdown_count_[level];
|
||||
|
||||
double stall_us = level == 0 ? (stall_micros_[LEVEL0_SLOWDOWN] +
|
||||
stall_micros_[LEVEL0_NUM_FILES] +
|
||||
stall_micros_[MEMTABLE_COMPACTION])
|
||||
: stall_leveln_slowdown_[level];
|
||||
|
||||
snprintf(buf, sizeof(buf),
|
||||
"%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f "
|
||||
"%10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %8d %9.1f %9.1f "
|
||||
"%9lu\n",
|
||||
level, files, current->NumLevelBytes(level) / 1048576.0,
|
||||
current->NumLevelBytes(level) /
|
||||
version_set->MaxBytesForLevel(level),
|
||||
compaction_stats_[level].micros / 1e6, bytes_read / 1048576.0,
|
||||
compaction_stats_[level].bytes_written / 1048576.0,
|
||||
compaction_stats_[level].bytes_readn / 1048576.0,
|
||||
compaction_stats_[level].bytes_readnp1 / 1048576.0,
|
||||
bytes_new / 1048576.0, amplify,
|
||||
// +1 to avoid division by 0
|
||||
(bytes_read / 1048576.0) /
|
||||
((compaction_stats_[level].micros + 1) / 1000000.0),
|
||||
(compaction_stats_[level].bytes_written / 1048576.0) /
|
||||
((compaction_stats_[level].micros + 1) / 1000000.0),
|
||||
compaction_stats_[level].files_in_leveln,
|
||||
compaction_stats_[level].files_in_levelnp1,
|
||||
compaction_stats_[level].files_out_levelnp1,
|
||||
compaction_stats_[level].files_out_levelnp1 -
|
||||
compaction_stats_[level].files_in_levelnp1,
|
||||
compaction_stats_[level].count,
|
||||
(int)((double)compaction_stats_[level].micros / 1000.0 /
|
||||
(compaction_stats_[level].count + 1)),
|
||||
(double)stall_us / 1000.0 / (stalls + 1), stall_us / 1000000.0,
|
||||
(unsigned long)stalls);
|
||||
total_slowdown += stall_leveln_slowdown_[level];
|
||||
total_slowdown_count += stall_leveln_slowdown_count_[level];
|
||||
value->append(buf);
|
||||
}
|
||||
}
|
||||
|
||||
interval_bytes_new = user_bytes_written - last_stats_.ingest_bytes_;
|
||||
interval_bytes_read = total_bytes_read - last_stats_.compaction_bytes_read_;
|
||||
interval_bytes_written =
|
||||
total_bytes_written - last_stats_.compaction_bytes_written_;
|
||||
interval_seconds_up = seconds_up - last_stats_.seconds_up_;
|
||||
|
||||
snprintf(buf, sizeof(buf), "Uptime(secs): %.1f total, %.1f interval\n",
|
||||
seconds_up, interval_seconds_up);
|
||||
value->append(buf);
|
||||
|
||||
snprintf(buf, sizeof(buf),
|
||||
"Writes cumulative: %llu total, %llu batches, "
|
||||
"%.1f per batch, %.2f ingest GB\n",
|
||||
(unsigned long long)(write_other + write_self),
|
||||
(unsigned long long)write_self,
|
||||
(write_other + write_self) / (double)(write_self + 1),
|
||||
user_bytes_written / (1048576.0 * 1024));
|
||||
value->append(buf);
|
||||
|
||||
snprintf(buf, sizeof(buf),
|
||||
"WAL cumulative: %llu WAL writes, %llu WAL syncs, "
|
||||
"%.2f writes per sync, %.2f GB written\n",
|
||||
(unsigned long long)write_with_wal, (unsigned long long)wal_synced,
|
||||
write_with_wal / (double)(wal_synced + 1),
|
||||
wal_bytes / (1048576.0 * 1024));
|
||||
value->append(buf);
|
||||
|
||||
snprintf(buf, sizeof(buf),
|
||||
"Compaction IO cumulative (GB): "
|
||||
"%.2f new, %.2f read, %.2f write, %.2f read+write\n",
|
||||
user_bytes_written / (1048576.0 * 1024),
|
||||
total_bytes_read / (1048576.0 * 1024),
|
||||
total_bytes_written / (1048576.0 * 1024),
|
||||
(total_bytes_read + total_bytes_written) / (1048576.0 * 1024));
|
||||
value->append(buf);
|
||||
|
||||
snprintf(buf, sizeof(buf),
|
||||
"Compaction IO cumulative (MB/sec): "
|
||||
"%.1f new, %.1f read, %.1f write, %.1f read+write\n",
|
||||
user_bytes_written / 1048576.0 / seconds_up,
|
||||
total_bytes_read / 1048576.0 / seconds_up,
|
||||
total_bytes_written / 1048576.0 / seconds_up,
|
||||
(total_bytes_read + total_bytes_written) / 1048576.0 / seconds_up);
|
||||
value->append(buf);
|
||||
|
||||
// +1 to avoid divide by 0 and NaN
|
||||
snprintf(
|
||||
buf, sizeof(buf),
|
||||
"Amplification cumulative: %.1f write, %.1f compaction\n",
|
||||
(double)(total_bytes_written + wal_bytes) / (user_bytes_written + 1),
|
||||
(double)(total_bytes_written + total_bytes_read + wal_bytes) /
|
||||
(user_bytes_written + 1));
|
||||
value->append(buf);
|
||||
|
||||
uint64_t interval_write_other = write_other - last_stats_.write_other_;
|
||||
uint64_t interval_write_self = write_self - last_stats_.write_self_;
|
||||
|
||||
snprintf(buf, sizeof(buf),
|
||||
"Writes interval: %llu total, %llu batches, "
|
||||
"%.1f per batch, %.1f ingest MB\n",
|
||||
(unsigned long long)(interval_write_other + interval_write_self),
|
||||
(unsigned long long)interval_write_self,
|
||||
(double)(interval_write_other + interval_write_self) /
|
||||
(interval_write_self + 1),
|
||||
(user_bytes_written - last_stats_.ingest_bytes_) / 1048576.0);
|
||||
value->append(buf);
|
||||
|
||||
uint64_t interval_write_with_wal =
|
||||
write_with_wal - last_stats_.write_with_wal_;
|
||||
|
||||
uint64_t interval_wal_synced = wal_synced - last_stats_.wal_synced_;
|
||||
uint64_t interval_wal_bytes = wal_bytes - last_stats_.wal_bytes_;
|
||||
|
||||
snprintf(buf, sizeof(buf),
|
||||
"WAL interval: %llu WAL writes, %llu WAL syncs, "
|
||||
"%.2f writes per sync, %.2f MB written\n",
|
||||
(unsigned long long)interval_write_with_wal,
|
||||
(unsigned long long)interval_wal_synced,
|
||||
interval_write_with_wal / (double)(interval_wal_synced + 1),
|
||||
interval_wal_bytes / (1048576.0 * 1024));
|
||||
value->append(buf);
|
||||
|
||||
snprintf(buf, sizeof(buf),
|
||||
"Compaction IO interval (MB): "
|
||||
"%.2f new, %.2f read, %.2f write, %.2f read+write\n",
|
||||
interval_bytes_new / 1048576.0, interval_bytes_read / 1048576.0,
|
||||
interval_bytes_written / 1048576.0,
|
||||
(interval_bytes_read + interval_bytes_written) / 1048576.0);
|
||||
value->append(buf);
|
||||
|
||||
snprintf(buf, sizeof(buf),
|
||||
"Compaction IO interval (MB/sec): "
|
||||
"%.1f new, %.1f read, %.1f write, %.1f read+write\n",
|
||||
interval_bytes_new / 1048576.0 / interval_seconds_up,
|
||||
interval_bytes_read / 1048576.0 / interval_seconds_up,
|
||||
interval_bytes_written / 1048576.0 / interval_seconds_up,
|
||||
(interval_bytes_read + interval_bytes_written) / 1048576.0 /
|
||||
interval_seconds_up);
|
||||
value->append(buf);
|
||||
|
||||
// +1 to avoid divide by 0 and NaN
|
||||
snprintf(
|
||||
buf, sizeof(buf),
|
||||
"Amplification interval: %.1f write, %.1f compaction\n",
|
||||
(double)(interval_bytes_written + wal_bytes) / (interval_bytes_new + 1),
|
||||
(double)(interval_bytes_written + interval_bytes_read + wal_bytes) /
|
||||
(interval_bytes_new + 1));
|
||||
value->append(buf);
|
||||
|
||||
snprintf(buf, sizeof(buf),
|
||||
"Stalls(secs): %.3f level0_slowdown, %.3f level0_numfiles, "
|
||||
"%.3f memtable_compaction, %.3f leveln_slowdown\n",
|
||||
stall_micros_[LEVEL0_SLOWDOWN] / 1000000.0,
|
||||
stall_micros_[LEVEL0_NUM_FILES] / 1000000.0,
|
||||
stall_micros_[MEMTABLE_COMPACTION] / 1000000.0,
|
||||
total_slowdown / 1000000.0);
|
||||
value->append(buf);
|
||||
|
||||
snprintf(buf, sizeof(buf),
|
||||
"Stalls(count): %lu level0_slowdown, %lu level0_numfiles, "
|
||||
"%lu memtable_compaction, %lu leveln_slowdown\n",
|
||||
(unsigned long)stall_counts_[LEVEL0_SLOWDOWN],
|
||||
(unsigned long)stall_counts_[LEVEL0_NUM_FILES],
|
||||
(unsigned long)stall_counts_[MEMTABLE_COMPACTION],
|
||||
(unsigned long)total_slowdown_count);
|
||||
value->append(buf);
|
||||
|
||||
last_stats_.compaction_bytes_read_ = total_bytes_read;
|
||||
last_stats_.compaction_bytes_written_ = total_bytes_written;
|
||||
last_stats_.ingest_bytes_ = user_bytes_written;
|
||||
last_stats_.seconds_up_ = seconds_up;
|
||||
last_stats_.wal_bytes_ = wal_bytes;
|
||||
last_stats_.wal_synced_ = wal_synced;
|
||||
last_stats_.write_with_wal_ = write_with_wal;
|
||||
last_stats_.write_other_ = write_other;
|
||||
last_stats_.write_self_ = write_self;
|
||||
|
||||
return true;
|
||||
} else if (in == "sstables") {
|
||||
*value = current->DebugString();
|
||||
return true;
|
||||
} else if (in == "num-immutable-mem-table") {
|
||||
*value = std::to_string(immsize);
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
149
db/internal_stats.h
Normal file
149
db/internal_stats.h
Normal file
@ -0,0 +1,149 @@
|
||||
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
//
|
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
//
|
||||
|
||||
#pragma once
|
||||
#include "rocksdb/statistics.h"
|
||||
#include "util/statistics.h"
|
||||
#include "db/version_set.h"
|
||||
|
||||
#include <vector>
|
||||
#include <string>
|
||||
|
||||
namespace rocksdb {
|
||||
class InternalStats {
|
||||
public:
|
||||
enum WriteStallType {
|
||||
LEVEL0_SLOWDOWN,
|
||||
MEMTABLE_COMPACTION,
|
||||
LEVEL0_NUM_FILES,
|
||||
WRITE_STALLS_ENUM_MAX,
|
||||
};
|
||||
|
||||
InternalStats(int num_levels, Env* env, Statistics* statistics)
|
||||
: compaction_stats_(num_levels),
|
||||
stall_micros_(WRITE_STALLS_ENUM_MAX, 0),
|
||||
stall_counts_(WRITE_STALLS_ENUM_MAX, 0),
|
||||
stall_leveln_slowdown_(num_levels, 0),
|
||||
stall_leveln_slowdown_count_(num_levels, 0),
|
||||
number_levels_(num_levels),
|
||||
statistics_(statistics),
|
||||
env_(env),
|
||||
started_at_(env->NowMicros()) {}
|
||||
|
||||
// Per level compaction stats. compaction_stats_[level] stores the stats for
|
||||
// compactions that produced data for the specified "level".
|
||||
struct CompactionStats {
|
||||
uint64_t micros;
|
||||
|
||||
// Bytes read from level N during compaction between levels N and N+1
|
||||
int64_t bytes_readn;
|
||||
|
||||
// Bytes read from level N+1 during compaction between levels N and N+1
|
||||
int64_t bytes_readnp1;
|
||||
|
||||
// Total bytes written during compaction between levels N and N+1
|
||||
int64_t bytes_written;
|
||||
|
||||
// Files read from level N during compaction between levels N and N+1
|
||||
int files_in_leveln;
|
||||
|
||||
// Files read from level N+1 during compaction between levels N and N+1
|
||||
int files_in_levelnp1;
|
||||
|
||||
// Files written during compaction between levels N and N+1
|
||||
int files_out_levelnp1;
|
||||
|
||||
// Number of compactions done
|
||||
int count;
|
||||
|
||||
CompactionStats()
|
||||
: micros(0),
|
||||
bytes_readn(0),
|
||||
bytes_readnp1(0),
|
||||
bytes_written(0),
|
||||
files_in_leveln(0),
|
||||
files_in_levelnp1(0),
|
||||
files_out_levelnp1(0),
|
||||
count(0) {}
|
||||
|
||||
void Add(const CompactionStats& c) {
|
||||
this->micros += c.micros;
|
||||
this->bytes_readn += c.bytes_readn;
|
||||
this->bytes_readnp1 += c.bytes_readnp1;
|
||||
this->bytes_written += c.bytes_written;
|
||||
this->files_in_leveln += c.files_in_leveln;
|
||||
this->files_in_levelnp1 += c.files_in_levelnp1;
|
||||
this->files_out_levelnp1 += c.files_out_levelnp1;
|
||||
this->count += 1;
|
||||
}
|
||||
};
|
||||
|
||||
void AddCompactionStats(int level, const CompactionStats& stats) {
|
||||
compaction_stats_[level].Add(stats);
|
||||
}
|
||||
|
||||
void RecordWriteStall(WriteStallType write_stall_type, uint64_t micros) {
|
||||
stall_micros_[write_stall_type] += micros;
|
||||
stall_counts_[write_stall_type]++;
|
||||
}
|
||||
|
||||
void RecordLevelNSlowdown(int level, uint64_t micros) {
|
||||
stall_leveln_slowdown_[level] += micros;
|
||||
stall_leveln_slowdown_count_[level] += micros;
|
||||
}
|
||||
|
||||
bool GetProperty(const Slice& property, std::string* value,
|
||||
VersionSet* version_set, int immsize);
|
||||
|
||||
private:
|
||||
std::vector<CompactionStats> compaction_stats_;
|
||||
|
||||
// Used to compute per-interval statistics
|
||||
struct StatsSnapshot {
|
||||
uint64_t compaction_bytes_read_; // Bytes read by compaction
|
||||
uint64_t compaction_bytes_written_; // Bytes written by compaction
|
||||
uint64_t ingest_bytes_; // Bytes written by user
|
||||
uint64_t wal_bytes_; // Bytes written to WAL
|
||||
uint64_t wal_synced_; // Number of times WAL is synced
|
||||
uint64_t write_with_wal_; // Number of writes that request WAL
|
||||
// These count the number of writes processed by the calling thread or
|
||||
// another thread.
|
||||
uint64_t write_other_;
|
||||
uint64_t write_self_;
|
||||
double seconds_up_;
|
||||
|
||||
StatsSnapshot()
|
||||
: compaction_bytes_read_(0),
|
||||
compaction_bytes_written_(0),
|
||||
ingest_bytes_(0),
|
||||
wal_bytes_(0),
|
||||
wal_synced_(0),
|
||||
write_with_wal_(0),
|
||||
write_other_(0),
|
||||
write_self_(0),
|
||||
seconds_up_(0) {}
|
||||
};
|
||||
|
||||
// Counters from the previous time per-interval stats were computed
|
||||
StatsSnapshot last_stats_;
|
||||
|
||||
// These count the number of microseconds for which MakeRoomForWrite stalls.
|
||||
std::vector<uint64_t> stall_micros_;
|
||||
std::vector<uint64_t> stall_counts_;
|
||||
std::vector<uint64_t> stall_leveln_slowdown_;
|
||||
std::vector<uint64_t> stall_leveln_slowdown_count_;
|
||||
|
||||
int number_levels_;
|
||||
Statistics* statistics_;
|
||||
Env* env_;
|
||||
uint64_t started_at_;
|
||||
};
|
||||
|
||||
} // namespace rocksdb
|
@ -1574,8 +1574,10 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu,
|
||||
}
|
||||
}
|
||||
|
||||
// find offset in manifest file where this version is stored.
|
||||
new_manifest_file_size = descriptor_log_->file()->GetFileSize();
|
||||
if (s.ok()) {
|
||||
// find offset in manifest file where this version is stored.
|
||||
new_manifest_file_size = descriptor_log_->file()->GetFileSize();
|
||||
}
|
||||
|
||||
LogFlush(options_->info_log);
|
||||
mu->Lock();
|
||||
|
@ -23,6 +23,7 @@
|
||||
#include <set>
|
||||
#include <vector>
|
||||
#include <deque>
|
||||
#include <atomic>
|
||||
#include "db/dbformat.h"
|
||||
#include "db/version_edit.h"
|
||||
#include "port/port.h"
|
||||
|
@ -68,8 +68,6 @@ struct BackupableDBOptions {
|
||||
destroy_old_data(_destroy_old_data) { }
|
||||
};
|
||||
|
||||
class BackupEngine;
|
||||
|
||||
typedef uint32_t BackupID;
|
||||
|
||||
struct BackupInfo {
|
||||
@ -82,6 +80,29 @@ struct BackupInfo {
|
||||
: backup_id(_backup_id), timestamp(_timestamp), size(_size) {}
|
||||
};
|
||||
|
||||
// Please see the documentation in BackupableDB and RestoreBackupableDB
|
||||
class BackupEngine {
|
||||
public:
|
||||
virtual ~BackupEngine() {}
|
||||
|
||||
static BackupEngine* NewBackupEngine(Env* db_env,
|
||||
const BackupableDBOptions& options);
|
||||
|
||||
virtual Status CreateNewBackup(DB* db, bool flush_before_backup = false) = 0;
|
||||
virtual Status PurgeOldBackups(uint32_t num_backups_to_keep) = 0;
|
||||
virtual Status DeleteBackup(BackupID backup_id) = 0;
|
||||
virtual void StopBackup() = 0;
|
||||
|
||||
virtual void GetBackupInfo(std::vector<BackupInfo>* backup_info) = 0;
|
||||
virtual Status RestoreDBFromBackup(BackupID backup_id,
|
||||
const std::string& db_dir,
|
||||
const std::string& wal_dir) = 0;
|
||||
virtual Status RestoreDBFromLatestBackup(const std::string& db_dir,
|
||||
const std::string& wal_dir) = 0;
|
||||
|
||||
virtual void DeleteBackupsNewerThan(uint64_t sequence_number) = 0;
|
||||
};
|
||||
|
||||
// Stack your DB with BackupableDB to be able to backup the DB
|
||||
class BackupableDB : public StackableDB {
|
||||
public:
|
||||
|
@ -14,25 +14,23 @@ std::shared_ptr<Statistics> CreateDBStatistics() {
|
||||
return std::make_shared<StatisticsImpl>();
|
||||
}
|
||||
|
||||
StatisticsImpl::StatisticsImpl()
|
||||
: tickers_(TICKER_ENUM_MAX),
|
||||
histograms_(HISTOGRAM_ENUM_MAX) {}
|
||||
StatisticsImpl::StatisticsImpl() {}
|
||||
|
||||
StatisticsImpl::~StatisticsImpl() {}
|
||||
|
||||
long StatisticsImpl::getTickerCount(Tickers tickerType) {
|
||||
assert(tickerType < TICKER_ENUM_MAX);
|
||||
return tickers_[tickerType];
|
||||
return tickers_[tickerType].value;
|
||||
}
|
||||
|
||||
void StatisticsImpl::setTickerCount(Tickers tickerType, uint64_t count) {
|
||||
assert(tickerType < TICKER_ENUM_MAX);
|
||||
tickers_[tickerType] = count;
|
||||
tickers_[tickerType].value = count;
|
||||
}
|
||||
|
||||
void StatisticsImpl::recordTick(Tickers tickerType, uint64_t count) {
|
||||
assert(tickerType < TICKER_ENUM_MAX);
|
||||
tickers_[tickerType] += count;
|
||||
tickers_[tickerType].value += count;
|
||||
}
|
||||
|
||||
void StatisticsImpl::measureTime(Histograms histogramType, uint64_t value) {
|
||||
|
@ -28,8 +28,18 @@ class StatisticsImpl : public Statistics {
|
||||
HistogramData* const data);
|
||||
|
||||
private:
|
||||
std::vector<std::atomic_uint_fast64_t> tickers_;
|
||||
std::vector<HistogramImpl> histograms_;
|
||||
struct Ticker {
|
||||
Ticker() : value(uint_fast64_t()) {}
|
||||
|
||||
std::atomic_uint_fast64_t value;
|
||||
// Pad the structure to make it size of 64 bytes. A plain array of
|
||||
// std::atomic_uint_fast64_t results in huge performance degradataion
|
||||
// due to false sharing.
|
||||
char padding[64 - sizeof(std::atomic_uint_fast64_t)];
|
||||
};
|
||||
|
||||
Ticker tickers_[TICKER_ENUM_MAX] __attribute__((aligned(64)));
|
||||
HistogramImpl histograms_[HISTOGRAM_ENUM_MAX] __attribute__((aligned(64)));
|
||||
};
|
||||
|
||||
// Utility functions
|
||||
|
@ -26,11 +26,11 @@
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
// -------- BackupEngine class ---------
|
||||
class BackupEngine {
|
||||
// -------- BackupEngineImpl class ---------
|
||||
class BackupEngineImpl : public BackupEngine {
|
||||
public:
|
||||
BackupEngine(Env* db_env, const BackupableDBOptions& options);
|
||||
~BackupEngine();
|
||||
BackupEngineImpl(Env* db_env, const BackupableDBOptions& options);
|
||||
~BackupEngineImpl();
|
||||
Status CreateNewBackup(DB* db, bool flush_before_backup = false);
|
||||
Status PurgeOldBackups(uint32_t num_backups_to_keep);
|
||||
Status DeleteBackup(BackupID backup_id);
|
||||
@ -188,7 +188,13 @@ class BackupEngine {
|
||||
static const size_t copy_file_buffer_size_ = 5 * 1024 * 1024LL; // 5MB
|
||||
};
|
||||
|
||||
BackupEngine::BackupEngine(Env* db_env, const BackupableDBOptions& options)
|
||||
BackupEngine* BackupEngine::NewBackupEngine(
|
||||
Env* db_env, const BackupableDBOptions& options) {
|
||||
return new BackupEngineImpl(db_env, options);
|
||||
}
|
||||
|
||||
BackupEngineImpl::BackupEngineImpl(Env* db_env,
|
||||
const BackupableDBOptions& options)
|
||||
: stop_backup_(false),
|
||||
options_(options),
|
||||
db_env_(db_env),
|
||||
@ -271,11 +277,9 @@ BackupEngine::BackupEngine(Env* db_env, const BackupableDBOptions& options)
|
||||
latest_backup_id_);
|
||||
}
|
||||
|
||||
BackupEngine::~BackupEngine() {
|
||||
LogFlush(options_.info_log);
|
||||
}
|
||||
BackupEngineImpl::~BackupEngineImpl() { LogFlush(options_.info_log); }
|
||||
|
||||
void BackupEngine::DeleteBackupsNewerThan(uint64_t sequence_number) {
|
||||
void BackupEngineImpl::DeleteBackupsNewerThan(uint64_t sequence_number) {
|
||||
for (auto backup : backups_) {
|
||||
if (backup.second.GetSequenceNumber() > sequence_number) {
|
||||
Log(options_.info_log,
|
||||
@ -295,7 +299,7 @@ void BackupEngine::DeleteBackupsNewerThan(uint64_t sequence_number) {
|
||||
GarbageCollection(false);
|
||||
}
|
||||
|
||||
Status BackupEngine::CreateNewBackup(DB* db, bool flush_before_backup) {
|
||||
Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
|
||||
Status s;
|
||||
std::vector<std::string> live_files;
|
||||
VectorLogPtr live_wal_files;
|
||||
@ -405,7 +409,7 @@ Status BackupEngine::CreateNewBackup(DB* db, bool flush_before_backup) {
|
||||
return s;
|
||||
}
|
||||
|
||||
Status BackupEngine::PurgeOldBackups(uint32_t num_backups_to_keep) {
|
||||
Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
|
||||
Log(options_.info_log, "Purging old backups, keeping %u",
|
||||
num_backups_to_keep);
|
||||
while (num_backups_to_keep < backups_.size()) {
|
||||
@ -418,7 +422,7 @@ Status BackupEngine::PurgeOldBackups(uint32_t num_backups_to_keep) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status BackupEngine::DeleteBackup(BackupID backup_id) {
|
||||
Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
|
||||
Log(options_.info_log, "Deleting backup %u", backup_id);
|
||||
auto backup = backups_.find(backup_id);
|
||||
if (backup == backups_.end()) {
|
||||
@ -431,7 +435,7 @@ Status BackupEngine::DeleteBackup(BackupID backup_id) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void BackupEngine::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
|
||||
void BackupEngineImpl::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
|
||||
backup_info->reserve(backups_.size());
|
||||
for (auto& backup : backups_) {
|
||||
if (!backup.second.Empty()) {
|
||||
@ -441,9 +445,9 @@ void BackupEngine::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
|
||||
}
|
||||
}
|
||||
|
||||
Status BackupEngine::RestoreDBFromBackup(BackupID backup_id,
|
||||
const std::string &db_dir,
|
||||
const std::string &wal_dir) {
|
||||
Status BackupEngineImpl::RestoreDBFromBackup(BackupID backup_id,
|
||||
const std::string& db_dir,
|
||||
const std::string& wal_dir) {
|
||||
auto backup_itr = backups_.find(backup_id);
|
||||
if (backup_itr == backups_.end()) {
|
||||
return Status::NotFound("Backup not found");
|
||||
@ -517,7 +521,7 @@ Status BackupEngine::RestoreDBFromBackup(BackupID backup_id,
|
||||
}
|
||||
|
||||
// latest backup id is an ASCII representation of latest backup id
|
||||
Status BackupEngine::GetLatestBackupFileContents(uint32_t* latest_backup) {
|
||||
Status BackupEngineImpl::GetLatestBackupFileContents(uint32_t* latest_backup) {
|
||||
Status s;
|
||||
unique_ptr<SequentialFile> file;
|
||||
s = backup_env_->NewSequentialFile(GetLatestBackupFile(),
|
||||
@ -547,7 +551,7 @@ Status BackupEngine::GetLatestBackupFileContents(uint32_t* latest_backup) {
|
||||
// writing 4 bytes to the file is atomic alright, but we should *never*
|
||||
// do something like 1. delete file, 2. write new file
|
||||
// We write to a tmp file and then atomically rename
|
||||
Status BackupEngine::PutLatestBackupFileContents(uint32_t latest_backup) {
|
||||
Status BackupEngineImpl::PutLatestBackupFileContents(uint32_t latest_backup) {
|
||||
Status s;
|
||||
unique_ptr<WritableFile> file;
|
||||
EnvOptions env_options;
|
||||
@ -577,14 +581,11 @@ Status BackupEngine::PutLatestBackupFileContents(uint32_t latest_backup) {
|
||||
return s;
|
||||
}
|
||||
|
||||
Status BackupEngine::CopyFile(const std::string& src,
|
||||
const std::string& dst,
|
||||
Env* src_env,
|
||||
Env* dst_env,
|
||||
bool sync,
|
||||
uint64_t* size,
|
||||
uint32_t* checksum_value,
|
||||
uint64_t size_limit) {
|
||||
Status BackupEngineImpl::CopyFile(const std::string& src,
|
||||
const std::string& dst, Env* src_env,
|
||||
Env* dst_env, bool sync, uint64_t* size,
|
||||
uint32_t* checksum_value,
|
||||
uint64_t size_limit) {
|
||||
Status s;
|
||||
unique_ptr<WritableFile> dst_file;
|
||||
unique_ptr<SequentialFile> src_file;
|
||||
@ -644,12 +645,10 @@ Status BackupEngine::CopyFile(const std::string& src,
|
||||
}
|
||||
|
||||
// src_fname will always start with "/"
|
||||
Status BackupEngine::BackupFile(BackupID backup_id,
|
||||
BackupMeta* backup,
|
||||
bool shared,
|
||||
const std::string& src_dir,
|
||||
const std::string& src_fname,
|
||||
uint64_t size_limit) {
|
||||
Status BackupEngineImpl::BackupFile(BackupID backup_id, BackupMeta* backup,
|
||||
bool shared, const std::string& src_dir,
|
||||
const std::string& src_fname,
|
||||
uint64_t size_limit) {
|
||||
|
||||
assert(src_fname.size() > 0 && src_fname[0] == '/');
|
||||
std::string dst_relative = src_fname.substr(1);
|
||||
@ -697,10 +696,9 @@ Status BackupEngine::BackupFile(BackupID backup_id,
|
||||
return s;
|
||||
}
|
||||
|
||||
Status BackupEngine::CalculateChecksum(const std::string& src,
|
||||
Env* src_env,
|
||||
uint64_t size_limit,
|
||||
uint32_t* checksum_value) {
|
||||
Status BackupEngineImpl::CalculateChecksum(const std::string& src, Env* src_env,
|
||||
uint64_t size_limit,
|
||||
uint32_t* checksum_value) {
|
||||
*checksum_value = 0;
|
||||
if (size_limit == 0) {
|
||||
size_limit = std::numeric_limits<uint64_t>::max();
|
||||
@ -737,7 +735,7 @@ Status BackupEngine::CalculateChecksum(const std::string& src,
|
||||
return s;
|
||||
}
|
||||
|
||||
void BackupEngine::GarbageCollection(bool full_scan) {
|
||||
void BackupEngineImpl::GarbageCollection(bool full_scan) {
|
||||
Log(options_.info_log, "Starting garbage collection");
|
||||
std::vector<std::string> to_delete;
|
||||
for (auto& itr : backuped_file_infos_) {
|
||||
@ -817,7 +815,7 @@ void BackupEngine::GarbageCollection(bool full_scan) {
|
||||
|
||||
// ------- BackupMeta class --------
|
||||
|
||||
Status BackupEngine::BackupMeta::AddFile(const FileInfo& file_info) {
|
||||
Status BackupEngineImpl::BackupMeta::AddFile(const FileInfo& file_info) {
|
||||
size_ += file_info.size;
|
||||
files_.push_back(file_info.filename);
|
||||
|
||||
@ -840,7 +838,7 @@ Status BackupEngine::BackupMeta::AddFile(const FileInfo& file_info) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void BackupEngine::BackupMeta::Delete() {
|
||||
void BackupEngineImpl::BackupMeta::Delete() {
|
||||
for (const auto& file : files_) {
|
||||
auto itr = file_infos_->find(file);
|
||||
assert(itr != file_infos_->end());
|
||||
@ -860,7 +858,8 @@ void BackupEngine::BackupMeta::Delete() {
|
||||
// <file2> <crc32(literal string)> <crc32_value>
|
||||
// ...
|
||||
// TODO: maybe add checksum?
|
||||
Status BackupEngine::BackupMeta::LoadFromFile(const std::string& backup_dir) {
|
||||
Status BackupEngineImpl::BackupMeta::LoadFromFile(
|
||||
const std::string& backup_dir) {
|
||||
assert(Empty());
|
||||
Status s;
|
||||
unique_ptr<SequentialFile> backup_meta_file;
|
||||
@ -927,7 +926,7 @@ Status BackupEngine::BackupMeta::LoadFromFile(const std::string& backup_dir) {
|
||||
return s;
|
||||
}
|
||||
|
||||
Status BackupEngine::BackupMeta::StoreToFile(bool sync) {
|
||||
Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) {
|
||||
Status s;
|
||||
unique_ptr<WritableFile> backup_meta_file;
|
||||
EnvOptions env_options;
|
||||
@ -969,7 +968,8 @@ Status BackupEngine::BackupMeta::StoreToFile(bool sync) {
|
||||
// --- BackupableDB methods --------
|
||||
|
||||
BackupableDB::BackupableDB(DB* db, const BackupableDBOptions& options)
|
||||
: StackableDB(db), backup_engine_(new BackupEngine(db->GetEnv(), options)) {
|
||||
: StackableDB(db),
|
||||
backup_engine_(new BackupEngineImpl(db->GetEnv(), options)) {
|
||||
if (options.share_table_files) {
|
||||
backup_engine_->DeleteBackupsNewerThan(GetLatestSequenceNumber());
|
||||
}
|
||||
@ -1003,7 +1003,7 @@ void BackupableDB::StopBackup() {
|
||||
|
||||
RestoreBackupableDB::RestoreBackupableDB(Env* db_env,
|
||||
const BackupableDBOptions& options)
|
||||
: backup_engine_(new BackupEngine(db_env, options)) {}
|
||||
: backup_engine_(new BackupEngineImpl(db_env, options)) {}
|
||||
|
||||
RestoreBackupableDB::~RestoreBackupableDB() {
|
||||
delete backup_engine_;
|
||||
|
@ -250,17 +250,15 @@ class FileManager : public EnvWrapper {
|
||||
return s;
|
||||
}
|
||||
|
||||
std::vector<int64_t> positions;
|
||||
auto pos = metadata.find(" crc32 ");
|
||||
auto pos = metadata.find("private");
|
||||
if (pos == std::string::npos) {
|
||||
return Status::Corruption("private file is expected");
|
||||
}
|
||||
pos = metadata.find(" crc32 ", pos + 6);
|
||||
if (pos == std::string::npos) {
|
||||
return Status::Corruption("checksum not found");
|
||||
}
|
||||
do {
|
||||
positions.push_back(pos);
|
||||
pos = metadata.find(" crc32 ", pos + 6);
|
||||
} while (pos != std::string::npos);
|
||||
|
||||
pos = positions[rnd_.Next() % positions.size()];
|
||||
if (metadata.size() < pos + 7) {
|
||||
return Status::Corruption("bad CRC32 checksum value");
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user