Move FIFOCompactionPicker to a separate file (#4724)
Summary: **Summary:** Simplified the code layout by moving FIFOCompactionPicker to a separate file. **Why?:** While trying to add ttl functionality to universal compaction, I found that `FIFOCompactionPicker` class and its impl methods to be interspersed between `LevelCompactionPicker` methods which kind-of made the code a little hard to traverse. So I moved `FIFOCompactionPicker` to a separate compaction_picker_fifo.h/cc file, similar to `UniversalCompactionPicker`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4724 Differential Revision: D13227914 Pulled By: sagar0 fbshipit-source-id: 89471766ea67fa4d87664a41c057dd7df4b3d4e3
This commit is contained in:
parent
8d7bc76f36
commit
70645355ad
@ -471,6 +471,7 @@ set(SOURCES
|
||||
db/compaction_iterator.cc
|
||||
db/compaction_job.cc
|
||||
db/compaction_picker.cc
|
||||
db/compaction_picker_fifo.cc
|
||||
db/compaction_picker_universal.cc
|
||||
db/convenience.cc
|
||||
db/db_filesnapshot.cc
|
||||
|
1
TARGETS
1
TARGETS
@ -91,6 +91,7 @@ cpp_library(
|
||||
"db/compaction_iterator.cc",
|
||||
"db/compaction_job.cc",
|
||||
"db/compaction_picker.cc",
|
||||
"db/compaction_picker_fifo.cc",
|
||||
"db/compaction_picker_universal.cc",
|
||||
"db/convenience.cc",
|
||||
"db/db_filesnapshot.cc",
|
||||
|
@ -20,6 +20,7 @@
|
||||
#include <limits>
|
||||
|
||||
#include "db/compaction_picker.h"
|
||||
#include "db/compaction_picker_fifo.h"
|
||||
#include "db/compaction_picker_universal.h"
|
||||
#include "db/db_impl.h"
|
||||
#include "db/internal_stats.h"
|
||||
|
@ -18,6 +18,7 @@
|
||||
#include <queue>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
#include "db/column_family.h"
|
||||
#include "monitoring/statistics.h"
|
||||
#include "util/filename.h"
|
||||
@ -36,6 +37,7 @@ uint64_t TotalCompensatedFileSize(const std::vector<FileMetaData*>& files) {
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
} // anonymous namespace
|
||||
|
||||
bool FindIntraL0Compaction(const std::vector<FileMetaData*>& level_files,
|
||||
size_t min_files_to_compact,
|
||||
@ -69,7 +71,6 @@ bool FindIntraL0Compaction(const std::vector<FileMetaData*>& level_files,
|
||||
}
|
||||
return false;
|
||||
}
|
||||
} // anonymous namespace
|
||||
|
||||
// Determine compression type, based on user options, level of the output
|
||||
// file and whether compression is disabled.
|
||||
@ -1547,217 +1548,4 @@ Compaction* LevelCompactionPicker::PickCompaction(
|
||||
return builder.PickCompaction();
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
bool FIFOCompactionPicker::NeedsCompaction(
|
||||
const VersionStorageInfo* vstorage) const {
|
||||
const int kLevel0 = 0;
|
||||
return vstorage->CompactionScore(kLevel0) >= 1;
|
||||
}
|
||||
|
||||
namespace {
|
||||
uint64_t GetTotalFilesSize(
|
||||
const std::vector<FileMetaData*>& files) {
|
||||
uint64_t total_size = 0;
|
||||
for (const auto& f : files) {
|
||||
total_size += f->fd.file_size;
|
||||
}
|
||||
return total_size;
|
||||
}
|
||||
} // anonymous namespace
|
||||
|
||||
Compaction* FIFOCompactionPicker::PickTTLCompaction(
|
||||
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
|
||||
VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
|
||||
assert(mutable_cf_options.compaction_options_fifo.ttl > 0);
|
||||
|
||||
const int kLevel0 = 0;
|
||||
const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0);
|
||||
uint64_t total_size = GetTotalFilesSize(level_files);
|
||||
|
||||
int64_t _current_time;
|
||||
auto status = ioptions_.env->GetCurrentTime(&_current_time);
|
||||
if (!status.ok()) {
|
||||
ROCKS_LOG_BUFFER(log_buffer,
|
||||
"[%s] FIFO compaction: Couldn't get current time: %s. "
|
||||
"Not doing compactions based on TTL. ",
|
||||
cf_name.c_str(), status.ToString().c_str());
|
||||
return nullptr;
|
||||
}
|
||||
const uint64_t current_time = static_cast<uint64_t>(_current_time);
|
||||
|
||||
std::vector<CompactionInputFiles> inputs;
|
||||
inputs.emplace_back();
|
||||
inputs[0].level = 0;
|
||||
|
||||
// avoid underflow
|
||||
if (current_time > mutable_cf_options.compaction_options_fifo.ttl) {
|
||||
for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) {
|
||||
auto f = *ritr;
|
||||
if (f->fd.table_reader != nullptr &&
|
||||
f->fd.table_reader->GetTableProperties() != nullptr) {
|
||||
auto creation_time =
|
||||
f->fd.table_reader->GetTableProperties()->creation_time;
|
||||
if (creation_time == 0 ||
|
||||
creation_time >= (current_time -
|
||||
mutable_cf_options.compaction_options_fifo.ttl)) {
|
||||
break;
|
||||
}
|
||||
total_size -= f->compensated_file_size;
|
||||
inputs[0].files.push_back(f);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Return a nullptr and proceed to size-based FIFO compaction if:
|
||||
// 1. there are no files older than ttl OR
|
||||
// 2. there are a few files older than ttl, but deleting them will not bring
|
||||
// the total size to be less than max_table_files_size threshold.
|
||||
if (inputs[0].files.empty() ||
|
||||
total_size >
|
||||
mutable_cf_options.compaction_options_fifo.max_table_files_size) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
for (const auto& f : inputs[0].files) {
|
||||
ROCKS_LOG_BUFFER(log_buffer,
|
||||
"[%s] FIFO compaction: picking file %" PRIu64
|
||||
" with creation time %" PRIu64 " for deletion",
|
||||
cf_name.c_str(), f->fd.GetNumber(),
|
||||
f->fd.table_reader->GetTableProperties()->creation_time);
|
||||
}
|
||||
|
||||
Compaction* c = new Compaction(
|
||||
vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0,
|
||||
kNoCompression, ioptions_.compression_opts, /* max_subcompactions */ 0,
|
||||
{}, /* is manual */ false, vstorage->CompactionScore(0),
|
||||
/* is deletion compaction */ true, CompactionReason::kFIFOTtl);
|
||||
return c;
|
||||
}
|
||||
|
||||
Compaction* FIFOCompactionPicker::PickSizeCompaction(
|
||||
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
|
||||
VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
|
||||
const int kLevel0 = 0;
|
||||
const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0);
|
||||
uint64_t total_size = GetTotalFilesSize(level_files);
|
||||
|
||||
if (total_size <=
|
||||
mutable_cf_options.compaction_options_fifo.max_table_files_size ||
|
||||
level_files.size() == 0) {
|
||||
// total size not exceeded
|
||||
if (mutable_cf_options.compaction_options_fifo.allow_compaction &&
|
||||
level_files.size() > 0) {
|
||||
CompactionInputFiles comp_inputs;
|
||||
// try to prevent same files from being compacted multiple times, which
|
||||
// could produce large files that may never TTL-expire. Achieve this by
|
||||
// disallowing compactions with files larger than memtable (inflate its
|
||||
// size by 10% to account for uncompressed L0 files that may have size
|
||||
// slightly greater than memtable size limit).
|
||||
size_t max_compact_bytes_per_del_file =
|
||||
static_cast<size_t>(MultiplyCheckOverflow(
|
||||
static_cast<uint64_t>(mutable_cf_options.write_buffer_size),
|
||||
1.1));
|
||||
if (FindIntraL0Compaction(
|
||||
level_files,
|
||||
mutable_cf_options
|
||||
.level0_file_num_compaction_trigger /* min_files_to_compact */
|
||||
,
|
||||
max_compact_bytes_per_del_file, &comp_inputs)) {
|
||||
Compaction* c = new Compaction(
|
||||
vstorage, ioptions_, mutable_cf_options, {comp_inputs}, 0,
|
||||
16 * 1024 * 1024 /* output file size limit */,
|
||||
0 /* max compaction bytes, not applicable */,
|
||||
0 /* output path ID */, mutable_cf_options.compression,
|
||||
ioptions_.compression_opts, 0 /* max_subcompactions */, {},
|
||||
/* is manual */ false, vstorage->CompactionScore(0),
|
||||
/* is deletion compaction */ false,
|
||||
CompactionReason::kFIFOReduceNumFiles);
|
||||
return c;
|
||||
}
|
||||
}
|
||||
|
||||
ROCKS_LOG_BUFFER(
|
||||
log_buffer,
|
||||
"[%s] FIFO compaction: nothing to do. Total size %" PRIu64
|
||||
", max size %" PRIu64 "\n",
|
||||
cf_name.c_str(), total_size,
|
||||
mutable_cf_options.compaction_options_fifo.max_table_files_size);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (!level0_compactions_in_progress_.empty()) {
|
||||
ROCKS_LOG_BUFFER(
|
||||
log_buffer,
|
||||
"[%s] FIFO compaction: Already executing compaction. No need "
|
||||
"to run parallel compactions since compactions are very fast",
|
||||
cf_name.c_str());
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
std::vector<CompactionInputFiles> inputs;
|
||||
inputs.emplace_back();
|
||||
inputs[0].level = 0;
|
||||
|
||||
for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) {
|
||||
auto f = *ritr;
|
||||
total_size -= f->compensated_file_size;
|
||||
inputs[0].files.push_back(f);
|
||||
char tmp_fsize[16];
|
||||
AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize));
|
||||
ROCKS_LOG_BUFFER(log_buffer,
|
||||
"[%s] FIFO compaction: picking file %" PRIu64
|
||||
" with size %s for deletion",
|
||||
cf_name.c_str(), f->fd.GetNumber(), tmp_fsize);
|
||||
if (total_size <=
|
||||
mutable_cf_options.compaction_options_fifo.max_table_files_size) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Compaction* c = new Compaction(
|
||||
vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0,
|
||||
kNoCompression, ioptions_.compression_opts, /* max_subcompactions */ 0,
|
||||
{}, /* is manual */ false, vstorage->CompactionScore(0),
|
||||
/* is deletion compaction */ true, CompactionReason::kFIFOMaxSize);
|
||||
return c;
|
||||
}
|
||||
|
||||
Compaction* FIFOCompactionPicker::PickCompaction(
|
||||
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
|
||||
VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
|
||||
assert(vstorage->num_levels() == 1);
|
||||
|
||||
Compaction* c = nullptr;
|
||||
if (mutable_cf_options.compaction_options_fifo.ttl > 0) {
|
||||
c = PickTTLCompaction(cf_name, mutable_cf_options, vstorage, log_buffer);
|
||||
}
|
||||
if (c == nullptr) {
|
||||
c = PickSizeCompaction(cf_name, mutable_cf_options, vstorage, log_buffer);
|
||||
}
|
||||
RegisterCompaction(c);
|
||||
return c;
|
||||
}
|
||||
|
||||
Compaction* FIFOCompactionPicker::CompactRange(
|
||||
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
|
||||
VersionStorageInfo* vstorage, int input_level, int output_level,
|
||||
uint32_t /*output_path_id*/, uint32_t /*max_subcompactions*/,
|
||||
const InternalKey* /*begin*/, const InternalKey* /*end*/,
|
||||
InternalKey** compaction_end, bool* /*manual_conflict*/) {
|
||||
#ifdef NDEBUG
|
||||
(void)input_level;
|
||||
(void)output_level;
|
||||
#endif
|
||||
assert(input_level == 0);
|
||||
assert(output_level == 0);
|
||||
*compaction_end = nullptr;
|
||||
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, ioptions_.info_log);
|
||||
Compaction* c =
|
||||
PickCompaction(cf_name, mutable_cf_options, vstorage, &log_buffer);
|
||||
log_buffer.FlushBufferToLog();
|
||||
return c;
|
||||
}
|
||||
|
||||
#endif // !ROCKSDB_LITE
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -235,42 +235,6 @@ class LevelCompactionPicker : public CompactionPicker {
|
||||
};
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
class FIFOCompactionPicker : public CompactionPicker {
|
||||
public:
|
||||
FIFOCompactionPicker(const ImmutableCFOptions& ioptions,
|
||||
const InternalKeyComparator* icmp)
|
||||
: CompactionPicker(ioptions, icmp) {}
|
||||
|
||||
virtual Compaction* PickCompaction(const std::string& cf_name,
|
||||
const MutableCFOptions& mutable_cf_options,
|
||||
VersionStorageInfo* version,
|
||||
LogBuffer* log_buffer) override;
|
||||
|
||||
virtual Compaction* CompactRange(
|
||||
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
|
||||
VersionStorageInfo* vstorage, int input_level, int output_level,
|
||||
uint32_t output_path_id, uint32_t max_subcompactions,
|
||||
const InternalKey* begin, const InternalKey* end,
|
||||
InternalKey** compaction_end, bool* manual_conflict) override;
|
||||
|
||||
// The maximum allowed output level. Always returns 0.
|
||||
virtual int MaxOutputLevel() const override { return 0; }
|
||||
|
||||
virtual bool NeedsCompaction(
|
||||
const VersionStorageInfo* vstorage) const override;
|
||||
|
||||
private:
|
||||
Compaction* PickTTLCompaction(const std::string& cf_name,
|
||||
const MutableCFOptions& mutable_cf_options,
|
||||
VersionStorageInfo* version,
|
||||
LogBuffer* log_buffer);
|
||||
|
||||
Compaction* PickSizeCompaction(const std::string& cf_name,
|
||||
const MutableCFOptions& mutable_cf_options,
|
||||
VersionStorageInfo* version,
|
||||
LogBuffer* log_buffer);
|
||||
};
|
||||
|
||||
class NullCompactionPicker : public CompactionPicker {
|
||||
public:
|
||||
NullCompactionPicker(const ImmutableCFOptions& ioptions,
|
||||
@ -308,6 +272,11 @@ class NullCompactionPicker : public CompactionPicker {
|
||||
};
|
||||
#endif // !ROCKSDB_LITE
|
||||
|
||||
bool FindIntraL0Compaction(const std::vector<FileMetaData*>& level_files,
|
||||
size_t min_files_to_compact,
|
||||
uint64_t max_compact_bytes_per_del_file,
|
||||
CompactionInputFiles* comp_inputs);
|
||||
|
||||
CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
|
||||
const VersionStorageInfo* vstorage,
|
||||
const MutableCFOptions& mutable_cf_options,
|
||||
|
235
db/compaction_picker_fifo.cc
Normal file
235
db/compaction_picker_fifo.cc
Normal file
@ -0,0 +1,235 @@
|
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under both the GPLv2 (found in the
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
//
|
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#include "db/compaction_picker_fifo.h"
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
#ifndef __STDC_FORMAT_MACROS
|
||||
#define __STDC_FORMAT_MACROS
|
||||
#endif
|
||||
|
||||
#include <inttypes.h>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include "db/column_family.h"
|
||||
#include "util/log_buffer.h"
|
||||
#include "util/string_util.h"
|
||||
|
||||
namespace rocksdb {
|
||||
namespace {
|
||||
uint64_t GetTotalFilesSize(const std::vector<FileMetaData*>& files) {
|
||||
uint64_t total_size = 0;
|
||||
for (const auto& f : files) {
|
||||
total_size += f->fd.file_size;
|
||||
}
|
||||
return total_size;
|
||||
}
|
||||
} // anonymous namespace
|
||||
|
||||
bool FIFOCompactionPicker::NeedsCompaction(
|
||||
const VersionStorageInfo* vstorage) const {
|
||||
const int kLevel0 = 0;
|
||||
return vstorage->CompactionScore(kLevel0) >= 1;
|
||||
}
|
||||
|
||||
Compaction* FIFOCompactionPicker::PickTTLCompaction(
|
||||
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
|
||||
VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
|
||||
assert(mutable_cf_options.compaction_options_fifo.ttl > 0);
|
||||
|
||||
const int kLevel0 = 0;
|
||||
const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0);
|
||||
uint64_t total_size = GetTotalFilesSize(level_files);
|
||||
|
||||
int64_t _current_time;
|
||||
auto status = ioptions_.env->GetCurrentTime(&_current_time);
|
||||
if (!status.ok()) {
|
||||
ROCKS_LOG_BUFFER(log_buffer,
|
||||
"[%s] FIFO compaction: Couldn't get current time: %s. "
|
||||
"Not doing compactions based on TTL. ",
|
||||
cf_name.c_str(), status.ToString().c_str());
|
||||
return nullptr;
|
||||
}
|
||||
const uint64_t current_time = static_cast<uint64_t>(_current_time);
|
||||
|
||||
std::vector<CompactionInputFiles> inputs;
|
||||
inputs.emplace_back();
|
||||
inputs[0].level = 0;
|
||||
|
||||
// avoid underflow
|
||||
if (current_time > mutable_cf_options.compaction_options_fifo.ttl) {
|
||||
for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) {
|
||||
auto f = *ritr;
|
||||
if (f->fd.table_reader != nullptr &&
|
||||
f->fd.table_reader->GetTableProperties() != nullptr) {
|
||||
auto creation_time =
|
||||
f->fd.table_reader->GetTableProperties()->creation_time;
|
||||
if (creation_time == 0 ||
|
||||
creation_time >= (current_time -
|
||||
mutable_cf_options.compaction_options_fifo.ttl)) {
|
||||
break;
|
||||
}
|
||||
total_size -= f->compensated_file_size;
|
||||
inputs[0].files.push_back(f);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Return a nullptr and proceed to size-based FIFO compaction if:
|
||||
// 1. there are no files older than ttl OR
|
||||
// 2. there are a few files older than ttl, but deleting them will not bring
|
||||
// the total size to be less than max_table_files_size threshold.
|
||||
if (inputs[0].files.empty() ||
|
||||
total_size >
|
||||
mutable_cf_options.compaction_options_fifo.max_table_files_size) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
for (const auto& f : inputs[0].files) {
|
||||
ROCKS_LOG_BUFFER(log_buffer,
|
||||
"[%s] FIFO compaction: picking file %" PRIu64
|
||||
" with creation time %" PRIu64 " for deletion",
|
||||
cf_name.c_str(), f->fd.GetNumber(),
|
||||
f->fd.table_reader->GetTableProperties()->creation_time);
|
||||
}
|
||||
|
||||
Compaction* c = new Compaction(
|
||||
vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0,
|
||||
kNoCompression, ioptions_.compression_opts, /* max_subcompactions */ 0,
|
||||
{}, /* is manual */ false, vstorage->CompactionScore(0),
|
||||
/* is deletion compaction */ true, CompactionReason::kFIFOTtl);
|
||||
return c;
|
||||
}
|
||||
|
||||
Compaction* FIFOCompactionPicker::PickSizeCompaction(
|
||||
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
|
||||
VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
|
||||
const int kLevel0 = 0;
|
||||
const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0);
|
||||
uint64_t total_size = GetTotalFilesSize(level_files);
|
||||
|
||||
if (total_size <=
|
||||
mutable_cf_options.compaction_options_fifo.max_table_files_size ||
|
||||
level_files.size() == 0) {
|
||||
// total size not exceeded
|
||||
if (mutable_cf_options.compaction_options_fifo.allow_compaction &&
|
||||
level_files.size() > 0) {
|
||||
CompactionInputFiles comp_inputs;
|
||||
// try to prevent same files from being compacted multiple times, which
|
||||
// could produce large files that may never TTL-expire. Achieve this by
|
||||
// disallowing compactions with files larger than memtable (inflate its
|
||||
// size by 10% to account for uncompressed L0 files that may have size
|
||||
// slightly greater than memtable size limit).
|
||||
size_t max_compact_bytes_per_del_file =
|
||||
static_cast<size_t>(MultiplyCheckOverflow(
|
||||
static_cast<uint64_t>(mutable_cf_options.write_buffer_size),
|
||||
1.1));
|
||||
if (FindIntraL0Compaction(
|
||||
level_files,
|
||||
mutable_cf_options
|
||||
.level0_file_num_compaction_trigger /* min_files_to_compact */
|
||||
,
|
||||
max_compact_bytes_per_del_file, &comp_inputs)) {
|
||||
Compaction* c = new Compaction(
|
||||
vstorage, ioptions_, mutable_cf_options, {comp_inputs}, 0,
|
||||
16 * 1024 * 1024 /* output file size limit */,
|
||||
0 /* max compaction bytes, not applicable */,
|
||||
0 /* output path ID */, mutable_cf_options.compression,
|
||||
ioptions_.compression_opts, 0 /* max_subcompactions */, {},
|
||||
/* is manual */ false, vstorage->CompactionScore(0),
|
||||
/* is deletion compaction */ false,
|
||||
CompactionReason::kFIFOReduceNumFiles);
|
||||
return c;
|
||||
}
|
||||
}
|
||||
|
||||
ROCKS_LOG_BUFFER(
|
||||
log_buffer,
|
||||
"[%s] FIFO compaction: nothing to do. Total size %" PRIu64
|
||||
", max size %" PRIu64 "\n",
|
||||
cf_name.c_str(), total_size,
|
||||
mutable_cf_options.compaction_options_fifo.max_table_files_size);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (!level0_compactions_in_progress_.empty()) {
|
||||
ROCKS_LOG_BUFFER(
|
||||
log_buffer,
|
||||
"[%s] FIFO compaction: Already executing compaction. No need "
|
||||
"to run parallel compactions since compactions are very fast",
|
||||
cf_name.c_str());
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
std::vector<CompactionInputFiles> inputs;
|
||||
inputs.emplace_back();
|
||||
inputs[0].level = 0;
|
||||
|
||||
for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) {
|
||||
auto f = *ritr;
|
||||
total_size -= f->compensated_file_size;
|
||||
inputs[0].files.push_back(f);
|
||||
char tmp_fsize[16];
|
||||
AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize));
|
||||
ROCKS_LOG_BUFFER(log_buffer,
|
||||
"[%s] FIFO compaction: picking file %" PRIu64
|
||||
" with size %s for deletion",
|
||||
cf_name.c_str(), f->fd.GetNumber(), tmp_fsize);
|
||||
if (total_size <=
|
||||
mutable_cf_options.compaction_options_fifo.max_table_files_size) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Compaction* c = new Compaction(
|
||||
vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0,
|
||||
kNoCompression, ioptions_.compression_opts, /* max_subcompactions */ 0,
|
||||
{}, /* is manual */ false, vstorage->CompactionScore(0),
|
||||
/* is deletion compaction */ true, CompactionReason::kFIFOMaxSize);
|
||||
return c;
|
||||
}
|
||||
|
||||
Compaction* FIFOCompactionPicker::PickCompaction(
|
||||
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
|
||||
VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
|
||||
assert(vstorage->num_levels() == 1);
|
||||
|
||||
Compaction* c = nullptr;
|
||||
if (mutable_cf_options.compaction_options_fifo.ttl > 0) {
|
||||
c = PickTTLCompaction(cf_name, mutable_cf_options, vstorage, log_buffer);
|
||||
}
|
||||
if (c == nullptr) {
|
||||
c = PickSizeCompaction(cf_name, mutable_cf_options, vstorage, log_buffer);
|
||||
}
|
||||
RegisterCompaction(c);
|
||||
return c;
|
||||
}
|
||||
|
||||
Compaction* FIFOCompactionPicker::CompactRange(
|
||||
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
|
||||
VersionStorageInfo* vstorage, int input_level, int output_level,
|
||||
uint32_t /*output_path_id*/, uint32_t /*max_subcompactions*/,
|
||||
const InternalKey* /*begin*/, const InternalKey* /*end*/,
|
||||
InternalKey** compaction_end, bool* /*manual_conflict*/) {
|
||||
#ifdef NDEBUG
|
||||
(void)input_level;
|
||||
(void)output_level;
|
||||
#endif
|
||||
assert(input_level == 0);
|
||||
assert(output_level == 0);
|
||||
*compaction_end = nullptr;
|
||||
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, ioptions_.info_log);
|
||||
Compaction* c =
|
||||
PickCompaction(cf_name, mutable_cf_options, vstorage, &log_buffer);
|
||||
log_buffer.FlushBufferToLog();
|
||||
return c;
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
#endif // !ROCKSDB_LITE
|
52
db/compaction_picker_fifo.h
Normal file
52
db/compaction_picker_fifo.h
Normal file
@ -0,0 +1,52 @@
|
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under both the GPLv2 (found in the
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
//
|
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#pragma once
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
#include "db/compaction_picker.h"
|
||||
|
||||
namespace rocksdb {
|
||||
class FIFOCompactionPicker : public CompactionPicker {
|
||||
public:
|
||||
FIFOCompactionPicker(const ImmutableCFOptions& ioptions,
|
||||
const InternalKeyComparator* icmp)
|
||||
: CompactionPicker(ioptions, icmp) {}
|
||||
|
||||
virtual Compaction* PickCompaction(const std::string& cf_name,
|
||||
const MutableCFOptions& mutable_cf_options,
|
||||
VersionStorageInfo* version,
|
||||
LogBuffer* log_buffer) override;
|
||||
|
||||
virtual Compaction* CompactRange(
|
||||
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
|
||||
VersionStorageInfo* vstorage, int input_level, int output_level,
|
||||
uint32_t output_path_id, uint32_t max_subcompactions,
|
||||
const InternalKey* begin, const InternalKey* end,
|
||||
InternalKey** compaction_end, bool* manual_conflict) override;
|
||||
|
||||
// The maximum allowed output level. Always returns 0.
|
||||
virtual int MaxOutputLevel() const override { return 0; }
|
||||
|
||||
virtual bool NeedsCompaction(
|
||||
const VersionStorageInfo* vstorage) const override;
|
||||
|
||||
private:
|
||||
Compaction* PickTTLCompaction(const std::string& cf_name,
|
||||
const MutableCFOptions& mutable_cf_options,
|
||||
VersionStorageInfo* version,
|
||||
LogBuffer* log_buffer);
|
||||
|
||||
Compaction* PickSizeCompaction(const std::string& cf_name,
|
||||
const MutableCFOptions& mutable_cf_options,
|
||||
VersionStorageInfo* version,
|
||||
LogBuffer* log_buffer);
|
||||
};
|
||||
} // namespace rocksdb
|
||||
#endif // !ROCKSDB_LITE
|
@ -4,10 +4,12 @@
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
|
||||
#include "db/compaction_picker.h"
|
||||
|
||||
#include <limits>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include "db/compaction.h"
|
||||
#include "db/compaction_picker_fifo.h"
|
||||
#include "db/compaction_picker_universal.h"
|
||||
|
||||
#include "util/logging.h"
|
||||
|
1
src.mk
1
src.mk
@ -11,6 +11,7 @@ LIB_SOURCES = \
|
||||
db/compaction_iterator.cc \
|
||||
db/compaction_job.cc \
|
||||
db/compaction_picker.cc \
|
||||
db/compaction_picker_fifo.cc \
|
||||
db/compaction_picker_universal.cc \
|
||||
db/convenience.cc \
|
||||
db/db_filesnapshot.cc \
|
||||
|
@ -4580,7 +4580,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
|
||||
if (FLAGS_max_scan_distance != 0) {
|
||||
if (FLAGS_reverse_iterator) {
|
||||
GenerateKeyFromInt(
|
||||
(uint64_t)std::max((int64_t)0, seek_pos - FLAGS_max_scan_distance),
|
||||
(uint64_t)std::max((int64_t)0,
|
||||
seek_pos - FLAGS_max_scan_distance),
|
||||
FLAGS_num, &lower_bound);
|
||||
options.iterate_lower_bound = &lower_bound;
|
||||
} else {
|
||||
|
Loading…
x
Reference in New Issue
Block a user