diff --git a/db/db_impl.cc b/db/db_impl.cc index 0088ee56d..0d6158a89 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2879,6 +2879,10 @@ Status DBImpl::IngestExternalFile( ColumnFamilyHandle* column_family, const std::vector& external_files, const IngestExternalFileOptions& ingestion_options) { + if (external_files.empty()) { + return Status::InvalidArgument("external_files is empty"); + } + Status status; auto cfh = reinterpret_cast(column_family); auto cfd = cfh->cfd(); @@ -2896,6 +2900,9 @@ Status DBImpl::IngestExternalFile( immutable_db_options_, env_options_, &snapshots_, ingestion_options); + SuperVersionContext dummy_sv_ctx(/* create_superversion */ true); + VersionEdit dummy_edit; + uint64_t next_file_number = 0; std::list::iterator pending_output_elem; { InstrumentedMutexLock l(&mutex_); @@ -2906,10 +2913,27 @@ Status DBImpl::IngestExternalFile( // Make sure that bg cleanup wont delete the files that we are ingesting pending_output_elem = CaptureCurrentFileNumberInPendingOutputs(); + + // If crash happen after a hard link established, Recover function may + // reuse the file number that has already assigned to the internal file, + // and this will overwrite the external file. To protect the external + // file, we have to make sure the file number will never being reused. + next_file_number = versions_->FetchAddFileNumber(external_files.size()); + auto cf_options = cfd->GetLatestMutableCFOptions(); + status = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_, + directories_.GetDbDir()); + if (status.ok()) { + InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options); + } + } + dummy_sv_ctx.Clean(); + if (!status.ok()) { + return status; } SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); - status = ingestion_job.Prepare(external_files, super_version); + status = ingestion_job.Prepare(external_files, next_file_number, + super_version); CleanupSuperVersion(super_version); if (!status.ok()) { InstrumentedMutexLock l(&mutex_); diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index f4f1680f9..b71063818 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -2091,7 +2091,8 @@ void DBImpl::InstallSuperVersionAndScheduleWork( old_sv->mutable_cf_options.max_write_buffer_number; } - if (sv_context->new_superversion == nullptr) { + // this branch is unlikely to step in + if (UNLIKELY(sv_context->new_superversion == nullptr)) { sv_context->NewSuperVersion(); } cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options); diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 2fc8b61c6..a3761e0d4 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -29,7 +29,9 @@ namespace rocksdb { Status ExternalSstFileIngestionJob::Prepare( - const std::vector& external_files_paths, SuperVersion* sv) { + const std::vector& external_files_paths, + uint64_t next_file_number, + SuperVersion* sv) { Status status; // Read the information of files we are ingesting @@ -90,7 +92,7 @@ Status ExternalSstFileIngestionJob::Prepare( // Copy/Move external files into DB for (IngestedFileInfo& f : files_to_ingest_) { - f.fd = FileDescriptor(versions_->NewFileNumber(), 0, f.file_size); + f.fd = FileDescriptor(next_file_number++, 0, f.file_size); const std::string path_outside_db = f.external_file_path; const std::string path_inside_db = diff --git a/db/external_sst_file_ingestion_job.h b/db/external_sst_file_ingestion_job.h index 4ba604d47..29981bc10 100644 --- a/db/external_sst_file_ingestion_job.h +++ b/db/external_sst_file_ingestion_job.h @@ -89,6 +89,7 @@ class ExternalSstFileIngestionJob { // Prepare the job by copying external files into the DB. Status Prepare(const std::vector& external_files_paths, + uint64_t next_file_number, SuperVersion* sv); // Check if we need to flush the memtable before running the ingestion job diff --git a/db/version_set.h b/db/version_set.h index 6fe205651..fe8f26339 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -832,6 +832,11 @@ class VersionSet { // Allocate and return a new file number uint64_t NewFileNumber() { return next_file_number_.fetch_add(1); } + // Fetch And Add n new file number + uint64_t FetchAddFileNumber(uint64_t n) { + return next_file_number_.fetch_add(n); + } + // Return the last sequence number. uint64_t LastSequence() const { return last_sequence_.load(std::memory_order_acquire); diff --git a/tools/db_stress.cc b/tools/db_stress.cc index b0612ad2e..e599df93e 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -1627,11 +1627,15 @@ class StressTest { if (!FLAGS_verbose) { return; } - fprintf(stdout, "[CF %d] %" PRIi64 " == > (%" ROCKSDB_PRIszt ") ", cf, key, sz); + std::string tmp; + tmp.reserve(sz * 2 + 16); + char buf[4]; for (size_t i = 0; i < sz; i++) { - fprintf(stdout, "%X", value[i]); + snprintf(buf, 4, "%X", value[i]); + tmp.append(buf); } - fprintf(stdout, "\n"); + fprintf(stdout, "[CF %d] %" PRIi64 " == > (%" ROCKSDB_PRIszt ") %s\n", + cf, key, sz, tmp.c_str()); } static int64_t GenerateOneKey(ThreadState* thread, uint64_t iteration) {