support merge and delete in file ingestion
Summary: Previously sst_file_writer only supports kTypeValue, we need kTypeMerge and kTypeDeletion also as user requested. Closes https://github.com/facebook/rocksdb/pull/2361 Differential Revision: D5139402 Pulled By: lightmark fbshipit-source-id: 092a60756d01692539d817a3765ebfd58a8d7f88
This commit is contained in:
parent
c2c62ad4e6
commit
f7bb1a0060
21
db/c.cc
21
db/c.cc
@ -2852,7 +2852,26 @@ void rocksdb_sstfilewriter_open(rocksdb_sstfilewriter_t* writer,
|
||||
void rocksdb_sstfilewriter_add(rocksdb_sstfilewriter_t* writer, const char* key,
|
||||
size_t keylen, const char* val, size_t vallen,
|
||||
char** errptr) {
|
||||
SaveError(errptr, writer->rep->Add(Slice(key, keylen), Slice(val, vallen)));
|
||||
SaveError(errptr, writer->rep->Put(Slice(key, keylen), Slice(val, vallen)));
|
||||
}
|
||||
|
||||
void rocksdb_sstfilewriter_put(rocksdb_sstfilewriter_t* writer, const char* key,
|
||||
size_t keylen, const char* val, size_t vallen,
|
||||
char** errptr) {
|
||||
SaveError(errptr, writer->rep->Put(Slice(key, keylen), Slice(val, vallen)));
|
||||
}
|
||||
|
||||
void rocksdb_sstfilewriter_merge(rocksdb_sstfilewriter_t* writer,
|
||||
const char* key, size_t keylen,
|
||||
const char* val, size_t vallen,
|
||||
char** errptr) {
|
||||
SaveError(errptr, writer->rep->Merge(Slice(key, keylen), Slice(val, vallen)));
|
||||
}
|
||||
|
||||
void rocksdb_sstfilewriter_delete(rocksdb_sstfilewriter_t* writer,
|
||||
const char* key, size_t keylen,
|
||||
char** errptr) {
|
||||
SaveError(errptr, writer->rep->Delete(Slice(key, keylen)));
|
||||
}
|
||||
|
||||
void rocksdb_sstfilewriter_finish(rocksdb_sstfilewriter_t* writer,
|
||||
|
12
db/c_test.c
12
db/c_test.c
@ -569,11 +569,11 @@ int main(int argc, char** argv) {
|
||||
unlink(sstfilename);
|
||||
rocksdb_sstfilewriter_open(writer, sstfilename, &err);
|
||||
CheckNoError(err);
|
||||
rocksdb_sstfilewriter_add(writer, "sstk1", 5, "v1", 2, &err);
|
||||
rocksdb_sstfilewriter_put(writer, "sstk1", 5, "v1", 2, &err);
|
||||
CheckNoError(err);
|
||||
rocksdb_sstfilewriter_add(writer, "sstk2", 5, "v2", 2, &err);
|
||||
rocksdb_sstfilewriter_put(writer, "sstk2", 5, "v2", 2, &err);
|
||||
CheckNoError(err);
|
||||
rocksdb_sstfilewriter_add(writer, "sstk3", 5, "v3", 2, &err);
|
||||
rocksdb_sstfilewriter_put(writer, "sstk3", 5, "v3", 2, &err);
|
||||
CheckNoError(err);
|
||||
rocksdb_sstfilewriter_finish(writer, &err);
|
||||
CheckNoError(err);
|
||||
@ -590,11 +590,11 @@ int main(int argc, char** argv) {
|
||||
unlink(sstfilename);
|
||||
rocksdb_sstfilewriter_open(writer, sstfilename, &err);
|
||||
CheckNoError(err);
|
||||
rocksdb_sstfilewriter_add(writer, "sstk2", 5, "v4", 2, &err);
|
||||
rocksdb_sstfilewriter_put(writer, "sstk2", 5, "v4", 2, &err);
|
||||
CheckNoError(err);
|
||||
rocksdb_sstfilewriter_add(writer, "sstk22", 6, "v5", 2, &err);
|
||||
rocksdb_sstfilewriter_put(writer, "sstk22", 6, "v5", 2, &err);
|
||||
CheckNoError(err);
|
||||
rocksdb_sstfilewriter_add(writer, "sstk3", 5, "v6", 2, &err);
|
||||
rocksdb_sstfilewriter_put(writer, "sstk3", 5, "v6", 2, &err);
|
||||
CheckNoError(err);
|
||||
rocksdb_sstfilewriter_finish(writer, &err);
|
||||
CheckNoError(err);
|
||||
|
@ -20,28 +20,6 @@ class DBMergeOperatorTest : public DBTestBase {
|
||||
DBMergeOperatorTest() : DBTestBase("/db_merge_operator_test") {}
|
||||
};
|
||||
|
||||
// A test merge operator mimics put but also fails if one of merge operands is
|
||||
// "corrupted".
|
||||
class TestPutOperator : public MergeOperator {
|
||||
public:
|
||||
virtual bool FullMergeV2(const MergeOperationInput& merge_in,
|
||||
MergeOperationOutput* merge_out) const override {
|
||||
if (merge_in.existing_value != nullptr &&
|
||||
*(merge_in.existing_value) == "corrupted") {
|
||||
return false;
|
||||
}
|
||||
for (auto value : merge_in.operand_list) {
|
||||
if (value == "corrupted") {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
merge_out->existing_operand = merge_in.operand_list.back();
|
||||
return true;
|
||||
}
|
||||
|
||||
virtual const char* Name() const override { return "TestPutOperator"; }
|
||||
};
|
||||
|
||||
TEST_F(DBMergeOperatorTest, MergeErrorOnRead) {
|
||||
Options options;
|
||||
options.create_if_missing = true;
|
||||
|
@ -586,6 +586,28 @@ class OnFileDeletionListener : public EventListener {
|
||||
};
|
||||
#endif
|
||||
|
||||
// A test merge operator mimics put but also fails if one of merge operands is
|
||||
// "corrupted".
|
||||
class TestPutOperator : public MergeOperator {
|
||||
public:
|
||||
virtual bool FullMergeV2(const MergeOperationInput& merge_in,
|
||||
MergeOperationOutput* merge_out) const override {
|
||||
if (merge_in.existing_value != nullptr &&
|
||||
*(merge_in.existing_value) == "corrupted") {
|
||||
return false;
|
||||
}
|
||||
for (auto value : merge_in.operand_list) {
|
||||
if (value == "corrupted") {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
merge_out->existing_operand = merge_in.operand_list.back();
|
||||
return true;
|
||||
}
|
||||
|
||||
virtual const char* Name() const override { return "TestPutOperator"; }
|
||||
};
|
||||
|
||||
class DBTestBase : public testing::Test {
|
||||
protected:
|
||||
// Sequence of option configurations to try
|
||||
|
@ -40,8 +40,10 @@ class ExternalSSTFileBasicTest : public DBTestBase {
|
||||
}
|
||||
|
||||
Status GenerateAndAddExternalFile(
|
||||
const Options options, std::vector<int> keys, int file_id,
|
||||
const Options options, std::vector<int> keys,
|
||||
const std::vector<ValueType>& value_types, int file_id,
|
||||
std::map<std::string, std::string>* true_data) {
|
||||
assert(value_types.size() == 1 || keys.size() == value_types.size());
|
||||
std::string file_path = sst_files_dir_ + ToString(file_id);
|
||||
SstFileWriter sst_file_writer(EnvOptions(), options);
|
||||
|
||||
@ -49,11 +51,28 @@ class ExternalSSTFileBasicTest : public DBTestBase {
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
for (int k : keys) {
|
||||
std::string key = Key(k);
|
||||
std::string value = Key(k) + ToString(file_id);
|
||||
s = sst_file_writer.Add(key, value);
|
||||
(*true_data)[key] = value;
|
||||
for (size_t i = 0; i < keys.size(); i++) {
|
||||
std::string key = Key(keys[i]);
|
||||
std::string value = Key(keys[i]) + ToString(file_id);
|
||||
ValueType value_type =
|
||||
(value_types.size() == 1 ? value_types[0] : value_types[i]);
|
||||
switch (value_type) {
|
||||
case ValueType::kTypeValue:
|
||||
s = sst_file_writer.Put(key, value);
|
||||
(*true_data)[key] = value;
|
||||
break;
|
||||
case ValueType::kTypeMerge:
|
||||
s = sst_file_writer.Merge(key, value);
|
||||
// we only use TestPutOperator in this test
|
||||
(*true_data)[key] = value;
|
||||
break;
|
||||
case ValueType::kTypeDeletion:
|
||||
s = sst_file_writer.Delete(key);
|
||||
true_data->erase(key);
|
||||
break;
|
||||
default:
|
||||
return Status::InvalidArgument("Value type is not supported");
|
||||
}
|
||||
if (!s.ok()) {
|
||||
sst_file_writer.Finish();
|
||||
return s;
|
||||
@ -66,10 +85,17 @@ class ExternalSSTFileBasicTest : public DBTestBase {
|
||||
ifo.allow_global_seqno = true;
|
||||
s = db_->IngestExternalFile({file_path}, ifo);
|
||||
}
|
||||
|
||||
return s;
|
||||
}
|
||||
|
||||
Status GenerateAndAddExternalFile(
|
||||
const Options options, std::vector<int> keys, const ValueType value_type,
|
||||
int file_id, std::map<std::string, std::string>* true_data) {
|
||||
return GenerateAndAddExternalFile(options, keys,
|
||||
std::vector<ValueType>(1, value_type),
|
||||
file_id, true_data);
|
||||
}
|
||||
|
||||
~ExternalSSTFileBasicTest() { test::DestroyDir(env_, sst_files_dir_); }
|
||||
|
||||
protected:
|
||||
@ -89,7 +115,7 @@ TEST_F(ExternalSSTFileBasicTest, Basic) {
|
||||
std::string file1 = sst_files_dir_ + "file1.sst";
|
||||
ASSERT_OK(sst_file_writer.Open(file1));
|
||||
for (int k = 0; k < 100; k++) {
|
||||
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
|
||||
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
|
||||
}
|
||||
ExternalSstFileInfo file1_info;
|
||||
Status s = sst_file_writer.Finish(&file1_info);
|
||||
@ -103,7 +129,7 @@ TEST_F(ExternalSSTFileBasicTest, Basic) {
|
||||
ASSERT_EQ(file1_info.smallest_key, Key(0));
|
||||
ASSERT_EQ(file1_info.largest_key, Key(99));
|
||||
// sst_file_writer already finished, cannot add this value
|
||||
s = sst_file_writer.Add(Key(100), "bad_val");
|
||||
s = sst_file_writer.Put(Key(100), "bad_val");
|
||||
ASSERT_FALSE(s.ok()) << s.ToString();
|
||||
|
||||
DestroyAndReopen(options);
|
||||
@ -128,7 +154,7 @@ TEST_F(ExternalSSTFileBasicTest, NoCopy) {
|
||||
std::string file1 = sst_files_dir_ + "file1.sst";
|
||||
ASSERT_OK(sst_file_writer.Open(file1));
|
||||
for (int k = 0; k < 100; k++) {
|
||||
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
|
||||
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
|
||||
}
|
||||
ExternalSstFileInfo file1_info;
|
||||
Status s = sst_file_writer.Finish(&file1_info);
|
||||
@ -142,7 +168,7 @@ TEST_F(ExternalSSTFileBasicTest, NoCopy) {
|
||||
std::string file2 = sst_files_dir_ + "file2.sst";
|
||||
ASSERT_OK(sst_file_writer.Open(file2));
|
||||
for (int k = 100; k < 300; k++) {
|
||||
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
|
||||
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
|
||||
}
|
||||
ExternalSstFileInfo file2_info;
|
||||
s = sst_file_writer.Finish(&file2_info);
|
||||
@ -156,7 +182,7 @@ TEST_F(ExternalSSTFileBasicTest, NoCopy) {
|
||||
std::string file3 = sst_files_dir_ + "file3.sst";
|
||||
ASSERT_OK(sst_file_writer.Open(file3));
|
||||
for (int k = 110; k < 125; k++) {
|
||||
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val_overlap"));
|
||||
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap"));
|
||||
}
|
||||
ExternalSstFileInfo file3_info;
|
||||
s = sst_file_writer.Finish(&file3_info);
|
||||
@ -191,33 +217,36 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithGlobalSeqnoPickedSeqno) {
|
||||
|
||||
int file_id = 1;
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2, 3, 4, 5, 6}, file_id++,
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2, 3, 4, 5, 6},
|
||||
ValueType::kTypeValue, file_id++,
|
||||
&true_data));
|
||||
// File dont overwrite any keys, No seqno needed
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {10, 11, 12, 13}, file_id++,
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {10, 11, 12, 13},
|
||||
ValueType::kTypeValue, file_id++,
|
||||
|
||||
&true_data));
|
||||
// File dont overwrite any keys, No seqno needed
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0);
|
||||
|
||||
ASSERT_OK(
|
||||
GenerateAndAddExternalFile(options, {1, 4, 6}, file_id++, &true_data));
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {1, 4, 6}, ValueType::kTypeValue, file_id++, &true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 1);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {11, 15, 19}, file_id++,
|
||||
&true_data));
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {11, 15, 19}, ValueType::kTypeValue, file_id++, &true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2);
|
||||
|
||||
ASSERT_OK(
|
||||
GenerateAndAddExternalFile(options, {120, 130}, file_id++, &true_data));
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {120, 130}, ValueType::kTypeValue, file_id++, &true_data));
|
||||
// File dont overwrite any keys, No seqno needed
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2);
|
||||
|
||||
ASSERT_OK(
|
||||
GenerateAndAddExternalFile(options, {1, 130}, file_id++, &true_data));
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {1, 130}, ValueType::kTypeValue, file_id++, &true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3);
|
||||
|
||||
@ -228,17 +257,115 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithGlobalSeqnoPickedSeqno) {
|
||||
}
|
||||
SequenceNumber last_seqno = dbfull()->GetLatestSequenceNumber();
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {60, 61, 62}, file_id++,
|
||||
&true_data));
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {60, 61, 62}, ValueType::kTypeValue, file_id++, &true_data));
|
||||
// File dont overwrite any keys, No seqno needed
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {40, 41, 42}, file_id++,
|
||||
&true_data));
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {40, 41, 42}, ValueType::kTypeValue, file_id++, &true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 1);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {20, 30, 40}, file_id++,
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {20, 30, 40}, ValueType::kTypeValue, file_id++, &true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 2);
|
||||
|
||||
const Snapshot* snapshot = db_->GetSnapshot();
|
||||
|
||||
// We will need a seqno for the file regardless if the file overwrite
|
||||
// keys in the DB or not because we have a snapshot
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {1000, 1002}, ValueType::kTypeValue, file_id++, &true_data));
|
||||
// A global seqno will be assigned anyway because of the snapshot
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 3);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {2000, 3002}, ValueType::kTypeValue, file_id++, &true_data));
|
||||
// A global seqno will be assigned anyway because of the snapshot
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 4);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {1, 20, 40, 100, 150},
|
||||
ValueType::kTypeValue, file_id++,
|
||||
&true_data));
|
||||
// A global seqno will be assigned anyway because of the snapshot
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5);
|
||||
|
||||
db_->ReleaseSnapshot(snapshot);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {5000, 5001}, ValueType::kTypeValue, file_id++, &true_data));
|
||||
// No snapshot anymore, no need to assign a seqno
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5);
|
||||
|
||||
size_t kcnt = 0;
|
||||
VerifyDBFromMap(true_data, &kcnt, false);
|
||||
} while (ChangeCompactOptions());
|
||||
}
|
||||
|
||||
TEST_F(ExternalSSTFileBasicTest, IngestFileWithMultipleValueType) {
|
||||
do {
|
||||
Options options = CurrentOptions();
|
||||
options.merge_operator.reset(new TestPutOperator());
|
||||
DestroyAndReopen(options);
|
||||
std::map<std::string, std::string> true_data;
|
||||
|
||||
int file_id = 1;
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2, 3, 4, 5, 6},
|
||||
ValueType::kTypeValue, file_id++,
|
||||
&true_data));
|
||||
// File dont overwrite any keys, No seqno needed
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {10, 11, 12, 13},
|
||||
ValueType::kTypeValue, file_id++,
|
||||
|
||||
&true_data));
|
||||
// File dont overwrite any keys, No seqno needed
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {1, 4, 6}, ValueType::kTypeMerge, file_id++, &true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 1);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {11, 15, 19},
|
||||
ValueType::kTypeDeletion, file_id++,
|
||||
&true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {120, 130}, ValueType::kTypeMerge, file_id++, &true_data));
|
||||
// File dont overwrite any keys, No seqno needed
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {1, 130}, ValueType::kTypeDeletion, file_id++, &true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3);
|
||||
|
||||
// Write some keys through normal write path
|
||||
for (int i = 0; i < 50; i++) {
|
||||
ASSERT_OK(Put(Key(i), "memtable"));
|
||||
true_data[Key(i)] = "memtable";
|
||||
}
|
||||
SequenceNumber last_seqno = dbfull()->GetLatestSequenceNumber();
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {60, 61, 62}, ValueType::kTypeValue, file_id++, &true_data));
|
||||
// File dont overwrite any keys, No seqno needed
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {40, 41, 42}, ValueType::kTypeMerge, file_id++, &true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 1);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {20, 30, 40},
|
||||
ValueType::kTypeDeletion, file_id++,
|
||||
&true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 2);
|
||||
@ -247,25 +374,143 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithGlobalSeqnoPickedSeqno) {
|
||||
|
||||
// We will need a seqno for the file regardless if the file overwrite
|
||||
// keys in the DB or not because we have a snapshot
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {1000, 1002}, file_id++,
|
||||
&true_data));
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {1000, 1002}, ValueType::kTypeMerge, file_id++, &true_data));
|
||||
// A global seqno will be assigned anyway because of the snapshot
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 3);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {2000, 3002}, file_id++,
|
||||
&true_data));
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {2000, 3002}, ValueType::kTypeMerge, file_id++, &true_data));
|
||||
// A global seqno will be assigned anyway because of the snapshot
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 4);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {1, 20, 40, 100, 150},
|
||||
file_id++, &true_data));
|
||||
ValueType::kTypeMerge, file_id++,
|
||||
&true_data));
|
||||
// A global seqno will be assigned anyway because of the snapshot
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5);
|
||||
|
||||
db_->ReleaseSnapshot(snapshot);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {5000, 5001}, file_id++,
|
||||
&true_data));
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {5000, 5001}, ValueType::kTypeValue, file_id++, &true_data));
|
||||
// No snapshot anymore, no need to assign a seqno
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5);
|
||||
|
||||
size_t kcnt = 0;
|
||||
VerifyDBFromMap(true_data, &kcnt, false);
|
||||
} while (ChangeCompactOptions());
|
||||
}
|
||||
|
||||
TEST_F(ExternalSSTFileBasicTest, IngestFileWithMixedValueType) {
|
||||
do {
|
||||
Options options = CurrentOptions();
|
||||
options.merge_operator.reset(new TestPutOperator());
|
||||
DestroyAndReopen(options);
|
||||
std::map<std::string, std::string> true_data;
|
||||
|
||||
int file_id = 1;
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {1, 2, 3, 4, 5, 6},
|
||||
{ValueType::kTypeValue, ValueType::kTypeMerge, ValueType::kTypeValue,
|
||||
ValueType::kTypeMerge, ValueType::kTypeValue, ValueType::kTypeMerge},
|
||||
file_id++, &true_data));
|
||||
// File dont overwrite any keys, No seqno needed
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {10, 11, 12, 13},
|
||||
{ValueType::kTypeValue, ValueType::kTypeMerge, ValueType::kTypeValue,
|
||||
ValueType::kTypeMerge},
|
||||
file_id++, &true_data));
|
||||
// File dont overwrite any keys, No seqno needed
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {1, 4, 6}, {ValueType::kTypeDeletion, ValueType::kTypeValue,
|
||||
ValueType::kTypeMerge},
|
||||
file_id++, &true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 1);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {11, 15, 19}, {ValueType::kTypeDeletion, ValueType::kTypeMerge,
|
||||
ValueType::kTypeValue},
|
||||
file_id++, &true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {120, 130}, {ValueType::kTypeValue, ValueType::kTypeMerge},
|
||||
file_id++, &true_data));
|
||||
// File dont overwrite any keys, No seqno needed
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {1, 130}, {ValueType::kTypeMerge, ValueType::kTypeDeletion},
|
||||
file_id++, &true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3);
|
||||
|
||||
// Write some keys through normal write path
|
||||
for (int i = 0; i < 50; i++) {
|
||||
ASSERT_OK(Put(Key(i), "memtable"));
|
||||
true_data[Key(i)] = "memtable";
|
||||
}
|
||||
SequenceNumber last_seqno = dbfull()->GetLatestSequenceNumber();
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {60, 61, 62},
|
||||
{ValueType::kTypeValue, ValueType::kTypeMerge, ValueType::kTypeValue},
|
||||
file_id++, &true_data));
|
||||
// File dont overwrite any keys, No seqno needed
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {40, 41, 42}, {ValueType::kTypeValue, ValueType::kTypeDeletion,
|
||||
ValueType::kTypeDeletion},
|
||||
file_id++, &true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 1);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {20, 30, 40},
|
||||
{ValueType::kTypeDeletion, ValueType::kTypeDeletion,
|
||||
ValueType::kTypeDeletion},
|
||||
file_id++, &true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 2);
|
||||
|
||||
const Snapshot* snapshot = db_->GetSnapshot();
|
||||
|
||||
// We will need a seqno for the file regardless if the file overwrite
|
||||
// keys in the DB or not because we have a snapshot
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {1000, 1002}, {ValueType::kTypeValue, ValueType::kTypeMerge},
|
||||
file_id++, &true_data));
|
||||
// A global seqno will be assigned anyway because of the snapshot
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 3);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {2000, 3002}, {ValueType::kTypeValue, ValueType::kTypeMerge},
|
||||
file_id++, &true_data));
|
||||
// A global seqno will be assigned anyway because of the snapshot
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 4);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {1, 20, 40, 100, 150},
|
||||
{ValueType::kTypeDeletion, ValueType::kTypeDeletion,
|
||||
ValueType::kTypeValue, ValueType::kTypeMerge, ValueType::kTypeMerge},
|
||||
file_id++, &true_data));
|
||||
// A global seqno will be assigned anyway because of the snapshot
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5);
|
||||
|
||||
db_->ReleaseSnapshot(snapshot);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {5000, 5001}, {ValueType::kTypeValue, ValueType::kTypeMerge},
|
||||
file_id++, &true_data));
|
||||
// No snapshot anymore, no need to assign a seqno
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5);
|
||||
|
||||
@ -280,7 +525,7 @@ TEST_F(ExternalSSTFileBasicTest, FadviseTrigger) {
|
||||
|
||||
size_t total_fadvised_bytes = 0;
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"SstFileWriter::InvalidatePageCache", [&](void* arg) {
|
||||
"SstFileWriter::Rep::InvalidatePageCache", [&](void* arg) {
|
||||
size_t fadvise_size = *(reinterpret_cast<size_t*>(arg));
|
||||
total_fadvised_bytes += fadvise_size;
|
||||
});
|
||||
@ -293,19 +538,18 @@ TEST_F(ExternalSSTFileBasicTest, FadviseTrigger) {
|
||||
new SstFileWriter(EnvOptions(), options, nullptr, false));
|
||||
ASSERT_OK(sst_file_writer->Open(sst_file_path));
|
||||
for (int i = 0; i < kNumKeys; i++) {
|
||||
ASSERT_OK(sst_file_writer->Add(Key(i), Key(i)));
|
||||
ASSERT_OK(sst_file_writer->Put(Key(i), Key(i)));
|
||||
}
|
||||
ASSERT_OK(sst_file_writer->Finish());
|
||||
// fadvise disabled
|
||||
ASSERT_EQ(total_fadvised_bytes, 0);
|
||||
|
||||
|
||||
sst_file_path = sst_files_dir_ + "file_fadvise_enable.sst";
|
||||
sst_file_writer.reset(
|
||||
new SstFileWriter(EnvOptions(), options, nullptr, true));
|
||||
ASSERT_OK(sst_file_writer->Open(sst_file_path));
|
||||
for (int i = 0; i < kNumKeys; i++) {
|
||||
ASSERT_OK(sst_file_writer->Add(Key(i), Key(i)));
|
||||
ASSERT_OK(sst_file_writer->Put(Key(i), Key(i)));
|
||||
}
|
||||
ASSERT_OK(sst_file_writer->Finish());
|
||||
// fadvise enabled
|
||||
|
@ -63,7 +63,7 @@ class ExternalSSTFileTest : public DBTestBase {
|
||||
return s;
|
||||
}
|
||||
for (auto& entry : data) {
|
||||
s = sst_file_writer.Add(entry.first, entry.second);
|
||||
s = sst_file_writer.Put(entry.first, entry.second);
|
||||
if (!s.ok()) {
|
||||
sst_file_writer.Finish();
|
||||
return s;
|
||||
@ -125,7 +125,7 @@ class ExternalSSTFileTest : public DBTestBase {
|
||||
return s;
|
||||
}
|
||||
for (auto& entry : data) {
|
||||
s = sst_file_writer.Add(entry.first, entry.second);
|
||||
s = sst_file_writer.Put(entry.first, entry.second);
|
||||
if (!s.ok()) {
|
||||
sst_file_writer.Finish();
|
||||
return s;
|
||||
@ -211,7 +211,7 @@ TEST_F(ExternalSSTFileTest, Basic) {
|
||||
std::string file1 = sst_files_dir_ + "file1.sst";
|
||||
ASSERT_OK(sst_file_writer.Open(file1));
|
||||
for (int k = 0; k < 100; k++) {
|
||||
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
|
||||
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
|
||||
}
|
||||
ExternalSstFileInfo file1_info;
|
||||
Status s = sst_file_writer.Finish(&file1_info);
|
||||
@ -225,17 +225,17 @@ TEST_F(ExternalSSTFileTest, Basic) {
|
||||
ASSERT_EQ(file1_info.smallest_key, Key(0));
|
||||
ASSERT_EQ(file1_info.largest_key, Key(99));
|
||||
// sst_file_writer already finished, cannot add this value
|
||||
s = sst_file_writer.Add(Key(100), "bad_val");
|
||||
s = sst_file_writer.Put(Key(100), "bad_val");
|
||||
ASSERT_FALSE(s.ok()) << s.ToString();
|
||||
|
||||
// file2.sst (100 => 199)
|
||||
std::string file2 = sst_files_dir_ + "file2.sst";
|
||||
ASSERT_OK(sst_file_writer.Open(file2));
|
||||
for (int k = 100; k < 200; k++) {
|
||||
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
|
||||
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
|
||||
}
|
||||
// Cannot add this key because it's not after last added key
|
||||
s = sst_file_writer.Add(Key(99), "bad_val");
|
||||
s = sst_file_writer.Put(Key(99), "bad_val");
|
||||
ASSERT_FALSE(s.ok()) << s.ToString();
|
||||
ExternalSstFileInfo file2_info;
|
||||
s = sst_file_writer.Finish(&file2_info);
|
||||
@ -250,7 +250,7 @@ TEST_F(ExternalSSTFileTest, Basic) {
|
||||
std::string file3 = sst_files_dir_ + "file3.sst";
|
||||
ASSERT_OK(sst_file_writer.Open(file3));
|
||||
for (int k = 195; k < 300; k++) {
|
||||
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val_overlap"));
|
||||
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap"));
|
||||
}
|
||||
ExternalSstFileInfo file3_info;
|
||||
s = sst_file_writer.Finish(&file3_info);
|
||||
@ -268,7 +268,7 @@ TEST_F(ExternalSSTFileTest, Basic) {
|
||||
std::string file4 = sst_files_dir_ + "file4.sst";
|
||||
ASSERT_OK(sst_file_writer.Open(file4));
|
||||
for (int k = 30; k < 40; k++) {
|
||||
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val_overlap"));
|
||||
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap"));
|
||||
}
|
||||
ExternalSstFileInfo file4_info;
|
||||
s = sst_file_writer.Finish(&file4_info);
|
||||
@ -282,7 +282,7 @@ TEST_F(ExternalSSTFileTest, Basic) {
|
||||
std::string file5 = sst_files_dir_ + "file5.sst";
|
||||
ASSERT_OK(sst_file_writer.Open(file5));
|
||||
for (int k = 400; k < 500; k++) {
|
||||
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
|
||||
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
|
||||
}
|
||||
ExternalSstFileInfo file5_info;
|
||||
s = sst_file_writer.Finish(&file5_info);
|
||||
@ -444,7 +444,7 @@ TEST_F(ExternalSSTFileTest, AddList) {
|
||||
std::string file1 = sst_files_dir_ + "file1.sst";
|
||||
ASSERT_OK(sst_file_writer.Open(file1));
|
||||
for (int k = 0; k < 100; k++) {
|
||||
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
|
||||
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
|
||||
}
|
||||
ExternalSstFileInfo file1_info;
|
||||
Status s = sst_file_writer.Finish(&file1_info);
|
||||
@ -454,17 +454,17 @@ TEST_F(ExternalSSTFileTest, AddList) {
|
||||
ASSERT_EQ(file1_info.smallest_key, Key(0));
|
||||
ASSERT_EQ(file1_info.largest_key, Key(99));
|
||||
// sst_file_writer already finished, cannot add this value
|
||||
s = sst_file_writer.Add(Key(100), "bad_val");
|
||||
s = sst_file_writer.Put(Key(100), "bad_val");
|
||||
ASSERT_FALSE(s.ok()) << s.ToString();
|
||||
|
||||
// file2.sst (100 => 199)
|
||||
std::string file2 = sst_files_dir_ + "file2.sst";
|
||||
ASSERT_OK(sst_file_writer.Open(file2));
|
||||
for (int k = 100; k < 200; k++) {
|
||||
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
|
||||
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
|
||||
}
|
||||
// Cannot add this key because it's not after last added key
|
||||
s = sst_file_writer.Add(Key(99), "bad_val");
|
||||
s = sst_file_writer.Put(Key(99), "bad_val");
|
||||
ASSERT_FALSE(s.ok()) << s.ToString();
|
||||
ExternalSstFileInfo file2_info;
|
||||
s = sst_file_writer.Finish(&file2_info);
|
||||
@ -479,7 +479,7 @@ TEST_F(ExternalSSTFileTest, AddList) {
|
||||
std::string file3 = sst_files_dir_ + "file3.sst";
|
||||
ASSERT_OK(sst_file_writer.Open(file3));
|
||||
for (int k = 195; k < 200; k++) {
|
||||
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val_overlap"));
|
||||
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap"));
|
||||
}
|
||||
ExternalSstFileInfo file3_info;
|
||||
s = sst_file_writer.Finish(&file3_info);
|
||||
@ -494,7 +494,7 @@ TEST_F(ExternalSSTFileTest, AddList) {
|
||||
std::string file4 = sst_files_dir_ + "file4.sst";
|
||||
ASSERT_OK(sst_file_writer.Open(file4));
|
||||
for (int k = 30; k < 40; k++) {
|
||||
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val_overlap"));
|
||||
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap"));
|
||||
}
|
||||
ExternalSstFileInfo file4_info;
|
||||
s = sst_file_writer.Finish(&file4_info);
|
||||
@ -508,7 +508,7 @@ TEST_F(ExternalSSTFileTest, AddList) {
|
||||
std::string file5 = sst_files_dir_ + "file5.sst";
|
||||
ASSERT_OK(sst_file_writer.Open(file5));
|
||||
for (int k = 200; k < 300; k++) {
|
||||
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
|
||||
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
|
||||
}
|
||||
ExternalSstFileInfo file5_info;
|
||||
s = sst_file_writer.Finish(&file5_info);
|
||||
@ -639,7 +639,7 @@ TEST_F(ExternalSSTFileTest, AddListAtomicity) {
|
||||
files[i] = sst_files_dir_ + "file" + std::to_string(i) + ".sst";
|
||||
ASSERT_OK(sst_file_writer.Open(files[i]));
|
||||
for (int k = i * 100; k < (i + 1) * 100; k++) {
|
||||
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
|
||||
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
|
||||
}
|
||||
Status s = sst_file_writer.Finish(&files_info[i]);
|
||||
ASSERT_TRUE(s.ok()) << s.ToString();
|
||||
@ -676,7 +676,7 @@ TEST_F(ExternalSSTFileTest, PurgeObsoleteFilesBug) {
|
||||
ASSERT_OK(s);
|
||||
for (int i = 0; i < 500; i++) {
|
||||
std::string k = Key(i);
|
||||
s = sst_file_writer.Add(k, k + "_val");
|
||||
s = sst_file_writer.Put(k, k + "_val");
|
||||
ASSERT_OK(s);
|
||||
}
|
||||
|
||||
@ -719,7 +719,7 @@ TEST_F(ExternalSSTFileTest, SkipSnapshot) {
|
||||
std::string file1 = sst_files_dir_ + "file1.sst";
|
||||
ASSERT_OK(sst_file_writer.Open(file1));
|
||||
for (int k = 0; k < 100; k++) {
|
||||
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
|
||||
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
|
||||
}
|
||||
ExternalSstFileInfo file1_info;
|
||||
Status s = sst_file_writer.Finish(&file1_info);
|
||||
@ -733,7 +733,7 @@ TEST_F(ExternalSSTFileTest, SkipSnapshot) {
|
||||
std::string file2 = sst_files_dir_ + "file2.sst";
|
||||
ASSERT_OK(sst_file_writer.Open(file2));
|
||||
for (int k = 100; k < 300; k++) {
|
||||
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
|
||||
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
|
||||
}
|
||||
ExternalSstFileInfo file2_info;
|
||||
s = sst_file_writer.Finish(&file2_info);
|
||||
@ -763,7 +763,7 @@ TEST_F(ExternalSSTFileTest, SkipSnapshot) {
|
||||
std::string file3 = sst_files_dir_ + "file3.sst";
|
||||
ASSERT_OK(sst_file_writer.Open(file3));
|
||||
for (int k = 300; k < 400; k++) {
|
||||
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
|
||||
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
|
||||
}
|
||||
ExternalSstFileInfo file3_info;
|
||||
s = sst_file_writer.Finish(&file3_info);
|
||||
@ -809,7 +809,7 @@ TEST_F(ExternalSSTFileTest, MultiThreaded) {
|
||||
ASSERT_OK(sst_file_writer.Open(file_names[file_idx]));
|
||||
|
||||
for (int k = range_start; k < range_end; k++) {
|
||||
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k)));
|
||||
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k)));
|
||||
}
|
||||
|
||||
Status s = sst_file_writer.Finish();
|
||||
@ -960,7 +960,7 @@ TEST_F(ExternalSSTFileTest, OverlappingRanges) {
|
||||
std::string file_name = sst_files_dir_ + env_->GenerateUniqueId();
|
||||
ASSERT_OK(sst_file_writer.Open(file_name));
|
||||
for (int k = range_start; k <= range_end; k++) {
|
||||
s = sst_file_writer.Add(Key(k), range_val);
|
||||
s = sst_file_writer.Put(Key(k), range_val);
|
||||
ASSERT_OK(s);
|
||||
}
|
||||
ExternalSstFileInfo file_info;
|
||||
@ -1358,7 +1358,7 @@ TEST_F(ExternalSSTFileTest, AddExternalSstFileWithCustomCompartor) {
|
||||
int range_end = i * 10;
|
||||
int range_start = range_end + 15;
|
||||
for (int k = (range_start - 1); k >= range_end; k--) {
|
||||
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k)));
|
||||
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k)));
|
||||
}
|
||||
ExternalSstFileInfo file_info;
|
||||
ASSERT_OK(sst_file_writer.Finish(&file_info));
|
||||
@ -1448,12 +1448,12 @@ TEST_F(ExternalSSTFileTest, SstFileWriterNonSharedKeys) {
|
||||
|
||||
std::string suffix(100, 'X');
|
||||
ASSERT_OK(sst_file_writer.Open(file_path));
|
||||
ASSERT_OK(sst_file_writer.Add("A" + suffix, "VAL"));
|
||||
ASSERT_OK(sst_file_writer.Add("BB" + suffix, "VAL"));
|
||||
ASSERT_OK(sst_file_writer.Add("CC" + suffix, "VAL"));
|
||||
ASSERT_OK(sst_file_writer.Add("CXD" + suffix, "VAL"));
|
||||
ASSERT_OK(sst_file_writer.Add("CZZZ" + suffix, "VAL"));
|
||||
ASSERT_OK(sst_file_writer.Add("ZAAAX" + suffix, "VAL"));
|
||||
ASSERT_OK(sst_file_writer.Put("A" + suffix, "VAL"));
|
||||
ASSERT_OK(sst_file_writer.Put("BB" + suffix, "VAL"));
|
||||
ASSERT_OK(sst_file_writer.Put("CC" + suffix, "VAL"));
|
||||
ASSERT_OK(sst_file_writer.Put("CXD" + suffix, "VAL"));
|
||||
ASSERT_OK(sst_file_writer.Put("CZZZ" + suffix, "VAL"));
|
||||
ASSERT_OK(sst_file_writer.Put("ZAAAX" + suffix, "VAL"));
|
||||
|
||||
ASSERT_OK(sst_file_writer.Finish());
|
||||
ASSERT_OK(DeprecatedAddFile({file_path}));
|
||||
@ -1748,22 +1748,22 @@ TEST_F(ExternalSSTFileTest, FileWithCFInfo) {
|
||||
// default_cf.sst
|
||||
const std::string cf_default_sst = sst_files_dir_ + "/default_cf.sst";
|
||||
ASSERT_OK(sfw_default.Open(cf_default_sst));
|
||||
ASSERT_OK(sfw_default.Add("K1", "V1"));
|
||||
ASSERT_OK(sfw_default.Add("K2", "V2"));
|
||||
ASSERT_OK(sfw_default.Put("K1", "V1"));
|
||||
ASSERT_OK(sfw_default.Put("K2", "V2"));
|
||||
ASSERT_OK(sfw_default.Finish());
|
||||
|
||||
// cf1.sst
|
||||
const std::string cf1_sst = sst_files_dir_ + "/cf1.sst";
|
||||
ASSERT_OK(sfw_cf1.Open(cf1_sst));
|
||||
ASSERT_OK(sfw_cf1.Add("K3", "V1"));
|
||||
ASSERT_OK(sfw_cf1.Add("K4", "V2"));
|
||||
ASSERT_OK(sfw_cf1.Put("K3", "V1"));
|
||||
ASSERT_OK(sfw_cf1.Put("K4", "V2"));
|
||||
ASSERT_OK(sfw_cf1.Finish());
|
||||
|
||||
// cf_unknown.sst
|
||||
const std::string unknown_sst = sst_files_dir_ + "/cf_unknown.sst";
|
||||
ASSERT_OK(sfw_unknown.Open(unknown_sst));
|
||||
ASSERT_OK(sfw_unknown.Add("K5", "V1"));
|
||||
ASSERT_OK(sfw_unknown.Add("K6", "V2"));
|
||||
ASSERT_OK(sfw_unknown.Put("K5", "V1"));
|
||||
ASSERT_OK(sfw_unknown.Put("K6", "V2"));
|
||||
ASSERT_OK(sfw_unknown.Finish());
|
||||
|
||||
IngestExternalFileOptions ifo;
|
||||
@ -1864,7 +1864,7 @@ TEST_F(ExternalSSTFileTest, SnapshotInconsistencyBug) {
|
||||
SstFileWriter sst_file_writer(EnvOptions(), options);
|
||||
ASSERT_OK(sst_file_writer.Open(sst_file_path));
|
||||
for (int i = 0; i < kNumKeys; i++) {
|
||||
ASSERT_OK(sst_file_writer.Add(Key(i), Key(i) + "_V2"));
|
||||
ASSERT_OK(sst_file_writer.Put(Key(i), Key(i) + "_V2"));
|
||||
}
|
||||
ASSERT_OK(sst_file_writer.Finish());
|
||||
|
||||
|
@ -1117,6 +1117,15 @@ extern ROCKSDB_LIBRARY_API void rocksdb_sstfilewriter_open(
|
||||
extern ROCKSDB_LIBRARY_API void rocksdb_sstfilewriter_add(
|
||||
rocksdb_sstfilewriter_t* writer, const char* key, size_t keylen,
|
||||
const char* val, size_t vallen, char** errptr);
|
||||
extern ROCKSDB_LIBRARY_API void rocksdb_sstfilewriter_put(
|
||||
rocksdb_sstfilewriter_t* writer, const char* key, size_t keylen,
|
||||
const char* val, size_t vallen, char** errptr);
|
||||
extern ROCKSDB_LIBRARY_API void rocksdb_sstfilewriter_merge(
|
||||
rocksdb_sstfilewriter_t* writer, const char* key, size_t keylen,
|
||||
const char* val, size_t vallen, char** errptr);
|
||||
extern ROCKSDB_LIBRARY_API void rocksdb_sstfilewriter_delete(
|
||||
rocksdb_sstfilewriter_t* writer, const char* key, size_t keylen,
|
||||
char** errptr);
|
||||
extern ROCKSDB_LIBRARY_API void rocksdb_sstfilewriter_finish(
|
||||
rocksdb_sstfilewriter_t* writer, char** errptr);
|
||||
extern ROCKSDB_LIBRARY_API void rocksdb_sstfilewriter_destroy(
|
||||
|
@ -17,6 +17,12 @@
|
||||
#include "rocksdb/table_properties.h"
|
||||
#include "rocksdb/types.h"
|
||||
|
||||
#if defined(__GNUC__) || defined(__clang__)
|
||||
#define ROCKSDB_DEPRECATED_FUNC __attribute__((__deprecated__))
|
||||
#elif _WIN32
|
||||
#define ROCKSDB_DEPRECATED_FUNC __declspec(deprecated)
|
||||
#endif
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
class Comparator;
|
||||
@ -76,9 +82,21 @@ class SstFileWriter {
|
||||
// Prepare SstFileWriter to write into file located at "file_path".
|
||||
Status Open(const std::string& file_path);
|
||||
|
||||
// Add key, value to currently opened file
|
||||
// Add a Put key with value to currently opened file (deprecated)
|
||||
// REQUIRES: key is after any previously added key according to comparator.
|
||||
Status Add(const Slice& user_key, const Slice& value);
|
||||
ROCKSDB_DEPRECATED_FUNC Status Add(const Slice& user_key, const Slice& value);
|
||||
|
||||
// Add a Put key with value to currently opened file
|
||||
// REQUIRES: key is after any previously added key according to comparator.
|
||||
Status Put(const Slice& user_key, const Slice& value);
|
||||
|
||||
// Add a Merge key with value to currently opened file
|
||||
// REQUIRES: key is after any previously added key according to comparator.
|
||||
Status Merge(const Slice& user_key, const Slice& value);
|
||||
|
||||
// Add a deletion key to currently opened file
|
||||
// REQUIRES: key is after any previously added key according to comparator.
|
||||
Status Delete(const Slice& user_key);
|
||||
|
||||
// Finalize writing to sst file and close file.
|
||||
//
|
||||
@ -91,7 +109,6 @@ class SstFileWriter {
|
||||
|
||||
private:
|
||||
void InvalidatePageCache(bool closing);
|
||||
|
||||
struct Rep;
|
||||
std::unique_ptr<Rep> rep_;
|
||||
};
|
||||
|
@ -83,13 +83,64 @@ void Java_org_rocksdb_SstFileWriter_add(JNIEnv *env, jobject jobj,
|
||||
auto *key_slice = reinterpret_cast<rocksdb::Slice *>(jkey_handle);
|
||||
auto *value_slice = reinterpret_cast<rocksdb::Slice *>(jvalue_handle);
|
||||
rocksdb::Status s =
|
||||
reinterpret_cast<rocksdb::SstFileWriter *>(jhandle)->Add(*key_slice,
|
||||
reinterpret_cast<rocksdb::SstFileWriter *>(jhandle)->Put(*key_slice,
|
||||
*value_slice);
|
||||
if (!s.ok()) {
|
||||
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Class: org_rocksdb_SstFileWriter
|
||||
* Method: put
|
||||
* Signature: (JJJ)V
|
||||
*/
|
||||
void Java_org_rocksdb_SstFileWriter_put(JNIEnv *env, jobject jobj,
|
||||
jlong jhandle, jlong jkey_handle,
|
||||
jlong jvalue_handle) {
|
||||
auto *key_slice = reinterpret_cast<rocksdb::Slice *>(jkey_handle);
|
||||
auto *value_slice = reinterpret_cast<rocksdb::Slice *>(jvalue_handle);
|
||||
rocksdb::Status s =
|
||||
reinterpret_cast<rocksdb::SstFileWriter *>(jhandle)->Put(*key_slice,
|
||||
*value_slice);
|
||||
if (!s.ok()) {
|
||||
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Class: org_rocksdb_SstFileWriter
|
||||
* Method: merge
|
||||
* Signature: (JJJ)V
|
||||
*/
|
||||
void Java_org_rocksdb_SstFileWriter_merge(JNIEnv *env, jobject jobj,
|
||||
jlong jhandle, jlong jkey_handle,
|
||||
jlong jvalue_handle) {
|
||||
auto *key_slice = reinterpret_cast<rocksdb::Slice *>(jkey_handle);
|
||||
auto *value_slice = reinterpret_cast<rocksdb::Slice *>(jvalue_handle);
|
||||
rocksdb::Status s =
|
||||
reinterpret_cast<rocksdb::SstFileWriter *>(jhandle)->Merge(*key_slice,
|
||||
*value_slice);
|
||||
if (!s.ok()) {
|
||||
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Class: org_rocksdb_SstFileWriter
|
||||
* Method: delete
|
||||
* Signature: (JJJ)V
|
||||
*/
|
||||
void Java_org_rocksdb_SstFileWriter_delete(JNIEnv *env, jobject jobj,
|
||||
jlong jhandle, jlong jkey_handle) {
|
||||
auto *key_slice = reinterpret_cast<rocksdb::Slice *>(jkey_handle);
|
||||
rocksdb::Status s =
|
||||
reinterpret_cast<rocksdb::SstFileWriter *>(jhandle)->Delete(*key_slice);
|
||||
if (!s.ok()) {
|
||||
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Class: org_rocksdb_SstFileWriter
|
||||
* Method: finish
|
||||
|
@ -246,10 +246,14 @@ bool BlockIter::ParseNextKey() {
|
||||
|
||||
if (global_seqno_ != kDisableGlobalSequenceNumber) {
|
||||
// If we are reading a file with a global sequence number we should
|
||||
// expect that all encoded sequence numbers are zeros and all value
|
||||
// types are kTypeValue
|
||||
// expect that all encoded sequence numbers are zeros and any value
|
||||
// type is kTypeValue, kTypeMerge or kTypeDeletion
|
||||
assert(GetInternalKeySeqno(key_.GetInternalKey()) == 0);
|
||||
assert(ExtractValueType(key_.GetInternalKey()) == ValueType::kTypeValue);
|
||||
|
||||
ValueType value_type = ExtractValueType(key_.GetInternalKey());
|
||||
assert(value_type == ValueType::kTypeValue ||
|
||||
value_type == ValueType::kTypeMerge ||
|
||||
value_type == ValueType::kTypeDeletion);
|
||||
|
||||
if (key_pinned_) {
|
||||
// TODO(tec): Investigate updating the seqno in the loaded block
|
||||
@ -261,7 +265,7 @@ bool BlockIter::ParseNextKey() {
|
||||
key_pinned_ = false;
|
||||
}
|
||||
|
||||
key_.UpdateInternalKey(global_seqno_, ValueType::kTypeValue);
|
||||
key_.UpdateInternalKey(global_seqno_, value_type);
|
||||
}
|
||||
|
||||
value_ = Slice(p + non_shared, value_length);
|
||||
|
@ -56,6 +56,67 @@ struct SstFileWriter::Rep {
|
||||
// The size of the file during the last time we called Fadvise to remove
|
||||
// cached pages from page cache.
|
||||
uint64_t last_fadvise_size;
|
||||
Status Add(const Slice& user_key, const Slice& value,
|
||||
const ValueType value_type) {
|
||||
if (!builder) {
|
||||
return Status::InvalidArgument("File is not opened");
|
||||
}
|
||||
|
||||
if (file_info.num_entries == 0) {
|
||||
file_info.smallest_key.assign(user_key.data(), user_key.size());
|
||||
} else {
|
||||
if (internal_comparator.user_comparator()->Compare(
|
||||
user_key, file_info.largest_key) <= 0) {
|
||||
// Make sure that keys are added in order
|
||||
return Status::InvalidArgument("Keys must be added in order");
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(tec) : For external SST files we could omit the seqno and type.
|
||||
switch (value_type) {
|
||||
case ValueType::kTypeValue:
|
||||
ikey.Set(user_key, 0 /* Sequence Number */,
|
||||
ValueType::kTypeValue /* Put */);
|
||||
break;
|
||||
case ValueType::kTypeMerge:
|
||||
ikey.Set(user_key, 0 /* Sequence Number */,
|
||||
ValueType::kTypeMerge /* Merge */);
|
||||
break;
|
||||
case ValueType::kTypeDeletion:
|
||||
ikey.Set(user_key, 0 /* Sequence Number */,
|
||||
ValueType::kTypeDeletion /* Delete */);
|
||||
break;
|
||||
default:
|
||||
return Status::InvalidArgument("Value type is not supported");
|
||||
}
|
||||
builder->Add(ikey.Encode(), value);
|
||||
|
||||
// update file info
|
||||
file_info.num_entries++;
|
||||
file_info.largest_key.assign(user_key.data(), user_key.size());
|
||||
file_info.file_size = builder->FileSize();
|
||||
|
||||
InvalidatePageCache(false /* closing */);
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void InvalidatePageCache(bool closing) {
|
||||
if (invalidate_page_cache == false) {
|
||||
// Fadvise disabled
|
||||
return;
|
||||
}
|
||||
uint64_t bytes_since_last_fadvise =
|
||||
builder->FileSize() - last_fadvise_size;
|
||||
if (bytes_since_last_fadvise > kFadviseTrigger || closing) {
|
||||
TEST_SYNC_POINT_CALLBACK("SstFileWriter::Rep::InvalidatePageCache",
|
||||
&(bytes_since_last_fadvise));
|
||||
// Tell the OS that we dont need this file in page cache
|
||||
file_writer->InvalidateCache(0, 0);
|
||||
last_fadvise_size = builder->FileSize();
|
||||
}
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
SstFileWriter::SstFileWriter(const EnvOptions& env_options,
|
||||
@ -149,34 +210,19 @@ Status SstFileWriter::Open(const std::string& file_path) {
|
||||
}
|
||||
|
||||
Status SstFileWriter::Add(const Slice& user_key, const Slice& value) {
|
||||
Rep* r = rep_.get();
|
||||
if (!r->builder) {
|
||||
return Status::InvalidArgument("File is not opened");
|
||||
}
|
||||
return rep_->Add(user_key, value, ValueType::kTypeValue);
|
||||
}
|
||||
|
||||
if (r->file_info.num_entries == 0) {
|
||||
r->file_info.smallest_key.assign(user_key.data(), user_key.size());
|
||||
} else {
|
||||
if (r->internal_comparator.user_comparator()->Compare(
|
||||
user_key, r->file_info.largest_key) <= 0) {
|
||||
// Make sure that keys are added in order
|
||||
return Status::InvalidArgument("Keys must be added in order");
|
||||
}
|
||||
}
|
||||
Status SstFileWriter::Put(const Slice& user_key, const Slice& value) {
|
||||
return rep_->Add(user_key, value, ValueType::kTypeValue);
|
||||
}
|
||||
|
||||
// TODO(tec) : For external SST files we could omit the seqno and type.
|
||||
r->ikey.Set(user_key, 0 /* Sequence Number */,
|
||||
ValueType::kTypeValue /* Put */);
|
||||
r->builder->Add(r->ikey.Encode(), value);
|
||||
Status SstFileWriter::Merge(const Slice& user_key, const Slice& value) {
|
||||
return rep_->Add(user_key, value, ValueType::kTypeMerge);
|
||||
}
|
||||
|
||||
// Update file info
|
||||
r->file_info.num_entries++;
|
||||
r->file_info.largest_key.assign(user_key.data(), user_key.size());
|
||||
r->file_info.file_size = r->builder->FileSize();
|
||||
|
||||
InvalidatePageCache(false /* closing */);
|
||||
|
||||
return Status::OK();
|
||||
Status SstFileWriter::Delete(const Slice& user_key) {
|
||||
return rep_->Add(user_key, Slice(), ValueType::kTypeDeletion);
|
||||
}
|
||||
|
||||
Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
|
||||
@ -193,7 +239,7 @@ Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
|
||||
|
||||
if (s.ok()) {
|
||||
s = r->file_writer->Sync(r->ioptions.use_fsync);
|
||||
InvalidatePageCache(true /* closing */);
|
||||
r->InvalidatePageCache(true /* closing */);
|
||||
if (s.ok()) {
|
||||
s = r->file_writer->Close();
|
||||
}
|
||||
@ -210,24 +256,6 @@ Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
|
||||
return s;
|
||||
}
|
||||
|
||||
void SstFileWriter::InvalidatePageCache(bool closing) {
|
||||
Rep* r = rep_.get();
|
||||
if (r->invalidate_page_cache == false) {
|
||||
// Fadvise disabled
|
||||
return;
|
||||
}
|
||||
|
||||
uint64_t bytes_since_last_fadvise =
|
||||
r->builder->FileSize() - r->last_fadvise_size;
|
||||
if (bytes_since_last_fadvise > kFadviseTrigger || closing) {
|
||||
TEST_SYNC_POINT_CALLBACK("SstFileWriter::InvalidatePageCache",
|
||||
&(bytes_since_last_fadvise));
|
||||
// Tell the OS that we dont need this file in page cache
|
||||
r->file_writer->InvalidateCache(0, 0);
|
||||
r->last_fadvise_size = r->builder->FileSize();
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t SstFileWriter::FileSize() {
|
||||
return rep_->file_info.file_size;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user