diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index a2dbad026..fec77ff3f 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -38,6 +38,15 @@ Status ExternalSstFileIngestionJob::Prepare( files_to_ingest_.push_back(file_to_ingest); } + for (const IngestedFileInfo& f : files_to_ingest_) { + if (f.cf_id != + TablePropertiesCollectorFactory::Context::kUnknownColumnFamily && + f.cf_id != cfd_->GetID()) { + return Status::InvalidArgument( + "External file column family id dont match"); + } + } + const Comparator* ucmp = cfd_->internal_comparator().user_comparator(); auto num_files = files_to_ingest_.size(); if (num_files == 0) { @@ -325,6 +334,8 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo( } file_to_ingest->largest_user_key = key.user_key.ToString(); + file_to_ingest->cf_id = static_cast(props->column_family_id); + return status; } diff --git a/db/external_sst_file_ingestion_job.h b/db/external_sst_file_ingestion_job.h index 8e65ae3b2..954dcb98b 100644 --- a/db/external_sst_file_ingestion_job.h +++ b/db/external_sst_file_ingestion_job.h @@ -36,6 +36,8 @@ struct IngestedFileInfo { uint64_t file_size; // total number of keys in external file uint64_t num_entries; + // Id of column family this file shoule be ingested into + uint32_t cf_id; // Version of external file int version; diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index 8295c2d8d..3120d43da 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -1780,6 +1780,61 @@ TEST_F(ExternalSSTFileTest, DirtyExit) { ASSERT_NOK(sst_file_writer->Finish()); } +TEST_F(ExternalSSTFileTest, FileWithCFInfo) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"koko", "toto"}, options); + + SstFileWriter sfw_default(EnvOptions(), options, options.comparator, + handles_[0]); + SstFileWriter sfw_cf1(EnvOptions(), options, options.comparator, handles_[1]); + SstFileWriter sfw_cf2(EnvOptions(), options, options.comparator, handles_[2]); + SstFileWriter sfw_unknown(EnvOptions(), options, options.comparator); + + // default_cf.sst + const std::string cf_default_sst = sst_files_dir_ + "/default_cf.sst"; + ASSERT_OK(sfw_default.Open(cf_default_sst)); + ASSERT_OK(sfw_default.Add("K1", "V1")); + ASSERT_OK(sfw_default.Add("K2", "V2")); + ASSERT_OK(sfw_default.Finish()); + + // cf1.sst + const std::string cf1_sst = sst_files_dir_ + "/cf1.sst"; + ASSERT_OK(sfw_cf1.Open(cf1_sst)); + ASSERT_OK(sfw_cf1.Add("K3", "V1")); + ASSERT_OK(sfw_cf1.Add("K4", "V2")); + ASSERT_OK(sfw_cf1.Finish()); + + // cf_unknown.sst + const std::string unknown_sst = sst_files_dir_ + "/cf_unknown.sst"; + ASSERT_OK(sfw_unknown.Open(unknown_sst)); + ASSERT_OK(sfw_unknown.Add("K5", "V1")); + ASSERT_OK(sfw_unknown.Add("K6", "V2")); + ASSERT_OK(sfw_unknown.Finish()); + + IngestExternalFileOptions ifo; + + // SST CF dont match + ASSERT_NOK(db_->IngestExternalFile(handles_[0], {cf1_sst}, ifo)); + // SST CF dont match + ASSERT_NOK(db_->IngestExternalFile(handles_[2], {cf1_sst}, ifo)); + // SST CF match + ASSERT_OK(db_->IngestExternalFile(handles_[1], {cf1_sst}, ifo)); + + // SST CF dont match + ASSERT_NOK(db_->IngestExternalFile(handles_[1], {cf_default_sst}, ifo)); + // SST CF dont match + ASSERT_NOK(db_->IngestExternalFile(handles_[2], {cf_default_sst}, ifo)); + // SST CF match + ASSERT_OK(db_->IngestExternalFile(handles_[0], {cf_default_sst}, ifo)); + + // SST CF unknown + ASSERT_OK(db_->IngestExternalFile(handles_[1], {unknown_sst}, ifo)); + // SST CF unknown + ASSERT_OK(db_->IngestExternalFile(handles_[2], {unknown_sst}, ifo)); + // SST CF unknown + ASSERT_OK(db_->IngestExternalFile(handles_[0], {unknown_sst}, ifo)); +} + #endif // ROCKSDB_LITE } // namespace rocksdb diff --git a/include/rocksdb/sst_file_writer.h b/include/rocksdb/sst_file_writer.h index ace372545..8cd1ceea5 100644 --- a/include/rocksdb/sst_file_writer.h +++ b/include/rocksdb/sst_file_writer.h @@ -7,6 +7,7 @@ #include #include "rocksdb/env.h" #include "rocksdb/options.h" +#include "rocksdb/table_properties.h" #include "rocksdb/types.h" namespace rocksdb { @@ -43,8 +44,12 @@ struct ExternalSstFileInfo { // All keys in files generated by SstFileWriter will have sequence number = 0 class SstFileWriter { public: + // User can pass `column_family` to specify that the the generated file will + // be ingested into this column_family, note that passing nullptr means that + // the column_family is unknown. SstFileWriter(const EnvOptions& env_options, const Options& options, - const Comparator* user_comparator); + const Comparator* user_comparator, + ColumnFamilyHandle* column_family = nullptr); ~SstFileWriter(); diff --git a/table/sst_file_writer.cc b/table/sst_file_writer.cc index eca52c523..74feb1a30 100644 --- a/table/sst_file_writer.cc +++ b/table/sst_file_writer.cc @@ -21,11 +21,12 @@ const std::string ExternalSstFilePropertyNames::kGlobalSeqno = struct SstFileWriter::Rep { Rep(const EnvOptions& _env_options, const Options& options, - const Comparator* _user_comparator) + const Comparator* _user_comparator, ColumnFamilyHandle* _cfh) : env_options(_env_options), ioptions(options), mutable_cf_options(options), - internal_comparator(_user_comparator) {} + internal_comparator(_user_comparator), + cfh(_cfh) {} std::unique_ptr file_writer; std::unique_ptr builder; @@ -34,14 +35,16 @@ struct SstFileWriter::Rep { MutableCFOptions mutable_cf_options; InternalKeyComparator internal_comparator; ExternalSstFileInfo file_info; - std::string column_family_name; InternalKey ikey; + std::string column_family_name; + ColumnFamilyHandle* cfh; }; SstFileWriter::SstFileWriter(const EnvOptions& env_options, const Options& options, - const Comparator* user_comparator) - : rep_(new Rep(env_options, options, user_comparator)) {} + const Comparator* user_comparator, + ColumnFamilyHandle* column_family) + : rep_(new Rep(env_options, options, user_comparator, column_family)) {} SstFileWriter::~SstFileWriter() { if (rep_->builder) { @@ -89,6 +92,18 @@ Status SstFileWriter::Open(const std::string& file_path) { user_collector_factories[i])); } int unknown_level = -1; + uint32_t cf_id; + + if (r->cfh != nullptr) { + // user explicitly specified that this file will be ingested into cfh, + // we can persist this information in the file. + cf_id = r->cfh->GetID(); + r->column_family_name = r->cfh->GetName(); + } else { + r->column_family_name = ""; + cf_id = TablePropertiesCollectorFactory::Context::kUnknownColumnFamily; + } + TableBuilderOptions table_builder_options( r->ioptions, r->internal_comparator, &int_tbl_prop_collector_factories, compression_type, r->ioptions.compression_opts, @@ -100,9 +115,7 @@ Status SstFileWriter::Open(const std::string& file_path) { // TODO(tec) : If table_factory is using compressed block cache, we will // be adding the external sst file blocks into it, which is wasteful. r->builder.reset(r->ioptions.table_factory->NewTableBuilder( - table_builder_options, - TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, - r->file_writer.get())); + table_builder_options, cf_id, r->file_writer.get())); r->file_info.file_path = file_path; r->file_info.file_size = 0;