Update file indexer to take timestamp into consideration (#6205)
Summary: Exclude timestamp in key comparison during boundary calculation to avoid key versions being excluded. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6205 Differential Revision: D19166765 Pulled By: riversand963 fbshipit-source-id: bbe08816fef8de349a83ebd59a595ad844021f24
This commit is contained in:
parent
941bd15aed
commit
e5b476f551
@ -2021,23 +2021,15 @@ INSTANTIATE_TEST_CASE_P(
|
||||
::testing::Combine(::testing::Bool(), ::testing::Bool(),
|
||||
::testing::Bool(), ::testing::Bool()));
|
||||
|
||||
class DBBasicTestWithTimestampWithParam
|
||||
: public DBTestBase,
|
||||
public testing::WithParamInterface<bool> {
|
||||
class DBBasicTestWithTimestampBase : public DBTestBase {
|
||||
public:
|
||||
DBBasicTestWithTimestampWithParam()
|
||||
: DBTestBase("/db_basic_test_with_timestamp") {}
|
||||
explicit DBBasicTestWithTimestampBase(const std::string& dbname)
|
||||
: DBTestBase(dbname) {}
|
||||
|
||||
protected:
|
||||
class TestComparator : public Comparator {
|
||||
private:
|
||||
const Comparator* cmp_without_ts_;
|
||||
|
||||
class TestComparatorBase : public Comparator {
|
||||
public:
|
||||
explicit TestComparator(size_t ts_sz)
|
||||
: Comparator(ts_sz), cmp_without_ts_(nullptr) {
|
||||
cmp_without_ts_ = BytewiseComparator();
|
||||
}
|
||||
explicit TestComparatorBase(size_t ts_sz) : Comparator(ts_sz) {}
|
||||
|
||||
const char* Name() const override { return "TestComparator"; }
|
||||
|
||||
@ -2055,13 +2047,15 @@ class DBBasicTestWithTimestampWithParam
|
||||
Slice(b.data() + b.size() - timestamp_size(), timestamp_size()));
|
||||
}
|
||||
|
||||
virtual int CompareImpl(const Slice& a, const Slice& b) const = 0;
|
||||
|
||||
int CompareWithoutTimestamp(const Slice& a, const Slice& b) const override {
|
||||
assert(a.size() >= timestamp_size());
|
||||
assert(b.size() >= timestamp_size());
|
||||
Slice k1 = StripTimestampFromUserKey(a, timestamp_size());
|
||||
Slice k2 = StripTimestampFromUserKey(b, timestamp_size());
|
||||
|
||||
return cmp_without_ts_->Compare(k1, k2);
|
||||
return CompareImpl(k1, k2);
|
||||
}
|
||||
|
||||
int CompareTimestamp(const Slice& ts1, const Slice& ts2) const override {
|
||||
@ -2107,6 +2101,55 @@ class DBBasicTestWithTimestampWithParam
|
||||
}
|
||||
};
|
||||
|
||||
class DBBasicTestWithTimestamp : public DBBasicTestWithTimestampBase {
|
||||
public:
|
||||
DBBasicTestWithTimestamp()
|
||||
: DBBasicTestWithTimestampBase("/db_basic_test_with_timestamp") {}
|
||||
|
||||
protected:
|
||||
class TestComparator : public TestComparatorBase {
|
||||
public:
|
||||
const int kKeyPrefixLength =
|
||||
3; // 3: length of "key" in generated keys ("key" + std::to_string(j))
|
||||
explicit TestComparator(size_t ts_sz) : TestComparatorBase(ts_sz) {}
|
||||
|
||||
int CompareImpl(const Slice& a, const Slice& b) const override {
|
||||
int n1 = atoi(
|
||||
std::string(a.data() + kKeyPrefixLength, a.size() - kKeyPrefixLength)
|
||||
.c_str());
|
||||
int n2 = atoi(
|
||||
std::string(b.data() + kKeyPrefixLength, b.size() - kKeyPrefixLength)
|
||||
.c_str());
|
||||
return (n1 < n2) ? -1 : (n1 > n2) ? 1 : 0;
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
class DBBasicTestWithTimestampWithParam
|
||||
: public DBBasicTestWithTimestampBase,
|
||||
public testing::WithParamInterface<bool> {
|
||||
public:
|
||||
DBBasicTestWithTimestampWithParam()
|
||||
: DBBasicTestWithTimestampBase(
|
||||
"/db_basic_test_with_timestamp_with_param") {}
|
||||
|
||||
protected:
|
||||
class TestComparator : public TestComparatorBase {
|
||||
private:
|
||||
const Comparator* cmp_without_ts_;
|
||||
|
||||
public:
|
||||
explicit TestComparator(size_t ts_sz)
|
||||
: TestComparatorBase(ts_sz), cmp_without_ts_(nullptr) {
|
||||
cmp_without_ts_ = BytewiseComparator();
|
||||
}
|
||||
|
||||
int CompareImpl(const Slice& a, const Slice& b) const override {
|
||||
return cmp_without_ts_->Compare(a, b);
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
TEST_P(DBBasicTestWithTimestampWithParam, PutAndGet) {
|
||||
const int kNumKeysPerFile = 8192;
|
||||
const size_t kNumTimestamps = 6;
|
||||
@ -2204,6 +2247,115 @@ TEST_P(DBBasicTestWithTimestampWithParam, PutAndGet) {
|
||||
INSTANTIATE_TEST_CASE_P(Timestamp, DBBasicTestWithTimestampWithParam,
|
||||
::testing::Bool());
|
||||
|
||||
// A class which remembers the name of each flushed file.
|
||||
class FlushedFileCollector : public EventListener {
|
||||
public:
|
||||
FlushedFileCollector() {}
|
||||
~FlushedFileCollector() override {}
|
||||
|
||||
void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
|
||||
InstrumentedMutexLock lock(&mutex_);
|
||||
flushed_files_.push_back(info.file_path);
|
||||
}
|
||||
|
||||
std::vector<std::string> GetFlushedFiles() {
|
||||
std::vector<std::string> result;
|
||||
{
|
||||
InstrumentedMutexLock lock(&mutex_);
|
||||
result = flushed_files_;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
void ClearFlushedFiles() {
|
||||
InstrumentedMutexLock lock(&mutex_);
|
||||
flushed_files_.clear();
|
||||
}
|
||||
|
||||
private:
|
||||
std::vector<std::string> flushed_files_;
|
||||
InstrumentedMutex mutex_;
|
||||
};
|
||||
|
||||
TEST_F(DBBasicTestWithTimestamp, PutAndGetWithCompaction) {
|
||||
const int kNumKeysPerFile = 8192;
|
||||
const size_t kNumTimestamps = 2;
|
||||
const size_t kNumKeysPerTimestamp = (kNumKeysPerFile - 1) / kNumTimestamps;
|
||||
const size_t kSplitPosBase = kNumKeysPerTimestamp / 2;
|
||||
Options options = CurrentOptions();
|
||||
options.create_if_missing = true;
|
||||
options.env = env_;
|
||||
options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
|
||||
|
||||
FlushedFileCollector* collector = new FlushedFileCollector();
|
||||
options.listeners.emplace_back(collector);
|
||||
|
||||
std::string tmp;
|
||||
size_t ts_sz = EncodeTimestamp(0, 0, &tmp).size();
|
||||
TestComparator test_cmp(ts_sz);
|
||||
options.comparator = &test_cmp;
|
||||
BlockBasedTableOptions bbto;
|
||||
bbto.filter_policy.reset(NewBloomFilterPolicy(
|
||||
10 /*bits_per_key*/, false /*use_block_based_builder*/));
|
||||
bbto.whole_key_filtering = true;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
|
||||
DestroyAndReopen(options);
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
size_t num_cfs = handles_.size();
|
||||
ASSERT_EQ(2, num_cfs);
|
||||
std::vector<std::string> write_ts_strs(kNumTimestamps);
|
||||
std::vector<std::string> read_ts_strs(kNumTimestamps);
|
||||
std::vector<Slice> write_ts_list;
|
||||
std::vector<Slice> read_ts_list;
|
||||
|
||||
for (size_t i = 0; i != kNumTimestamps; ++i) {
|
||||
write_ts_list.emplace_back(EncodeTimestamp(i * 2, 0, &write_ts_strs[i]));
|
||||
read_ts_list.emplace_back(EncodeTimestamp(1 + i * 2, 0, &read_ts_strs[i]));
|
||||
const Slice& write_ts = write_ts_list.back();
|
||||
WriteOptions wopts;
|
||||
wopts.timestamp = &write_ts;
|
||||
for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
|
||||
for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
|
||||
ASSERT_OK(Put(cf, "key" + std::to_string(j),
|
||||
"value_" + std::to_string(j) + "_" + std::to_string(i),
|
||||
wopts));
|
||||
if (j == kSplitPosBase + i || j == kNumKeysPerTimestamp - 1) {
|
||||
// flush all keys with the same timestamp to two sst files, split at
|
||||
// incremental positions such that lowerlevel[1].smallest.userkey ==
|
||||
// higherlevel[0].largest.userkey
|
||||
ASSERT_OK(Flush(cf));
|
||||
|
||||
// compact files (2 at each level) to a lower level such that all keys
|
||||
// with the same timestamp is at one level, with newer versions at
|
||||
// higher levels.
|
||||
CompactionOptions compact_opt;
|
||||
compact_opt.compression = kNoCompression;
|
||||
db_->CompactFiles(compact_opt, handles_[cf],
|
||||
collector->GetFlushedFiles(),
|
||||
static_cast<int>(kNumTimestamps - i));
|
||||
collector->ClearFlushedFiles();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
const auto& verify_db_func = [&]() {
|
||||
for (size_t i = 0; i != kNumTimestamps; ++i) {
|
||||
ReadOptions ropts;
|
||||
ropts.timestamp = &read_ts_list[i];
|
||||
for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
|
||||
ColumnFamilyHandle* cfh = handles_[cf];
|
||||
for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
|
||||
std::string value;
|
||||
ASSERT_OK(db_->Get(ropts, cfh, "key" + std::to_string(j), &value));
|
||||
ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i),
|
||||
value);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
verify_db_func();
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
|
||||
|
@ -108,26 +108,30 @@ void FileIndexer::UpdateIndex(Arena* arena, const size_t num_levels,
|
||||
|
||||
CalculateLB(
|
||||
upper_files, lower_files, &index_level,
|
||||
[this](const FileMetaData * a, const FileMetaData * b)->int {
|
||||
return ucmp_->Compare(a->smallest.user_key(), b->largest.user_key());
|
||||
[this](const FileMetaData* a, const FileMetaData* b) -> int {
|
||||
return ucmp_->CompareWithoutTimestamp(a->smallest.user_key(),
|
||||
b->largest.user_key());
|
||||
},
|
||||
[](IndexUnit* index, int32_t f_idx) { index->smallest_lb = f_idx; });
|
||||
CalculateLB(
|
||||
upper_files, lower_files, &index_level,
|
||||
[this](const FileMetaData * a, const FileMetaData * b)->int {
|
||||
return ucmp_->Compare(a->largest.user_key(), b->largest.user_key());
|
||||
[this](const FileMetaData* a, const FileMetaData* b) -> int {
|
||||
return ucmp_->CompareWithoutTimestamp(a->largest.user_key(),
|
||||
b->largest.user_key());
|
||||
},
|
||||
[](IndexUnit* index, int32_t f_idx) { index->largest_lb = f_idx; });
|
||||
CalculateRB(
|
||||
upper_files, lower_files, &index_level,
|
||||
[this](const FileMetaData * a, const FileMetaData * b)->int {
|
||||
return ucmp_->Compare(a->smallest.user_key(), b->smallest.user_key());
|
||||
[this](const FileMetaData* a, const FileMetaData* b) -> int {
|
||||
return ucmp_->CompareWithoutTimestamp(a->smallest.user_key(),
|
||||
b->smallest.user_key());
|
||||
},
|
||||
[](IndexUnit* index, int32_t f_idx) { index->smallest_rb = f_idx; });
|
||||
CalculateRB(
|
||||
upper_files, lower_files, &index_level,
|
||||
[this](const FileMetaData * a, const FileMetaData * b)->int {
|
||||
return ucmp_->Compare(a->largest.user_key(), b->smallest.user_key());
|
||||
[this](const FileMetaData* a, const FileMetaData* b) -> int {
|
||||
return ucmp_->CompareWithoutTimestamp(a->largest.user_key(),
|
||||
b->smallest.user_key());
|
||||
},
|
||||
[](IndexUnit* index, int32_t f_idx) { index->largest_rb = f_idx; });
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user