diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 8bafabd5d..8b9fe7a97 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -607,6 +607,69 @@ Status CompactionJob::Run() { } } + if (status.ok()) { + thread_pool.clear(); + std::vector files_meta; + for (const auto& state : compact_->sub_compact_states) { + for (const auto& output : state.outputs) { + files_meta.emplace_back(&output.meta); + } + } + ColumnFamilyData* cfd = compact_->compaction->column_family_data(); + auto prefix_extractor = + compact_->compaction->mutable_cf_options()->prefix_extractor.get(); + std::atomic next_file_meta_idx(0); + auto verify_table = [&](Status& output_status) { + while (true) { + size_t file_idx = next_file_meta_idx.fetch_add(1); + if (file_idx >= files_meta.size()) { + break; + } + // Verify that the table is usable + // We set for_compaction to false and don't OptimizeForCompactionTableRead + // here because this is a special case after we finish the table building + // No matter whether use_direct_io_for_flush_and_compaction is true, + // we will regard this verification as user reads since the goal is + // to cache it here for further user reads + InternalIterator* iter = cfd->table_cache()->NewIterator( + ReadOptions(), env_options_, cfd->internal_comparator(), + files_meta[file_idx]->fd, nullptr /* range_del_agg */, + prefix_extractor, nullptr, + cfd->internal_stats()->GetFileReadHist( + compact_->compaction->output_level()), + false, nullptr /* arena */, false /* skip_filters */, + compact_->compaction->output_level()); + auto s = iter->status(); + + if (s.ok() && paranoid_file_checks_) { + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {} + s = iter->status(); + } + + delete iter; + + if (!s.ok()) { + output_status = s; + break; + } + } + }; + for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) { + thread_pool.emplace_back(verify_table, + std::ref(compact_->sub_compact_states[i].status)); + } + verify_table(compact_->sub_compact_states[0].status); + for (auto& thread : thread_pool) { + thread.join(); + } + for (const auto& state : compact_->sub_compact_states) { + if (!state.status.ok()) { + status = state.status; + break; + } + } + } + TablePropertiesCollection tp; for (const auto& state : compact_->sub_compact_states) { for (const auto& output : state.outputs) { @@ -1175,43 +1238,16 @@ Status CompactionJob::FinishCompactionOutputFile( ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); TableProperties tp; if (s.ok() && current_entries > 0) { - // Verify that the table is usable - // We set for_compaction to false and don't OptimizeForCompactionTableRead - // here because this is a special case after we finish the table building - // No matter whether use_direct_io_for_flush_and_compaction is true, - // we will regard this verification as user reads since the goal is - // to cache it here for further user reads - InternalIterator* iter = cfd->table_cache()->NewIterator( - ReadOptions(), env_options_, cfd->internal_comparator(), meta->fd, - nullptr /* range_del_agg */, - sub_compact->compaction->mutable_cf_options()->prefix_extractor.get(), - nullptr, - cfd->internal_stats()->GetFileReadHist( - compact_->compaction->output_level()), - false, nullptr /* arena */, false /* skip_filters */, - compact_->compaction->output_level()); - s = iter->status(); - - if (s.ok() && paranoid_file_checks_) { - for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { - } - s = iter->status(); - } - - delete iter; - // Output to event logger and fire events. - if (s.ok()) { - tp = sub_compact->builder->GetTableProperties(); - sub_compact->current_output()->table_properties = - std::make_shared(tp); - ROCKS_LOG_INFO(db_options_.info_log, - "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64 - " keys, %" PRIu64 " bytes%s", - cfd->GetName().c_str(), job_id_, output_number, - current_entries, current_bytes, - meta->marked_for_compaction ? " (need compaction)" : ""); - } + tp = sub_compact->builder->GetTableProperties(); + sub_compact->current_output()->table_properties = + std::make_shared(tp); + ROCKS_LOG_INFO(db_options_.info_log, + "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64 + " keys, %" PRIu64 " bytes%s", + cfd->GetName().c_str(), job_id_, output_number, + current_entries, current_bytes, + meta->marked_for_compaction ? " (need compaction)" : ""); } std::string fname; FileDescriptor output_fd; diff --git a/db/version_builder.cc b/db/version_builder.cc index 2d381ec61..9b034c5c8 100644 --- a/db/version_builder.cc +++ b/db/version_builder.cc @@ -405,17 +405,13 @@ class VersionBuilder::Rep { } }; - if (max_threads <= 1) { - load_handlers_func(); - } else { - std::vector threads; - for (int i = 0; i < max_threads; i++) { - threads.emplace_back(load_handlers_func); - } - - for (auto& t : threads) { - t.join(); - } + std::vector threads; + for (int i = 1; i < max_threads; i++) { + threads.emplace_back(load_handlers_func); + } + load_handlers_func(); + for (auto& t : threads) { + t.join(); } }