From a7a04b6898be9e62c16042b9a89df3774291be77 Mon Sep 17 00:00:00 2001 From: Levi Tamasi Date: Mon, 26 Oct 2020 13:50:03 -0700 Subject: [PATCH] Integrate BlobFileBuilder into the compaction process (#7573) Summary: Similarly to how https://github.com/facebook/rocksdb/issues/7345 integrated blob file writing into the flush process, the patch adds support for writing blob files to the compaction logic. Namely, if `enable_blob_files` is set, large values encountered during compaction are extracted to blob files and replaced with blob indexes. The resulting blob files are then logged to the MANIFEST as part of the compaction job's `VersionEdit` and added to the `Version` alongside any table files written by the compaction. Any errors during blob file building fail the compaction job. There will be a separate follow-up patch to perform blob garbage collection during compactions. In addition, the patch continues to chip away at the mess around computing various compaction related statistics by eliminating some code duplication and by making the `num_output_files` and `bytes_written` stats more consistent for flushes, compactions, and recovery. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7573 Test Plan: `make check` Reviewed By: riversand963 Differential Revision: D24404696 Pulled By: ltamasi fbshipit-source-id: 21216af3a172ad3ce8f85d11cd30923784ae426c --- db/builder.cc | 17 ++- db/compaction/compaction_job.cc | 202 ++++++++++++++++++------- db/compaction/compaction_job.h | 5 +- db/compaction/compaction_job_test.cc | 2 +- db/db_compaction_test.cc | 175 +++++++++++++++++++++ db/db_flush_test.cc | 26 +++- db/db_impl/db_impl_compaction_flush.cc | 16 +- db/db_impl/db_impl_open.cc | 16 +- db/db_wal_test.cc | 12 +- db/flush_job.cc | 16 +- 10 files changed, 388 insertions(+), 99 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index 90cfbbffb..206ff257f 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -212,6 +212,7 @@ Status BuildTable( } else if (!c_iter.status().ok()) { s = c_iter.status(); } + if (s.ok()) { auto range_del_it = range_del_agg->NewIterator(); for (range_del_it->SeekToFirst(); range_del_it->Valid(); @@ -222,10 +223,6 @@ Status BuildTable( meta->UpdateBoundariesForRange(kv.first, tombstone.SerializeEndKey(), tombstone.seq_, internal_comparator); } - - if (blob_file_builder) { - s = blob_file_builder->Finish(); - } } TEST_SYNC_POINT("BuildTable:BeforeFinishBuildTable"); @@ -273,6 +270,14 @@ Status BuildTable( s = *io_status; } + if (blob_file_builder) { + if (s.ok()) { + s = blob_file_builder->Finish(); + } + + blob_file_builder.reset(); + } + // TODO Also check the IO status when create the Iterator. if (s.ok() && !empty) { @@ -318,6 +323,8 @@ Status BuildTable( } if (!s.ok() || meta->fd.GetFileSize() == 0) { + TEST_SYNC_POINT("BuildTable:BeforeDeleteFile"); + constexpr IODebugContext* dbg = nullptr; Status ignored = fs->DeleteFile(fname, IOOptions(), dbg); @@ -330,8 +337,6 @@ Status BuildTable( ignored = fs->DeleteFile(blob_file_path, IOOptions(), dbg); ignored.PermitUncheckedError(); } - - blob_file_additions->clear(); } } diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index bcb90a2d1..6485c3de2 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -20,6 +20,8 @@ #include #include +#include "db/blob/blob_file_addition.h" +#include "db/blob/blob_file_builder.h" #include "db/builder.h" #include "db/db_impl/db_impl.h" #include "db/db_iter.h" @@ -138,6 +140,7 @@ struct CompactionJob::SubcompactionState { // State kept for output being generated std::vector outputs; + std::vector blob_file_additions; std::unique_ptr outfile; std::unique_ptr builder; @@ -231,21 +234,13 @@ struct CompactionJob::CompactionState { std::vector sub_compact_states; Status status; - uint64_t total_bytes; - uint64_t num_output_records; + size_t num_output_files = 0; + uint64_t total_bytes = 0; + size_t num_blob_output_files = 0; + uint64_t total_blob_bytes = 0; + uint64_t num_output_records = 0; - explicit CompactionState(Compaction* c) - : compaction(c), - total_bytes(0), - num_output_records(0) {} - - size_t NumOutputFiles() { - size_t total = 0; - for (auto& s : sub_compact_states) { - total += s.outputs.size(); - } - return total; - } + explicit CompactionState(Compaction* c) : compaction(c) {} Slice SmallestUserKey() { for (const auto& sub_compact_state : sub_compact_states) { @@ -272,11 +267,29 @@ struct CompactionJob::CompactionState { }; void CompactionJob::AggregateStatistics() { + assert(compact_); + for (SubcompactionState& sc : compact_->sub_compact_states) { + auto& outputs = sc.outputs; + + if (!outputs.empty() && !outputs.back().meta.fd.file_size) { + // An error occurred, so ignore the last output. + outputs.pop_back(); + } + + compact_->num_output_files += outputs.size(); compact_->total_bytes += sc.total_bytes; + + const auto& blobs = sc.blob_file_additions; + + compact_->num_blob_output_files += blobs.size(); + + for (const auto& blob : blobs) { + compact_->total_blob_bytes += blob.GetTotalBlobBytes(); + } + compact_->num_output_records += sc.num_output_records; - } - for (SubcompactionState& sc : compact_->sub_compact_states) { + compaction_job_stats_->Add(sc.compaction_job_stats); } } @@ -286,7 +299,8 @@ CompactionJob::CompactionJob( const FileOptions& file_options, VersionSet* versions, const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer, - FSDirectory* db_directory, FSDirectory* output_directory, Statistics* stats, + FSDirectory* db_directory, FSDirectory* output_directory, + FSDirectory* blob_output_directory, Statistics* stats, InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler, std::vector existing_snapshots, SequenceNumber earliest_write_conflict_snapshot, @@ -317,6 +331,7 @@ CompactionJob::CompactionJob( log_buffer_(log_buffer), db_directory_(db_directory), output_directory_(output_directory), + blob_output_directory_(blob_output_directory), stats_(stats), db_mutex_(db_mutex), db_error_handler_(db_error_handler), @@ -604,18 +619,34 @@ Status CompactionJob::Run() { // Check if any thread encountered an error during execution Status status; IOStatus io_s; + bool wrote_new_blob_files = false; + for (const auto& state : compact_->sub_compact_states) { if (!state.status.ok()) { status = state.status; io_s = state.io_status; break; } + + if (!state.blob_file_additions.empty()) { + wrote_new_blob_files = true; + } } + if (io_status_.ok()) { io_status_ = io_s; } - if (status.ok() && output_directory_) { - io_s = output_directory_->Fsync(IOOptions(), nullptr); + if (status.ok()) { + constexpr IODebugContext* dbg = nullptr; + + if (output_directory_) { + io_s = output_directory_->Fsync(IOOptions(), dbg); + } + + if (io_s.ok() && wrote_new_blob_files && blob_output_directory_ && + blob_output_directory_ != output_directory_) { + io_s = blob_output_directory_->Fsync(IOOptions(), dbg); + } } if (io_status_.ok()) { io_status_ = io_s; @@ -721,6 +752,7 @@ Status CompactionJob::Run() { // Finish up all book-keeping to unify the subcompaction results AggregateStatistics(); UpdateCompactionStats(); + RecordCompactionIOStats(); LogFlush(db_options_.info_log); TEST_SYNC_POINT("CompactionJob::Run():End"); @@ -730,11 +762,16 @@ Status CompactionJob::Run() { } Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { + assert(compact_); + AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_INSTALL); db_mutex_->AssertHeld(); Status status = compact_->status; + ColumnFamilyData* cfd = compact_->compaction->column_family_data(); + assert(cfd); + cfd->internal_stats()->AddCompactionStats( compact_->compaction->output_level(), thread_pri_, compaction_stats_); @@ -744,6 +781,7 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { if (!versions_->io_status().ok()) { io_status_ = versions_->io_status(); } + VersionStorageInfo::LevelSummaryStorage tmp; auto vstorage = cfd->current()->storage_info(); const auto& stats = compaction_stats_; @@ -768,6 +806,8 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { stats.bytes_written / static_cast(stats.micros); } + const std::string& column_family_name = cfd->GetName(); + ROCKS_LOG_BUFFER( log_buffer_, "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, " @@ -775,8 +815,9 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) " "write-amplify(%.1f) %s, records in: %" PRIu64 ", records dropped: %" PRIu64 " output_compression: %s\n", - cfd->GetName().c_str(), vstorage->LevelSummary(&tmp), bytes_read_per_sec, - bytes_written_per_sec, compact_->compaction->output_level(), + column_family_name.c_str(), vstorage->LevelSummary(&tmp), + bytes_read_per_sec, bytes_written_per_sec, + compact_->compaction->output_level(), stats.num_input_files_in_non_output_levels, stats.num_input_files_in_output_level, stats.num_output_files, stats.bytes_read_non_output_levels / 1048576.0, @@ -787,6 +828,15 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { CompressionTypeToString(compact_->compaction->output_compression()) .c_str()); + const auto& blob_files = vstorage->GetBlobFiles(); + if (!blob_files.empty()) { + ROCKS_LOG_BUFFER(log_buffer_, + "[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64 + "\n", + column_family_name.c_str(), blob_files.begin()->first, + blob_files.rbegin()->first); + } + UpdateCompactionJobStats(stats); auto stream = event_logger_->LogToBuffer(log_buffer_); @@ -795,11 +845,18 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { << "compaction_time_micros" << stats.micros << "compaction_time_cpu_micros" << stats.cpu_micros << "output_level" << compact_->compaction->output_level() << "num_output_files" - << compact_->NumOutputFiles() << "total_output_size" - << compact_->total_bytes << "num_input_records" - << stats.num_input_records << "num_output_records" - << compact_->num_output_records << "num_subcompactions" - << compact_->sub_compact_states.size() << "output_compression" + << compact_->num_output_files << "total_output_size" + << compact_->total_bytes; + + if (compact_->num_blob_output_files > 0) { + stream << "num_blob_output_files" << compact_->num_blob_output_files + << "total_blob_output_size" << compact_->total_blob_bytes; + } + + stream << "num_input_records" << stats.num_input_records + << "num_output_records" << compact_->num_output_records + << "num_subcompactions" << compact_->sub_compact_states.size() + << "output_compression" << CompressionTypeToString(compact_->compaction->output_compression()); stream << "num_single_delete_mismatches" @@ -823,12 +880,18 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { } stream.EndArray(); + if (!blob_files.empty()) { + stream << "blob_file_head" << blob_files.begin()->first; + stream << "blob_file_tail" << blob_files.rbegin()->first; + } + CleanupCompaction(); return status; } void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { - assert(sub_compact != nullptr); + assert(sub_compact); + assert(sub_compact->compaction); uint64_t prev_cpu_micros = env_->NowCPUNanos() / 1000; @@ -899,6 +962,22 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { snapshot_checker_, compact_->compaction->level(), db_options_.statistics.get()); + const MutableCFOptions* mutable_cf_options = + sub_compact->compaction->mutable_cf_options(); + assert(mutable_cf_options); + + std::vector blob_file_paths; + + std::unique_ptr blob_file_builder( + mutable_cf_options->enable_blob_files + ? new BlobFileBuilder( + versions_, env_, fs_.get(), + sub_compact->compaction->immutable_cf_options(), + mutable_cf_options, &file_options_, job_id_, cfd->GetID(), + cfd->GetName(), Env::IOPriority::IO_LOW, write_hint_, + &blob_file_paths, &sub_compact->blob_file_additions) + : nullptr); + TEST_SYNC_POINT("CompactionJob::Run():Inprogress"); TEST_SYNC_POINT_CALLBACK( "CompactionJob::Run():PausingManualCompaction:1", @@ -921,7 +1000,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { &existing_snapshots_, earliest_write_conflict_snapshot_, snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), /*expect_valid_internal_key=*/true, &range_del_agg, - /* blob_file_builder */ nullptr, db_options_.allow_data_in_errors, + blob_file_builder.get(), db_options_.allow_data_in_errors, sub_compact->compaction, compaction_filter, shutting_down_, preserve_deletes_seqnum_, manual_compaction_paused_, db_options_.info_log)); @@ -1093,6 +1172,14 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats); } + if (blob_file_builder) { + if (status.ok()) { + status = blob_file_builder->Finish(); + } + + blob_file_builder.reset(); + } + sub_compact->compaction_job_stats.cpu_micros = env_->NowCPUNanos() / 1000 - prev_cpu_micros; @@ -1479,9 +1566,13 @@ Status CompactionJob::FinishCompactionOutputFile( Status CompactionJob::InstallCompactionResults( const MutableCFOptions& mutable_cf_options) { + assert(compact_); + db_mutex_->AssertHeld(); auto* compaction = compact_->compaction; + assert(compaction); + // paranoia: verify that the files that we started with // still exist in the current version and in the same original level. // This ensures that a concurrent compaction did not erroneously @@ -1497,23 +1588,32 @@ Status CompactionJob::InstallCompactionResults( { Compaction::InputLevelSummaryBuffer inputs_summary; - ROCKS_LOG_INFO( - db_options_.info_log, "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes", - compaction->column_family_data()->GetName().c_str(), job_id_, - compaction->InputLevelSummary(&inputs_summary), compact_->total_bytes); + ROCKS_LOG_INFO(db_options_.info_log, + "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes", + compaction->column_family_data()->GetName().c_str(), job_id_, + compaction->InputLevelSummary(&inputs_summary), + compact_->total_bytes + compact_->total_blob_bytes); } + VersionEdit* const edit = compaction->edit(); + assert(edit); + // Add compaction inputs - compaction->AddInputDeletions(compact_->compaction->edit()); + compaction->AddInputDeletions(edit); for (const auto& sub_compact : compact_->sub_compact_states) { for (const auto& out : sub_compact.outputs) { - compaction->edit()->AddFile(compaction->output_level(), out.meta); + edit->AddFile(compaction->output_level(), out.meta); + } + + for (const auto& blob : sub_compact.blob_file_additions) { + edit->AddBlobFile(blob); } } + return versions_->LogAndApply(compaction->column_family_data(), - mutable_cf_options, compaction->edit(), - db_mutex_, db_directory_); + mutable_cf_options, edit, db_mutex_, + db_directory_); } void CompactionJob::RecordCompactionIOStats() { @@ -1689,6 +1789,8 @@ void CopyPrefix(const Slice& src, size_t prefix_length, std::string* dst) { #endif // !ROCKSDB_LITE void CompactionJob::UpdateCompactionStats() { + assert(compact_); + Compaction* compaction = compact_->compaction; compaction_stats_.num_input_files_in_non_output_levels = 0; compaction_stats_.num_input_files_in_output_level = 0; @@ -1706,27 +1808,15 @@ void CompactionJob::UpdateCompactionStats() { } } - uint64_t num_output_records = 0; + compaction_stats_.num_output_files = + static_cast(compact_->num_output_files) + + static_cast(compact_->num_blob_output_files); + compaction_stats_.bytes_written = + compact_->total_bytes + compact_->total_blob_bytes; - for (const auto& sub_compact : compact_->sub_compact_states) { - size_t num_output_files = sub_compact.outputs.size(); - if (sub_compact.builder != nullptr) { - // An error occurred so ignore the last output. - assert(num_output_files > 0); - --num_output_files; - } - compaction_stats_.num_output_files += static_cast(num_output_files); - - num_output_records += sub_compact.num_output_records; - - for (const auto& out : sub_compact.outputs) { - compaction_stats_.bytes_written += out.meta.fd.file_size; - } - } - - if (compaction_stats_.num_input_records > num_output_records) { + if (compaction_stats_.num_input_records > compact_->num_output_records) { compaction_stats_.num_dropped_records = - compaction_stats_.num_input_records - num_output_records; + compaction_stats_.num_input_records - compact_->num_output_records; } } @@ -1765,7 +1855,7 @@ void CompactionJob::UpdateCompactionJobStats( compaction_job_stats_->num_output_records = compact_->num_output_records; compaction_job_stats_->num_output_files = stats.num_output_files; - if (compact_->NumOutputFiles() > 0U) { + if (stats.num_output_files > 0) { CopyPrefix(compact_->SmallestUserKey(), CompactionJobStats::kMaxPrefixLength, &compaction_job_stats_->smallest_output_key_prefix); diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index aafad8d3a..2c36b408d 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -68,8 +68,8 @@ class CompactionJob { const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer, FSDirectory* db_directory, FSDirectory* output_directory, - Statistics* stats, InstrumentedMutex* db_mutex, - ErrorHandler* db_error_handler, + FSDirectory* blob_output_directory, Statistics* stats, + InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler, std::vector existing_snapshots, SequenceNumber earliest_write_conflict_snapshot, const SnapshotChecker* snapshot_checker, @@ -169,6 +169,7 @@ class CompactionJob { LogBuffer* log_buffer_; FSDirectory* db_directory_; FSDirectory* output_directory_; + FSDirectory* blob_output_directory_; Statistics* stats_; InstrumentedMutex* db_mutex_; ErrorHandler* db_error_handler_; diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index 210042ca0..8fbefa676 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -340,7 +340,7 @@ class CompactionJobTest : public testing::Test { CompactionJob compaction_job( 0, &compaction, db_options_, env_options_, versions_.get(), &shutting_down_, preserve_deletes_seqnum_, &log_buffer, nullptr, - nullptr, nullptr, &mutex_, &error_handler_, snapshots, + nullptr, nullptr, nullptr, &mutex_, &error_handler_, snapshots, earliest_write_conflict_snapshot, snapshot_checker, table_cache_, &event_logger, false, false, dbname_, &compaction_job_stats_, Env::Priority::USER, nullptr /* IOTracer */); diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index aac39d980..d90c31bda 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -5845,6 +5845,181 @@ TEST_F(DBCompactionTest, ChangeLevelErrorPathTest) { ASSERT_EQ("0,5", FilesPerLevel(0)); } +TEST_F(DBCompactionTest, CompactionWithBlob) { + Options options; + options.env = env_; + options.disable_auto_compactions = true; + + Reopen(options); + + constexpr char first_key[] = "first_key"; + constexpr char second_key[] = "second_key"; + constexpr char first_value[] = "first_value"; + constexpr char second_value[] = "second_value"; + constexpr char third_value[] = "third_value"; + + ASSERT_OK(Put(first_key, first_value)); + ASSERT_OK(Put(second_key, first_value)); + ASSERT_OK(Flush()); + + ASSERT_OK(Put(first_key, second_value)); + ASSERT_OK(Put(second_key, second_value)); + ASSERT_OK(Flush()); + + ASSERT_OK(Put(first_key, third_value)); + ASSERT_OK(Put(second_key, third_value)); + ASSERT_OK(Flush()); + + options.enable_blob_files = true; + + Reopen(options); + + constexpr Slice* begin = nullptr; + constexpr Slice* end = nullptr; + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); + + ASSERT_EQ(Get(first_key), third_value); + ASSERT_EQ(Get(second_key), third_value); + + VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + assert(versions); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + assert(cfd); + + Version* const current = cfd->current(); + assert(current); + + const VersionStorageInfo* const storage_info = current->storage_info(); + assert(storage_info); + + const auto& l1_files = storage_info->LevelFiles(1); + ASSERT_EQ(l1_files.size(), 1); + + const FileMetaData* const table_file = l1_files[0]; + assert(table_file); + + const auto& blob_files = storage_info->GetBlobFiles(); + ASSERT_EQ(blob_files.size(), 1); + + const auto& blob_file = blob_files.begin()->second; + assert(blob_file); + + ASSERT_EQ(table_file->smallest.user_key(), first_key); + ASSERT_EQ(table_file->largest.user_key(), second_key); + ASSERT_EQ(table_file->oldest_blob_file_number, + blob_file->GetBlobFileNumber()); + + ASSERT_EQ(blob_file->GetTotalBlobCount(), 2); + + const InternalStats* const internal_stats = cfd->internal_stats(); + assert(internal_stats); + + const uint64_t expected_bytes = + table_file->fd.GetFileSize() + blob_file->GetTotalBlobBytes(); + + const auto& compaction_stats = internal_stats->TEST_GetCompactionStats(); + ASSERT_GE(compaction_stats.size(), 2); + ASSERT_EQ(compaction_stats[1].bytes_written, expected_bytes); + ASSERT_EQ(compaction_stats[1].num_output_files, 2); +} + +class DBCompactionTestBlobError + : public DBCompactionTest, + public testing::WithParamInterface { + public: + DBCompactionTestBlobError() + : fault_injection_env_(env_), sync_point_(GetParam()) {} + ~DBCompactionTestBlobError() { Close(); } + + FaultInjectionTestEnv fault_injection_env_; + std::string sync_point_; +}; + +INSTANTIATE_TEST_CASE_P(DBCompactionTestBlobError, DBCompactionTestBlobError, + ::testing::ValuesIn(std::vector{ + "BlobFileBuilder::WriteBlobToFile:AddRecord", + "BlobFileBuilder::WriteBlobToFile:AppendFooter"})); + +TEST_P(DBCompactionTestBlobError, CompactionError) { + Options options; + options.disable_auto_compactions = true; + options.env = env_; + + Reopen(options); + + constexpr char first_key[] = "first_key"; + constexpr char second_key[] = "second_key"; + constexpr char first_value[] = "first_value"; + constexpr char second_value[] = "second_value"; + constexpr char third_value[] = "third_value"; + + ASSERT_OK(Put(first_key, first_value)); + ASSERT_OK(Put(second_key, first_value)); + ASSERT_OK(Flush()); + + ASSERT_OK(Put(first_key, second_value)); + ASSERT_OK(Put(second_key, second_value)); + ASSERT_OK(Flush()); + + ASSERT_OK(Put(first_key, third_value)); + ASSERT_OK(Put(second_key, third_value)); + ASSERT_OK(Flush()); + + options.enable_blob_files = true; + options.env = &fault_injection_env_; + + Reopen(options); + + SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { + fault_injection_env_.SetFilesystemActive(false, + Status::IOError(sync_point_)); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + constexpr Slice* begin = nullptr; + constexpr Slice* end = nullptr; + + ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), begin, end).IsIOError()); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + assert(versions); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + assert(cfd); + + Version* const current = cfd->current(); + assert(current); + + const VersionStorageInfo* const storage_info = current->storage_info(); + assert(storage_info); + + const auto& l1_files = storage_info->LevelFiles(1); + ASSERT_TRUE(l1_files.empty()); + + const auto& blob_files = storage_info->GetBlobFiles(); + ASSERT_TRUE(blob_files.empty()); + + const InternalStats* const internal_stats = cfd->internal_stats(); + assert(internal_stats); + + const auto& compaction_stats = internal_stats->TEST_GetCompactionStats(); + ASSERT_GE(compaction_stats.size(), 2); + + if (sync_point_ == "BlobFileBuilder::WriteBlobToFile:AddRecord") { + ASSERT_EQ(compaction_stats[1].bytes_written, 0); + ASSERT_EQ(compaction_stats[1].num_output_files, 0); + } else { + // SST file writing succeeded; blob file writing failed (during Finish) + ASSERT_GT(compaction_stats[1].bytes_written, 0); + ASSERT_EQ(compaction_stats[1].num_output_files, 1); + } +} + #endif // !defined(ROCKSDB_LITE) } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 7edc6738c..1789ce44e 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -453,6 +453,7 @@ TEST_F(DBFlushTest, FlushWithBlob) { options.enable_blob_files = true; options.min_blob_size = min_blob_size; options.disable_auto_compactions = true; + options.env = env_; Reopen(options); @@ -525,10 +526,12 @@ TEST_F(DBFlushTest, FlushWithBlob) { class DBFlushTestBlobError : public DBFlushTest, public testing::WithParamInterface { public: - DBFlushTestBlobError() : fault_injection_env_(env_) {} + DBFlushTestBlobError() + : fault_injection_env_(env_), sync_point_(GetParam()) {} ~DBFlushTestBlobError() { Close(); } FaultInjectionTestEnv fault_injection_env_; + std::string sync_point_; }; INSTANTIATE_TEST_CASE_P(DBFlushTestBlobError, DBFlushTestBlobError, @@ -546,11 +549,12 @@ TEST_P(DBFlushTestBlobError, FlushError) { ASSERT_OK(Put("key", "blob")); - SyncPoint::GetInstance()->SetCallBack(GetParam(), [this](void* /* arg */) { - fault_injection_env_.SetFilesystemActive(false, Status::IOError()); + SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { + fault_injection_env_.SetFilesystemActive(false, + Status::IOError(sync_point_)); }); SyncPoint::GetInstance()->SetCallBack( - "BuildTable:BeforeFinishBuildTable", [this](void* /* arg */) { + "BuildTable:BeforeDeleteFile", [this](void* /* arg */) { fault_injection_env_.SetFilesystemActive(true); }); SyncPoint::GetInstance()->EnableProcessing(); @@ -599,11 +603,19 @@ TEST_P(DBFlushTestBlobError, FlushError) { const auto& compaction_stats = internal_stats->TEST_GetCompactionStats(); ASSERT_FALSE(compaction_stats.empty()); - ASSERT_EQ(compaction_stats[0].bytes_written, 0); - ASSERT_EQ(compaction_stats[0].num_output_files, 0); + + if (sync_point_ == "BlobFileBuilder::WriteBlobToFile:AddRecord") { + ASSERT_EQ(compaction_stats[0].bytes_written, 0); + ASSERT_EQ(compaction_stats[0].num_output_files, 0); + } else { + // SST file writing succeeded; blob file writing failed (during Finish) + ASSERT_GT(compaction_stats[0].bytes_written, 0); + ASSERT_EQ(compaction_stats[0].num_output_files, 1); + } const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue(); - ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], 0); + ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], + compaction_stats[0].bytes_written); #endif // ROCKSDB_LITE } diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 95855e4c7..3c8a3a454 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1147,9 +1147,10 @@ Status DBImpl::CompactFilesImpl( job_context->job_id, c.get(), immutable_db_options_, file_options_for_compaction_, versions_.get(), &shutting_down_, preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(), - GetDataDir(c->column_family_data(), c->output_path_id()), stats_, &mutex_, - &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot, - snapshot_checker, table_cache_, &event_logger_, + GetDataDir(c->column_family_data(), c->output_path_id()), + GetDataDir(c->column_family_data(), 0), stats_, &mutex_, &error_handler_, + snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, + table_cache_, &event_logger_, c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, &compaction_job_stats, Env::Priority::USER, io_tracer_, @@ -2954,10 +2955,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, job_context->job_id, c.get(), immutable_db_options_, file_options_for_compaction_, versions_.get(), &shutting_down_, preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(), - GetDataDir(c->column_family_data(), c->output_path_id()), stats_, - &mutex_, &error_handler_, snapshot_seqs, - earliest_write_conflict_snapshot, snapshot_checker, table_cache_, - &event_logger_, c->mutable_cf_options()->paranoid_file_checks, + GetDataDir(c->column_family_data(), c->output_path_id()), + GetDataDir(c->column_family_data(), 0), stats_, &mutex_, + &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot, + snapshot_checker, table_cache_, &event_logger_, + c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, &compaction_job_stats, thread_pri, io_tracer_, is_manual ? &manual_compaction_paused_ : nullptr, db_id_, diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index bd3770588..8f660924e 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1393,7 +1393,6 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, // Note that if file_size is zero, the file has been deleted and // should not be added to the manifest. const bool has_output = meta.fd.GetFileSize() > 0; - assert(has_output || blob_file_additions.empty()); constexpr int level = 0; @@ -1413,15 +1412,16 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, if (has_output) { stats.bytes_written = meta.fd.GetFileSize(); - - const auto& blobs = edit->GetBlobFileAdditions(); - for (const auto& blob : blobs) { - stats.bytes_written += blob.GetTotalBlobBytes(); - } - - stats.num_output_files = static_cast(blobs.size()) + 1; + stats.num_output_files = 1; } + const auto& blobs = edit->GetBlobFileAdditions(); + for (const auto& blob : blobs) { + stats.bytes_written += blob.GetTotalBlobBytes(); + } + + stats.num_output_files += static_cast(blobs.size()); + cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER, stats); cfd->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED, stats.bytes_written); diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index f17861b3f..56d9b35ae 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -380,6 +380,7 @@ TEST_F(DBWALTest, RecoverWithBlob) { options.min_blob_size = min_blob_size; options.avoid_flush_during_recovery = false; options.disable_auto_compactions = true; + options.env = env_; Reopen(options); @@ -440,10 +441,12 @@ class DBRecoveryTestBlobError : public DBWALTest, public testing::WithParamInterface { public: - DBRecoveryTestBlobError() : fault_injection_env_(env_) {} + DBRecoveryTestBlobError() + : fault_injection_env_(env_), sync_point_(GetParam()) {} ~DBRecoveryTestBlobError() { Close(); } FaultInjectionTestEnv fault_injection_env_; + std::string sync_point_; }; INSTANTIATE_TEST_CASE_P(DBRecoveryTestBlobError, DBRecoveryTestBlobError, @@ -457,11 +460,12 @@ TEST_P(DBRecoveryTestBlobError, RecoverWithBlobError) { // Reopen with blob files enabled but make blob file writing fail during // recovery. - SyncPoint::GetInstance()->SetCallBack(GetParam(), [this](void* /* arg */) { - fault_injection_env_.SetFilesystemActive(false, Status::IOError()); + SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { + fault_injection_env_.SetFilesystemActive(false, + Status::IOError(sync_point_)); }); SyncPoint::GetInstance()->SetCallBack( - "BuildTable:BeforeFinishBuildTable", [this](void* /* arg */) { + "BuildTable:BeforeDeleteFile", [this](void* /* arg */) { fault_injection_env_.SetFilesystemActive(true); }); SyncPoint::GetInstance()->EnableProcessing(); diff --git a/db/flush_job.cc b/db/flush_job.cc index 6e2a60ff9..504a232b6 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -438,7 +438,6 @@ Status FlushJob::WriteLevel0Table() { // Note that if file_size is zero, the file has been deleted and // should not be added to the manifest. const bool has_output = meta_.fd.GetFileSize() > 0; - assert(has_output || blob_file_additions.empty()); if (s.ok() && has_output) { // if we have more than 1 background thread, then we cannot @@ -467,15 +466,16 @@ Status FlushJob::WriteLevel0Table() { if (has_output) { stats.bytes_written = meta_.fd.GetFileSize(); - - const auto& blobs = edit_->GetBlobFileAdditions(); - for (const auto& blob : blobs) { - stats.bytes_written += blob.GetTotalBlobBytes(); - } - - stats.num_output_files = static_cast(blobs.size()) + 1; + stats.num_output_files = 1; } + const auto& blobs = edit_->GetBlobFileAdditions(); + for (const auto& blob : blobs) { + stats.bytes_written += blob.GetTotalBlobBytes(); + } + + stats.num_output_files += static_cast(blobs.size()); + RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros); cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats); cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,