diff --git a/db/db_impl.cc b/db/db_impl.cc index 26044da32..61bdfc4da 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -6,7 +6,6 @@ // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. - #include "db/db_impl.h" #ifndef __STDC_FORMAT_MACROS diff --git a/db/db_impl.h b/db/db_impl.h index de9f4f8a1..34d27f5ac 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -258,10 +258,11 @@ class DBImpl : public DB { using DB::AddFile; virtual Status AddFile(ColumnFamilyHandle* column_family, - const ExternalSstFileInfo* file_info, + const std::vector& file_info_list, bool move_file) override; virtual Status AddFile(ColumnFamilyHandle* column_family, - const std::string& file_path, bool move_file) override; + const std::vector& file_path_list, + bool move_file) override; #endif // ROCKSDB_LITE @@ -637,13 +638,17 @@ class DBImpl : public DB { // Finds the lowest level in the DB that the ingested file can be added to // REQUIRES: mutex_ held int PickLevelForIngestedFile(ColumnFamilyData* cfd, - const ExternalSstFileInfo* file_info); + const ExternalSstFileInfo& file_info); Status CompactFilesImpl( const CompactionOptions& compact_options, ColumnFamilyData* cfd, Version* version, const std::vector& input_file_names, const int output_level, int output_path_id, JobContext* job_context, LogBuffer* log_buffer); + Status ReadExternalSstFileInfo(ColumnFamilyHandle* column_family, + const std::string& file_path, + ExternalSstFileInfo* file_info); + #endif // ROCKSDB_LITE ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name); diff --git a/db/db_impl_add_file.cc b/db/db_impl_add_file.cc index b76c47064..3019f3391 100644 --- a/db/db_impl_add_file.cc +++ b/db/db_impl_add_file.cc @@ -21,16 +21,33 @@ #include "util/sync_point.h" namespace rocksdb { + +namespace { +// RAII Timer +class StatsTimer { + public: + explicit StatsTimer(Env* env, uint64_t* counter) + : env_(env), counter_(counter), start_micros_(env_->NowMicros()) {} + ~StatsTimer() { *counter_ += env_->NowMicros() - start_micros_; } + + private: + Env* env_; + uint64_t* counter_; + uint64_t start_micros_; +}; +} // anonymous namespace + #ifndef ROCKSDB_LITE -Status DBImpl::AddFile(ColumnFamilyHandle* column_family, - const std::string& file_path, bool move_file) { + +Status DBImpl::ReadExternalSstFileInfo(ColumnFamilyHandle* column_family, + const std::string& file_path, + ExternalSstFileInfo* file_info) { Status status; auto cfh = reinterpret_cast(column_family); - ColumnFamilyData* cfd = cfh->cfd(); + auto cfd = cfh->cfd(); - ExternalSstFileInfo file_info; - file_info.file_path = file_path; - status = env_->GetFileSize(file_path, &file_info.file_size); + file_info->file_path = file_path; + status = env_->GetFileSize(file_path, &file_info->file_size); if (!status.ok()) { return status; } @@ -49,7 +66,7 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family, status = cfd->ioptions()->table_factory->NewTableReader( TableReaderOptions(*cfd->ioptions(), env_options_, cfd->internal_comparator()), - std::move(sst_file_reader), file_info.file_size, &table_reader); + std::move(sst_file_reader), file_info->file_size, &table_reader); if (!status.ok()) { return status; } @@ -63,17 +80,16 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family, return Status::InvalidArgument("Generated table version not found"); } - file_info.version = + file_info->version = DecodeFixed32(external_sst_file_version_iter->second.c_str()); - if (file_info.version == 1) { + if (file_info->version == 1) { // version 1 imply that all sequence numbers in table equal 0 - file_info.sequence_number = 0; + file_info->sequence_number = 0; } else { return Status::InvalidArgument("Generated table version is not supported"); } - // Get number of entries in table - file_info.num_entries = table_reader->GetTableProperties()->num_entries; + file_info->num_entries = table_reader->GetTableProperties()->num_entries; ParsedInternalKey key; std::unique_ptr iter( @@ -87,7 +103,7 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family, if (key.sequence != 0) { return Status::Corruption("Generated table have non zero sequence number"); } - file_info.smallest_key = key.user_key.ToString(); + file_info->smallest_key = key.user_key.ToString(); // Get last (largest) key from file iter->SeekToLast(); @@ -97,70 +113,145 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family, if (key.sequence != 0) { return Status::Corruption("Generated table have non zero sequence number"); } - file_info.largest_key = key.user_key.ToString(); + file_info->largest_key = key.user_key.ToString(); - return AddFile(column_family, &file_info, move_file); + return Status::OK(); } Status DBImpl::AddFile(ColumnFamilyHandle* column_family, - const ExternalSstFileInfo* file_info, bool move_file) { + const std::vector& file_path_list, + bool move_file) { + Status status; + auto num_files = file_path_list.size(); + if (num_files == 0) { + return Status::InvalidArgument("The list of files is empty"); + } + + std::vector file_infos(num_files); + std::vector file_info_list(num_files); + for (size_t i = 0; i < num_files; i++) { + status = ReadExternalSstFileInfo(column_family, file_path_list[i], + &file_info_list[i]); + if (!status.ok()) { + return status; + } + } + return AddFile(column_family, file_info_list, move_file); +} + +Status DBImpl::AddFile(ColumnFamilyHandle* column_family, + const std::vector& file_info_list, + bool move_file) { Status status; auto cfh = reinterpret_cast(column_family); ColumnFamilyData* cfd = cfh->cfd(); - const uint64_t start_micros = env_->NowMicros(); - if (file_info->num_entries == 0) { - return Status::InvalidArgument("File contain no entries"); - } - if (file_info->version != 1) { - return Status::InvalidArgument("Generated table version is not supported"); - } - // version 1 imply that file have only Put Operations with Sequence Number = 0 - - FileMetaData meta; - meta.smallest = - InternalKey(file_info->smallest_key, file_info->sequence_number, - ValueType::kTypeValue); - meta.largest = InternalKey(file_info->largest_key, file_info->sequence_number, - ValueType::kTypeValue); - if (!meta.smallest.Valid() || !meta.largest.Valid()) { - return Status::Corruption("Generated table have corrupted keys"); - } - meta.smallest_seqno = file_info->sequence_number; - meta.largest_seqno = file_info->sequence_number; - if (meta.smallest_seqno != 0 || meta.largest_seqno != 0) { - return Status::InvalidArgument( - "Non zero sequence numbers are not supported"); + auto num_files = file_info_list.size(); + if (num_files == 0) { + return Status::InvalidArgument("The list of files is empty"); } - // Generate a location for the new table - std::list::iterator pending_outputs_inserted_elem; + // Verify that passed files dont have overlapping ranges + if (num_files > 1) { + std::vector sorted_file_info_list(num_files); + for (size_t i = 0; i < num_files; i++) { + sorted_file_info_list[i] = &file_info_list[i]; + } + + auto* vstorage = cfd->current()->storage_info(); + std::sort( + sorted_file_info_list.begin(), sorted_file_info_list.end(), + [&vstorage, &file_info_list](const ExternalSstFileInfo* info1, + const ExternalSstFileInfo* info2) { + return vstorage->InternalComparator()->user_comparator()->Compare( + info1->smallest_key, info2->smallest_key) < 0; + }); + + for (size_t i = 0; i < num_files - 1; i++) { + if (sorted_file_info_list[i]->largest_key >= + sorted_file_info_list[i + 1]->smallest_key) { + return Status::NotSupported("Cannot add overlapping range among files"); + } + } + } + + std::vector micro_list(num_files, 0); + std::vector meta_list(num_files); + for (size_t i = 0; i < num_files; i++) { + StatsTimer t(env_, µ_list[i]); + if (file_info_list[i].num_entries == 0) { + return Status::InvalidArgument("File contain no entries"); + } + if (file_info_list[i].version != 1) { + return Status::InvalidArgument( + "Generated table version is not supported"); + } + // version 1 imply that file have only Put Operations with Sequence Number = + // 0 + + meta_list[i].smallest = + InternalKey(file_info_list[i].smallest_key, + file_info_list[i].sequence_number, ValueType::kTypeValue); + meta_list[i].largest = + InternalKey(file_info_list[i].largest_key, + file_info_list[i].sequence_number, ValueType::kTypeValue); + if (!meta_list[i].smallest.Valid() || !meta_list[i].largest.Valid()) { + return Status::Corruption("Generated table have corrupted keys"); + } + meta_list[i].smallest_seqno = file_info_list[i].sequence_number; + meta_list[i].largest_seqno = file_info_list[i].sequence_number; + if (meta_list[i].smallest_seqno != 0 || meta_list[i].largest_seqno != 0) { + return Status::InvalidArgument( + "Non zero sequence numbers are not supported"); + } + } + + std::vector::iterator> pending_outputs_inserted_elem_list( + num_files); + // Generate locations for the new tables { InstrumentedMutexLock l(&mutex_); - pending_outputs_inserted_elem = CaptureCurrentFileNumberInPendingOutputs(); - meta.fd = - FileDescriptor(versions_->NewFileNumber(), 0, file_info->file_size); - } - - std::string db_fname = TableFileName( - db_options_.db_paths, meta.fd.GetNumber(), meta.fd.GetPathId()); - - if (move_file) { - status = env_->LinkFile(file_info->file_path, db_fname); - if (status.IsNotSupported()) { - // Original file is on a different FS, use copy instead of hard linking - status = CopyFile(env_, file_info->file_path, db_fname, 0); + for (size_t i = 0; i < num_files; i++) { + StatsTimer t(env_, µ_list[i]); + pending_outputs_inserted_elem_list[i] = + CaptureCurrentFileNumberInPendingOutputs(); + meta_list[i].fd = FileDescriptor(versions_->NewFileNumber(), 0, + file_info_list[i].file_size); + } + } + + // Copy/Move external files into DB + std::vector db_fname_list(num_files); + size_t j = 0; + for (; j < num_files; j++) { + StatsTimer t(env_, µ_list[j]); + db_fname_list[j] = + TableFileName(db_options_.db_paths, meta_list[j].fd.GetNumber(), + meta_list[j].fd.GetPathId()); + if (move_file) { + status = env_->LinkFile(file_info_list[j].file_path, db_fname_list[j]); + if (status.IsNotSupported()) { + // Original file is on a different FS, use copy instead of hard linking + status = + CopyFile(env_, file_info_list[j].file_path, db_fname_list[j], 0); + } + } else { + status = CopyFile(env_, file_info_list[j].file_path, db_fname_list[j], 0); + } + TEST_SYNC_POINT("DBImpl::AddFile:FileCopied"); + if (!status.ok()) { + for (size_t i = 0; i < j; i++) { + Status s = env_->DeleteFile(db_fname_list[i]); + if (!s.ok()) { + Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, + "AddFile() clean up for file %s failed : %s", + db_fname_list[i].c_str(), s.ToString().c_str()); + } + } + return status; } - } else { - status = CopyFile(env_, file_info->file_path, db_fname, 0); - } - TEST_SYNC_POINT("DBImpl::AddFile:FileCopied"); - if (!status.ok()) { - return status; } - // The level the file will be ingested into - int target_level = 0; { InstrumentedMutexLock l(&mutex_); const MutableCFOptions mutable_cf_options = @@ -183,35 +274,46 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family, ro.total_order_seek = true; ScopedArenaIterator iter(NewInternalIterator(ro, cfd, sv, &arena)); - InternalKey range_start(file_info->smallest_key, kMaxSequenceNumber, - kTypeValue); - iter->Seek(range_start.Encode()); - status = iter->status(); + for (size_t i = 0; i < num_files; i++) { + StatsTimer t(env_, µ_list[i]); + InternalKey range_start(file_info_list[i].smallest_key, + kMaxSequenceNumber, kTypeValue); + iter->Seek(range_start.Encode()); + status = iter->status(); - if (status.ok() && iter->Valid()) { - ParsedInternalKey seek_result; - if (ParseInternalKey(iter->key(), &seek_result)) { - auto* vstorage = cfd->current()->storage_info(); - if (vstorage->InternalComparator()->user_comparator()->Compare( - seek_result.user_key, file_info->largest_key) <= 0) { - status = Status::NotSupported("Cannot add overlapping range"); + if (status.ok() && iter->Valid()) { + ParsedInternalKey seek_result; + if (ParseInternalKey(iter->key(), &seek_result)) { + auto* vstorage = cfd->current()->storage_info(); + if (vstorage->InternalComparator()->user_comparator()->Compare( + seek_result.user_key, file_info_list[i].largest_key) <= 0) { + status = Status::NotSupported("Cannot add overlapping range"); + break; + } + } else { + status = Status::Corruption("DB have corrupted keys"); + break; } - } else { - status = Status::Corruption("DB have corrupted keys"); } } } + // The level the file will be ingested into + std::vector target_level_list(num_files, 0); if (status.ok()) { - // Add file to the lowest possible level - target_level = PickLevelForIngestedFile(cfd, file_info); + // Add files to L0 VersionEdit edit; edit.SetColumnFamily(cfd->GetID()); - edit.AddFile(target_level, meta.fd.GetNumber(), meta.fd.GetPathId(), - meta.fd.GetFileSize(), meta.smallest, meta.largest, - meta.smallest_seqno, meta.largest_seqno, - meta.marked_for_compaction); - + for (size_t i = 0; i < num_files; i++) { + StatsTimer t(env_, µ_list[i]); + // Add file to the lowest possible level + target_level_list[i] = PickLevelForIngestedFile(cfd, file_info_list[i]); + edit.AddFile(target_level_list[i], meta_list[i].fd.GetNumber(), + meta_list[i].fd.GetPathId(), meta_list[i].fd.GetFileSize(), + meta_list[i].smallest, meta_list[i].largest, + meta_list[i].smallest_seqno, meta_list[i].largest_seqno, + meta_list[i].marked_for_compaction); + } status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_, directories_.GetDbDir()); } @@ -220,45 +322,41 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family, if (status.ok()) { delete InstallSuperVersionAndScheduleWork(cfd, nullptr, mutable_cf_options); - + } + for (size_t i = 0; i < num_files; i++) { // Update internal stats InternalStats::CompactionStats stats(1); - stats.micros = env_->NowMicros() - start_micros; - stats.bytes_written = meta.fd.GetFileSize(); + stats.micros = micro_list[i]; + stats.bytes_written = meta_list[i].fd.GetFileSize(); stats.num_output_files = 1; - cfd->internal_stats()->AddCompactionStats(target_level, stats); + cfd->internal_stats()->AddCompactionStats(target_level_list[i], stats); cfd->internal_stats()->AddCFStats(InternalStats::BYTES_INGESTED_ADD_FILE, - meta.fd.GetFileSize()); + meta_list[i].fd.GetFileSize()); + ReleaseFileNumberFromPendingOutputs( + pending_outputs_inserted_elem_list[i]); } - ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); } if (!status.ok()) { - // We failed to add the file to the database - Status s = env_->DeleteFile(db_fname); - if (!s.ok()) { - Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, - "AddFile() clean up for file %s failed : %s", db_fname.c_str(), - s.ToString().c_str()); + // We failed to add the files to the database + for (size_t i = 0; i < num_files; i++) { + Status s = env_->DeleteFile(db_fname_list[i]); + if (!s.ok()) { + Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, + "AddFile() clean up for file %s failed : %s", + db_fname_list[i].c_str(), s.ToString().c_str()); + } } - } else { - // File was ingested successfully - Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, - "New file %" PRIu64 - " was added to L%d (Size: %.2f MB, " - "entries: %" PRIu64 ")", - meta.fd.GetNumber(), target_level, - static_cast(meta.fd.GetFileSize()) / 1048576.0, - file_info->num_entries); - - if (move_file) { - // The file was moved and added successfully, remove original file link - Status s = env_->DeleteFile(file_info->file_path); + } else if (status.ok() && move_file) { + // The files were moved and added successfully, remove original file links + for (size_t i = 0; i < num_files; i++) { + Status s = env_->DeleteFile(file_info_list[i].file_path); if (!s.ok()) { Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, "%s was added to DB successfully but failed to remove original " - "file link : %s", - file_info->file_path.c_str(), s.ToString().c_str()); + "file " + "link : %s", + file_info_list[i].file_path.c_str(), s.ToString().c_str()); } } } @@ -267,15 +365,15 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family, // Finds the lowest level in the DB that the ingested file can be added to int DBImpl::PickLevelForIngestedFile(ColumnFamilyData* cfd, - const ExternalSstFileInfo* file_info) { + const ExternalSstFileInfo& file_info) { mutex_.AssertHeld(); int target_level = 0; auto* vstorage = cfd->current()->storage_info(); auto* ucmp = vstorage->InternalComparator()->user_comparator(); - Slice file_smallest_user_key(file_info->smallest_key); - Slice file_largest_user_key(file_info->largest_key); + Slice file_smallest_user_key(file_info.smallest_key); + Slice file_largest_user_key(file_info.largest_key); for (int lvl = cfd->NumberLevels() - 1; lvl >= vstorage->base_level(); lvl--) { diff --git a/db/db_sst_test.cc b/db/db_sst_test.cc index 5de7b6d3b..8616caaf2 100644 --- a/db/db_sst_test.cc +++ b/db/db_sst_test.cc @@ -843,7 +843,7 @@ TEST_F(DBSSTTest, AddExternalSstFile) { DestroyAndReopen(options); // Add file using file path - s = db_->AddFile(file1); + s = db_->AddFile(std::vector(1, file1)); ASSERT_TRUE(s.ok()) << s.ToString(); ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U); for (int k = 0; k < 100; k++) { @@ -853,23 +853,23 @@ TEST_F(DBSSTTest, AddExternalSstFile) { // Add file while holding a snapshot will fail const Snapshot* s1 = db_->GetSnapshot(); if (s1 != nullptr) { - ASSERT_NOK(db_->AddFile(&file2_info)); + ASSERT_NOK(db_->AddFile(std::vector(1, file2_info))); db_->ReleaseSnapshot(s1); } // We can add the file after releaseing the snapshot - ASSERT_OK(db_->AddFile(&file2_info)); + ASSERT_OK(db_->AddFile(std::vector(1, file2_info))); ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U); for (int k = 0; k < 200; k++) { ASSERT_EQ(Get(Key(k)), Key(k) + "_val"); } - // This file have overlapping values with the exisitng data - s = db_->AddFile(file3); + // This file has overlapping values with the exisitng data + s = db_->AddFile(std::vector(1, file3)); ASSERT_FALSE(s.ok()) << s.ToString(); - // This file have overlapping values with the exisitng data - s = db_->AddFile(&file4_info); + // This file has overlapping values with the exisitng data + s = db_->AddFile(std::vector(1, file4_info)); ASSERT_FALSE(s.ok()) << s.ToString(); // Overwrite values of keys divisible by 5 @@ -879,7 +879,7 @@ TEST_F(DBSSTTest, AddExternalSstFile) { ASSERT_NE(db_->GetLatestSequenceNumber(), 0U); // Key range of file5 (400 => 499) dont overlap with any keys in DB - ASSERT_OK(db_->AddFile(file5)); + ASSERT_OK(db_->AddFile(std::vector(1, file5))); // Make sure values are correct before and after flush/compaction for (int i = 0; i < 2; i++) { @@ -908,13 +908,13 @@ TEST_F(DBSSTTest, AddExternalSstFile) { } // We deleted range (400 => 499) but cannot add file5 because // of the range tombstones - ASSERT_NOK(db_->AddFile(file5)); + ASSERT_NOK(db_->AddFile(std::vector(1, file5))); // Compacting the DB will remove the tombstones ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); // Now we can add the file - ASSERT_OK(db_->AddFile(file5)); + ASSERT_OK(db_->AddFile(std::vector(1, file5))); // Verify values of file5 in DB for (int k = 400; k < 500; k++) { @@ -925,6 +925,230 @@ TEST_F(DBSSTTest, AddExternalSstFile) { kSkipFIFOCompaction)); } +TEST_F(DBSSTTest, AddExternalSstFileList) { + do { + std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/"; + env_->CreateDir(sst_files_folder); + Options options = CurrentOptions(); + options.env = env_; + + SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator); + + // file1.sst (0 => 99) + std::string file1 = sst_files_folder + "file1.sst"; + ASSERT_OK(sst_file_writer.Open(file1)); + for (int k = 0; k < 100; k++) { + ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val")); + } + ExternalSstFileInfo file1_info; + Status s = sst_file_writer.Finish(&file1_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file1_info.file_path, file1); + ASSERT_EQ(file1_info.num_entries, 100); + ASSERT_EQ(file1_info.smallest_key, Key(0)); + ASSERT_EQ(file1_info.largest_key, Key(99)); + // sst_file_writer already finished, cannot add this value + s = sst_file_writer.Add(Key(100), "bad_val"); + ASSERT_FALSE(s.ok()) << s.ToString(); + + // file2.sst (100 => 199) + std::string file2 = sst_files_folder + "file2.sst"; + ASSERT_OK(sst_file_writer.Open(file2)); + for (int k = 100; k < 200; k++) { + ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val")); + } + // Cannot add this key because it's not after last added key + s = sst_file_writer.Add(Key(99), "bad_val"); + ASSERT_FALSE(s.ok()) << s.ToString(); + ExternalSstFileInfo file2_info; + s = sst_file_writer.Finish(&file2_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file2_info.file_path, file2); + ASSERT_EQ(file2_info.num_entries, 100); + ASSERT_EQ(file2_info.smallest_key, Key(100)); + ASSERT_EQ(file2_info.largest_key, Key(199)); + + // file3.sst (195 => 199) + // This file values overlap with file2 values + std::string file3 = sst_files_folder + "file3.sst"; + ASSERT_OK(sst_file_writer.Open(file3)); + for (int k = 195; k < 200; k++) { + ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val_overlap")); + } + ExternalSstFileInfo file3_info; + s = sst_file_writer.Finish(&file3_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file3_info.file_path, file3); + ASSERT_EQ(file3_info.num_entries, 5); + ASSERT_EQ(file3_info.smallest_key, Key(195)); + ASSERT_EQ(file3_info.largest_key, Key(199)); + + // file4.sst (30 => 39) + // This file values overlap with file1 values + std::string file4 = sst_files_folder + "file4.sst"; + ASSERT_OK(sst_file_writer.Open(file4)); + for (int k = 30; k < 40; k++) { + ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val_overlap")); + } + ExternalSstFileInfo file4_info; + s = sst_file_writer.Finish(&file4_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file4_info.file_path, file4); + ASSERT_EQ(file4_info.num_entries, 10); + ASSERT_EQ(file4_info.smallest_key, Key(30)); + ASSERT_EQ(file4_info.largest_key, Key(39)); + + // file5.sst (200 => 299) + std::string file5 = sst_files_folder + "file5.sst"; + ASSERT_OK(sst_file_writer.Open(file5)); + for (int k = 200; k < 300; k++) { + ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val")); + } + ExternalSstFileInfo file5_info; + s = sst_file_writer.Finish(&file5_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file5_info.file_path, file5); + ASSERT_EQ(file5_info.num_entries, 100); + ASSERT_EQ(file5_info.smallest_key, Key(200)); + ASSERT_EQ(file5_info.largest_key, Key(299)); + + // list 1 has internal key range conflict + std::vector file_list0({file1, file2}); + std::vector file_list1({file3, file2, file1}); + std::vector file_list2({file5}); + std::vector file_list3({file3, file4}); + + std::vector info_list0({file1_info, file2_info}); + std::vector info_list1( + {file3_info, file2_info, file1_info}); + std::vector info_list2({file5_info}); + std::vector info_list3({file3_info, file4_info}); + + DestroyAndReopen(options); + + // This list of files have key ranges are overlapping with each other + s = db_->AddFile(file_list1); + ASSERT_FALSE(s.ok()) << s.ToString(); + s = db_->AddFile(info_list1); + ASSERT_FALSE(s.ok()) << s.ToString(); + + // Add files using file path list + s = db_->AddFile(file_list0); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U); + for (int k = 0; k < 200; k++) { + ASSERT_EQ(Get(Key(k)), Key(k) + "_val"); + } + + // Add file while holding a snapshot will fail + const Snapshot* s1 = db_->GetSnapshot(); + if (s1 != nullptr) { + ASSERT_NOK(db_->AddFile(info_list2)); + db_->ReleaseSnapshot(s1); + } + // We can add the file after releaseing the snapshot + ASSERT_OK(db_->AddFile(info_list2)); + ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U); + for (int k = 0; k < 300; k++) { + ASSERT_EQ(Get(Key(k)), Key(k) + "_val"); + } + + // This file list has overlapping values with the exisitng data + s = db_->AddFile(file_list3); + ASSERT_FALSE(s.ok()) << s.ToString(); + s = db_->AddFile(info_list3); + ASSERT_FALSE(s.ok()) << s.ToString(); + + // Overwrite values of keys divisible by 5 + for (int k = 0; k < 200; k += 5) { + ASSERT_OK(Put(Key(k), Key(k) + "_val_new")); + } + ASSERT_NE(db_->GetLatestSequenceNumber(), 0U); + + // Make sure values are correct before and after flush/compaction + for (int i = 0; i < 2; i++) { + for (int k = 0; k < 200; k++) { + std::string value = Key(k) + "_val"; + if (k % 5 == 0) { + value += "_new"; + } + ASSERT_EQ(Get(Key(k)), value); + } + for (int k = 200; k < 300; k++) { + std::string value = Key(k) + "_val"; + ASSERT_EQ(Get(Key(k)), value); + } + ASSERT_OK(Flush()); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + } + + // Delete keys in range (200 => 299) + for (int k = 200; k < 300; k++) { + ASSERT_OK(Delete(Key(k))); + } + // We deleted range (200 => 299) but cannot add file5 because + // of the range tombstones + ASSERT_NOK(db_->AddFile(file_list2)); + + // Compacting the DB will remove the tombstones + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + + // Now we can add the file + ASSERT_OK(db_->AddFile(file_list2)); + + // Verify values of file5 in DB + for (int k = 200; k < 300; k++) { + std::string value = Key(k) + "_val"; + ASSERT_EQ(Get(Key(k)), value); + } + } while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction | + kSkipFIFOCompaction)); +} + +TEST_F(DBSSTTest, AddExternalSstFileListAtomicity) { + do { + std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/"; + env_->CreateDir(sst_files_folder); + Options options = CurrentOptions(); + options.env = env_; + + SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator); + + // files[0].sst (0 => 99) + // files[1].sst (100 => 199) + // ... + // file[8].sst (800 => 899) + size_t n = 9; + std::vector files(n); + std::vector files_info(n); + for (size_t i = 0; i < n; i++) { + files[i] = sst_files_folder + "file" + std::to_string(i) + ".sst"; + ASSERT_OK(sst_file_writer.Open(files[i])); + for (size_t k = i * 100; k < (i + 1) * 100; k++) { + ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val")); + } + Status s = sst_file_writer.Finish(&files_info[i]); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(files_info[i].file_path, files[i]); + ASSERT_EQ(files_info[i].num_entries, 100); + ASSERT_EQ(files_info[i].smallest_key, Key(i * 100)); + ASSERT_EQ(files_info[i].largest_key, Key((i + 1) * 100 - 1)); + } + files.push_back(sst_files_folder + "file" + std::to_string(n) + ".sst"); + auto s = db_->AddFile(files); + ASSERT_NOK(s) << s.ToString(); + for (size_t k = 0; k < n * 100; k++) { + ASSERT_EQ("NOT_FOUND", Get(Key(k))); + } + s = db_->AddFile(files_info); + ASSERT_OK(s); + for (size_t k = 0; k < n * 100; k++) { + std::string value = Key(k) + "_val"; + ASSERT_EQ(Get(Key(k)), value); + } + } while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction | + kSkipFIFOCompaction)); +} // This test reporduce a bug that can happen in some cases if the DB started // purging obsolete files when we are adding an external sst file. // This situation may result in deleting the file while it's being added. @@ -963,7 +1187,7 @@ TEST_F(DBSSTTest, AddExternalSstFilePurgeObsoleteFilesBug) { }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - s = db_->AddFile(sst_file_path); + s = db_->AddFile(std::vector(1, sst_file_path)); ASSERT_OK(s); for (int i = 0; i < 500; i++) { @@ -1025,17 +1249,19 @@ TEST_F(DBSSTTest, AddExternalSstFileNoCopy) { ASSERT_EQ(file3_info.num_entries, 15); ASSERT_EQ(file3_info.smallest_key, Key(110)); ASSERT_EQ(file3_info.largest_key, Key(124)); - - s = db_->AddFile(&file1_info, true /* move file */); + s = db_->AddFile(std::vector(1, file1_info), + true /* move file */); ASSERT_TRUE(s.ok()) << s.ToString(); ASSERT_EQ(Status::NotFound(), env_->FileExists(file1)); - s = db_->AddFile(&file2_info, false /* copy file */); + s = db_->AddFile(std::vector(1, file2_info), + false /* copy file */); ASSERT_TRUE(s.ok()) << s.ToString(); ASSERT_OK(env_->FileExists(file2)); // This file have overlapping values with the exisitng data - s = db_->AddFile(&file3_info, true /* move file */); + s = db_->AddFile(std::vector(1, file2_info), + true /* move file */); ASSERT_FALSE(s.ok()) << s.ToString(); ASSERT_OK(env_->FileExists(file3)); @@ -1101,7 +1327,8 @@ TEST_F(DBSSTTest, AddExternalSstFileMultiThreaded) { // sometimes we use copy, sometimes link .. the result should be the same bool move_file = (thread_id % 3 == 0); - Status s = db_->AddFile(file_names[file_idx], move_file); + Status s = db_->AddFile(std::vector(1, file_names[file_idx]), + move_file); if (s.ok()) { files_added++; } @@ -1198,7 +1425,7 @@ TEST_F(DBSSTTest, AddExternalSstFileOverlappingRanges) { ASSERT_OK(s); // Insert the generated file - s = db_->AddFile(&file_info); + s = db_->AddFile(std::vector(1, file_info)); auto it = true_data.lower_bound(Key(range_start)); if (it != true_data.end() && it->first <= Key(range_end)) { diff --git a/db/db_test.cc b/db/db_test.cc index d40a5a8c4..f54a1bd05 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2642,12 +2642,12 @@ class ModelDB : public DB { #ifndef ROCKSDB_LITE using DB::AddFile; virtual Status AddFile(ColumnFamilyHandle* column_family, - const ExternalSstFileInfo* file_path, + const std::vector& file_info_list, bool move_file) override { return Status::NotSupported("Not implemented."); } virtual Status AddFile(ColumnFamilyHandle* column_family, - const std::string& file_path, + const std::vector& file_path_list, bool move_file) override { return Status::NotSupported("Not implemented."); } diff --git a/db/db_test_util.cc b/db/db_test_util.cc index be31ffba2..cb76a36f9 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -1098,7 +1098,7 @@ Status DBTestBase::GenerateAndAddExternalFile(const Options options, s = sst_file_writer.Finish(); if (s.ok()) { - s = db_->AddFile(file_path); + s = db_->AddFile(std::vector(1, file_path)); } return s; diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index e644303b0..8a703232f 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -12,15 +12,16 @@ #include #include #include -#include #include #include +#include #include "rocksdb/immutable_options.h" #include "rocksdb/iterator.h" #include "rocksdb/listener.h" #include "rocksdb/metadata.h" #include "rocksdb/options.h" #include "rocksdb/snapshot.h" +#include "rocksdb/sst_file_writer.h" #include "rocksdb/thread_status.h" #include "rocksdb/transaction_log.h" #include "rocksdb/types.h" @@ -580,14 +581,14 @@ class DB { } #if defined(__GNUC__) || defined(__clang__) - __attribute__((deprecated)) + __attribute__((__deprecated__)) #elif _WIN32 __declspec(deprecated) #endif - virtual Status - CompactRange(ColumnFamilyHandle* column_family, const Slice* begin, - const Slice* end, bool change_level = false, - int target_level = -1, uint32_t target_path_id = 0) { + virtual Status + CompactRange(ColumnFamilyHandle* column_family, const Slice* begin, + const Slice* end, bool change_level = false, + int target_level = -1, uint32_t target_path_id = 0) { CompactRangeOptions options; options.change_level = change_level; options.target_level = target_level; @@ -595,14 +596,13 @@ class DB { return CompactRange(options, column_family, begin, end); } #if defined(__GNUC__) || defined(__clang__) - __attribute__((deprecated)) + __attribute__((__deprecated__)) #elif _WIN32 __declspec(deprecated) #endif - virtual Status - CompactRange(const Slice* begin, const Slice* end, - bool change_level = false, int target_level = -1, - uint32_t target_path_id = 0) { + virtual Status + CompactRange(const Slice* begin, const Slice* end, bool change_level = false, + int target_level = -1, uint32_t target_path_id = 0) { CompactRangeOptions options; options.change_level = change_level; options.target_level = target_level; @@ -792,32 +792,77 @@ class DB { GetColumnFamilyMetaData(DefaultColumnFamily(), metadata); } - // Load table file located at "file_path" into "column_family", a pointer to - // ExternalSstFileInfo can be used instead of "file_path" to do a blind add - // that wont need to read the file, move_file can be set to true to - // move the file instead of copying it. + // Batch load table files whose paths stored in "file_path_list" into + // "column_family", a vector of ExternalSstFileInfo can be used + // instead of "file_path_list" to do a blind batch add that wont + // need to read the file, move_file can be set to true to + // move the files instead of copying them. // // Current Requirements: - // (1) Key range in loaded table file don't overlap with + // (1) The key ranges of the files don't overlap with each other + // (1) The key range of any file in list doesn't overlap with // existing keys or tombstones in DB. // (2) No snapshots are held. // - // Notes: We will try to ingest the file to the lowest possible level + // Notes: We will try to ingest the files to the lowest possible level // even if the file compression dont match the level compression virtual Status AddFile(ColumnFamilyHandle* column_family, - const std::string& file_path, + const std::vector& file_path_list, bool move_file = false) = 0; - virtual Status AddFile(const std::string& file_path, bool move_file = false) { - return AddFile(DefaultColumnFamily(), file_path, move_file); + virtual Status AddFile(const std::vector& file_path_list, + bool move_file = false) { + return AddFile(DefaultColumnFamily(), file_path_list, move_file); + } +#if defined(__GNUC__) || defined(__clang__) + __attribute__((__deprecated__)) +#elif _WIN32 + __declspec(deprecated) +#endif + virtual Status + AddFile(ColumnFamilyHandle* column_family, const std::string& file_path, + bool move_file = false) { + return AddFile(column_family, std::vector(1, file_path), + move_file); + } +#if defined(__GNUC__) || defined(__clang__) + __attribute__((__deprecated__)) +#elif _WIN32 + __declspec(deprecated) +#endif + virtual Status + AddFile(const std::string& file_path, bool move_file = false) { + return AddFile(DefaultColumnFamily(), + std::vector(1, file_path), move_file); } // Load table file with information "file_info" into "column_family" virtual Status AddFile(ColumnFamilyHandle* column_family, - const ExternalSstFileInfo* file_info, + const std::vector& file_info_list, bool move_file = false) = 0; - virtual Status AddFile(const ExternalSstFileInfo* file_info, + virtual Status AddFile(const std::vector& file_info_list, bool move_file = false) { - return AddFile(DefaultColumnFamily(), file_info, move_file); + return AddFile(DefaultColumnFamily(), file_info_list, move_file); + } +#if defined(__GNUC__) || defined(__clang__) + __attribute__((__deprecated__)) +#elif _WIN32 + __declspec(deprecated) +#endif + virtual Status + AddFile(ColumnFamilyHandle* column_family, + const ExternalSstFileInfo* file_info, bool move_file = false) { + return AddFile(column_family, + std::vector(1, *file_info), move_file); + } +#if defined(__GNUC__) || defined(__clang__) + __attribute__((__deprecated__)) +#elif _WIN32 + __declspec(deprecated) +#endif + virtual Status + AddFile(const ExternalSstFileInfo* file_info, bool move_file = false) { + return AddFile(DefaultColumnFamily(), + std::vector(1, *file_info), move_file); } #endif // ROCKSDB_LITE diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 542f6c654..32eb1ed84 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -65,14 +65,14 @@ class StackableDB : public DB { using DB::AddFile; virtual Status AddFile(ColumnFamilyHandle* column_family, - const ExternalSstFileInfo* file_info, + const std::vector& file_info_list, bool move_file) override { - return db_->AddFile(column_family, file_info, move_file); + return db_->AddFile(column_family, file_info_list, move_file); } virtual Status AddFile(ColumnFamilyHandle* column_family, - const std::string& file_path, + const std::vector& file_path_list, bool move_file) override { - return db_->AddFile(column_family, file_path, move_file); + return db_->AddFile(column_family, file_path_list, move_file); } using DB::KeyMayExist;