Merge from github/master

This commit is contained in:
Dmitri Smirnov 2015-07-09 18:01:08 -07:00
commit c903ccc4c2
24 changed files with 724 additions and 227 deletions

View File

@ -5,6 +5,10 @@
* Added experimental support for optimistic transactions. See include/rocksdb/utilities/optimistic_transaction.h for more info. * Added experimental support for optimistic transactions. See include/rocksdb/utilities/optimistic_transaction.h for more info.
* Added a new way to report QPS from db_bench (check out --report_file and --report_interval_seconds) * Added a new way to report QPS from db_bench (check out --report_file and --report_interval_seconds)
* Added a cache for individual rows. See DBOptions::row_cache for more info. * Added a cache for individual rows. See DBOptions::row_cache for more info.
* Several new features on EventListener (see include/rocksdb/listener.h):
- OnCompationCompleted() now returns per-compaciton job statistics, defined in include/rocksdb/compaction_job_stats.h.
- Added OnTableFileCreated() and OnTableFileDeleted().
* Add compaction_options_universal.enable_trivial_move to true, to allow trivial move while performing universal compaction. Trivial move will happen only when all the input files are non overlapping.
### Public API changes ### Public API changes
* EventListener::OnFlushCompleted() now passes FlushJobInfo instead of a list of parameters. * EventListener::OnFlushCompleted() now passes FlushJobInfo instead of a list of parameters.

View File

@ -59,6 +59,7 @@ ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
if (job_context.HaveSomethingToDelete()) { if (job_context.HaveSomethingToDelete()) {
db_->PurgeObsoleteFiles(job_context); db_->PurgeObsoleteFiles(job_context);
} }
job_context.Clean();
} }
} }

View File

@ -167,8 +167,14 @@ bool Compaction::IsTrivialMove() const {
return false; return false;
} }
// Used in universal compaction, where trivial move can be done if the
// input files are non overlapping
if (cfd_->ioptions()->compaction_options_universal.allow_trivial_move) {
return is_trivial_move_;
}
return (start_level_ != output_level_ && num_input_levels() == 1 && return (start_level_ != output_level_ && num_input_levels() == 1 &&
input(0, 0)->fd.GetPathId() == GetOutputPathId() && input(0, 0)->fd.GetPathId() == output_path_id() &&
InputCompressionMatchesOutput() && InputCompressionMatchesOutput() &&
TotalFileSize(grandparents_) <= max_grandparent_overlap_bytes_); TotalFileSize(grandparents_) <= max_grandparent_overlap_bytes_);
} }

View File

@ -109,20 +109,20 @@ class Compaction {
} }
// Maximum size of files to build during this compaction. // Maximum size of files to build during this compaction.
uint64_t MaxOutputFileSize() const { return max_output_file_size_; } uint64_t max_output_file_size() const { return max_output_file_size_; }
// What compression for output // What compression for output
CompressionType OutputCompressionType() const { return output_compression_; } CompressionType output_compression() const { return output_compression_; }
// Whether need to write output file to second DB path. // Whether need to write output file to second DB path.
uint32_t GetOutputPathId() const { return output_path_id_; } uint32_t output_path_id() const { return output_path_id_; }
// Is this a trivial compaction that can be implemented by just // Is this a trivial compaction that can be implemented by just
// moving a single input file to the next level (no merging or splitting) // moving a single input file to the next level (no merging or splitting)
bool IsTrivialMove() const; bool IsTrivialMove() const;
// If true, then the compaction can be done by simply deleting input files. // If true, then the compaction can be done by simply deleting input files.
bool IsDeletionCompaction() const { bool deletion_compaction() const {
return deletion_compaction_; return deletion_compaction_;
} }
@ -150,13 +150,26 @@ class Compaction {
double score() const { return score_; } double score() const { return score_; }
// Is this compaction creating a file in the bottom most level? // Is this compaction creating a file in the bottom most level?
bool BottomMostLevel() { return bottommost_level_; } bool bottommost_level() { return bottommost_level_; }
// Does this compaction include all sst files? // Does this compaction include all sst files?
bool IsFullCompaction() { return is_full_compaction_; } bool is_full_compaction() { return is_full_compaction_; }
// Was this compaction triggered manually by the client? // Was this compaction triggered manually by the client?
bool IsManualCompaction() { return is_manual_compaction_; } bool is_manual_compaction() { return is_manual_compaction_; }
// Used when allow_trivial_move option is set in
// Universal compaction. If all the input files are
// non overlapping, then is_trivial_move_ variable
// will be set true, else false
void set_is_trivial_move(bool trivial_move) {
is_trivial_move_ = trivial_move;
}
// Used when allow_trivial_move option is set in
// Universal compaction. Returns true, if the input files
// are non-overlapping and can be trivially moved.
bool is_trivial_move() { return is_trivial_move_; }
// Return the MutableCFOptions that should be used throughout the compaction // Return the MutableCFOptions that should be used throughout the compaction
// procedure // procedure
@ -238,6 +251,11 @@ class Compaction {
// Is this compaction requested by the client? // Is this compaction requested by the client?
const bool is_manual_compaction_; const bool is_manual_compaction_;
// True if we can do trivial move in Universal multi level
// compaction
bool is_trivial_move_;
// "level_ptrs_" holds indices into "input_version_->levels_", where each // "level_ptrs_" holds indices into "input_version_->levels_", where each
// index remembers which file of an associated level we are currently used // index remembers which file of an associated level we are currently used
// to check KeyNotExistsBeyondOutputLevel() for deletion operation. // to check KeyNotExistsBeyondOutputLevel() for deletion operation.

View File

@ -238,12 +238,12 @@ void CompactionJob::ReportStartedCompaction(
// In the current design, a CompactionJob is always created // In the current design, a CompactionJob is always created
// for non-trivial compaction. // for non-trivial compaction.
assert(compaction->IsTrivialMove() == false || assert(compaction->IsTrivialMove() == false ||
compaction->IsManualCompaction() == true); compaction->is_manual_compaction() == true);
ThreadStatusUtil::SetThreadOperationProperty( ThreadStatusUtil::SetThreadOperationProperty(
ThreadStatus::COMPACTION_PROP_FLAGS, ThreadStatus::COMPACTION_PROP_FLAGS,
compaction->IsManualCompaction() + compaction->is_manual_compaction() +
(compaction->IsDeletionCompaction() << 1)); (compaction->deletion_compaction() << 1));
ThreadStatusUtil::SetThreadOperationProperty( ThreadStatusUtil::SetThreadOperationProperty(
ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES, ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES,
@ -263,7 +263,7 @@ void CompactionJob::ReportStartedCompaction(
if (compaction_job_stats_) { if (compaction_job_stats_) {
compaction_job_stats_->is_manual_compaction = compaction_job_stats_->is_manual_compaction =
compaction->IsManualCompaction(); compaction->is_manual_compaction();
} }
} }
@ -298,7 +298,7 @@ void CompactionJob::Prepare() {
} }
// Is this compaction producing files at the bottommost level? // Is this compaction producing files at the bottommost level?
bottommost_level_ = compact_->compaction->BottomMostLevel(); bottommost_level_ = compact_->compaction->bottommost_level();
} }
Status CompactionJob::Run() { Status CompactionJob::Run() {
@ -864,7 +864,7 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
// Close output file if it is big enough // Close output file if it is big enough
if (compact_->builder->FileSize() >= if (compact_->builder->FileSize() >=
compact_->compaction->MaxOutputFileSize()) { compact_->compaction->max_output_file_size()) {
status = FinishCompactionOutputFile(input); status = FinishCompactionOutputFile(input);
if (!status.ok()) { if (!status.ok()) {
break; break;
@ -1160,7 +1160,7 @@ Status CompactionJob::OpenCompactionOutputFile() {
uint64_t file_number = versions_->NewFileNumber(); uint64_t file_number = versions_->NewFileNumber();
// Make the output file // Make the output file
std::string fname = TableFileName(db_options_.db_paths, file_number, std::string fname = TableFileName(db_options_.db_paths, file_number,
compact_->compaction->GetOutputPathId()); compact_->compaction->output_path_id());
Status s = env_->NewWritableFile(fname, &compact_->outfile, env_options_); Status s = env_->NewWritableFile(fname, &compact_->outfile, env_options_);
if (!s.ok()) { if (!s.ok()) {
@ -1174,7 +1174,7 @@ Status CompactionJob::OpenCompactionOutputFile() {
} }
CompactionState::Output out; CompactionState::Output out;
out.number = file_number; out.number = file_number;
out.path_id = compact_->compaction->GetOutputPathId(); out.path_id = compact_->compaction->output_path_id();
out.smallest.Clear(); out.smallest.Clear();
out.largest.Clear(); out.largest.Clear();
out.smallest_seqno = out.largest_seqno = 0; out.smallest_seqno = out.largest_seqno = 0;
@ -1198,7 +1198,7 @@ Status CompactionJob::OpenCompactionOutputFile() {
compact_->builder.reset(NewTableBuilder( compact_->builder.reset(NewTableBuilder(
*cfd->ioptions(), cfd->internal_comparator(), *cfd->ioptions(), cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), compact_->outfile.get(), cfd->int_tbl_prop_collector_factories(), compact_->outfile.get(),
compact_->compaction->OutputCompressionType(), compact_->compaction->output_compression(),
cfd->ioptions()->compression_opts, skip_filters)); cfd->ioptions()->compression_opts, skip_filters));
LogFlush(db_options_.info_log); LogFlush(db_options_.info_log);
return s; return s;

View File

@ -76,11 +76,10 @@ class CompactionJobTest : public testing::Test {
largest = internal_key; largest = internal_key;
largest_seqno = sequence_number; largest_seqno = sequence_number;
} }
std::pair<std::string, std::string> key_value( contents.insert({internal_key.Encode().ToString(), value});
{bottommost_internal_key.Encode().ToString(), value});
contents.insert(key_value);
if (i == 1 || k < kKeysPerFile / 2) { if (i == 1 || k < kKeysPerFile / 2) {
expected_results.insert(key_value); expected_results.insert(
{bottommost_internal_key.Encode().ToString(), value});
} }
} }
@ -97,7 +96,7 @@ class CompactionJobTest : public testing::Test {
mutable_cf_options_, &edit, &mutex_); mutable_cf_options_, &edit, &mutex_);
mutex_.Unlock(); mutex_.Unlock();
} }
versions_->SetLastSequence(sequence_number); versions_->SetLastSequence(sequence_number + 1);
return expected_results; return expected_results;
} }
@ -169,9 +168,8 @@ void VerifyInitializationOfCompactionJobStats(
void VerifyCompactionJobStats( void VerifyCompactionJobStats(
const CompactionJobStats& compaction_job_stats, const CompactionJobStats& compaction_job_stats,
const std::vector<FileMetaData*>& files, const std::vector<FileMetaData*>& files,
size_t num_output_files, size_t num_output_files) {
uint64_t min_elapsed_time) { ASSERT_GE(compaction_job_stats.elapsed_micros, 0U);
ASSERT_GE(compaction_job_stats.elapsed_micros, min_elapsed_time);
ASSERT_EQ(compaction_job_stats.num_input_files, files.size()); ASSERT_EQ(compaction_job_stats.num_input_files, files.size());
ASSERT_EQ(compaction_job_stats.num_output_files, num_output_files); ASSERT_EQ(compaction_job_stats.num_output_files, num_output_files);
} }
@ -210,7 +208,6 @@ TEST_F(CompactionJobTest, Simple) {
std::move(yield_callback), &event_logger, false, std::move(yield_callback), &event_logger, false,
db_name, &compaction_job_stats); db_name, &compaction_job_stats);
auto start_micros = Env::Default()->NowMicros();
VerifyInitializationOfCompactionJobStats(compaction_job_stats); VerifyInitializationOfCompactionJobStats(compaction_job_stats);
compaction_job.Prepare(); compaction_job.Prepare();
@ -224,7 +221,7 @@ TEST_F(CompactionJobTest, Simple) {
VerifyCompactionJobStats( VerifyCompactionJobStats(
compaction_job_stats, compaction_job_stats,
files, 1, (Env::Default()->NowMicros() - start_micros) / 2); files, 1);
mock_table_factory_->AssertLatestFile(expected_results); mock_table_factory_->AssertLatestFile(expected_results);
ASSERT_EQ(yield_callback_called, 20000); ASSERT_EQ(yield_callback_called, 20000);

View File

@ -15,6 +15,7 @@
#include <inttypes.h> #include <inttypes.h>
#include <limits> #include <limits>
#include <queue>
#include <string> #include <string>
#include <utility> #include <utility>
@ -37,6 +38,64 @@ uint64_t TotalCompensatedFileSize(const std::vector<FileMetaData*>& files) {
return sum; return sum;
} }
// Used in universal compaction when trivial move is enabled.
// This structure is used for the construction of min heap
// that contains the file meta data, the level of the file
// and the index of the file in that level
struct InputFileInfo {
FileMetaData* f;
size_t level;
size_t index;
};
// Used in universal compaction when trivial move is enabled.
// This comparator is used for the construction of min heap
// based on the smallest key of the file.
struct UserKeyComparator {
explicit UserKeyComparator(const Comparator* ucmp) { ucmp_ = ucmp; }
bool operator()(InputFileInfo i1, InputFileInfo i2) const {
return (ucmp_->Compare(i1.f->smallest.user_key(),
i2.f->smallest.user_key()) > 0);
}
private:
const Comparator* ucmp_;
};
typedef std::priority_queue<InputFileInfo, std::vector<InputFileInfo>,
UserKeyComparator> SmallestKeyHeap;
// This function creates the heap that is used to find if the files are
// overlapping during universal compaction when the allow_trivial_move
// is set.
SmallestKeyHeap create_level_heap(Compaction* c, const Comparator* ucmp) {
SmallestKeyHeap smallest_key_priority_q =
SmallestKeyHeap(UserKeyComparator(ucmp));
InputFileInfo input_file;
for (size_t l = 0; l < c->num_input_levels(); l++) {
if (c->num_input_files(l) != 0) {
if (l == 0 && c->start_level() == 0) {
for (size_t i = 0; i < c->num_input_files(0); i++) {
input_file.f = c->input(0, i);
input_file.level = 0;
input_file.index = i;
smallest_key_priority_q.push(std::move(input_file));
}
} else {
input_file.f = c->input(l, 0);
input_file.level = l;
input_file.index = 0;
smallest_key_priority_q.push(std::move(input_file));
}
}
}
return smallest_key_priority_q;
}
} // anonymous namespace } // anonymous namespace
// Determine compression type, based on user options, level of the output // Determine compression type, based on user options, level of the output
@ -1106,6 +1165,50 @@ void GetSmallestLargestSeqno(const std::vector<FileMetaData*>& files,
} // namespace } // namespace
#endif #endif
// Algorithm that checks to see if there are any overlapping
// files in the input
bool CompactionPicker::IsInputNonOverlapping(Compaction* c) {
auto comparator = icmp_->user_comparator();
int first_iter = 1;
InputFileInfo prev, curr, next;
SmallestKeyHeap smallest_key_priority_q =
create_level_heap(c, icmp_->user_comparator());
while (!smallest_key_priority_q.empty()) {
curr = smallest_key_priority_q.top();
smallest_key_priority_q.pop();
if (first_iter) {
prev = curr;
first_iter = 0;
} else {
if (comparator->Compare(prev.f->largest.user_key(),
curr.f->smallest.user_key()) >= 0) {
// found overlapping files, return false
return false;
}
assert(comparator->Compare(curr.f->largest.user_key(),
prev.f->largest.user_key()) > 0);
prev = curr;
}
next.f = nullptr;
if (curr.level != 0 && curr.index < c->num_input_files(curr.level) - 1) {
next.f = c->input(curr.level, curr.index + 1);
next.level = curr.level;
next.index = curr.index + 1;
}
if (next.f) {
smallest_key_priority_q.push(std::move(next));
}
}
return true;
}
// Universal style of compaction. Pick files that are contiguous in // Universal style of compaction. Pick files that are contiguous in
// time-range to compact. // time-range to compact.
// //
@ -1168,6 +1271,10 @@ Compaction* UniversalCompactionPicker::PickCompaction(
return nullptr; return nullptr;
} }
if (ioptions_.compaction_options_universal.allow_trivial_move == true) {
c->set_is_trivial_move(IsInputNonOverlapping(c));
}
// validate that all the chosen files of L0 are non overlapping in time // validate that all the chosen files of L0 are non overlapping in time
#ifndef NDEBUG #ifndef NDEBUG
SequenceNumber prev_smallest_seqno = 0U; SequenceNumber prev_smallest_seqno = 0U;

View File

@ -105,6 +105,12 @@ class CompactionPicker {
const VersionStorageInfo* vstorage, const VersionStorageInfo* vstorage,
const CompactionOptions& compact_options) const; const CompactionOptions& compact_options) const;
// Used in universal compaction when the enabled_trivial_move
// option is set. Checks whether there are any overlapping files
// in the input. Returns true if the input files are non
// overlapping.
bool IsInputNonOverlapping(Compaction* c);
protected: protected:
int NumberLevels() const { return ioptions_.num_levels; } int NumberLevels() const { return ioptions_.num_levels; }

View File

@ -77,6 +77,8 @@ class CompactionPickerTest : public testing::Test {
f->fd = FileDescriptor(file_number, path_id, file_size); f->fd = FileDescriptor(file_number, path_id, file_size);
f->smallest = InternalKey(smallest, smallest_seq, kTypeValue); f->smallest = InternalKey(smallest, smallest_seq, kTypeValue);
f->largest = InternalKey(largest, largest_seq, kTypeValue); f->largest = InternalKey(largest, largest_seq, kTypeValue);
f->smallest_seqno = smallest_seq;
f->largest_seqno = largest_seq;
f->compensated_file_size = file_size; f->compensated_file_size = file_size;
f->refs = 0; f->refs = 0;
vstorage_->AddFile(level, f); vstorage_->AddFile(level, f);
@ -365,6 +367,64 @@ TEST_F(CompactionPickerTest, NeedsCompactionUniversal) {
vstorage_->CompactionScore(0) >= 1); vstorage_->CompactionScore(0) >= 1);
} }
} }
// Tests if the files can be trivially moved in multi level
// universal compaction when allow_trivial_move option is set
// In this test as the input files overlaps, they cannot
// be trivially moved.
TEST_F(CompactionPickerTest, CannotTrivialMoveUniversal) {
const uint64_t kFileSize = 100000;
ioptions_.compaction_options_universal.allow_trivial_move = true;
NewVersionStorage(1, kCompactionStyleUniversal);
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
// must return false when there's no files.
ASSERT_EQ(universal_compaction_picker.NeedsCompaction(vstorage_.get()),
false);
NewVersionStorage(3, kCompactionStyleUniversal);
Add(0, 1U, "150", "200", kFileSize, 0, 500, 550);
Add(0, 2U, "201", "250", kFileSize, 0, 401, 450);
Add(0, 4U, "260", "300", kFileSize, 0, 260, 300);
Add(1, 5U, "100", "151", kFileSize, 0, 200, 251);
Add(1, 3U, "301", "350", kFileSize, 0, 101, 150);
Add(2, 6U, "120", "200", kFileSize, 0, 20, 100);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(!compaction->is_trivial_move());
}
// Tests if the files can be trivially moved in multi level
// universal compaction when allow_trivial_move option is set
// In this test as the input files doesn't overlaps, they should
// be trivially moved.
TEST_F(CompactionPickerTest, AllowsTrivialMoveUniversal) {
const uint64_t kFileSize = 100000;
ioptions_.compaction_options_universal.allow_trivial_move = true;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
NewVersionStorage(3, kCompactionStyleUniversal);
Add(0, 1U, "150", "200", kFileSize, 0, 500, 550);
Add(0, 2U, "201", "250", kFileSize, 0, 401, 450);
Add(0, 4U, "260", "300", kFileSize, 0, 260, 300);
Add(1, 5U, "010", "080", kFileSize, 0, 200, 251);
Add(2, 3U, "301", "350", kFileSize, 0, 101, 150);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(compaction->is_trivial_move());
}
TEST_F(CompactionPickerTest, NeedsCompactionFIFO) { TEST_F(CompactionPickerTest, NeedsCompactionFIFO) {
NewVersionStorage(1, kCompactionStyleFIFO); NewVersionStorage(1, kCompactionStyleFIFO);
@ -419,6 +479,76 @@ TEST_F(CompactionPickerTest, ParentIndexResetBug) {
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_)); cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
} }
// This test checks ExpandWhileOverlapping() by having overlapping user keys
// ranges (with different sequence numbers) in the input files.
TEST_F(CompactionPickerTest, OverlappingUserKeys) {
NewVersionStorage(6, kCompactionStyleLevel);
Add(1, 1U, "100", "150", 1U);
// Overlapping user keys
Add(1, 2U, "200", "400", 1U);
Add(1, 3U, "400", "500", 1000000000U, 0, 0);
Add(2, 4U, "600", "700", 1U);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1U, compaction->num_input_levels());
ASSERT_EQ(2U, compaction->num_input_files(0));
ASSERT_EQ(2U, compaction->input(0, 0)->fd.GetNumber());
ASSERT_EQ(3U, compaction->input(0, 1)->fd.GetNumber());
}
TEST_F(CompactionPickerTest, OverlappingUserKeys2) {
NewVersionStorage(6, kCompactionStyleLevel);
// Overlapping user keys on same level and output level
Add(1, 1U, "200", "400", 1000000000U);
Add(1, 2U, "400", "500", 1U, 0, 0);
Add(2, 3U, "400", "600", 1U);
// The following file is not in the compaction despite overlapping user keys
Add(2, 4U, "600", "700", 1U, 0, 0);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_levels());
ASSERT_EQ(2U, compaction->num_input_files(0));
ASSERT_EQ(1U, compaction->num_input_files(1));
ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber());
ASSERT_EQ(3U, compaction->input(1, 0)->fd.GetNumber());
}
TEST_F(CompactionPickerTest, OverlappingUserKeys3) {
NewVersionStorage(6, kCompactionStyleLevel);
// Chain of overlapping user key ranges (forces ExpandWhileOverlapping() to
// expand multiple times)
Add(1, 1U, "100", "150", 1U);
Add(1, 2U, "150", "200", 1U, 0, 0);
Add(1, 3U, "200", "250", 1000000000U, 0, 0);
Add(1, 4U, "250", "300", 1U, 0, 0);
Add(1, 5U, "300", "350", 1U, 0, 0);
// Output level overlaps with the beginning and the end of the chain
Add(2, 6U, "050", "100", 1U);
Add(2, 7U, "350", "400", 1U);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_levels());
ASSERT_EQ(5U, compaction->num_input_files(0));
ASSERT_EQ(2U, compaction->num_input_files(1));
ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber());
ASSERT_EQ(3U, compaction->input(0, 2)->fd.GetNumber());
ASSERT_EQ(4U, compaction->input(0, 3)->fd.GetNumber());
ASSERT_EQ(5U, compaction->input(0, 4)->fd.GetNumber());
ASSERT_EQ(6U, compaction->input(1, 0)->fd.GetNumber());
ASSERT_EQ(7U, compaction->input(1, 1)->fd.GetNumber());
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

View File

@ -527,6 +527,25 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
versions_->GetObsoleteFiles(&job_context->sst_delete_files, versions_->GetObsoleteFiles(&job_context->sst_delete_files,
job_context->min_pending_output); job_context->min_pending_output);
uint64_t min_log_number = versions_->MinLogNumber();
if (!alive_log_files_.empty()) {
// find newly obsoleted log files
while (alive_log_files_.begin()->number < min_log_number) {
auto& earliest = *alive_log_files_.begin();
job_context->log_delete_files.push_back(earliest.number);
total_log_size_ -= earliest.size;
alive_log_files_.pop_front();
// Current log should always stay alive since it can't have
// number < MinLogNumber().
assert(alive_log_files_.size());
}
}
// We're just cleaning up for DB::Write().
assert(job_context->logs_to_free.empty());
job_context->logs_to_free = logs_to_free_;
logs_to_free_.clear();
// store the current filenum, lognum, etc // store the current filenum, lognum, etc
job_context->manifest_file_number = versions_->manifest_file_number(); job_context->manifest_file_number = versions_->manifest_file_number();
job_context->pending_manifest_file_number = job_context->pending_manifest_file_number =
@ -1309,17 +1328,6 @@ Status DBImpl::FlushMemTableToOutputFile(
VersionStorageInfo::LevelSummaryStorage tmp; VersionStorageInfo::LevelSummaryStorage tmp;
LogToBuffer(log_buffer, "[%s] Level summary: %s\n", cfd->GetName().c_str(), LogToBuffer(log_buffer, "[%s] Level summary: %s\n", cfd->GetName().c_str(),
cfd->current()->storage_info()->LevelSummary(&tmp)); cfd->current()->storage_info()->LevelSummary(&tmp));
if (disable_delete_obsolete_files_ == 0) {
// add to deletion state
while (alive_log_files_.size() &&
alive_log_files_.begin()->number < versions_->MinLogNumber()) {
const auto& earliest = *alive_log_files_.begin();
job_context->log_delete_files.push_back(earliest.number);
total_log_size_ -= earliest.size;
alive_log_files_.pop_front();
}
}
} }
if (!s.ok() && !s.IsShutdownInProgress() && db_options_.paranoid_checks && if (!s.ok() && !s.IsShutdownInProgress() && db_options_.paranoid_checks &&
@ -1609,7 +1617,7 @@ Status DBImpl::CompactFilesImpl(
assert(c); assert(c);
c->SetInputVersion(version); c->SetInputVersion(version);
// deletion compaction currently not allowed in CompactFiles. // deletion compaction currently not allowed in CompactFiles.
assert(!c->IsDeletionCompaction()); assert(!c->deletion_compaction());
auto yield_callback = [&]() { auto yield_callback = [&]() {
return CallFlushDuringCompaction( return CallFlushDuringCompaction(
@ -1620,7 +1628,7 @@ Status DBImpl::CompactFilesImpl(
CompactionJob compaction_job( CompactionJob compaction_job(
job_context->job_id, c.get(), db_options_, env_options_, versions_.get(), job_context->job_id, c.get(), db_options_, env_options_, versions_.get(),
&shutting_down_, log_buffer, directories_.GetDbDir(), &shutting_down_, log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(c->GetOutputPathId()), stats_, directories_.GetDataDir(c->output_path_id()), stats_,
snapshots_.GetAll(), table_cache_, std::move(yield_callback), snapshots_.GetAll(), table_cache_, std::move(yield_callback),
&event_logger_, c->mutable_cf_options()->paranoid_file_checks, dbname_, &event_logger_, c->mutable_cf_options()->paranoid_file_checks, dbname_,
nullptr); // Here we pass a nullptr for CompactionJobStats because nullptr); // Here we pass a nullptr for CompactionJobStats because
@ -2145,7 +2153,9 @@ void DBImpl::RecordFlushIOStats() {
void DBImpl::BGWorkFlush(void* db) { void DBImpl::BGWorkFlush(void* db) {
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH); IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
TEST_SYNC_POINT("DBImpl::BGWorkFlush");
reinterpret_cast<DBImpl*>(db)->BackgroundCallFlush(); reinterpret_cast<DBImpl*>(db)->BackgroundCallFlush();
TEST_SYNC_POINT("DBImpl::BGWorkFlush:done");
} }
void DBImpl::BGWorkCompaction(void* db) { void DBImpl::BGWorkCompaction(void* db) {
@ -2238,10 +2248,6 @@ void DBImpl::BackgroundCallFlush() {
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
// We're just cleaning up for DB::Write()
job_context.logs_to_free = logs_to_free_;
logs_to_free_.clear();
// If flush failed, we want to delete all temporary files that we might have // If flush failed, we want to delete all temporary files that we might have
// created. Thus, we force full scan in FindObsoleteFiles() // created. Thus, we force full scan in FindObsoleteFiles()
FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress()); FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress());
@ -2308,10 +2314,6 @@ void DBImpl::BackgroundCallCompaction() {
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
// We're just cleaning up for DB::Write()
job_context.logs_to_free = logs_to_free_;
logs_to_free_.clear();
// If compaction failed, we want to delete all temporary files that we might // If compaction failed, we want to delete all temporary files that we might
// have created (they might not be all recorded in job_context in case of a // have created (they might not be all recorded in job_context in case of a
// failure). Thus, we force full scan in FindObsoleteFiles() // failure). Thus, we force full scan in FindObsoleteFiles()
@ -2502,7 +2504,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
if (!c) { if (!c) {
// Nothing to do // Nothing to do
LogToBuffer(log_buffer, "Compaction nothing to do"); LogToBuffer(log_buffer, "Compaction nothing to do");
} else if (c->IsDeletionCompaction()) { } else if (c->deletion_compaction()) {
// TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old // TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old
// file if there is alive snapshot pointing to it // file if there is alive snapshot pointing to it
assert(c->num_input_files(1) == 0); assert(c->num_input_files(1) == 0);
@ -2536,8 +2538,12 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
// Move files to next level // Move files to next level
int32_t moved_files = 0; int32_t moved_files = 0;
int64_t moved_bytes = 0; int64_t moved_bytes = 0;
for (size_t i = 0; i < c->num_input_files(0); i++) { for (unsigned int l = 0; l < c->num_input_levels(); l++) {
FileMetaData* f = c->input(0, i); if (l == static_cast<unsigned int>(c->output_level())) {
continue;
}
for (size_t i = 0; i < c->num_input_files(l); i++) {
FileMetaData* f = c->input(l, i);
c->edit()->DeleteFile(c->level(), f->fd.GetNumber()); c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
c->edit()->AddFile(c->output_level(), f->fd.GetNumber(), c->edit()->AddFile(c->output_level(), f->fd.GetNumber(),
f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest, f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest,
@ -2546,11 +2552,13 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
LogToBuffer(log_buffer, LogToBuffer(log_buffer,
"[%s] Moving #%" PRIu64 " to level-%d %" PRIu64 " bytes\n", "[%s] Moving #%" PRIu64 " to level-%d %" PRIu64 " bytes\n",
c->column_family_data()->GetName().c_str(), f->fd.GetNumber(), c->column_family_data()->GetName().c_str(),
c->output_level(), f->fd.GetFileSize()); f->fd.GetNumber(), c->output_level(), f->fd.GetFileSize());
++moved_files; ++moved_files;
moved_bytes += f->fd.GetFileSize(); moved_bytes += f->fd.GetFileSize();
} }
}
status = versions_->LogAndApply(c->column_family_data(), status = versions_->LogAndApply(c->column_family_data(),
*c->mutable_cf_options(), c->edit(), *c->mutable_cf_options(), c->edit(),
&mutex_, directories_.GetDbDir()); &mutex_, directories_.GetDbDir());
@ -2589,7 +2597,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
CompactionJob compaction_job( CompactionJob compaction_job(
job_context->job_id, c.get(), db_options_, env_options_, job_context->job_id, c.get(), db_options_, env_options_,
versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(), versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(c->GetOutputPathId()), stats_, directories_.GetDataDir(c->output_path_id()), stats_,
snapshots_.GetAll(), table_cache_, std::move(yield_callback), snapshots_.GetAll(), table_cache_, std::move(yield_callback),
&event_logger_, c->mutable_cf_options()->paranoid_file_checks, &event_logger_, c->mutable_cf_options()->paranoid_file_checks,
dbname_, &compaction_job_stats); dbname_, &compaction_job_stats);

View File

@ -290,6 +290,8 @@ class DBImpl : public DB {
size_t TEST_LogsToFreeSize(); size_t TEST_LogsToFreeSize();
uint64_t TEST_LogfileNumber();
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
// Returns the list of live files in 'live' and the list // Returns the list of live files in 'live' and the list

View File

@ -148,5 +148,10 @@ size_t DBImpl::TEST_LogsToFreeSize() {
return logs_to_free_.size(); return logs_to_free_.size();
} }
uint64_t DBImpl::TEST_LogfileNumber() {
InstrumentedMutexLock l(&mutex_);
return logfile_number_;
}
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

View File

@ -350,9 +350,6 @@ void DBIter::MergeValuesNewToOld() {
void DBIter::Prev() { void DBIter::Prev() {
assert(valid_); assert(valid_);
if (direction_ == kForward) { if (direction_ == kForward) {
if (!iter_->Valid()) {
iter_->SeekToLast();
}
FindPrevUserKey(); FindPrevUserKey();
direction_ = kReverse; direction_ = kReverse;
} }
@ -556,7 +553,7 @@ void DBIter::FindNextUserKey() {
ParsedInternalKey ikey; ParsedInternalKey ikey;
FindParseableKey(&ikey, kForward); FindParseableKey(&ikey, kForward);
while (iter_->Valid() && while (iter_->Valid() &&
user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) { user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) != 0) {
iter_->Next(); iter_->Next();
FindParseableKey(&ikey, kForward); FindParseableKey(&ikey, kForward);
} }
@ -571,7 +568,7 @@ void DBIter::FindPrevUserKey() {
ParsedInternalKey ikey; ParsedInternalKey ikey;
FindParseableKey(&ikey, kReverse); FindParseableKey(&ikey, kReverse);
while (iter_->Valid() && while (iter_->Valid() &&
user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) >= 0) { user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) == 0) {
if (num_skipped >= max_skip_) { if (num_skipped >= max_skip_) {
num_skipped = 0; num_skipped = 0;
IterKey last_key; IterKey last_key;

View File

@ -1668,7 +1668,9 @@ TEST_F(DBIteratorTest, DBIterator8) {
ASSERT_EQ(db_iter->value().ToString(), "0"); ASSERT_EQ(db_iter->value().ToString(), "0");
} }
TEST_F(DBIteratorTest, DBIterator9) { // TODO(3.13): fix the issue of Seek() then Prev() which might not necessary
// return the biggest element smaller than the seek key.
TEST_F(DBIteratorTest, DISABLED_DBIterator9) {
Options options; Options options;
options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
{ {
@ -1716,7 +1718,9 @@ TEST_F(DBIteratorTest, DBIterator9) {
} }
} }
TEST_F(DBIteratorTest, DBIterator10) { // TODO(3.13): fix the issue of Seek() then Prev() which might not necessary
// return the biggest element smaller than the seek key.
TEST_F(DBIteratorTest, DISABLED_DBIterator10) {
Options options; Options options;
TestIterator* internal_iter = new TestIterator(BytewiseComparator()); TestIterator* internal_iter = new TestIterator(BytewiseComparator());

View File

@ -4534,7 +4534,50 @@ TEST_P(DBTestUniversalCompactionMultiLevels, UniversalCompactionMultiLevels) {
ASSERT_EQ(Get(1, Key(i % num_keys)), Key(i)); ASSERT_EQ(Get(1, Key(i % num_keys)), Key(i));
} }
} }
// Tests universal compaction with trivial move enabled
TEST_P(DBTestUniversalCompactionMultiLevels, UniversalCompactionTrivialMove) {
int32_t trivial_move = 0;
int32_t non_trivial_move = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:TrivialMove",
[&](void* arg) { trivial_move++; });
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:NonTrivial",
[&](void* arg) { non_trivial_move++; });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Options options;
options.compaction_style = kCompactionStyleUniversal;
options.compaction_options_universal.allow_trivial_move = true;
options.num_levels = 3;
options.write_buffer_size = 100 << 10; // 100KB
options.level0_file_num_compaction_trigger = 3;
options.max_background_compactions = 1;
options.target_file_size_base = 32 * 1024;
options = CurrentOptions(options);
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
// Trigger compaction if size amplification exceeds 110%
options.compaction_options_universal.max_size_amplification_percent = 110;
options = CurrentOptions(options);
ReopenWithColumnFamilies({"default", "pikachu"}, options);
Random rnd(301);
int num_keys = 15000;
for (int i = 0; i < num_keys; i++) {
ASSERT_OK(Put(1, Key(i), Key(i)));
}
std::vector<std::string> values;
ASSERT_OK(Flush(1));
dbfull()->TEST_WaitForCompact();
ASSERT_GT(trivial_move, 0);
ASSERT_EQ(non_trivial_move, 0);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
INSTANTIATE_TEST_CASE_P(DBTestUniversalCompactionMultiLevels, INSTANTIATE_TEST_CASE_P(DBTestUniversalCompactionMultiLevels,
DBTestUniversalCompactionMultiLevels, DBTestUniversalCompactionMultiLevels,
::testing::Values(3, 20)); ::testing::Values(3, 20));
@ -8669,28 +8712,78 @@ TEST_F(DBTest, TransactionLogIteratorCorruptedLog) {
// //
// Test WAL recovery for the various modes available // Test WAL recovery for the various modes available
// TODO krad:
// 1. Add tests when there are more than one log file
// //
class RecoveryTestHelper { class RecoveryTestHelper {
public: public:
// Recreate and fill the store with some data
static size_t FillData(DBTest* test, const Options& options) {
size_t count = 0;
test->DestroyAndReopen(options); // Number of WAL files to generate
static const int kWALFilesCount = 10;
// Starting number for the WAL file name like 00010.log
static const int kWALFileOffset = 10;
// Keys to be written per WAL file
static const int kKeysPerWALFile = 1024;
// Size of the value
static const int kValueSize = 10;
for (int i = 0; i < 1024; i++) { // Create WAL files with values filled in
test->Put("key" + ToString(i), test->DummyString(10)); static void FillData(DBTest* test, Options& options,
++count; const size_t wal_count, size_t & count) {
DBOptions & db_options = options;
count = 0;
shared_ptr<Cache> table_cache = NewLRUCache(50000, 16);
EnvOptions env_options;
WriteBuffer write_buffer(db_options.db_write_buffer_size);
unique_ptr<VersionSet> versions;
unique_ptr<WalManager> wal_manager;
WriteController write_controller;
versions.reset(new VersionSet(test->dbname_, &db_options, env_options,
table_cache.get(), &write_buffer,
&write_controller));
wal_manager.reset(new WalManager(db_options, env_options));
std::unique_ptr<log::Writer> current_log_writer;
for (size_t j = kWALFileOffset; j < wal_count + kWALFileOffset; j++) {
uint64_t current_log_number = j;
std::string fname = LogFileName(test->dbname_, current_log_number);
unique_ptr<WritableFile> file;
ASSERT_OK(db_options.env->NewWritableFile(fname, &file, env_options));
current_log_writer.reset(new log::Writer(std::move(file)));
for (int i = 0; i < kKeysPerWALFile; i++) {
std::string key = "key" + ToString(count++);
std::string value = test->DummyString(kValueSize);
assert(current_log_writer.get() != nullptr);
uint64_t seq = versions->LastSequence() + 1;
WriteBatch batch;
batch.Put(key, value);
WriteBatchInternal::SetSequence(&batch, seq);
current_log_writer->AddRecord(WriteBatchInternal::Contents(&batch));
versions->SetLastSequence(seq);
} }
}
}
// Recreate and fill the store with some data
static size_t FillData(DBTest* test, Options& options) {
options.create_if_missing = true;
test->DestroyAndReopen(options);
test->Close();
size_t count = 0;
FillData(test, options, kWALFilesCount, count);
return count; return count;
} }
// Read back all the keys we wrote and return the number of keys found // Read back all the keys we wrote and return the number of keys found
static size_t GetData(DBTest* test) { static size_t GetData(DBTest* test) {
size_t count = 0; size_t count = 0;
for (size_t i = 0; i < 1024; i++) { for (size_t i = 0; i < kWALFilesCount * kKeysPerWALFile; i++) {
if (test->Get("key" + ToString(i)) != "NOT_FOUND") { if (test->Get("key" + ToString(i)) != "NOT_FOUND") {
++count; ++count;
} }
@ -8698,6 +8791,30 @@ class RecoveryTestHelper {
return count; return count;
} }
// Manuall corrupt the specified WAL
static void CorruptWAL(DBTest * test, Options& options,
const double off, const double len,
const int wal_file_id, const bool trunc = false) {
Env* env = options.env;
std::string fname = LogFileName(test->dbname_, wal_file_id);
uint64_t size;
ASSERT_OK(env->GetFileSize(fname, &size));
ASSERT_GT(size, 0);
#ifdef OS_WIN
// Windows disk cache behaves differently. When we truncate
// the original content is still in the cache due to the original
// handle is still open. Generally, in Windows, one prohibits
// shared access to files and it is not needed for WAL but we allow
// it to induce corruption at various tests.
test->Close();
#endif
if (trunc) {
ASSERT_EQ(0, truncate(fname.c_str(), size * off));
} else {
InduceCorruption(fname, size * off, size * len);
}
}
// Overwrite data with 'a' from offset for length len // Overwrite data with 'a' from offset for length len
static void InduceCorruption(const std::string& filename, uint32_t offset, static void InduceCorruption(const std::string& filename, uint32_t offset,
uint32_t len) { uint32_t len) {
@ -8714,32 +8831,6 @@ class RecoveryTestHelper {
close(fd); close(fd);
} }
// Corrupt the last WAL file from (filesize * off) for length (filesize * len)
static void CorruptWAL(DBTest* test, const double off, const double len,
const bool trunc = false) {
rocksdb::VectorLogPtr wal_files;
ASSERT_OK(test->dbfull()->GetSortedWalFiles(wal_files));
ASSERT_EQ(wal_files.size(), 1);
const auto logfile_path =
test->dbname_ + "/" + wal_files.front()->PathName();
auto size = wal_files.front()->SizeFileBytes();
#ifdef OS_WIN
// Windows disk cache behaves differently. When we truncate
// the original content is still in the cache due to the original
// handle is still open. Generally, in Windows, one prohibits
// shared access to files and it is not needed for WAL but we allow
// it to induce corruption at various tests.
test->Close();
#endif
if (trunc) {
ASSERT_EQ(0, truncate(logfile_path.c_str(), size * off));
} else {
InduceCorruption(logfile_path, size * off, size * len);
}
}
}; };
// Test scope: // Test scope:
@ -8747,18 +8838,23 @@ class RecoveryTestHelper {
// at the end of any of the logs // at the end of any of the logs
// - We do not expect to open the data store for corruption // - We do not expect to open the data store for corruption
TEST_F(DBTest, kTolerateCorruptedTailRecords) { TEST_F(DBTest, kTolerateCorruptedTailRecords) {
for (auto trunc : {true, false}) { const int jstart = RecoveryTestHelper::kWALFileOffset;
for (int i = 0; i < 4; i++) { const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
for (auto trunc : {true, false}) { /* Corruption style */
for (int i = 0; i < 4; i++) { /* Corruption offset position */
for (int j = jstart; j < jend; j++) { /* WAL file */
// Fill data for testing // Fill data for testing
Options options = CurrentOptions(); Options options = CurrentOptions();
const size_t row_count = RecoveryTestHelper::FillData(this, options); const size_t row_count = RecoveryTestHelper::FillData(this, options);
// test checksum failure or parsing // test checksum failure or parsing
RecoveryTestHelper::CorruptWAL(this, i * .3, /*len%=*/.1, trunc); RecoveryTestHelper::CorruptWAL(this, options, /*off=*/ i * .3,
/*len%=*/ .1, /*wal=*/ j, trunc);
if (trunc) { if (trunc) {
options.wal_recovery_mode = options.wal_recovery_mode =
WALRecoveryMode::kTolerateCorruptedTailRecords; WALRecoveryMode::kTolerateCorruptedTailRecords;
options.create_if_missing = false;
ASSERT_OK(TryReopen(options)); ASSERT_OK(TryReopen(options));
const size_t recovered_row_count = RecoveryTestHelper::GetData(this); const size_t recovered_row_count = RecoveryTestHelper::GetData(this);
ASSERT_TRUE(i == 0 || recovered_row_count > 0); ASSERT_TRUE(i == 0 || recovered_row_count > 0);
@ -8770,56 +8866,76 @@ TEST_F(DBTest, kTolerateCorruptedTailRecords) {
} }
} }
} }
}
} }
// Test scope: // Test scope:
// We don't expect the data store to be opened if there is any corruption // We don't expect the data store to be opened if there is any corruption
// (leading, middle or trailing -- incomplete writes or corruption) // (leading, middle or trailing -- incomplete writes or corruption)
TEST_F(DBTest, kAbsoluteConsistency) { TEST_F(DBTest, kAbsoluteConsistency) {
const int jstart = RecoveryTestHelper::kWALFileOffset;
const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
// Verify clean slate behavior
Options options = CurrentOptions(); Options options = CurrentOptions();
const size_t row_count = RecoveryTestHelper::FillData(this, options); const size_t row_count = RecoveryTestHelper::FillData(this, options);
options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency; options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
options.create_if_missing = false;
ASSERT_OK(TryReopen(options)); ASSERT_OK(TryReopen(options));
ASSERT_EQ(RecoveryTestHelper::GetData(this), row_count); ASSERT_EQ(RecoveryTestHelper::GetData(this), row_count);
for (auto trunc : {true, false}) { for (auto trunc : {true, false}) { /* Corruption style */
for (int i = 0; i < 4; i++) { for (int i = 0; i < 4; i++) { /* Corruption offset position */
if (trunc && i == 0) { if (trunc && i == 0) {
continue; continue;
} }
options = CurrentOptions();
RecoveryTestHelper::FillData(this, options);
RecoveryTestHelper::CorruptWAL(this, i * .3, /*len%=*/.1, trunc); for (int j = jstart; j < jend; j++) { /* wal files */
// fill with new date
RecoveryTestHelper::FillData(this, options);
// corrupt the wal
RecoveryTestHelper::CorruptWAL(this, options, /*off=*/ i * .3,
/*len%=*/.1, j, trunc);
// verify
options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency; options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
options.create_if_missing = false;
ASSERT_NOK(TryReopen(options)); ASSERT_NOK(TryReopen(options));
} }
} }
}
} }
// Test scope: // Test scope:
// - We expect to open data store under all circumstances // - We expect to open data store under all circumstances
// - We expect only data upto the point where the first error was encountered // - We expect only data upto the point where the first error was encountered
TEST_F(DBTest, kPointInTimeRecovery) { TEST_F(DBTest, kPointInTimeRecovery) {
for (auto trunc : {true, false}) { const int jstart = RecoveryTestHelper::kWALFileOffset;
for (int i = 0; i < 4; i++) { const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
const int maxkeys = RecoveryTestHelper::kWALFilesCount *
RecoveryTestHelper::kKeysPerWALFile;
for (auto trunc : {true, false}) { /* Corruption style */
for (int i = 0; i < 4; i++) { /* Offset of corruption */
for (int j = jstart; j < jend; j++) { /* WAL file */
// Fill data for testing // Fill data for testing
Options options = CurrentOptions(); Options options = CurrentOptions();
const size_t row_count = RecoveryTestHelper::FillData(this, options); const size_t row_count = RecoveryTestHelper::FillData(this, options);
// test checksum failure or parsing // Corrupt the wal
RecoveryTestHelper::CorruptWAL(this, i * .3, /*len%=*/.1, trunc); RecoveryTestHelper::CorruptWAL(this, options, /*off=*/ i * .3,
/*len%=*/.1, j, trunc);
// Verify
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
options.create_if_missing = false;
ASSERT_OK(TryReopen(options)); ASSERT_OK(TryReopen(options));
// Probe data for invariants
size_t recovered_row_count = RecoveryTestHelper::GetData(this); size_t recovered_row_count = RecoveryTestHelper::GetData(this);
ASSERT_LT(recovered_row_count, row_count); ASSERT_LT(recovered_row_count, row_count);
// verify that the keys are sequential and there is no break
bool expect_data = true; bool expect_data = true;
for (size_t j = 0; j < 1024; ++j) { for (size_t k = 0; k < maxkeys; ++k) {
bool found = Get("key" + ToString(i)) != "NOT_FOUND"; bool found = Get("key" + ToString(i)) != "NOT_FOUND";
if (expect_data && !found) { if (expect_data && !found) {
expect_data = false; expect_data = false;
@ -8827,8 +8943,15 @@ TEST_F(DBTest, kPointInTimeRecovery) {
ASSERT_EQ(found, expect_data); ASSERT_EQ(found, expect_data);
} }
ASSERT_TRUE(i != 0 || recovered_row_count == 0); const size_t min = RecoveryTestHelper::kKeysPerWALFile *
ASSERT_TRUE(i != 1 || recovered_row_count < row_count / 2); (j - RecoveryTestHelper::kWALFileOffset);
ASSERT_GE(recovered_row_count, min);
if (!trunc && i != 0) {
const size_t max = RecoveryTestHelper::kKeysPerWALFile *
(j - RecoveryTestHelper::kWALFileOffset + 1);
ASSERT_LE(recovered_row_count, max);
}
}
} }
} }
} }
@ -8837,17 +8960,26 @@ TEST_F(DBTest, kPointInTimeRecovery) {
// - We expect to open the data store under all scenarios // - We expect to open the data store under all scenarios
// - We expect to have recovered records past the corruption zone // - We expect to have recovered records past the corruption zone
TEST_F(DBTest, kSkipAnyCorruptedRecords) { TEST_F(DBTest, kSkipAnyCorruptedRecords) {
for (auto trunc : {true, false}) { const int jstart = RecoveryTestHelper::kWALFileOffset;
for (int i = 0; i < 4; i++) { const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
for (auto trunc : {true, false}) { /* Corruption style */
for (int i = 0; i < 4; i++) { /* Corruption offset */
for (int j = jstart; j < jend; j++) { /* wal files */
// Fill data for testing // Fill data for testing
Options options = CurrentOptions(); Options options = CurrentOptions();
const size_t row_count = RecoveryTestHelper::FillData(this, options); const size_t row_count = RecoveryTestHelper::FillData(this, options);
// induce leading corruption // Corrupt the WAL
RecoveryTestHelper::CorruptWAL(this, i * .3, /*len%=*/.1, trunc); RecoveryTestHelper::CorruptWAL(this, options, /*off=*/ i * .3,
/*len%=*/.1, j, trunc);
// Verify behavior
options.wal_recovery_mode = WALRecoveryMode::kSkipAnyCorruptedRecords; options.wal_recovery_mode = WALRecoveryMode::kSkipAnyCorruptedRecords;
options.create_if_missing = false;
ASSERT_OK(TryReopen(options)); ASSERT_OK(TryReopen(options));
// Probe data for invariants
size_t recovered_row_count = RecoveryTestHelper::GetData(this); size_t recovered_row_count = RecoveryTestHelper::GetData(this);
ASSERT_LT(recovered_row_count, row_count); ASSERT_LT(recovered_row_count, row_count);
@ -8856,6 +8988,7 @@ TEST_F(DBTest, kSkipAnyCorruptedRecords) {
} }
} }
} }
}
} }
TEST_F(DBTest, TransactionLogIteratorBatchOperations) { TEST_F(DBTest, TransactionLogIteratorBatchOperations) {
@ -11274,10 +11407,10 @@ TEST_F(DBTest, ThreadStatusSingleCompaction) {
{"CompactionJob::Run():Start", "DBTest::ThreadStatusSingleCompaction:1"}, {"CompactionJob::Run():Start", "DBTest::ThreadStatusSingleCompaction:1"},
{"DBTest::ThreadStatusSingleCompaction:2", "CompactionJob::Run():End"}, {"DBTest::ThreadStatusSingleCompaction:2", "CompactionJob::Run():End"},
}); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
for (int tests = 0; tests < 2; ++tests) { for (int tests = 0; tests < 2; ++tests) {
DestroyAndReopen(options); DestroyAndReopen(options);
rocksdb::SyncPoint::GetInstance()->ClearTrace();
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Random rnd(301); Random rnd(301);
// The Put Phase. // The Put Phase.
@ -11309,8 +11442,8 @@ TEST_F(DBTest, ThreadStatusSingleCompaction) {
// repeat the test with disabling thread tracking. // repeat the test with disabling thread tracking.
options.enable_thread_tracking = false; options.enable_thread_tracking = false;
}
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
} }
TEST_F(DBTest, PreShutdownManualCompaction) { TEST_F(DBTest, PreShutdownManualCompaction) {
@ -12183,7 +12316,7 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) {
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) { "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
Compaction* compaction = reinterpret_cast<Compaction*>(arg); Compaction* compaction = reinterpret_cast<Compaction*>(arg);
if (compaction->output_level() == 4) { if (compaction->output_level() == 4) {
ASSERT_TRUE(compaction->OutputCompressionType() == kLZ4Compression); ASSERT_TRUE(compaction->output_compression() == kLZ4Compression);
num_lz4.fetch_add(1); num_lz4.fetch_add(1);
} }
}); });
@ -12218,10 +12351,10 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) {
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) { "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
Compaction* compaction = reinterpret_cast<Compaction*>(arg); Compaction* compaction = reinterpret_cast<Compaction*>(arg);
if (compaction->output_level() == 4 && compaction->start_level() == 3) { if (compaction->output_level() == 4 && compaction->start_level() == 3) {
ASSERT_TRUE(compaction->OutputCompressionType() == kZlibCompression); ASSERT_TRUE(compaction->output_compression() == kZlibCompression);
num_zlib.fetch_add(1); num_zlib.fetch_add(1);
} else { } else {
ASSERT_TRUE(compaction->OutputCompressionType() == kLZ4Compression); ASSERT_TRUE(compaction->output_compression() == kLZ4Compression);
num_lz4.fetch_add(1); num_lz4.fetch_add(1);
} }
}); });
@ -12696,6 +12829,7 @@ TEST_F(DBTest, DontDeletePendingOutputs) {
dbfull()->FindObsoleteFiles(&job_context, true /*force*/); dbfull()->FindObsoleteFiles(&job_context, true /*force*/);
dbfull()->TEST_UnlockMutex(); dbfull()->TEST_UnlockMutex();
dbfull()->PurgeObsoleteFiles(job_context); dbfull()->PurgeObsoleteFiles(job_context);
job_context.Clean();
}; };
env_->table_write_callback_ = &purge_obsolete_files_function; env_->table_write_callback_ = &purge_obsolete_files_function;
@ -14137,7 +14271,9 @@ TEST_F(DBTest, RowCache) {
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 1); ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 1);
} }
TEST_F(DBTest, PrevAfterMerge) { // TODO(3.13): fix the issue of Seek() + Prev() which might not necessary
// return the biggest key which is smaller than the seek key.
TEST_F(DBTest, DISABLED_PrevAfterMerge) {
Options options; Options options;
options.create_if_missing = true; options.create_if_missing = true;
options.merge_operator = MergeOperators::CreatePutOperator(); options.merge_operator = MergeOperators::CreatePutOperator();
@ -14160,6 +14296,40 @@ TEST_F(DBTest, PrevAfterMerge) {
ASSERT_EQ("1", it->key().ToString()); ASSERT_EQ("1", it->key().ToString());
} }
TEST_F(DBTest, DeletingOldWalAfterDrop) {
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{ { "Test:AllowFlushes", "DBImpl::BGWorkFlush" },
{ "DBImpl::BGWorkFlush:done", "Test:WaitForFlush"} });
rocksdb::SyncPoint::GetInstance()->ClearTrace();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
Options options = CurrentOptions();
options.max_total_wal_size = 8192;
options.compression = kNoCompression;
options.write_buffer_size = 1 << 20;
options.level0_file_num_compaction_trigger = (1<<30);
options.level0_slowdown_writes_trigger = (1<<30);
options.level0_stop_writes_trigger = (1<<30);
options.disable_auto_compactions = true;
DestroyAndReopen(options);
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
CreateColumnFamilies({"cf1", "cf2"}, options);
ASSERT_OK(Put(0, "key1", DummyString(8192)));
ASSERT_OK(Put(0, "key2", DummyString(8192)));
// the oldest wal should now be getting_flushed
ASSERT_OK(db_->DropColumnFamily(handles_[0]));
// all flushes should now do nothing because their CF is dropped
TEST_SYNC_POINT("Test:AllowFlushes");
TEST_SYNC_POINT("Test:WaitForFlush");
uint64_t lognum1 = dbfull()->TEST_LogfileNumber();
ASSERT_OK(Put(1, "key3", DummyString(8192)));
ASSERT_OK(Put(1, "key4", DummyString(8192)));
// new wal should have been created
uint64_t lognum2 = dbfull()->TEST_LogfileNumber();
EXPECT_GT(lognum2, lognum1);
}
} // namespace rocksdb } // namespace rocksdb
#endif #endif

View File

@ -169,6 +169,7 @@ void ForwardIterator::Cleanup(bool release_sv) {
if (job_context.HaveSomethingToDelete()) { if (job_context.HaveSomethingToDelete()) {
db_->PurgeObsoleteFiles(job_context); db_->PurgeObsoleteFiles(job_context);
} }
job_context.Clean();
} }
} }
} }

View File

@ -83,6 +83,10 @@ struct JobContext {
new_superversion = create_superversion ? new SuperVersion() : nullptr; new_superversion = create_superversion ? new SuperVersion() : nullptr;
} }
// For non-empty JobContext Clean() has to be called at least once before
// before destruction (see asserts in ~JobContext()). Should be called with
// unlocked DB mutex. Destructor doesn't call Clean() to avoid accidentally
// doing potentially slow Clean() with locked DB mutex.
void Clean() { void Clean() {
// free pending memtables // free pending memtables
for (auto m : memtables_to_free) { for (auto m : memtables_to_free) {
@ -109,6 +113,7 @@ struct JobContext {
assert(memtables_to_free.size() == 0); assert(memtables_to_free.size() == 0);
assert(superversions_to_free.size() == 0); assert(superversions_to_free.size() == 0);
assert(new_superversion == nullptr); assert(new_superversion == nullptr);
assert(logs_to_free.size() == 0);
} }
}; };

View File

@ -612,7 +612,9 @@ class VersionSet {
uint64_t MinLogNumber() const { uint64_t MinLogNumber() const {
uint64_t min_log_num = std::numeric_limits<uint64_t>::max(); uint64_t min_log_num = std::numeric_limits<uint64_t>::max();
for (auto cfd : *column_family_set_) { for (auto cfd : *column_family_set_) {
if (min_log_num > cfd->GetLogNumber()) { // It's safe to ignore dropped column families here:
// cfd->IsDropped() becomes true after the drop is persisted in MANIFEST.
if (min_log_num > cfd->GetLogNumber() && !cfd->IsDropped()) {
min_log_num = cfd->GetLogNumber(); min_log_num = cfd->GetLogNumber();
} }
} }

View File

@ -622,6 +622,7 @@ enum InfoLogLevel : unsigned char {
WARN_LEVEL, WARN_LEVEL,
ERROR_LEVEL, ERROR_LEVEL,
FATAL_LEVEL, FATAL_LEVEL,
HEADER_LEVEL,
NUM_INFO_LOG_LEVELS, NUM_INFO_LOG_LEVELS,
}; };

View File

@ -69,6 +69,11 @@ class CompactionOptionsUniversal {
// Default: kCompactionStopStyleTotalSize // Default: kCompactionStopStyleTotalSize
CompactionStopStyle stop_style; CompactionStopStyle stop_style;
// Option to optimize the universal multi level compaction by enabling
// trivial move for non overlapping files.
// Default: false
bool allow_trivial_move;
// Default set of parameters // Default set of parameters
CompactionOptionsUniversal() CompactionOptionsUniversal()
: size_ratio(1), : size_ratio(1),
@ -76,7 +81,8 @@ class CompactionOptionsUniversal {
max_merge_width(UINT_MAX), max_merge_width(UINT_MAX),
max_size_amplification_percent(200), max_size_amplification_percent(200),
compression_size_percent(-1), compression_size_percent(-1),
stop_style(kCompactionStopStyleTotalSize) {} stop_style(kCompactionStopStyleTotalSize),
allow_trivial_move(false) {}
}; };
} // namespace rocksdb } // namespace rocksdb

View File

@ -9,8 +9,8 @@
#include "table/merger.h" #include "table/merger.h"
#include <vector>
#include <queue> #include <queue>
#include <vector>
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
@ -215,6 +215,12 @@ class MergingIterator : public Iterator {
} }
} }
direction_ = kReverse; direction_ = kReverse;
// Note that we don't do assert(current_ == CurrentReverse()) here
// because it is possible to have some keys larger than the seek-key
// inserted between Seek() and SeekToLast(), which makes current_ not
// equal to CurrentReverse().
//
// assert(current_ == CurrentReverse());
} }
current_->Prev(); current_->Prev();

View File

@ -261,28 +261,29 @@ TEST_F(AutoRollLoggerTest, InfoLogLevel) {
// becomes out of scope. // becomes out of scope.
{ {
AutoRollLogger logger(Env::Default(), kTestDir, "", log_size, 0); AutoRollLogger logger(Env::Default(), kTestDir, "", log_size, 0);
for (int log_level = InfoLogLevel::FATAL_LEVEL; for (int log_level = InfoLogLevel::HEADER_LEVEL;
log_level >= InfoLogLevel::DEBUG_LEVEL; log_level--) { log_level >= InfoLogLevel::DEBUG_LEVEL; log_level--) {
logger.SetInfoLogLevel((InfoLogLevel)log_level); logger.SetInfoLogLevel((InfoLogLevel)log_level);
for (int log_type = InfoLogLevel::DEBUG_LEVEL; for (int log_type = InfoLogLevel::DEBUG_LEVEL;
log_type <= InfoLogLevel::FATAL_LEVEL; log_type++) { log_type <= InfoLogLevel::HEADER_LEVEL; log_type++) {
// log messages with log level smaller than log_level will not be // log messages with log level smaller than log_level will not be
// logged. // logged.
LogMessage((InfoLogLevel)log_type, &logger, kSampleMessage.c_str()); LogMessage((InfoLogLevel)log_type, &logger, kSampleMessage.c_str());
} }
log_lines += InfoLogLevel::FATAL_LEVEL - log_level + 1; log_lines += InfoLogLevel::HEADER_LEVEL - log_level + 1;
} }
for (int log_level = InfoLogLevel::FATAL_LEVEL; for (int log_level = InfoLogLevel::HEADER_LEVEL;
log_level >= InfoLogLevel::DEBUG_LEVEL; log_level--) { log_level >= InfoLogLevel::DEBUG_LEVEL; log_level--) {
logger.SetInfoLogLevel((InfoLogLevel)log_level); logger.SetInfoLogLevel((InfoLogLevel)log_level);
// again, messages with level smaller than log_level will not be logged. // again, messages with level smaller than log_level will not be logged.
Log(InfoLogLevel::HEADER_LEVEL, &logger, "%s", kSampleMessage.c_str());
Debug(&logger, "%s", kSampleMessage.c_str()); Debug(&logger, "%s", kSampleMessage.c_str());
Info(&logger, "%s", kSampleMessage.c_str()); Info(&logger, "%s", kSampleMessage.c_str());
Warn(&logger, "%s", kSampleMessage.c_str()); Warn(&logger, "%s", kSampleMessage.c_str());
Error(&logger, "%s", kSampleMessage.c_str()); Error(&logger, "%s", kSampleMessage.c_str());
Fatal(&logger, "%s", kSampleMessage.c_str()); Fatal(&logger, "%s", kSampleMessage.c_str());
log_lines += InfoLogLevel::FATAL_LEVEL - log_level + 1; log_lines += InfoLogLevel::HEADER_LEVEL - log_level + 1;
} }
} }
std::ifstream inFile(AutoRollLoggerTest::kLogFile.c_str()); std::ifstream inFile(AutoRollLoggerTest::kLogFile.c_str());
@ -336,19 +337,31 @@ TEST_F(AutoRollLoggerTest, LogHeaderTest) {
static const size_t LOG_MAX_SIZE = 1024 * 5; static const size_t LOG_MAX_SIZE = 1024 * 5;
static const std::string HEADER_STR = "Log header line"; static const std::string HEADER_STR = "Log header line";
// test_num == 0 -> standard call to Header()
// test_num == 1 -> call to Log() with InfoLogLevel::HEADER_LEVEL
for (int test_num = 0; test_num < 2; test_num++) {
InitTestDb(); InitTestDb();
AutoRollLogger logger(Env::Default(), kTestDir, /*db_log_dir=*/ "", AutoRollLogger logger(Env::Default(), kTestDir, /*db_log_dir=*/ "",
LOG_MAX_SIZE, /*log_file_time_to_roll=*/ 0); LOG_MAX_SIZE, /*log_file_time_to_roll=*/ 0);
// log some headers if (test_num == 0) {
// Log some headers explicitly using Header()
for (size_t i = 0; i < MAX_HEADERS; i++) { for (size_t i = 0; i < MAX_HEADERS; i++) {
Header(&logger, "%s %d", HEADER_STR.c_str(), i); Header(&logger, "%s %d", HEADER_STR.c_str(), i);
} }
} else if (test_num == 1) {
// HEADER_LEVEL should make this behave like calling Header()
for (size_t i = 0; i < MAX_HEADERS; i++) {
Log(InfoLogLevel::HEADER_LEVEL, &logger, "%s %d",
HEADER_STR.c_str(), i);
}
}
const string& newfname = logger.TEST_log_fname().c_str(); const string& newfname = logger.TEST_log_fname().c_str();
// log enough data to cause a roll over // Log enough data to cause a roll over
int i = 0; int i = 0;
for (size_t iter = 0; iter < 2; iter++) { for (size_t iter = 0; iter < 2; iter++) {
while (logger.GetLogFileSize() < LOG_MAX_SIZE) { while (logger.GetLogFileSize() < LOG_MAX_SIZE) {
@ -359,7 +372,7 @@ TEST_F(AutoRollLoggerTest, LogHeaderTest) {
Info(&logger, "Rollover"); Info(&logger, "Rollover");
} }
// Flus the log for the latest file // Flush the log for the latest file
LogFlush(&logger); LogFlush(&logger);
const list<string> oldfiles = GetOldFileNames(newfname); const list<string> oldfiles = GetOldFileNames(newfname);
@ -372,6 +385,7 @@ TEST_F(AutoRollLoggerTest, LogHeaderTest) {
// verify that the old log contains all the header logs // verify that the old log contains all the header logs
ASSERT_EQ(GetLinesCount(oldfname, HEADER_STR), MAX_HEADERS); ASSERT_EQ(GetLinesCount(oldfname, HEADER_STR), MAX_HEADERS);
} }
}
} }
TEST_F(AutoRollLoggerTest, LogFileExistence) { TEST_F(AutoRollLoggerTest, LogFileExistence) {

View File

@ -61,7 +61,13 @@ void Log(const InfoLogLevel log_level, Logger* info_log, const char* format,
if (info_log && info_log->GetInfoLogLevel() <= log_level) { if (info_log && info_log->GetInfoLogLevel() <= log_level) {
va_list ap; va_list ap;
va_start(ap, format); va_start(ap, format);
if (log_level == InfoLogLevel::HEADER_LEVEL) {
info_log->LogHeader(format, ap);
} else {
info_log->Logv(log_level, format, ap); info_log->Logv(log_level, format, ap);
}
va_end(ap); va_end(ap);
} }
} }

View File

@ -487,26 +487,6 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env,
copy_file_buffer_size_(kDefaultCopyFileBufferSize), copy_file_buffer_size_(kDefaultCopyFileBufferSize),
read_only_(read_only) { read_only_(read_only) {
// set up threads perform copies from files_to_copy_ in the background
for (int t = 0; t < options_.max_background_operations; t++) {
threads_.emplace_back([&]() {
CopyWorkItem work_item;
while (files_to_copy_.read(work_item)) {
CopyResult result;
result.status = CopyFile(work_item.src_path,
work_item.dst_path,
work_item.src_env,
work_item.dst_env,
work_item.sync,
work_item.rate_limiter,
&result.size,
&result.checksum_value,
work_item.size_limit);
work_item.result.set_value(std::move(result));
}
});
}
if (read_only_) { if (read_only_) {
Log(options_.info_log, "Starting read_only backup engine"); Log(options_.info_log, "Starting read_only backup engine");
} }
@ -626,6 +606,27 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env,
if (!read_only_) { if (!read_only_) {
PutLatestBackupFileContents(latest_backup_id_); // Ignore errors PutLatestBackupFileContents(latest_backup_id_); // Ignore errors
} }
// set up threads perform copies from files_to_copy_ in the background
for (int t = 0; t < options_.max_background_operations; t++) {
threads_.emplace_back([&]() {
CopyWorkItem work_item;
while (files_to_copy_.read(work_item)) {
CopyResult result;
result.status = CopyFile(work_item.src_path,
work_item.dst_path,
work_item.src_env,
work_item.dst_env,
work_item.sync,
work_item.rate_limiter,
&result.size,
&result.checksum_value,
work_item.size_limit);
work_item.result.set_value(std::move(result));
}
});
}
Log(options_.info_log, "Initialized BackupEngine"); Log(options_.info_log, "Initialized BackupEngine");
} }