Merge branch 'master' into columnfamilies
Conflicts: db/compaction_picker.cc db/compaction_picker.h db/db_impl.cc db/version_set.cc db/version_set.h include/rocksdb/options.h util/options.cc
This commit is contained in:
commit
80a207fc90
@ -270,8 +270,8 @@ void ColumnFamilyData::CreateNewMemtable() {
|
||||
mem_->Ref();
|
||||
}
|
||||
|
||||
Compaction* ColumnFamilyData::PickCompaction() {
|
||||
return compaction_picker_->PickCompaction(current_);
|
||||
Compaction* ColumnFamilyData::PickCompaction(LogBuffer* log_buffer) {
|
||||
return compaction_picker_->PickCompaction(current_, log_buffer);
|
||||
}
|
||||
|
||||
Compaction* ColumnFamilyData::CompactRange(int input_level, int output_level,
|
||||
|
@ -147,7 +147,7 @@ class ColumnFamilyData {
|
||||
TableCache* table_cache() const { return table_cache_.get(); }
|
||||
|
||||
// See documentation in compaction_picker.h
|
||||
Compaction* PickCompaction();
|
||||
Compaction* PickCompaction(LogBuffer* log_buffer);
|
||||
Compaction* CompactRange(int input_level, int output_level,
|
||||
const InternalKey* begin, const InternalKey* end,
|
||||
InternalKey** compaction_end);
|
||||
|
@ -364,7 +364,8 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level,
|
||||
return c;
|
||||
}
|
||||
|
||||
Compaction* LevelCompactionPicker::PickCompaction(Version* version) {
|
||||
Compaction* LevelCompactionPicker::PickCompaction(Version* version,
|
||||
LogBuffer* log_buffer) {
|
||||
Compaction* c = nullptr;
|
||||
int level = -1;
|
||||
|
||||
@ -541,31 +542,34 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version,
|
||||
// Universal style of compaction. Pick files that are contiguous in
|
||||
// time-range to compact.
|
||||
//
|
||||
Compaction* UniversalCompactionPicker::PickCompaction(Version* version) {
|
||||
Compaction* UniversalCompactionPicker::PickCompaction(Version* version,
|
||||
LogBuffer* log_buffer) {
|
||||
int level = 0;
|
||||
double score = version->compaction_score_[0];
|
||||
|
||||
if ((version->files_[level].size() <
|
||||
(unsigned int)options_->level0_file_num_compaction_trigger)) {
|
||||
Log(logger_, "Universal: nothing to do\n");
|
||||
LogToBuffer(log_buffer, "Universal: nothing to do\n");
|
||||
return nullptr;
|
||||
}
|
||||
Version::FileSummaryStorage tmp;
|
||||
Log(logger_, "Universal: candidate files(%zu): %s\n",
|
||||
version->files_[level].size(), version->LevelFileSummary(&tmp, 0));
|
||||
LogToBuffer(log_buffer, "Universal: candidate files(%zu): %s\n",
|
||||
version->files_[level].size(),
|
||||
version->LevelFileSummary(&tmp, 0));
|
||||
|
||||
// Check for size amplification first.
|
||||
Compaction* c;
|
||||
if ((c = PickCompactionUniversalSizeAmp(version, score)) != nullptr) {
|
||||
Log(logger_, "Universal: compacting for size amp\n");
|
||||
if ((c = PickCompactionUniversalSizeAmp(version, score, log_buffer)) !=
|
||||
nullptr) {
|
||||
LogToBuffer(log_buffer, "Universal: compacting for size amp\n");
|
||||
} else {
|
||||
|
||||
// Size amplification is within limits. Try reducing read
|
||||
// amplification while maintaining file size ratios.
|
||||
unsigned int ratio = options_->compaction_options_universal.size_ratio;
|
||||
|
||||
if ((c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX)) != nullptr) {
|
||||
Log(logger_, "Universal: compacting for size ratio\n");
|
||||
if ((c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX,
|
||||
log_buffer)) != nullptr) {
|
||||
LogToBuffer(log_buffer, "Universal: compacting for size ratio\n");
|
||||
} else {
|
||||
// Size amplification and file size ratios are within configured limits.
|
||||
// If max read amplification is exceeding configured limits, then force
|
||||
@ -573,8 +577,9 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version) {
|
||||
// the number of files to fewer than level0_file_num_compaction_trigger.
|
||||
unsigned int num_files = version->files_[level].size() -
|
||||
options_->level0_file_num_compaction_trigger;
|
||||
if ((c = PickCompactionUniversalReadAmp(version, score, UINT_MAX, num_files)) != nullptr) {
|
||||
Log(logger_, "Universal: compacting for file num\n");
|
||||
if ((c = PickCompactionUniversalReadAmp(
|
||||
version, score, UINT_MAX, num_files, log_buffer)) != nullptr) {
|
||||
LogToBuffer(log_buffer, "Universal: compacting for file num\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -623,7 +628,7 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version) {
|
||||
//
|
||||
Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
|
||||
Version* version, double score, unsigned int ratio,
|
||||
unsigned int max_number_of_files_to_compact) {
|
||||
unsigned int max_number_of_files_to_compact, LogBuffer* log_buffer) {
|
||||
int level = 0;
|
||||
|
||||
unsigned int min_merge_width =
|
||||
@ -658,8 +663,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
|
||||
candidate_count = 1;
|
||||
break;
|
||||
}
|
||||
Log(logger_, "Universal: file %lu[%d] being compacted, skipping",
|
||||
(unsigned long)f->number, loop);
|
||||
LogToBuffer(log_buffer,
|
||||
"Universal: file %lu[%d] being compacted, skipping",
|
||||
(unsigned long)f->number, loop);
|
||||
f = nullptr;
|
||||
}
|
||||
|
||||
@ -667,8 +673,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
|
||||
// first candidate to be compacted.
|
||||
uint64_t candidate_size = f != nullptr? f->file_size : 0;
|
||||
if (f != nullptr) {
|
||||
Log(logger_, "Universal: Possible candidate file %lu[%d].",
|
||||
(unsigned long)f->number, loop);
|
||||
LogToBuffer(log_buffer, "Universal: Possible candidate file %lu[%d].",
|
||||
(unsigned long)f->number, loop);
|
||||
}
|
||||
|
||||
// Check if the suceeding files need compaction.
|
||||
@ -718,9 +724,10 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
|
||||
i < loop + candidate_count && i < file_by_time.size(); i++) {
|
||||
int index = file_by_time[i];
|
||||
FileMetaData* f = version->files_[level][index];
|
||||
Log(logger_, "Universal: Skipping file %lu[%d] with size %lu %d\n",
|
||||
(unsigned long)f->number, i, (unsigned long)f->file_size,
|
||||
f->being_compacted);
|
||||
LogToBuffer(log_buffer,
|
||||
"Universal: Skipping file %lu[%d] with size %lu %d\n",
|
||||
(unsigned long)f->number, i, (unsigned long)f->file_size,
|
||||
f->being_compacted);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -754,8 +761,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
|
||||
int index = file_by_time[i];
|
||||
FileMetaData* f = c->input_version_->files_[level][index];
|
||||
c->inputs_[0].push_back(f);
|
||||
Log(logger_, "Universal: Picking file %lu[%d] with size %lu\n",
|
||||
(unsigned long)f->number, i, (unsigned long)f->file_size);
|
||||
LogToBuffer(log_buffer, "Universal: Picking file %lu[%d] with size %lu\n",
|
||||
(unsigned long)f->number, i, (unsigned long)f->file_size);
|
||||
}
|
||||
return c;
|
||||
}
|
||||
@ -767,7 +774,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
|
||||
// min_merge_width and max_merge_width).
|
||||
//
|
||||
Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
|
||||
Version* version, double score) {
|
||||
Version* version, double score, LogBuffer* log_buffer) {
|
||||
int level = 0;
|
||||
|
||||
// percentage flexibilty while reducing size amplification
|
||||
@ -791,17 +798,17 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
|
||||
start_index = loop; // Consider this as the first candidate.
|
||||
break;
|
||||
}
|
||||
Log(logger_, "Universal: skipping file %lu[%d] compacted %s",
|
||||
(unsigned long)f->number, loop,
|
||||
" cannot be a candidate to reduce size amp.\n");
|
||||
LogToBuffer(log_buffer, "Universal: skipping file %lu[%d] compacted %s",
|
||||
(unsigned long)f->number, loop,
|
||||
" cannot be a candidate to reduce size amp.\n");
|
||||
f = nullptr;
|
||||
}
|
||||
if (f == nullptr) {
|
||||
return nullptr; // no candidate files
|
||||
}
|
||||
|
||||
Log(logger_, "Universal: First candidate file %lu[%d] %s",
|
||||
(unsigned long)f->number, start_index, " to reduce size amp.\n");
|
||||
LogToBuffer(log_buffer, "Universal: First candidate file %lu[%d] %s",
|
||||
(unsigned long)f->number, start_index, " to reduce size amp.\n");
|
||||
|
||||
// keep adding up all the remaining files
|
||||
for (unsigned int loop = start_index; loop < file_by_time.size() - 1;
|
||||
@ -809,7 +816,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
|
||||
int index = file_by_time[loop];
|
||||
f = version->files_[level][index];
|
||||
if (f->being_compacted) {
|
||||
Log(logger_, "Universal: Possible candidate file %lu[%d] %s.",
|
||||
LogToBuffer(
|
||||
log_buffer, "Universal: Possible candidate file %lu[%d] %s.",
|
||||
(unsigned long)f->number, loop,
|
||||
" is already being compacted. No size amp reduction possible.\n");
|
||||
return nullptr;
|
||||
@ -827,16 +835,18 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
|
||||
|
||||
// size amplification = percentage of additional size
|
||||
if (candidate_size * 100 < ratio * earliest_file_size) {
|
||||
Log(logger_,
|
||||
"Universal: size amp not needed. newer-files-total-size %lu "
|
||||
"earliest-file-size %lu",
|
||||
(unsigned long)candidate_size, (unsigned long)earliest_file_size);
|
||||
LogToBuffer(log_buffer,
|
||||
"Universal: size amp not needed. newer-files-total-size %lu "
|
||||
"earliest-file-size %lu",
|
||||
(unsigned long)candidate_size,
|
||||
(unsigned long)earliest_file_size);
|
||||
return nullptr;
|
||||
} else {
|
||||
Log(logger_,
|
||||
"Universal: size amp needed. newer-files-total-size %lu "
|
||||
"earliest-file-size %lu",
|
||||
(unsigned long)candidate_size, (unsigned long)earliest_file_size);
|
||||
LogToBuffer(log_buffer,
|
||||
"Universal: size amp needed. newer-files-total-size %lu "
|
||||
"earliest-file-size %lu",
|
||||
(unsigned long)candidate_size,
|
||||
(unsigned long)earliest_file_size);
|
||||
}
|
||||
assert(start_index >= 0 && start_index < file_by_time.size() - 1);
|
||||
|
||||
@ -850,8 +860,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
|
||||
int index = file_by_time[loop];
|
||||
f = c->input_version_->files_[level][index];
|
||||
c->inputs_[0].push_back(f);
|
||||
Log(logger_, "Universal: size amp picking file %lu[%d] with size %lu",
|
||||
(unsigned long)f->number, index, (unsigned long)f->file_size);
|
||||
LogToBuffer(log_buffer,
|
||||
"Universal: size amp picking file %lu[%d] with size %lu",
|
||||
(unsigned long)f->number, index, (unsigned long)f->file_size);
|
||||
}
|
||||
return c;
|
||||
}
|
||||
|
@ -20,6 +20,7 @@
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
class LogBuffer;
|
||||
class Compaction;
|
||||
class Version;
|
||||
|
||||
@ -33,7 +34,8 @@ class CompactionPicker {
|
||||
// Returns nullptr if there is no compaction to be done.
|
||||
// Otherwise returns a pointer to a heap-allocated object that
|
||||
// describes the compaction. Caller should delete the result.
|
||||
virtual Compaction* PickCompaction(Version* version) = 0;
|
||||
virtual Compaction* PickCompaction(Version* version,
|
||||
LogBuffer* log_buffer) = 0;
|
||||
|
||||
// Return a compaction object for compacting the range [begin,end] in
|
||||
// the specified level. Returns nullptr if there is nothing in that
|
||||
@ -131,16 +133,19 @@ class UniversalCompactionPicker : public CompactionPicker {
|
||||
UniversalCompactionPicker(const ColumnFamilyOptions* options,
|
||||
const InternalKeyComparator* icmp, Logger* logger)
|
||||
: CompactionPicker(options, icmp, logger) {}
|
||||
virtual Compaction* PickCompaction(Version* version) override;
|
||||
virtual Compaction* PickCompaction(Version* version,
|
||||
LogBuffer* log_buffer) override;
|
||||
|
||||
private:
|
||||
// Pick Universal compaction to limit read amplification
|
||||
Compaction* PickCompactionUniversalReadAmp(Version* version, double score,
|
||||
unsigned int ratio,
|
||||
unsigned int num_files);
|
||||
unsigned int num_files,
|
||||
LogBuffer* log_buffer);
|
||||
|
||||
// Pick Universal compaction to limit space amplification.
|
||||
Compaction* PickCompactionUniversalSizeAmp(Version* version, double score);
|
||||
Compaction* PickCompactionUniversalSizeAmp(Version* version, double score,
|
||||
LogBuffer* log_buffer);
|
||||
};
|
||||
|
||||
class LevelCompactionPicker : public CompactionPicker {
|
||||
@ -148,7 +153,8 @@ class LevelCompactionPicker : public CompactionPicker {
|
||||
LevelCompactionPicker(const ColumnFamilyOptions* options,
|
||||
const InternalKeyComparator* icmp, Logger* logger)
|
||||
: CompactionPicker(options, icmp, logger) {}
|
||||
virtual Compaction* PickCompaction(Version* version) override;
|
||||
virtual Compaction* PickCompaction(Version* version,
|
||||
LogBuffer* log_buffer) override;
|
||||
|
||||
private:
|
||||
// For the specfied level, pick a compaction.
|
||||
|
106
db/db_impl.cc
106
db/db_impl.cc
@ -1932,57 +1932,61 @@ void DBImpl::BackgroundCallCompaction() {
|
||||
DeletionState deletion_state(true);
|
||||
|
||||
MaybeDumpStats();
|
||||
LogBuffer log_buffer(INFO, options_.info_log);
|
||||
{
|
||||
MutexLock l(&mutex_);
|
||||
// Log(options_.info_log, "XXX BG Thread %llx process new work item",
|
||||
// pthread_self());
|
||||
assert(bg_compaction_scheduled_);
|
||||
Status s;
|
||||
if (!shutting_down_.Acquire_Load()) {
|
||||
s = BackgroundCompaction(&madeProgress, deletion_state, &log_buffer);
|
||||
if (!s.ok()) {
|
||||
// Wait a little bit before retrying background compaction in
|
||||
// case this is an environmental problem and we do not want to
|
||||
// chew up resources for failed compactions for the duration of
|
||||
// the problem.
|
||||
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
|
||||
mutex_.Unlock();
|
||||
Log(options_.info_log, "Waiting after background compaction error: %s",
|
||||
s.ToString().c_str());
|
||||
LogFlush(options_.info_log);
|
||||
env_->SleepForMicroseconds(1000000);
|
||||
mutex_.Lock();
|
||||
}
|
||||
}
|
||||
|
||||
MutexLock l(&mutex_);
|
||||
// Log(options_.info_log, "XXX BG Thread %llx process new work item", pthread_self());
|
||||
assert(bg_compaction_scheduled_);
|
||||
Status s;
|
||||
if (!shutting_down_.Acquire_Load()) {
|
||||
s = BackgroundCompaction(&madeProgress, deletion_state);
|
||||
if (!s.ok()) {
|
||||
// Wait a little bit before retrying background compaction in
|
||||
// case this is an environmental problem and we do not want to
|
||||
// chew up resources for failed compactions for the duration of
|
||||
// the problem.
|
||||
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
|
||||
Log(options_.info_log, "Waiting after background compaction error: %s",
|
||||
s.ToString().c_str());
|
||||
// If !s.ok(), this means that Compaction failed. In that case, we want
|
||||
// to delete all obsolete files we might have created and we force
|
||||
// FindObsoleteFiles(). This is because deletion_state does not catch
|
||||
// all created files if compaction failed.
|
||||
FindObsoleteFiles(deletion_state, !s.ok());
|
||||
|
||||
// delete unnecessary files if any, this is done outside the mutex
|
||||
if (deletion_state.HaveSomethingToDelete()) {
|
||||
mutex_.Unlock();
|
||||
LogFlush(options_.info_log);
|
||||
env_->SleepForMicroseconds(1000000);
|
||||
PurgeObsoleteFiles(deletion_state);
|
||||
mutex_.Lock();
|
||||
}
|
||||
|
||||
bg_compaction_scheduled_--;
|
||||
|
||||
MaybeScheduleLogDBDeployStats();
|
||||
|
||||
// Previous compaction may have produced too many files in a level,
|
||||
// So reschedule another compaction if we made progress in the
|
||||
// last compaction.
|
||||
if (madeProgress) {
|
||||
MaybeScheduleFlushOrCompaction();
|
||||
}
|
||||
bg_cv_.SignalAll();
|
||||
}
|
||||
|
||||
// If !s.ok(), this means that Compaction failed. In that case, we want
|
||||
// to delete all obsolete files we might have created and we force
|
||||
// FindObsoleteFiles(). This is because deletion_state does not catch
|
||||
// all created files if compaction failed.
|
||||
FindObsoleteFiles(deletion_state, !s.ok());
|
||||
|
||||
// delete unnecessary files if any, this is done outside the mutex
|
||||
if (deletion_state.HaveSomethingToDelete()) {
|
||||
mutex_.Unlock();
|
||||
PurgeObsoleteFiles(deletion_state);
|
||||
mutex_.Lock();
|
||||
}
|
||||
|
||||
bg_compaction_scheduled_--;
|
||||
|
||||
MaybeScheduleLogDBDeployStats();
|
||||
|
||||
// Previous compaction may have produced too many files in a level,
|
||||
// So reschedule another compaction if we made progress in the
|
||||
// last compaction.
|
||||
if (madeProgress) {
|
||||
MaybeScheduleFlushOrCompaction();
|
||||
}
|
||||
bg_cv_.SignalAll();
|
||||
|
||||
log_buffer.FlushBufferToLog();
|
||||
}
|
||||
|
||||
Status DBImpl::BackgroundCompaction(bool* madeProgress,
|
||||
DeletionState& deletion_state) {
|
||||
DeletionState& deletion_state,
|
||||
LogBuffer* log_buffer) {
|
||||
*madeProgress = false;
|
||||
mutex_.AssertHeld();
|
||||
|
||||
@ -1999,11 +2003,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
|
||||
if (!c) {
|
||||
m->done = true;
|
||||
}
|
||||
Log(options_.info_log,
|
||||
LogToBuffer(
|
||||
log_buffer,
|
||||
"Manual compaction from level-%d to level-%d from %s .. %s; will stop "
|
||||
"at %s\n",
|
||||
m->input_level,
|
||||
m->output_level,
|
||||
m->input_level, m->output_level,
|
||||
(m->begin ? m->begin->DebugString().c_str() : "(begin)"),
|
||||
(m->end ? m->end->DebugString().c_str() : "(end)"),
|
||||
((m->done || manual_end == nullptr)
|
||||
@ -2013,7 +2017,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
|
||||
// no need to refcount in iteration since it's always under a mutex
|
||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||
if (!cfd->options()->disable_auto_compactions && !cfd->IsDropped()) {
|
||||
c.reset(cfd->PickCompaction());
|
||||
c.reset(cfd->PickCompaction(log_buffer));
|
||||
if (c != nullptr) {
|
||||
// update statistics
|
||||
MeasureTime(options_.statistics.get(), NUM_FILES_IN_SINGLE_COMPACTION,
|
||||
@ -2027,7 +2031,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
|
||||
Status status;
|
||||
if (!c) {
|
||||
// Nothing to do
|
||||
Log(options_.info_log, "Compaction nothing to do");
|
||||
LogToBuffer(log_buffer, "Compaction nothing to do");
|
||||
} else if (!is_manual && c->IsTrivialMove()) {
|
||||
// Move file to next level
|
||||
assert(c->num_input_files(0) == 1);
|
||||
@ -2044,7 +2048,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
|
||||
}
|
||||
|
||||
Version::LevelSummaryStorage tmp;
|
||||
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
|
||||
LogToBuffer(log_buffer, "Moved #%lld to level-%d %lld bytes %s: %s\n",
|
||||
static_cast<unsigned long long>(f->number), c->level() + 1,
|
||||
static_cast<unsigned long long>(f->file_size),
|
||||
status.ToString().c_str(), c->input_version()->LevelSummary(&tmp));
|
||||
@ -2066,8 +2070,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
|
||||
} else if (shutting_down_.Acquire_Load()) {
|
||||
// Ignore compaction errors found during shutting down
|
||||
} else {
|
||||
Log(options_.info_log,
|
||||
"Compaction error: %s", status.ToString().c_str());
|
||||
Log(WARN, options_.info_log, "Compaction error: %s",
|
||||
status.ToString().c_str());
|
||||
if (options_.paranoid_checks && bg_error_.ok()) {
|
||||
bg_error_ = status;
|
||||
}
|
||||
|
@ -323,7 +323,8 @@ class DBImpl : public DB {
|
||||
static void BGWorkFlush(void* db);
|
||||
void BackgroundCallCompaction();
|
||||
void BackgroundCallFlush();
|
||||
Status BackgroundCompaction(bool* madeProgress,DeletionState& deletion_state);
|
||||
Status BackgroundCompaction(bool* madeProgress, DeletionState& deletion_state,
|
||||
LogBuffer* log_buffer);
|
||||
Status BackgroundFlush(bool* madeProgress, DeletionState& deletion_state);
|
||||
void CleanupCompaction(CompactionState* compact, Status status);
|
||||
Status DoCompactionWork(CompactionState* compact,
|
||||
|
@ -513,7 +513,7 @@ class Directory {
|
||||
virtual Status Fsync() = 0;
|
||||
};
|
||||
|
||||
enum InfoLogLevel {
|
||||
enum InfoLogLevel : unsigned char {
|
||||
DEBUG = 0,
|
||||
INFO,
|
||||
WARN,
|
||||
@ -526,7 +526,7 @@ enum InfoLogLevel {
|
||||
class Logger {
|
||||
public:
|
||||
enum { DO_NOT_SUPPORT_GET_LOG_FILE_SIZE = -1 };
|
||||
explicit Logger(const InfoLogLevel log_level = InfoLogLevel::ERROR)
|
||||
explicit Logger(const InfoLogLevel log_level = InfoLogLevel::INFO)
|
||||
: log_level_(log_level) {}
|
||||
virtual ~Logger();
|
||||
|
||||
@ -543,10 +543,20 @@ class Logger {
|
||||
if (log_level < log_level_) {
|
||||
return;
|
||||
}
|
||||
char new_format[500];
|
||||
snprintf(new_format, sizeof(new_format) - 1, "[%s] %s",
|
||||
kInfoLogLevelNames[log_level], format);
|
||||
Logv(new_format, ap);
|
||||
|
||||
if (log_level == INFO) {
|
||||
// Doesn't print log level if it is INFO level.
|
||||
// This is to avoid unexpected performance regression after we add
|
||||
// the feature of log level. All the logs before we add the feature
|
||||
// are INFO level. We don't want to add extra costs to those existing
|
||||
// logging.
|
||||
Logv(format, ap);
|
||||
} else {
|
||||
char new_format[500];
|
||||
snprintf(new_format, sizeof(new_format) - 1, "[%s] %s",
|
||||
kInfoLogLevelNames[log_level], format);
|
||||
Logv(new_format, ap);
|
||||
}
|
||||
}
|
||||
virtual size_t GetLogFileSize() const {
|
||||
return DO_NOT_SUPPORT_GET_LOG_FILE_SIZE;
|
||||
@ -565,6 +575,26 @@ class Logger {
|
||||
InfoLogLevel log_level_;
|
||||
};
|
||||
|
||||
// A class to buffer info log entries and flush them in the end.
|
||||
class LogBuffer {
|
||||
public:
|
||||
// log_level: the log level for all the logs
|
||||
// info_log: logger to write the logs to
|
||||
LogBuffer(const InfoLogLevel log_level, const shared_ptr<Logger>& info_log);
|
||||
~LogBuffer();
|
||||
|
||||
// Add a log entry to the buffer.
|
||||
void AddLogToBuffer(const char* format, ...);
|
||||
|
||||
// Flush all buffered log to the info log.
|
||||
void FlushBufferToLog() const;
|
||||
|
||||
private:
|
||||
struct Rep;
|
||||
Rep* rep_;
|
||||
const InfoLogLevel log_level_;
|
||||
const shared_ptr<Logger>& info_log_;
|
||||
};
|
||||
|
||||
// Identifies a locked file.
|
||||
class FileLock {
|
||||
@ -577,6 +607,9 @@ class FileLock {
|
||||
void operator=(const FileLock&);
|
||||
};
|
||||
|
||||
// Add log to the LogBuffer for a delayed info logging. It can be used when
|
||||
// we want to add some logs inside a mutex.
|
||||
extern void LogToBuffer(LogBuffer* log_buffer, const char* format, ...);
|
||||
|
||||
extern void LogFlush(const shared_ptr<Logger>& info_log);
|
||||
|
||||
|
@ -24,6 +24,7 @@ class CompactionFilter;
|
||||
class CompactionFilterFactory;
|
||||
class Comparator;
|
||||
class Env;
|
||||
enum InfoLogLevel : unsigned char;
|
||||
class FilterPolicy;
|
||||
class Logger;
|
||||
class MergeOperator;
|
||||
@ -532,6 +533,8 @@ struct DBOptions {
|
||||
// Default: nullptr
|
||||
shared_ptr<Logger> info_log;
|
||||
|
||||
InfoLogLevel info_log_level;
|
||||
|
||||
// Number of open files that can be used by the DB. You may need to
|
||||
// increase this if your database has a large working set. Value -1 means
|
||||
// files opened are always kept open. You can estimate number of files based
|
||||
|
@ -88,7 +88,7 @@ Status CreateLoggerFromOptions(
|
||||
AutoRollLogger* result = new AutoRollLogger(
|
||||
env, dbname, db_log_dir,
|
||||
options.max_log_file_size,
|
||||
options.log_file_time_to_roll);
|
||||
options.log_file_time_to_roll, options.info_log_level);
|
||||
Status s = result->GetStatus();
|
||||
if (!s.ok()) {
|
||||
delete result;
|
||||
@ -101,7 +101,11 @@ Status CreateLoggerFromOptions(
|
||||
env->CreateDir(dbname); // In case it does not exist
|
||||
env->RenameFile(fname, OldInfoLogFileName(dbname, env->NowMicros(),
|
||||
db_absolute_path, db_log_dir));
|
||||
return env->NewLogger(fname, logger);
|
||||
auto s = env->NewLogger(fname, logger);
|
||||
if (logger->get() != nullptr) {
|
||||
(*logger)->SetInfoLogLevel(options.info_log_level);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -19,7 +19,7 @@ class AutoRollLogger : public Logger {
|
||||
AutoRollLogger(Env* env, const std::string& dbname,
|
||||
const std::string& db_log_dir, size_t log_max_size,
|
||||
size_t log_file_time_to_roll,
|
||||
const InfoLogLevel log_level = InfoLogLevel::ERROR)
|
||||
const InfoLogLevel log_level = InfoLogLevel::INFO)
|
||||
: Logger(log_level),
|
||||
dbname_(dbname),
|
||||
db_log_dir_(db_log_dir),
|
||||
|
81
util/env.cc
81
util/env.cc
@ -8,7 +8,11 @@
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#include "rocksdb/env.h"
|
||||
|
||||
#include <sys/time.h>
|
||||
#include "rocksdb/options.h"
|
||||
#include "util/arena.h"
|
||||
#include "util/autovector.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
@ -27,9 +31,82 @@ WritableFile::~WritableFile() {
|
||||
Logger::~Logger() {
|
||||
}
|
||||
|
||||
// One log entry with its timestamp
|
||||
struct BufferedLog {
|
||||
struct timeval now_tv; // Timestamp of the log
|
||||
char message[1]; // Beginning of log message
|
||||
};
|
||||
|
||||
struct LogBuffer::Rep {
|
||||
Arena arena_;
|
||||
autovector<BufferedLog*> logs_;
|
||||
};
|
||||
|
||||
// Lazily initialize Rep to avoid allocations when new log is added.
|
||||
LogBuffer::LogBuffer(const InfoLogLevel log_level,
|
||||
const shared_ptr<Logger>& info_log)
|
||||
: rep_(nullptr), log_level_(log_level), info_log_(info_log) {}
|
||||
|
||||
LogBuffer::~LogBuffer() { delete rep_; }
|
||||
|
||||
void LogBuffer::AddLogToBuffer(const char* format, ...) {
|
||||
if (!info_log_ || log_level_ < info_log_->GetInfoLogLevel()) {
|
||||
// Skip the level because of its level.
|
||||
return;
|
||||
}
|
||||
if (rep_ == nullptr) {
|
||||
rep_ = new Rep();
|
||||
}
|
||||
|
||||
const size_t kLogSizeLimit = 512;
|
||||
char* alloc_mem = rep_->arena_.AllocateAligned(kLogSizeLimit);
|
||||
BufferedLog* buffered_log = new (alloc_mem) BufferedLog();
|
||||
char* p = buffered_log->message;
|
||||
char* limit = alloc_mem + kLogSizeLimit - 1;
|
||||
|
||||
// store the time
|
||||
gettimeofday(&(buffered_log->now_tv), nullptr);
|
||||
|
||||
// Print the message
|
||||
if (p < limit) {
|
||||
va_list ap;
|
||||
va_start(ap, format);
|
||||
p += vsnprintf(p, limit - p, format, ap);
|
||||
va_end(ap);
|
||||
}
|
||||
|
||||
// Add '\0' to the end
|
||||
*p = '\0';
|
||||
|
||||
rep_->logs_.push_back(buffered_log);
|
||||
}
|
||||
|
||||
void LogBuffer::FlushBufferToLog() const {
|
||||
if (rep_ != nullptr) {
|
||||
for (BufferedLog* log : rep_->logs_) {
|
||||
const time_t seconds = log->now_tv.tv_sec;
|
||||
struct tm t;
|
||||
localtime_r(&seconds, &t);
|
||||
Log(log_level_, info_log_,
|
||||
"(Original Log Time %04d/%02d/%02d-%02d:%02d:%02d.%06d) %s",
|
||||
t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, t.tm_min,
|
||||
t.tm_sec, static_cast<int>(log->now_tv.tv_usec), log->message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
FileLock::~FileLock() {
|
||||
}
|
||||
|
||||
void LogToBuffer(LogBuffer* log_buffer, const char* format, ...) {
|
||||
if (log_buffer != nullptr) {
|
||||
va_list ap;
|
||||
va_start(ap, format);
|
||||
log_buffer->AddLogToBuffer(format);
|
||||
va_end(ap);
|
||||
}
|
||||
}
|
||||
|
||||
void LogFlush(Logger *info_log) {
|
||||
if (info_log) {
|
||||
info_log->Flush();
|
||||
@ -40,7 +117,7 @@ void Log(Logger* info_log, const char* format, ...) {
|
||||
if (info_log) {
|
||||
va_list ap;
|
||||
va_start(ap, format);
|
||||
info_log->Logv(format, ap);
|
||||
info_log->Logv(InfoLogLevel::INFO, format, ap);
|
||||
va_end(ap);
|
||||
}
|
||||
}
|
||||
@ -163,7 +240,7 @@ void Log(const shared_ptr<Logger>& info_log, const char* format, ...) {
|
||||
if (info_log) {
|
||||
va_list ap;
|
||||
va_start(ap, format);
|
||||
info_log->Logv(format, ap);
|
||||
info_log->Logv(InfoLogLevel::INFO, format, ap);
|
||||
va_end(ap);
|
||||
}
|
||||
}
|
||||
|
@ -7,6 +7,7 @@
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#include <sys/types.h>
|
||||
|
||||
#include <iostream>
|
||||
#include <unordered_set>
|
||||
@ -191,6 +192,15 @@ bool IsSingleVarint(const std::string& s) {
|
||||
}
|
||||
|
||||
#ifdef OS_LINUX
|
||||
// To make sure the Env::GetUniqueId() related tests work correctly, The files
|
||||
// should be stored in regular storage like "hard disk" or "flash device".
|
||||
// Otherwise we cannot get the correct id.
|
||||
//
|
||||
// The following function act as the replacement of test::TmpDir() that may be
|
||||
// customized by user to be on a storage that doesn't work with GetUniqueId().
|
||||
//
|
||||
// TODO(kailiu) This function still assumes /tmp/<test-dir> reside in regular
|
||||
// storage system.
|
||||
bool IsUniqueIDValid(const std::string& s) {
|
||||
return !s.empty() && !IsSingleVarint(s);
|
||||
}
|
||||
@ -198,11 +208,21 @@ bool IsUniqueIDValid(const std::string& s) {
|
||||
const size_t MAX_ID_SIZE = 100;
|
||||
char temp_id[MAX_ID_SIZE];
|
||||
|
||||
std::string GetOnDiskTestDir() {
|
||||
char base[100];
|
||||
snprintf(base, sizeof(base), "/tmp/rocksdbtest-%d",
|
||||
static_cast<int>(geteuid()));
|
||||
// Directory may already exist
|
||||
Env::Default()->CreateDirIfMissing(base);
|
||||
|
||||
return base;
|
||||
}
|
||||
|
||||
// Only works in linux platforms
|
||||
TEST(EnvPosixTest, RandomAccessUniqueID) {
|
||||
// Create file.
|
||||
const EnvOptions soptions;
|
||||
std::string fname = test::TmpDir() + "/" + "testfile";
|
||||
std::string fname = GetOnDiskTestDir() + "/" + "testfile";
|
||||
unique_ptr<WritableFile> wfile;
|
||||
ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
|
||||
|
||||
@ -261,7 +281,7 @@ TEST(EnvPosixTest, RandomAccessUniqueIDConcurrent) {
|
||||
// Create the files
|
||||
std::vector<std::string> fnames;
|
||||
for (int i = 0; i < 1000; ++i) {
|
||||
fnames.push_back(test::TmpDir() + "/" + "testfile" + std::to_string(i));
|
||||
fnames.push_back(GetOnDiskTestDir() + "/" + "testfile" + std::to_string(i));
|
||||
|
||||
// Create file.
|
||||
unique_ptr<WritableFile> wfile;
|
||||
@ -294,7 +314,8 @@ TEST(EnvPosixTest, RandomAccessUniqueIDConcurrent) {
|
||||
// Only works in linux platforms
|
||||
TEST(EnvPosixTest, RandomAccessUniqueIDDeletes) {
|
||||
const EnvOptions soptions;
|
||||
std::string fname = test::TmpDir() + "/" + "testfile";
|
||||
|
||||
std::string fname = GetOnDiskTestDir() + "/" + "testfile";
|
||||
|
||||
// Check that after file is deleted we don't get same ID again in a new file.
|
||||
std::unordered_set<std::string> ids;
|
||||
|
@ -148,6 +148,7 @@ DBOptions::DBOptions()
|
||||
paranoid_checks(false),
|
||||
env(Env::Default()),
|
||||
info_log(nullptr),
|
||||
info_log_level(INFO),
|
||||
max_open_files(1000),
|
||||
statistics(nullptr),
|
||||
disableDataSync(false),
|
||||
@ -185,6 +186,7 @@ DBOptions::DBOptions(const Options& options)
|
||||
paranoid_checks(options.paranoid_checks),
|
||||
env(options.env),
|
||||
info_log(options.info_log),
|
||||
info_log_level(options.info_log_level),
|
||||
max_open_files(options.max_open_files),
|
||||
statistics(options.statistics),
|
||||
disableDataSync(options.disableDataSync),
|
||||
|
Loading…
Reference in New Issue
Block a user