Allow ingesting overlapping files (#5539)

Summary:
Currently IngestExternalFile() fails when its input files' ranges overlap. This condition doesn't need to hold for files that are to be ingested in L0, though.

This commit allows overlapping files and forces their target level to L0.

Additionally, ingest job's completion is logged to EventLogger, analogous to flush and compaction jobs.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5539

Differential Revision: D17370660

Pulled By: riversand963

fbshipit-source-id: 749a3899b17d1be267a5afd5b0a99d96b38ab2f3
This commit is contained in:
Igor Canadi 2019-09-13 14:48:18 -07:00 committed by Facebook Github Bot
parent 83a6a614e9
commit 97631357aa
4 changed files with 108 additions and 26 deletions

View File

@ -3763,9 +3763,9 @@ Status DBImpl::IngestExternalFiles(
std::vector<ExternalSstFileIngestionJob> ingestion_jobs; std::vector<ExternalSstFileIngestionJob> ingestion_jobs;
for (const auto& arg : args) { for (const auto& arg : args) {
auto* cfd = static_cast<ColumnFamilyHandleImpl*>(arg.column_family)->cfd(); auto* cfd = static_cast<ColumnFamilyHandleImpl*>(arg.column_family)->cfd();
ingestion_jobs.emplace_back(env_, versions_.get(), cfd, ingestion_jobs.emplace_back(
immutable_db_options_, env_options_, env_, versions_.get(), cfd, immutable_db_options_, env_options_,
&snapshots_, arg.options, &directories_); &snapshots_, arg.options, &directories_, &event_logger_);
} }
std::vector<std::pair<bool, Status>> exec_results; std::vector<std::pair<bool, Status>> exec_results;
for (size_t i = 0; i != num_cfs; ++i) { for (size_t i = 0; i != num_cfs; ++i) {
@ -3895,19 +3895,21 @@ Status DBImpl::IngestExternalFiles(
} }
} }
if (status.ok()) { if (status.ok()) {
bool should_increment_last_seqno = int consumed_seqno_count =
ingestion_jobs[0].ShouldIncrementLastSequence(); ingestion_jobs[0].ConsumedSequenceNumbersCount();
#ifndef NDEBUG #ifndef NDEBUG
for (size_t i = 1; i != num_cfs; ++i) { for (size_t i = 1; i != num_cfs; ++i) {
assert(should_increment_last_seqno == assert(!!consumed_seqno_count ==
ingestion_jobs[i].ShouldIncrementLastSequence()); !!ingestion_jobs[i].ConsumedSequenceNumbersCount());
consumed_seqno_count +=
ingestion_jobs[i].ConsumedSequenceNumbersCount();
} }
#endif #endif
if (should_increment_last_seqno) { if (consumed_seqno_count > 0) {
const SequenceNumber last_seqno = versions_->LastSequence(); const SequenceNumber last_seqno = versions_->LastSequence();
versions_->SetLastAllocatedSequence(last_seqno + 1); versions_->SetLastAllocatedSequence(last_seqno + consumed_seqno_count);
versions_->SetLastPublishedSequence(last_seqno + 1); versions_->SetLastPublishedSequence(last_seqno + consumed_seqno_count);
versions_->SetLastSequence(last_seqno + 1); versions_->SetLastSequence(last_seqno + consumed_seqno_count);
} }
autovector<ColumnFamilyData*> cfds_to_commit; autovector<ColumnFamilyData*> cfds_to_commit;
autovector<const MutableCFOptions*> mutable_cf_options_list; autovector<const MutableCFOptions*> mutable_cf_options_list;

View File

@ -1070,6 +1070,47 @@ TEST_P(ExternalSSTFileBasicTest, IngestExternalFileWithCorruptedPropsBlock) {
} while (ChangeOptionsForFileIngestionTest()); } while (ChangeOptionsForFileIngestionTest());
} }
TEST_F(ExternalSSTFileBasicTest, OverlappingFiles) {
Options options = CurrentOptions();
std::vector<std::string> files;
{
SstFileWriter sst_file_writer(EnvOptions(), options);
std::string file1 = sst_files_dir_ + "file1.sst";
ASSERT_OK(sst_file_writer.Open(file1));
ASSERT_OK(sst_file_writer.Put("a", "z"));
ASSERT_OK(sst_file_writer.Put("i", "m"));
ExternalSstFileInfo file1_info;
ASSERT_OK(sst_file_writer.Finish(&file1_info));
files.push_back(std::move(file1));
}
{
SstFileWriter sst_file_writer(EnvOptions(), options);
std::string file2 = sst_files_dir_ + "file2.sst";
ASSERT_OK(sst_file_writer.Open(file2));
ASSERT_OK(sst_file_writer.Put("i", "k"));
ExternalSstFileInfo file2_info;
ASSERT_OK(sst_file_writer.Finish(&file2_info));
files.push_back(std::move(file2));
}
IngestExternalFileOptions ifo;
ASSERT_OK(db_->IngestExternalFile(files, ifo));
ASSERT_EQ(Get("a"), "z");
ASSERT_EQ(Get("i"), "k");
int total_keys = 0;
Iterator* iter = db_->NewIterator(ReadOptions());
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
total_keys++;
}
delete iter;
ASSERT_EQ(total_keys, 2);
ASSERT_EQ(2, NumTableFilesAtLevel(0));
}
INSTANTIATE_TEST_CASE_P(ExternalSSTFileBasicTest, ExternalSSTFileBasicTest, INSTANTIATE_TEST_CASE_P(ExternalSSTFileBasicTest, ExternalSSTFileBasicTest,
testing::Values(std::make_tuple(true, true), testing::Values(std::make_tuple(true, true),
std::make_tuple(true, false), std::make_tuple(true, false),

View File

@ -71,11 +71,16 @@ Status ExternalSstFileIngestionJob::Prepare(
for (size_t i = 0; i < num_files - 1; i++) { for (size_t i = 0; i < num_files - 1; i++) {
if (sstableKeyCompare(ucmp, sorted_files[i]->largest_internal_key, if (sstableKeyCompare(ucmp, sorted_files[i]->largest_internal_key,
sorted_files[i + 1]->smallest_internal_key) >= 0) { sorted_files[i + 1]->smallest_internal_key) >= 0) {
return Status::NotSupported("Files have overlapping ranges"); files_overlap_ = true;
break;
} }
} }
} }
if (ingestion_options_.ingest_behind && files_overlap_) {
return Status::NotSupported("Files have overlapping ranges");
}
for (IngestedFileInfo& f : files_to_ingest_) { for (IngestedFileInfo& f : files_to_ingest_) {
if (f.num_entries == 0 && f.num_range_deletions == 0) { if (f.num_entries == 0 && f.num_range_deletions == 0) {
return Status::InvalidArgument("File contain no entries"); return Status::InvalidArgument("File contain no entries");
@ -212,7 +217,7 @@ Status ExternalSstFileIngestionJob::Run() {
} }
// It is safe to use this instead of LastAllocatedSequence since we are // It is safe to use this instead of LastAllocatedSequence since we are
// the only active writer, and hence they are equal // the only active writer, and hence they are equal
const SequenceNumber last_seqno = versions_->LastSequence(); SequenceNumber last_seqno = versions_->LastSequence();
edit_.SetColumnFamily(cfd_->GetID()); edit_.SetColumnFamily(cfd_->GetID());
// The levels that the files will be ingested into // The levels that the files will be ingested into
@ -222,8 +227,8 @@ Status ExternalSstFileIngestionJob::Run() {
status = CheckLevelForIngestedBehindFile(&f); status = CheckLevelForIngestedBehindFile(&f);
} else { } else {
status = AssignLevelAndSeqnoForIngestedFile( status = AssignLevelAndSeqnoForIngestedFile(
super_version, force_global_seqno, cfd_->ioptions()->compaction_style, super_version, force_global_seqno, cfd_->ioptions()->compaction_style,
&f, &assigned_seqno); last_seqno, &f, &assigned_seqno);
} }
if (!status.ok()) { if (!status.ok()) {
return status; return status;
@ -231,8 +236,10 @@ Status ExternalSstFileIngestionJob::Run() {
status = AssignGlobalSeqnoForIngestedFile(&f, assigned_seqno); status = AssignGlobalSeqnoForIngestedFile(&f, assigned_seqno);
TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run", TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run",
&assigned_seqno); &assigned_seqno);
if (assigned_seqno == last_seqno + 1) { if (assigned_seqno > last_seqno) {
consumed_seqno_ = true; assert(assigned_seqno == last_seqno + 1);
last_seqno = assigned_seqno;
++consumed_seqno_count_;
} }
if (!status.ok()) { if (!status.ok()) {
return status; return status;
@ -250,6 +257,13 @@ void ExternalSstFileIngestionJob::UpdateStats() {
uint64_t total_keys = 0; uint64_t total_keys = 0;
uint64_t total_l0_files = 0; uint64_t total_l0_files = 0;
uint64_t total_time = env_->NowMicros() - job_start_time_; uint64_t total_time = env_->NowMicros() - job_start_time_;
EventLoggerStream stream = event_logger_->Log();
stream << "event"
<< "ingest_finished";
stream << "files_ingested";
stream.StartArray();
for (IngestedFileInfo& f : files_to_ingest_) { for (IngestedFileInfo& f : files_to_ingest_) {
InternalStats::CompactionStats stats(CompactionReason::kExternalSstIngestion, 1); InternalStats::CompactionStats stats(CompactionReason::kExternalSstIngestion, 1);
stats.micros = total_time; stats.micros = total_time;
@ -277,7 +291,18 @@ void ExternalSstFileIngestionJob::UpdateStats() {
"(global_seqno=%" PRIu64 ")\n", "(global_seqno=%" PRIu64 ")\n",
f.external_file_path.c_str(), f.picked_level, f.external_file_path.c_str(), f.picked_level,
f.internal_file_path.c_str(), f.assigned_seqno); f.internal_file_path.c_str(), f.assigned_seqno);
stream << "file" << f.internal_file_path << "level" << f.picked_level;
} }
stream.EndArray();
stream << "lsm_state";
stream.StartArray();
auto vstorage = cfd_->current()->storage_info();
for (int level = 0; level < vstorage->num_levels(); ++level) {
stream << vstorage->NumLevelFiles(level);
}
stream.EndArray();
cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_KEYS_TOTAL, cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_KEYS_TOTAL,
total_keys); total_keys);
cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_FILES_TOTAL, cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_FILES_TOTAL,
@ -301,7 +326,8 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) {
f.internal_file_path.c_str(), s.ToString().c_str()); f.internal_file_path.c_str(), s.ToString().c_str());
} }
} }
consumed_seqno_ = false; consumed_seqno_count_ = 0;
files_overlap_ = false;
} else if (status.ok() && ingestion_options_.move_files) { } else if (status.ok() && ingestion_options_.move_files) {
// The files were moved and added successfully, remove original file links // The files were moved and added successfully, remove original file links
for (IngestedFileInfo& f : files_to_ingest_) { for (IngestedFileInfo& f : files_to_ingest_) {
@ -479,13 +505,13 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile( Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
SuperVersion* sv, bool force_global_seqno, CompactionStyle compaction_style, SuperVersion* sv, bool force_global_seqno, CompactionStyle compaction_style,
IngestedFileInfo* file_to_ingest, SequenceNumber* assigned_seqno) { SequenceNumber last_seqno, IngestedFileInfo* file_to_ingest,
SequenceNumber* assigned_seqno) {
Status status; Status status;
*assigned_seqno = 0; *assigned_seqno = 0;
const SequenceNumber last_seqno = versions_->LastSequence();
if (force_global_seqno) { if (force_global_seqno) {
*assigned_seqno = last_seqno + 1; *assigned_seqno = last_seqno + 1;
if (compaction_style == kCompactionStyleUniversal) { if (compaction_style == kCompactionStyleUniversal || files_overlap_) {
file_to_ingest->picked_level = 0; file_to_ingest->picked_level = 0;
return status; return status;
} }
@ -547,6 +573,12 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
target_level = lvl; target_level = lvl;
} }
} }
// If files overlap, we have to ingest them at level 0 and assign the newest
// sequence number
if (files_overlap_) {
target_level = 0;
*assigned_seqno = last_seqno + 1;
}
TEST_SYNC_POINT_CALLBACK( TEST_SYNC_POINT_CALLBACK(
"ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile", "ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile",
&overlap_with_db); &overlap_with_db);

View File

@ -12,6 +12,7 @@
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/internal_stats.h" #include "db/internal_stats.h"
#include "db/snapshot_impl.h" #include "db/snapshot_impl.h"
#include "logging/event_logger.h"
#include "options/db_options.h" #include "options/db_options.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
@ -71,7 +72,7 @@ class ExternalSstFileIngestionJob {
const ImmutableDBOptions& db_options, const EnvOptions& env_options, const ImmutableDBOptions& db_options, const EnvOptions& env_options,
SnapshotList* db_snapshots, SnapshotList* db_snapshots,
const IngestExternalFileOptions& ingestion_options, const IngestExternalFileOptions& ingestion_options,
Directories* directories) Directories* directories, EventLogger* event_logger)
: env_(env), : env_(env),
versions_(versions), versions_(versions),
cfd_(cfd), cfd_(cfd),
@ -80,8 +81,9 @@ class ExternalSstFileIngestionJob {
db_snapshots_(db_snapshots), db_snapshots_(db_snapshots),
ingestion_options_(ingestion_options), ingestion_options_(ingestion_options),
directories_(directories), directories_(directories),
event_logger_(event_logger),
job_start_time_(env_->NowMicros()), job_start_time_(env_->NowMicros()),
consumed_seqno_(false) { consumed_seqno_count_(0) {
assert(directories != nullptr); assert(directories != nullptr);
} }
@ -116,8 +118,8 @@ class ExternalSstFileIngestionJob {
return files_to_ingest_; return files_to_ingest_;
} }
// Whether to increment VersionSet's seqno after this job runs // How many sequence numbers did we consume as part of the ingest job?
bool ShouldIncrementLastSequence() const { return consumed_seqno_; } int ConsumedSequenceNumbersCount() const { return consumed_seqno_count_; }
private: private:
// Open the external file and populate `file_to_ingest` with all the // Open the external file and populate `file_to_ingest` with all the
@ -132,6 +134,7 @@ class ExternalSstFileIngestionJob {
Status AssignLevelAndSeqnoForIngestedFile(SuperVersion* sv, Status AssignLevelAndSeqnoForIngestedFile(SuperVersion* sv,
bool force_global_seqno, bool force_global_seqno,
CompactionStyle compaction_style, CompactionStyle compaction_style,
SequenceNumber last_seqno,
IngestedFileInfo* file_to_ingest, IngestedFileInfo* file_to_ingest,
SequenceNumber* assigned_seqno); SequenceNumber* assigned_seqno);
@ -163,9 +166,13 @@ class ExternalSstFileIngestionJob {
autovector<IngestedFileInfo> files_to_ingest_; autovector<IngestedFileInfo> files_to_ingest_;
const IngestExternalFileOptions& ingestion_options_; const IngestExternalFileOptions& ingestion_options_;
Directories* directories_; Directories* directories_;
EventLogger* event_logger_;
VersionEdit edit_; VersionEdit edit_;
uint64_t job_start_time_; uint64_t job_start_time_;
bool consumed_seqno_; int consumed_seqno_count_;
// Set in ExternalSstFileIngestionJob::Prepare(), if true all files are
// ingested in L0
bool files_overlap_{false};
}; };
} // namespace rocksdb } // namespace rocksdb