Support timestamps in SstFileWriter (#8899)

Summary:
As a first step of supporting user-defined timestamps with ingestion, the
patch adds timestamp support to `SstFileWriter`; namely, it adds new
versions of the `Put` and `Delete` APIs that take timestamps. (`Merge`
and `DeleteRange` are currently not supported with user-defined timestamps
in general but once those features are implemented, we can handle them
in `SstFileWriter` in a similar fashion.) The new APIs validate the size of
the timestamp provided by the client. Similarly, calls to the pre-existing
timestamp-less APIs are now disallowed when user-defined timestamps are
in use according to the comparator.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8899

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D30850699

Pulled By: ltamasi

fbshipit-source-id: 779154373618f19b8f0797976bb7286783c57b67
This commit is contained in:
Levi Tamasi 2021-09-09 18:57:01 -07:00 committed by Facebook GitHub Bot
parent 0aad4ca0ff
commit 7e78d7c540
4 changed files with 295 additions and 20 deletions

View File

@ -112,21 +112,40 @@ class SstFileWriter {
// Add a Put key with value to currently opened file (deprecated) // Add a Put key with value to currently opened file (deprecated)
// REQUIRES: key is after any previously added key according to comparator. // REQUIRES: key is after any previously added key according to comparator.
// REQUIRES: comparator is *not* timestamp-aware.
ROCKSDB_DEPRECATED_FUNC Status Add(const Slice& user_key, const Slice& value); ROCKSDB_DEPRECATED_FUNC Status Add(const Slice& user_key, const Slice& value);
// Add a Put key with value to currently opened file // Add a Put key with value to currently opened file
// REQUIRES: key is after any previously added key according to comparator. // REQUIRES: key is after any previously added key according to comparator.
// REQUIRES: comparator is *not* timestamp-aware.
Status Put(const Slice& user_key, const Slice& value); Status Put(const Slice& user_key, const Slice& value);
// Add a Put (key with timestamp, value) to the currently opened file
// REQUIRES: key is after any previously added key according to the
// comparator.
// REQUIRES: the timestamp's size is equal to what is expected by
// the comparator.
Status Put(const Slice& user_key, const Slice& timestamp, const Slice& value);
// Add a Merge key with value to currently opened file // Add a Merge key with value to currently opened file
// REQUIRES: key is after any previously added key according to comparator. // REQUIRES: key is after any previously added key according to comparator.
// REQUIRES: comparator is *not* timestamp-aware.
Status Merge(const Slice& user_key, const Slice& value); Status Merge(const Slice& user_key, const Slice& value);
// Add a deletion key to currently opened file // Add a deletion key to currently opened file
// REQUIRES: key is after any previously added key according to comparator. // REQUIRES: key is after any previously added key according to comparator.
// REQUIRES: comparator is *not* timestamp-aware.
Status Delete(const Slice& user_key); Status Delete(const Slice& user_key);
// Add a deletion key with timestamp to the currently opened file
// REQUIRES: key is after any previously added key according to the
// comparator.
// REQUIRES: the timestamp's size is equal to what is expected by
// the comparator.
Status Delete(const Slice& user_key, const Slice& timestamp);
// Add a range deletion tombstone to currently opened file // Add a range deletion tombstone to currently opened file
// REQUIRES: comparator is *not* timestamp-aware.
Status DeleteRange(const Slice& begin_key, const Slice& end_key); Status DeleteRange(const Slice& begin_key, const Slice& end_key);
// Finalize writing to sst file and close file. // Finalize writing to sst file and close file.

View File

@ -522,7 +522,8 @@ bool DataBlockIter::ParseNextDataKey(const char* limit) {
if (global_seqno_ != kDisableGlobalSequenceNumber) { if (global_seqno_ != kDisableGlobalSequenceNumber) {
// If we are reading a file with a global sequence number we should // If we are reading a file with a global sequence number we should
// expect that all encoded sequence numbers are zeros and any value // expect that all encoded sequence numbers are zeros and any value
// type is kTypeValue, kTypeMerge, kTypeDeletion, or kTypeRangeDeletion. // type is kTypeValue, kTypeMerge, kTypeDeletion,
// kTypeDeletionWithTimestamp, or kTypeRangeDeletion.
uint64_t packed = ExtractInternalKeyFooter(raw_key_.GetKey()); uint64_t packed = ExtractInternalKeyFooter(raw_key_.GetKey());
SequenceNumber seqno; SequenceNumber seqno;
ValueType value_type; ValueType value_type;
@ -530,6 +531,7 @@ bool DataBlockIter::ParseNextDataKey(const char* limit) {
assert(value_type == ValueType::kTypeValue || assert(value_type == ValueType::kTypeValue ||
value_type == ValueType::kTypeMerge || value_type == ValueType::kTypeMerge ||
value_type == ValueType::kTypeDeletion || value_type == ValueType::kTypeDeletion ||
value_type == ValueType::kTypeDeletionWithTimestamp ||
value_type == ValueType::kTypeRangeDeletion); value_type == ValueType::kTypeRangeDeletion);
assert(seqno == 0); assert(seqno == 0);
} }

View File

@ -195,6 +195,224 @@ TEST_F(SstFileReaderTest, ReadFileWithGlobalSeqno) {
ASSERT_OK(DestroyDB(db_name, options)); ASSERT_OK(DestroyDB(db_name, options));
} }
TEST_F(SstFileReaderTest, TimestampSizeMismatch) {
SstFileWriter writer(soptions_, options_);
ASSERT_OK(writer.Open(sst_name_));
// Comparator is not timestamp-aware; calls to APIs taking timestamps should
// fail.
ASSERT_NOK(writer.Put("key", EncodeAsUint64(100), "value"));
ASSERT_NOK(writer.Delete("another_key", EncodeAsUint64(200)));
}
class SstFileReaderTimestampTest : public testing::Test {
public:
SstFileReaderTimestampTest() {
Env* env = Env::Default();
EXPECT_OK(test::CreateEnvFromSystem(ConfigOptions(), &env, &env_guard_));
EXPECT_NE(nullptr, env);
options_.env = env;
options_.comparator = test::ComparatorWithU64Ts();
sst_name_ = test::PerThreadDBPath("sst_file_ts");
}
~SstFileReaderTimestampTest() {
EXPECT_OK(options_.env->DeleteFile(sst_name_));
}
struct KeyValueDesc {
KeyValueDesc(std::string k, std::string ts, std::string v)
: key(std::move(k)), timestamp(std::move(ts)), value(std::move(v)) {}
std::string key;
std::string timestamp;
std::string value;
};
struct InputKeyValueDesc : public KeyValueDesc {
InputKeyValueDesc(std::string k, std::string ts, std::string v, bool is_del,
bool use_contig_buf)
: KeyValueDesc(std::move(k), std::move(ts), std::move(v)),
is_delete(is_del),
use_contiguous_buffer(use_contig_buf) {}
bool is_delete = false;
bool use_contiguous_buffer = false;
};
struct OutputKeyValueDesc : public KeyValueDesc {
OutputKeyValueDesc(std::string k, std::string ts, std::string v)
: KeyValueDesc(std::move(k), std::string(ts), std::string(v)) {}
};
void CreateFile(const std::vector<InputKeyValueDesc>& descs) {
SstFileWriter writer(soptions_, options_);
ASSERT_OK(writer.Open(sst_name_));
for (const auto& desc : descs) {
if (desc.is_delete) {
if (desc.use_contiguous_buffer) {
std::string key_with_ts(desc.key + desc.timestamp);
ASSERT_OK(writer.Delete(Slice(key_with_ts.data(), desc.key.size()),
Slice(key_with_ts.data() + desc.key.size(),
desc.timestamp.size())));
} else {
ASSERT_OK(writer.Delete(desc.key, desc.timestamp));
}
} else {
if (desc.use_contiguous_buffer) {
std::string key_with_ts(desc.key + desc.timestamp);
ASSERT_OK(writer.Put(Slice(key_with_ts.data(), desc.key.size()),
Slice(key_with_ts.data() + desc.key.size(),
desc.timestamp.size()),
desc.value));
} else {
ASSERT_OK(writer.Put(desc.key, desc.timestamp, desc.value));
}
}
}
ASSERT_OK(writer.Finish());
}
void CheckFile(const std::string& timestamp,
const std::vector<OutputKeyValueDesc>& descs) {
SstFileReader reader(options_);
ASSERT_OK(reader.Open(sst_name_));
ASSERT_OK(reader.VerifyChecksum());
Slice ts_slice(timestamp);
ReadOptions read_options;
read_options.timestamp = &ts_slice;
std::unique_ptr<Iterator> iter(reader.NewIterator(read_options));
iter->SeekToFirst();
for (const auto& desc : descs) {
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), desc.key);
ASSERT_EQ(iter->timestamp(), desc.timestamp);
ASSERT_EQ(iter->value(), desc.value);
iter->Next();
}
ASSERT_FALSE(iter->Valid());
}
protected:
std::shared_ptr<Env> env_guard_;
Options options_;
EnvOptions soptions_;
std::string sst_name_;
};
TEST_F(SstFileReaderTimestampTest, Basic) {
std::vector<InputKeyValueDesc> input_descs;
for (uint64_t k = 0; k < kNumKeys; k += 4) {
// A Put with key k, timestamp k that gets overwritten by a subsequent Put
// with timestamp (k + 1). Note that the comparator uses descending order
// for the timestamp part, so we add the later Put first.
input_descs.emplace_back(
/* key */ EncodeAsString(k), /* timestamp */ EncodeAsUint64(k + 1),
/* value */ EncodeAsString(k * 2), /* is_delete */ false,
/* use_contiguous_buffer */ false);
input_descs.emplace_back(
/* key */ EncodeAsString(k), /* timestamp */ EncodeAsUint64(k),
/* value */ EncodeAsString(k * 3), /* is_delete */ false,
/* use_contiguous_buffer */ true);
// A Put with key (k + 2), timestamp (k + 2) that gets cancelled out by a
// Delete with timestamp (k + 3). Note that the comparator uses descending
// order for the timestamp part, so we add the Delete first.
input_descs.emplace_back(/* key */ EncodeAsString(k + 2),
/* timestamp */ EncodeAsUint64(k + 3),
/* value */ std::string(), /* is_delete */ true,
/* use_contiguous_buffer */ (k % 8) == 0);
input_descs.emplace_back(
/* key */ EncodeAsString(k + 2), /* timestamp */ EncodeAsUint64(k + 2),
/* value */ EncodeAsString(k * 5), /* is_delete */ false,
/* use_contiguous_buffer */ (k % 8) != 0);
}
CreateFile(input_descs);
// Note: below, we check the results as of each timestamp in the range,
// updating the expected result as needed.
std::vector<OutputKeyValueDesc> output_descs;
for (uint64_t ts = 0; ts < kNumKeys; ++ts) {
const uint64_t k = ts - (ts % 4);
switch (ts % 4) {
case 0: // Initial Put for key k
output_descs.emplace_back(/* key */ EncodeAsString(k),
/* timestamp */ EncodeAsUint64(ts),
/* value */ EncodeAsString(k * 3));
break;
case 1: // Second Put for key k
assert(output_descs.back().key == EncodeAsString(k));
assert(output_descs.back().timestamp == EncodeAsUint64(ts - 1));
assert(output_descs.back().value == EncodeAsString(k * 3));
output_descs.back().timestamp = EncodeAsUint64(ts);
output_descs.back().value = EncodeAsString(k * 2);
break;
case 2: // Put for key (k + 2)
output_descs.emplace_back(/* key */ EncodeAsString(k + 2),
/* timestamp */ EncodeAsUint64(ts),
/* value */ EncodeAsString(k * 5));
break;
case 3: // Delete for key (k + 2)
assert(output_descs.back().key == EncodeAsString(k + 2));
assert(output_descs.back().timestamp == EncodeAsUint64(ts - 1));
assert(output_descs.back().value == EncodeAsString(k * 5));
output_descs.pop_back();
break;
}
CheckFile(EncodeAsUint64(ts), output_descs);
}
}
TEST_F(SstFileReaderTimestampTest, TimestampsOutOfOrder) {
SstFileWriter writer(soptions_, options_);
ASSERT_OK(writer.Open(sst_name_));
// Note: KVs that have the same user key disregarding timestamps should be in
// descending order of timestamps.
ASSERT_OK(writer.Put("key", EncodeAsUint64(1), "value1"));
ASSERT_NOK(writer.Put("key", EncodeAsUint64(2), "value2"));
}
TEST_F(SstFileReaderTimestampTest, TimestampSizeMismatch) {
SstFileWriter writer(soptions_, options_);
ASSERT_OK(writer.Open(sst_name_));
// Comparator expects 64-bit timestamps; timestamps with other sizes as well
// as calls to the timestamp-less APIs should be rejected.
ASSERT_NOK(writer.Put("key", "not_an_actual_64_bit_timestamp", "value"));
ASSERT_NOK(writer.Delete("another_key", "timestamp_of_unexpected_size"));
ASSERT_NOK(writer.Put("key_without_timestamp", "value"));
ASSERT_NOK(writer.Merge("another_key_missing_a_timestamp", "merge_operand"));
ASSERT_NOK(writer.Delete("yet_another_key_still_no_timestamp"));
ASSERT_NOK(writer.DeleteRange("begin_key_timestamp_absent",
"end_key_with_a_complete_lack_of_timestamps"));
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS

View File

@ -63,8 +63,8 @@ struct SstFileWriter::Rep {
std::string db_session_id; std::string db_session_id;
uint64_t next_file_number = 1; uint64_t next_file_number = 1;
Status Add(const Slice& user_key, const Slice& value, Status AddImpl(const Slice& user_key, const Slice& value,
const ValueType value_type) { ValueType value_type) {
if (!builder) { if (!builder) {
return Status::InvalidArgument("File is not opened"); return Status::InvalidArgument("File is not opened");
} }
@ -80,23 +80,14 @@ struct SstFileWriter::Rep {
} }
} }
// TODO(tec) : For external SST files we could omit the seqno and type. assert(value_type == kTypeValue || value_type == kTypeMerge ||
switch (value_type) { value_type == kTypeDeletion ||
case ValueType::kTypeValue: value_type == kTypeDeletionWithTimestamp);
ikey.Set(user_key, 0 /* Sequence Number */,
ValueType::kTypeValue /* Put */); constexpr SequenceNumber sequence_number = 0;
break;
case ValueType::kTypeMerge: ikey.Set(user_key, sequence_number, value_type);
ikey.Set(user_key, 0 /* Sequence Number */,
ValueType::kTypeMerge /* Merge */);
break;
case ValueType::kTypeDeletion:
ikey.Set(user_key, 0 /* Sequence Number */,
ValueType::kTypeDeletion /* Delete */);
break;
default:
return Status::InvalidArgument("Value type is not supported");
}
builder->Add(ikey.Encode(), value); builder->Add(ikey.Encode(), value);
// update file info // update file info
@ -108,7 +99,42 @@ struct SstFileWriter::Rep {
return Status::OK(); return Status::OK();
} }
Status Add(const Slice& user_key, const Slice& value, ValueType value_type) {
if (internal_comparator.timestamp_size() != 0) {
return Status::InvalidArgument("Timestamp size mismatch");
}
return AddImpl(user_key, value, value_type);
}
Status Add(const Slice& user_key, const Slice& timestamp, const Slice& value,
ValueType value_type) {
const size_t timestamp_size = timestamp.size();
if (internal_comparator.timestamp_size() != timestamp_size) {
return Status::InvalidArgument("Timestamp size mismatch");
}
const size_t user_key_size = user_key.size();
if (user_key.data() + user_key_size == timestamp.data()) {
Slice user_key_with_ts(user_key.data(), user_key_size + timestamp_size);
return AddImpl(user_key_with_ts, value, value_type);
}
std::string user_key_with_ts;
user_key_with_ts.reserve(user_key_size + timestamp_size);
user_key_with_ts.append(user_key.data(), user_key_size);
user_key_with_ts.append(timestamp.data(), timestamp_size);
return AddImpl(user_key_with_ts, value, value_type);
}
Status DeleteRange(const Slice& begin_key, const Slice& end_key) { Status DeleteRange(const Slice& begin_key, const Slice& end_key) {
if (internal_comparator.timestamp_size() != 0) {
return Status::InvalidArgument("Timestamp size mismatch");
}
if (!builder) { if (!builder) {
return Status::InvalidArgument("File is not opened"); return Status::InvalidArgument("File is not opened");
} }
@ -294,6 +320,11 @@ Status SstFileWriter::Put(const Slice& user_key, const Slice& value) {
return rep_->Add(user_key, value, ValueType::kTypeValue); return rep_->Add(user_key, value, ValueType::kTypeValue);
} }
Status SstFileWriter::Put(const Slice& user_key, const Slice& timestamp,
const Slice& value) {
return rep_->Add(user_key, timestamp, value, ValueType::kTypeValue);
}
Status SstFileWriter::Merge(const Slice& user_key, const Slice& value) { Status SstFileWriter::Merge(const Slice& user_key, const Slice& value) {
return rep_->Add(user_key, value, ValueType::kTypeMerge); return rep_->Add(user_key, value, ValueType::kTypeMerge);
} }
@ -302,6 +333,11 @@ Status SstFileWriter::Delete(const Slice& user_key) {
return rep_->Add(user_key, Slice(), ValueType::kTypeDeletion); return rep_->Add(user_key, Slice(), ValueType::kTypeDeletion);
} }
Status SstFileWriter::Delete(const Slice& user_key, const Slice& timestamp) {
return rep_->Add(user_key, timestamp, Slice(),
ValueType::kTypeDeletionWithTimestamp);
}
Status SstFileWriter::DeleteRange(const Slice& begin_key, Status SstFileWriter::DeleteRange(const Slice& begin_key,
const Slice& end_key) { const Slice& end_key) {
return rep_->DeleteRange(begin_key, end_key); return rep_->DeleteRange(begin_key, end_key);