Finer report I/O stats about Flush and Compaction.
Summary: This diff allows the I/O stats about Flush and Compaction to be reported in a more accurate way. Instead of measuring the size of a file, it measure I/O cost in per read / write basis. Test Plan: make all check Reviewers: sdong, igor, ljin Reviewed By: ljin Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19383
This commit is contained in:
parent
a1df6c1fc8
commit
90a6aca48e
@ -65,6 +65,7 @@
|
||||
#include "util/log_buffer.h"
|
||||
#include "util/mutexlock.h"
|
||||
#include "util/perf_context_imp.h"
|
||||
#include "util/iostats_context_imp.h"
|
||||
#include "util/stop_watch.h"
|
||||
#include "util/sync_point.h"
|
||||
|
||||
@ -1604,6 +1605,7 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
|
||||
// true, mark DB read-only
|
||||
bg_error_ = s;
|
||||
}
|
||||
RecordFlushIOStats();
|
||||
return s;
|
||||
}
|
||||
|
||||
@ -1920,11 +1922,28 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
|
||||
}
|
||||
}
|
||||
|
||||
void DBImpl::RecordFlushIOStats() {
|
||||
RecordTick(options_.statistics.get(), FLUSH_WRITE_BYTES,
|
||||
iostats_context.bytes_written);
|
||||
IOSTATS_RESET(bytes_written);
|
||||
}
|
||||
|
||||
void DBImpl::RecordCompactionIOStats() {
|
||||
RecordTick(options_.statistics.get(), COMPACT_READ_BYTES,
|
||||
IOSTATS(bytes_read));
|
||||
IOSTATS_RESET(bytes_read);
|
||||
RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES,
|
||||
IOSTATS(bytes_written));
|
||||
IOSTATS_RESET(bytes_written);
|
||||
}
|
||||
|
||||
void DBImpl::BGWorkFlush(void* db) {
|
||||
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
|
||||
reinterpret_cast<DBImpl*>(db)->BackgroundCallFlush();
|
||||
}
|
||||
|
||||
void DBImpl::BGWorkCompaction(void* db) {
|
||||
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW);
|
||||
reinterpret_cast<DBImpl*>(db)->BackgroundCallCompaction();
|
||||
}
|
||||
|
||||
@ -2024,6 +2043,7 @@ void DBImpl::BackgroundCallFlush() {
|
||||
// that case, all DB variables will be dealloacated and referencing them
|
||||
// will cause trouble.
|
||||
}
|
||||
RecordFlushIOStats();
|
||||
}
|
||||
|
||||
void DBImpl::BackgroundCallCompaction() {
|
||||
@ -2559,6 +2579,7 @@ Status DBImpl::ProcessKeyValueCompaction(
|
||||
|
||||
while (input->Valid() && !shutting_down_.Acquire_Load() &&
|
||||
!cfd->IsDropped()) {
|
||||
RecordCompactionIOStats();
|
||||
// FLUSH preempts compaction
|
||||
// TODO(icanadi) this currently only checks if flush is necessary on
|
||||
// compacting column family. we should also check if flush is necessary on
|
||||
@ -2817,6 +2838,8 @@ Status DBImpl::ProcessKeyValueCompaction(
|
||||
}
|
||||
}
|
||||
|
||||
RecordCompactionIOStats();
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
@ -3124,22 +3147,18 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
|
||||
|
||||
for (int i = 0; i < compact->compaction->num_input_files(0); i++) {
|
||||
stats.bytes_readn += compact->compaction->input(0, i)->fd.GetFileSize();
|
||||
RecordTick(options_.statistics.get(), COMPACT_READ_BYTES,
|
||||
compact->compaction->input(0, i)->fd.GetFileSize());
|
||||
}
|
||||
|
||||
for (int i = 0; i < compact->compaction->num_input_files(1); i++) {
|
||||
stats.bytes_readnp1 += compact->compaction->input(1, i)->fd.GetFileSize();
|
||||
RecordTick(options_.statistics.get(), COMPACT_READ_BYTES,
|
||||
compact->compaction->input(1, i)->fd.GetFileSize());
|
||||
}
|
||||
|
||||
for (int i = 0; i < num_output_files; i++) {
|
||||
stats.bytes_written += compact->outputs[i].file_size;
|
||||
RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES,
|
||||
compact->outputs[i].file_size);
|
||||
}
|
||||
|
||||
RecordCompactionIOStats();
|
||||
|
||||
LogFlush(options_.info_log);
|
||||
mutex_.Lock();
|
||||
cfd->internal_stats()->AddCompactionStats(compact->compaction->output_level(),
|
||||
|
@ -359,6 +359,9 @@ class DBImpl : public DB {
|
||||
// Wait for memtable flushed
|
||||
Status WaitForFlushMemTable(ColumnFamilyData* cfd);
|
||||
|
||||
void RecordFlushIOStats();
|
||||
void RecordCompactionIOStats();
|
||||
|
||||
void MaybeScheduleLogDBDeployStats();
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
34
include/rocksdb/iostats_context.h
Normal file
34
include/rocksdb/iostats_context.h
Normal file
@ -0,0 +1,34 @@
|
||||
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
|
||||
#ifndef INCLUDE_ROCKSDB_IOSTATS_CONTEXT_H_
|
||||
#define INCLUDE_ROCKSDB_IOSTATS_CONTEXT_H_
|
||||
|
||||
#include <stdint.h>
|
||||
#include <string>
|
||||
|
||||
// A thread local context for gathering io-stats efficiently and transparently.
|
||||
namespace rocksdb {
|
||||
|
||||
struct IOStatsContext {
|
||||
// reset all io-stats counter to zero
|
||||
void Reset();
|
||||
|
||||
std::string ToString() const;
|
||||
|
||||
// the thread pool id
|
||||
uint64_t thread_pool_id;
|
||||
|
||||
// number of bytes that has been written.
|
||||
uint64_t bytes_written;
|
||||
// number of bytes that has been read.
|
||||
uint64_t bytes_read;
|
||||
};
|
||||
|
||||
extern __thread IOStatsContext iostats_context;
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
#endif // INCLUDE_ROCKSDB_IOSTATS_CONTEXT_H_
|
@ -119,6 +119,7 @@ enum Tickers {
|
||||
WRITE_WITH_WAL, // Number of Write calls that request WAL
|
||||
COMPACT_READ_BYTES, // Bytes read during compaction
|
||||
COMPACT_WRITE_BYTES, // Bytes written during compaction
|
||||
FLUSH_WRITE_BYTES, // Bytes written during flush
|
||||
|
||||
// Number of table's properties loaded directly from file, without creating
|
||||
// table reader object.
|
||||
@ -179,6 +180,7 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
|
||||
{WRITE_DONE_BY_OTHER, "rocksdb.write.other"},
|
||||
{WRITE_TIMEDOUT, "rocksdb.write.timedout"},
|
||||
{WRITE_WITH_WAL, "rocksdb.write.wal"},
|
||||
{FLUSH_WRITE_BYTES, "rocksdb.flush.write.bytes"},
|
||||
{COMPACT_READ_BYTES, "rocksdb.compact.read.bytes"},
|
||||
{COMPACT_WRITE_BYTES, "rocksdb.compact.write.bytes"},
|
||||
{NUMBER_DIRECT_LOAD_TABLE_PROPERTIES,
|
||||
|
@ -40,6 +40,7 @@
|
||||
#include "util/logging.h"
|
||||
#include "util/posix_logger.h"
|
||||
#include "util/random.h"
|
||||
#include "util/iostats_context_imp.h"
|
||||
#include <signal.h>
|
||||
|
||||
// Get nano time for mach systems
|
||||
@ -178,6 +179,7 @@ class PosixSequentialFile: public SequentialFile {
|
||||
do {
|
||||
r = fread_unlocked(scratch, 1, n, file_);
|
||||
} while (r == 0 && ferror(file_) && errno == EINTR);
|
||||
IOSTATS_ADD(bytes_read, r);
|
||||
*result = Slice(scratch, r);
|
||||
if (r < n) {
|
||||
if (feof(file_)) {
|
||||
@ -241,6 +243,7 @@ class PosixRandomAccessFile: public RandomAccessFile {
|
||||
do {
|
||||
r = pread(fd_, scratch, n, static_cast<off_t>(offset));
|
||||
} while (r < 0 && errno == EINTR);
|
||||
IOSTATS_ADD_IF_POSITIVE(bytes_read, r);
|
||||
*result = Slice(scratch, (r < 0) ? 0 : r);
|
||||
if (r < 0) {
|
||||
// An error: return a non-ok status
|
||||
@ -488,6 +491,7 @@ class PosixMmapFile : public WritableFile {
|
||||
|
||||
size_t n = (left <= avail) ? left : avail;
|
||||
memcpy(dst_, src, n);
|
||||
IOSTATS_ADD(bytes_written, n);
|
||||
dst_ += n;
|
||||
src += n;
|
||||
left -= n;
|
||||
@ -694,6 +698,7 @@ class PosixWritableFile : public WritableFile {
|
||||
}
|
||||
return IOError(filename_, errno);
|
||||
}
|
||||
IOSTATS_ADD(bytes_written, done);
|
||||
TEST_KILL_RANDOM(rocksdb_kill_odds);
|
||||
|
||||
left -= done;
|
||||
@ -744,6 +749,7 @@ class PosixWritableFile : public WritableFile {
|
||||
}
|
||||
return IOError(filename_, errno);
|
||||
}
|
||||
IOSTATS_ADD(bytes_written, done);
|
||||
TEST_KILL_RANDOM(rocksdb_kill_odds * REDUCE_ODDS2);
|
||||
left -= done;
|
||||
src += done;
|
||||
@ -877,6 +883,7 @@ class PosixRandomRWFile : public RandomRWFile {
|
||||
}
|
||||
return IOError(filename_, errno);
|
||||
}
|
||||
IOSTATS_ADD(bytes_written, done);
|
||||
|
||||
left -= done;
|
||||
src += done;
|
||||
@ -890,6 +897,7 @@ class PosixRandomRWFile : public RandomRWFile {
|
||||
char* scratch) const {
|
||||
Status s;
|
||||
ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset));
|
||||
IOSTATS_ADD_IF_POSITIVE(bytes_read, r);
|
||||
*result = Slice(scratch, (r < 0) ? 0 : r);
|
||||
if (r < 0) {
|
||||
s = IOError(filename_, errno);
|
||||
|
30
util/iostats_context.cc
Normal file
30
util/iostats_context.cc
Normal file
@ -0,0 +1,30 @@
|
||||
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
|
||||
#include <sstream>
|
||||
#include "rocksdb/env.h"
|
||||
#include "util/iostats_context_imp.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
__thread IOStatsContext iostats_context;
|
||||
|
||||
void IOStatsContext::Reset() {
|
||||
thread_pool_id = Env::Priority::TOTAL;
|
||||
bytes_read = 0;
|
||||
bytes_written = 0;
|
||||
}
|
||||
|
||||
#define OUTPUT(counter) #counter << " = " << counter << ", "
|
||||
|
||||
std::string IOStatsContext::ToString() const {
|
||||
std::ostringstream ss;
|
||||
ss << OUTPUT(thread_pool_id)
|
||||
<< OUTPUT(bytes_read)
|
||||
<< OUTPUT(bytes_written);
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
32
util/iostats_context_imp.h
Normal file
32
util/iostats_context_imp.h
Normal file
@ -0,0 +1,32 @@
|
||||
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
//
|
||||
#pragma once
|
||||
#include "rocksdb/iostats_context.h"
|
||||
|
||||
// increment a specific counter by the specified value
|
||||
#define IOSTATS_ADD(metric, value) \
|
||||
(iostats_context.metric += value)
|
||||
|
||||
// Increase metric value only when it is positive
|
||||
#define IOSTATS_ADD_IF_POSITIVE(metric, value) \
|
||||
if (value > 0) { IOSTATS_ADD(metric, value); }
|
||||
|
||||
// reset a specific counter to zero
|
||||
#define IOSTATS_RESET(metric) \
|
||||
(iostats_context.metric = 0)
|
||||
|
||||
// reset all counters to zero
|
||||
#define IOSTATS_RESET_ALL() \
|
||||
(iostats_context.Reset())
|
||||
|
||||
#define IOSTATS_SET_THREAD_POOL_ID(value) \
|
||||
(iostats_context.thread_pool_id = value)
|
||||
|
||||
#define IOSTATS_THREAD_POOL_ID() \
|
||||
(iostats_context.thread_pool_id)
|
||||
|
||||
#define IOSTATS(metric) \
|
||||
(iostats_context.metric)
|
Loading…
Reference in New Issue
Block a user