Allow user to specify a CF for SST files generated by SstFileWriter
Summary: Allow user to explicitly specify that the generated file by SstFileWriter will be ingested in a specific CF. This allow us to persist the CF id in the generated file Closes https://github.com/facebook/rocksdb/pull/1615 Differential Revision: D4270422 Pulled By: IslamAbdelRahman fbshipit-source-id: 7fb954e
This commit is contained in:
parent
314828c973
commit
7768975517
@ -38,6 +38,15 @@ Status ExternalSstFileIngestionJob::Prepare(
|
||||
files_to_ingest_.push_back(file_to_ingest);
|
||||
}
|
||||
|
||||
for (const IngestedFileInfo& f : files_to_ingest_) {
|
||||
if (f.cf_id !=
|
||||
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily &&
|
||||
f.cf_id != cfd_->GetID()) {
|
||||
return Status::InvalidArgument(
|
||||
"External file column family id dont match");
|
||||
}
|
||||
}
|
||||
|
||||
const Comparator* ucmp = cfd_->internal_comparator().user_comparator();
|
||||
auto num_files = files_to_ingest_.size();
|
||||
if (num_files == 0) {
|
||||
@ -325,6 +334,8 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
|
||||
}
|
||||
file_to_ingest->largest_user_key = key.user_key.ToString();
|
||||
|
||||
file_to_ingest->cf_id = static_cast<uint32_t>(props->column_family_id);
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
|
@ -36,6 +36,8 @@ struct IngestedFileInfo {
|
||||
uint64_t file_size;
|
||||
// total number of keys in external file
|
||||
uint64_t num_entries;
|
||||
// Id of column family this file shoule be ingested into
|
||||
uint32_t cf_id;
|
||||
// Version of external file
|
||||
int version;
|
||||
|
||||
|
@ -1780,6 +1780,61 @@ TEST_F(ExternalSSTFileTest, DirtyExit) {
|
||||
ASSERT_NOK(sst_file_writer->Finish());
|
||||
}
|
||||
|
||||
TEST_F(ExternalSSTFileTest, FileWithCFInfo) {
|
||||
Options options = CurrentOptions();
|
||||
CreateAndReopenWithCF({"koko", "toto"}, options);
|
||||
|
||||
SstFileWriter sfw_default(EnvOptions(), options, options.comparator,
|
||||
handles_[0]);
|
||||
SstFileWriter sfw_cf1(EnvOptions(), options, options.comparator, handles_[1]);
|
||||
SstFileWriter sfw_cf2(EnvOptions(), options, options.comparator, handles_[2]);
|
||||
SstFileWriter sfw_unknown(EnvOptions(), options, options.comparator);
|
||||
|
||||
// default_cf.sst
|
||||
const std::string cf_default_sst = sst_files_dir_ + "/default_cf.sst";
|
||||
ASSERT_OK(sfw_default.Open(cf_default_sst));
|
||||
ASSERT_OK(sfw_default.Add("K1", "V1"));
|
||||
ASSERT_OK(sfw_default.Add("K2", "V2"));
|
||||
ASSERT_OK(sfw_default.Finish());
|
||||
|
||||
// cf1.sst
|
||||
const std::string cf1_sst = sst_files_dir_ + "/cf1.sst";
|
||||
ASSERT_OK(sfw_cf1.Open(cf1_sst));
|
||||
ASSERT_OK(sfw_cf1.Add("K3", "V1"));
|
||||
ASSERT_OK(sfw_cf1.Add("K4", "V2"));
|
||||
ASSERT_OK(sfw_cf1.Finish());
|
||||
|
||||
// cf_unknown.sst
|
||||
const std::string unknown_sst = sst_files_dir_ + "/cf_unknown.sst";
|
||||
ASSERT_OK(sfw_unknown.Open(unknown_sst));
|
||||
ASSERT_OK(sfw_unknown.Add("K5", "V1"));
|
||||
ASSERT_OK(sfw_unknown.Add("K6", "V2"));
|
||||
ASSERT_OK(sfw_unknown.Finish());
|
||||
|
||||
IngestExternalFileOptions ifo;
|
||||
|
||||
// SST CF dont match
|
||||
ASSERT_NOK(db_->IngestExternalFile(handles_[0], {cf1_sst}, ifo));
|
||||
// SST CF dont match
|
||||
ASSERT_NOK(db_->IngestExternalFile(handles_[2], {cf1_sst}, ifo));
|
||||
// SST CF match
|
||||
ASSERT_OK(db_->IngestExternalFile(handles_[1], {cf1_sst}, ifo));
|
||||
|
||||
// SST CF dont match
|
||||
ASSERT_NOK(db_->IngestExternalFile(handles_[1], {cf_default_sst}, ifo));
|
||||
// SST CF dont match
|
||||
ASSERT_NOK(db_->IngestExternalFile(handles_[2], {cf_default_sst}, ifo));
|
||||
// SST CF match
|
||||
ASSERT_OK(db_->IngestExternalFile(handles_[0], {cf_default_sst}, ifo));
|
||||
|
||||
// SST CF unknown
|
||||
ASSERT_OK(db_->IngestExternalFile(handles_[1], {unknown_sst}, ifo));
|
||||
// SST CF unknown
|
||||
ASSERT_OK(db_->IngestExternalFile(handles_[2], {unknown_sst}, ifo));
|
||||
// SST CF unknown
|
||||
ASSERT_OK(db_->IngestExternalFile(handles_[0], {unknown_sst}, ifo));
|
||||
}
|
||||
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <string>
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/options.h"
|
||||
#include "rocksdb/table_properties.h"
|
||||
#include "rocksdb/types.h"
|
||||
|
||||
namespace rocksdb {
|
||||
@ -43,8 +44,12 @@ struct ExternalSstFileInfo {
|
||||
// All keys in files generated by SstFileWriter will have sequence number = 0
|
||||
class SstFileWriter {
|
||||
public:
|
||||
// User can pass `column_family` to specify that the the generated file will
|
||||
// be ingested into this column_family, note that passing nullptr means that
|
||||
// the column_family is unknown.
|
||||
SstFileWriter(const EnvOptions& env_options, const Options& options,
|
||||
const Comparator* user_comparator);
|
||||
const Comparator* user_comparator,
|
||||
ColumnFamilyHandle* column_family = nullptr);
|
||||
|
||||
~SstFileWriter();
|
||||
|
||||
|
@ -21,11 +21,12 @@ const std::string ExternalSstFilePropertyNames::kGlobalSeqno =
|
||||
|
||||
struct SstFileWriter::Rep {
|
||||
Rep(const EnvOptions& _env_options, const Options& options,
|
||||
const Comparator* _user_comparator)
|
||||
const Comparator* _user_comparator, ColumnFamilyHandle* _cfh)
|
||||
: env_options(_env_options),
|
||||
ioptions(options),
|
||||
mutable_cf_options(options),
|
||||
internal_comparator(_user_comparator) {}
|
||||
internal_comparator(_user_comparator),
|
||||
cfh(_cfh) {}
|
||||
|
||||
std::unique_ptr<WritableFileWriter> file_writer;
|
||||
std::unique_ptr<TableBuilder> builder;
|
||||
@ -34,14 +35,16 @@ struct SstFileWriter::Rep {
|
||||
MutableCFOptions mutable_cf_options;
|
||||
InternalKeyComparator internal_comparator;
|
||||
ExternalSstFileInfo file_info;
|
||||
std::string column_family_name;
|
||||
InternalKey ikey;
|
||||
std::string column_family_name;
|
||||
ColumnFamilyHandle* cfh;
|
||||
};
|
||||
|
||||
SstFileWriter::SstFileWriter(const EnvOptions& env_options,
|
||||
const Options& options,
|
||||
const Comparator* user_comparator)
|
||||
: rep_(new Rep(env_options, options, user_comparator)) {}
|
||||
const Comparator* user_comparator,
|
||||
ColumnFamilyHandle* column_family)
|
||||
: rep_(new Rep(env_options, options, user_comparator, column_family)) {}
|
||||
|
||||
SstFileWriter::~SstFileWriter() {
|
||||
if (rep_->builder) {
|
||||
@ -89,6 +92,18 @@ Status SstFileWriter::Open(const std::string& file_path) {
|
||||
user_collector_factories[i]));
|
||||
}
|
||||
int unknown_level = -1;
|
||||
uint32_t cf_id;
|
||||
|
||||
if (r->cfh != nullptr) {
|
||||
// user explicitly specified that this file will be ingested into cfh,
|
||||
// we can persist this information in the file.
|
||||
cf_id = r->cfh->GetID();
|
||||
r->column_family_name = r->cfh->GetName();
|
||||
} else {
|
||||
r->column_family_name = "";
|
||||
cf_id = TablePropertiesCollectorFactory::Context::kUnknownColumnFamily;
|
||||
}
|
||||
|
||||
TableBuilderOptions table_builder_options(
|
||||
r->ioptions, r->internal_comparator, &int_tbl_prop_collector_factories,
|
||||
compression_type, r->ioptions.compression_opts,
|
||||
@ -100,9 +115,7 @@ Status SstFileWriter::Open(const std::string& file_path) {
|
||||
// TODO(tec) : If table_factory is using compressed block cache, we will
|
||||
// be adding the external sst file blocks into it, which is wasteful.
|
||||
r->builder.reset(r->ioptions.table_factory->NewTableBuilder(
|
||||
table_builder_options,
|
||||
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
|
||||
r->file_writer.get()));
|
||||
table_builder_options, cf_id, r->file_writer.get()));
|
||||
|
||||
r->file_info.file_path = file_path;
|
||||
r->file_info.file_size = 0;
|
||||
|
Loading…
Reference in New Issue
Block a user