CompactRange() to return status

Summary: as title

Test Plan:
make all check
What else tests shall I cover?

Reviewers: igor, haobo

CC:

Differential Revision: https://reviews.facebook.net/D15339
This commit is contained in:
Lei Jin 2014-01-22 12:46:24 -08:00
parent 81c9cc9b3b
commit aba2acb5ec
6 changed files with 57 additions and 39 deletions

View File

@ -1288,11 +1288,16 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
return s;
}
void DBImpl::CompactRange(const Slice* begin,
const Slice* end,
bool reduce_level,
int target_level) {
FlushMemTable(FlushOptions());
Status DBImpl::CompactRange(const Slice* begin,
const Slice* end,
bool reduce_level,
int target_level) {
Status s = FlushMemTable(FlushOptions());
if (!s.ok()) {
LogFlush(options_.info_log);
return s;
}
int max_level_with_files = 1;
{
MutexLock l(&mutex_);
@ -1308,16 +1313,22 @@ void DBImpl::CompactRange(const Slice* begin,
// bottom-most level, the output level will be the same as input one
if (options_.compaction_style == kCompactionStyleUniversal ||
level == max_level_with_files) {
RunManualCompaction(level, level, begin, end);
s = RunManualCompaction(level, level, begin, end);
} else {
RunManualCompaction(level, level + 1, begin, end);
s = RunManualCompaction(level, level + 1, begin, end);
}
if (!s.ok()) {
LogFlush(options_.info_log);
return s;
}
}
if (reduce_level) {
ReFitLevel(max_level_with_files, target_level);
s = ReFitLevel(max_level_with_files, target_level);
}
LogFlush(options_.info_log);
return s;
}
// return the same level if it cannot be moved
@ -1336,7 +1347,7 @@ int DBImpl::FindMinimumEmptyLevelFitting(int level) {
return minimum_level;
}
void DBImpl::ReFitLevel(int level, int target_level) {
Status DBImpl::ReFitLevel(int level, int target_level) {
assert(level < NumberLevels());
SuperVersion* superversion_to_free = nullptr;
@ -1350,7 +1361,7 @@ void DBImpl::ReFitLevel(int level, int target_level) {
mutex_.Unlock();
Log(options_.info_log, "ReFitLevel: another thread is refitting");
delete new_superversion;
return;
return Status::NotSupported("another thread is refitting");
}
refitting_level_ = true;
@ -1371,6 +1382,7 @@ void DBImpl::ReFitLevel(int level, int target_level) {
assert(to_level <= level);
Status status;
if (to_level < level) {
Log(options_.info_log, "Before refitting:\n%s",
versions_->current()->DebugString().data());
@ -1384,7 +1396,7 @@ void DBImpl::ReFitLevel(int level, int target_level) {
Log(options_.info_log, "Apply version edit:\n%s",
edit.DebugString().data());
auto status = versions_->LogAndApply(&edit, &mutex_);
status = versions_->LogAndApply(&edit, &mutex_);
superversion_to_free = InstallSuperVersion(new_superversion);
new_superversion = nullptr;
@ -1402,6 +1414,7 @@ void DBImpl::ReFitLevel(int level, int target_level) {
mutex_.Unlock();
delete superversion_to_free;
delete new_superversion;
return status;
}
int DBImpl::NumberLevels() {
@ -1614,10 +1627,10 @@ Status DBImpl::AppendSortedWalsOfType(const std::string& path,
return status;
}
void DBImpl::RunManualCompaction(int input_level,
int output_level,
const Slice* begin,
const Slice* end) {
Status DBImpl::RunManualCompaction(int input_level,
int output_level,
const Slice* begin,
const Slice* end) {
assert(input_level >= 0);
InternalKey begin_storage, end_storage;
@ -1684,15 +1697,16 @@ void DBImpl::RunManualCompaction(int input_level,
assert(!manual.in_progress);
assert(bg_manual_only_ > 0);
--bg_manual_only_;
return manual.status;
}
void DBImpl::TEST_CompactRange(int level,
const Slice* begin,
const Slice* end) {
Status DBImpl::TEST_CompactRange(int level,
const Slice* begin,
const Slice* end) {
int output_level = (options_.compaction_style == kCompactionStyleUniversal)
? level
: level + 1;
RunManualCompaction(level, output_level, begin, end);
return RunManualCompaction(level, output_level, begin, end);
}
Status DBImpl::FlushMemTable(const FlushOptions& options) {
@ -1991,6 +2005,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
if (is_manual) {
ManualCompaction* m = manual_compaction_;
if (!status.ok()) {
m->status = status;
m->done = true;
}
// For universal compaction:

View File

@ -64,8 +64,8 @@ class DBImpl : public DB {
virtual void ReleaseSnapshot(const Snapshot* snapshot);
virtual bool GetProperty(const Slice& property, std::string* value);
virtual void GetApproximateSizes(const Range* range, int n, uint64_t* sizes);
virtual void CompactRange(const Slice* begin, const Slice* end,
bool reduce_level = false, int target_level = -1);
virtual Status CompactRange(const Slice* begin, const Slice* end,
bool reduce_level = false, int target_level = -1);
virtual int NumberLevels();
virtual int MaxMemCompactionLevel();
virtual int Level0StopWriteTrigger();
@ -90,17 +90,17 @@ class DBImpl : public DB {
virtual Status GetDbIdentity(std::string& identity);
void RunManualCompaction(int input_level,
int output_level,
const Slice* begin,
const Slice* end);
Status RunManualCompaction(int input_level,
int output_level,
const Slice* begin,
const Slice* end);
// Extra methods (for testing) that are not in the public DB interface
// Compact any files in the named level that overlap [*begin, *end]
void TEST_CompactRange(int level,
const Slice* begin,
const Slice* end);
Status TEST_CompactRange(int level,
const Slice* begin,
const Slice* end);
// Force current memtable contents to be flushed.
Status TEST_FlushMemTable();
@ -359,7 +359,7 @@ class DBImpl : public DB {
// Move the files in the input level to the target level.
// If target_level < 0, automatically calculate the minimum level that could
// hold the data set.
void ReFitLevel(int level, int target_level = -1);
Status ReFitLevel(int level, int target_level = -1);
// Returns the current SuperVersion number.
uint64_t CurrentVersionNumber() const;
@ -430,6 +430,7 @@ class DBImpl : public DB {
int input_level;
int output_level;
bool done;
Status status;
bool in_progress; // compaction request being processed?
const InternalKey* begin; // nullptr means beginning of key range
const InternalKey* end; // nullptr means end of key range

View File

@ -49,8 +49,9 @@ public:
virtual Status Write(const WriteOptions& options, WriteBatch* updates) {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual void CompactRange(const Slice* begin, const Slice* end,
bool reduce_level = false, int target_level = -1) {
virtual Status CompactRange(const Slice* begin, const Slice* end,
bool reduce_level = false, int target_level = -1) {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual Status DisableFileDeletions() {
return Status::NotSupported("Not supported operation in read only mode.");

View File

@ -4556,8 +4556,9 @@ class ModelDB: public DB {
sizes[i] = 0;
}
}
virtual void CompactRange(const Slice* start, const Slice* end,
bool reduce_level, int target_level) {
virtual Status CompactRange(const Slice* start, const Slice* end,
bool reduce_level, int target_level) {
return Status::NotSupported("Not supported operation.");
}
virtual int NumberLevels()

View File

@ -215,9 +215,9 @@ class DB {
// hosting all the files. In this case, client could set reduce_level
// to true, to move the files back to the minimum level capable of holding
// the data set or a given level (specified by non-negative target_level).
virtual void CompactRange(const Slice* begin, const Slice* end,
bool reduce_level = false,
int target_level = -1) = 0;
virtual Status CompactRange(const Slice* begin, const Slice* end,
bool reduce_level = false,
int target_level = -1) = 0;
// Number of levels used for this DB.
virtual int NumberLevels() = 0;

View File

@ -85,9 +85,9 @@ class StackableDB : public DB {
return db_->GetApproximateSizes(r, n, sizes);
}
virtual void CompactRange(const Slice* begin, const Slice* end,
bool reduce_level = false,
int target_level = -1) override {
virtual Status CompactRange(const Slice* begin, const Slice* end,
bool reduce_level = false,
int target_level = -1) override {
return db_->CompactRange(begin, end, reduce_level, target_level);
}