fix deletion-triggered compaction in table builder
Summary: It was broken when `NotifyCollectTableCollectorsOnFinish` was introduced. That function called `Finish` on each of the `TablePropertiesCollector`s, and `CompactOnDeletionCollector::Finish()` was resetting all its internal state. Then, when we checked whether compaction is necessary, the flag had already been cleared. Fixed above issue by avoiding resetting internal state during `Finish()`. Multiple calls to `Finish()` are allowed, but callers cannot invoke `AddUserKey()` on the collector after any finishes. Closes https://github.com/facebook/rocksdb/pull/2936 Differential Revision: D5918659 Pulled By: ajkr fbshipit-source-id: 4f05e9d80e50ee762ba1e611d8d22620029dca6b
This commit is contained in:
parent
385049baf2
commit
5df172da2f
@ -13,6 +13,7 @@
|
|||||||
#include "db/db_test_util.h"
|
#include "db/db_test_util.h"
|
||||||
#include "port/stack_trace.h"
|
#include "port/stack_trace.h"
|
||||||
#include "rocksdb/db.h"
|
#include "rocksdb/db.h"
|
||||||
|
#include "rocksdb/utilities/table_properties_collectors.h"
|
||||||
#include "util/testharness.h"
|
#include "util/testharness.h"
|
||||||
#include "util/testutil.h"
|
#include "util/testutil.h"
|
||||||
|
|
||||||
@ -250,6 +251,37 @@ TEST_F(DBTablePropertiesTest, GetColumnFamilyNameProperty) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(DBTablePropertiesTest, DeletionTriggeredCompactionMarking) {
|
||||||
|
const int kNumKeys = 1000;
|
||||||
|
const int kWindowSize = 100;
|
||||||
|
const int kNumDelsTrigger = 90;
|
||||||
|
|
||||||
|
Options opts = CurrentOptions();
|
||||||
|
opts.table_properties_collector_factories.emplace_back(
|
||||||
|
NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));
|
||||||
|
Reopen(opts);
|
||||||
|
|
||||||
|
// add an L1 file to prevent tombstones from dropping due to obsolescence
|
||||||
|
// during flush
|
||||||
|
Put(Key(0), "val");
|
||||||
|
Flush();
|
||||||
|
MoveFilesToLevel(1);
|
||||||
|
|
||||||
|
for (int i = 0; i < kNumKeys; ++i) {
|
||||||
|
if (i >= kNumKeys - kWindowSize &&
|
||||||
|
i < kNumKeys - kWindowSize + kNumDelsTrigger) {
|
||||||
|
Delete(Key(i));
|
||||||
|
} else {
|
||||||
|
Put(Key(i), "val");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Flush();
|
||||||
|
|
||||||
|
dbfull()->TEST_WaitForCompact();
|
||||||
|
ASSERT_EQ(0, NumTableFilesAtLevel(0));
|
||||||
|
ASSERT_GT(NumTableFilesAtLevel(1), 0);
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
|
||||||
#endif // ROCKSDB_LITE
|
#endif // ROCKSDB_LITE
|
||||||
|
@ -12,26 +12,16 @@
|
|||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
|
|
||||||
CompactOnDeletionCollector::CompactOnDeletionCollector(
|
CompactOnDeletionCollector::CompactOnDeletionCollector(
|
||||||
size_t sliding_window_size,
|
size_t sliding_window_size, size_t deletion_trigger)
|
||||||
size_t deletion_trigger) {
|
: bucket_size_((sliding_window_size + kNumBuckets - 1) / kNumBuckets),
|
||||||
deletion_trigger_ = deletion_trigger;
|
current_bucket_(0),
|
||||||
|
num_keys_in_current_bucket_(0),
|
||||||
// First, compute the number of keys in each bucket.
|
num_deletions_in_observation_window_(0),
|
||||||
bucket_size_ =
|
deletion_trigger_(deletion_trigger),
|
||||||
(sliding_window_size + kNumBuckets - 1) / kNumBuckets;
|
need_compaction_(false),
|
||||||
|
finished_(false) {
|
||||||
assert(bucket_size_ > 0U);
|
assert(bucket_size_ > 0U);
|
||||||
|
memset(num_deletions_in_buckets_, 0, sizeof(size_t) * kNumBuckets);
|
||||||
Reset();
|
|
||||||
}
|
|
||||||
|
|
||||||
void CompactOnDeletionCollector::Reset() {
|
|
||||||
for (int i = 0; i < kNumBuckets; ++i) {
|
|
||||||
num_deletions_in_buckets_[i] = 0;
|
|
||||||
}
|
|
||||||
current_bucket_ = 0;
|
|
||||||
num_keys_in_current_bucket_ = 0;
|
|
||||||
num_deletions_in_observation_window_ = 0;
|
|
||||||
need_compaction_ = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddUserKey() will be called when a new key/value pair is inserted into the
|
// AddUserKey() will be called when a new key/value pair is inserted into the
|
||||||
@ -43,6 +33,7 @@ Status CompactOnDeletionCollector::AddUserKey(
|
|||||||
const Slice& key, const Slice& value,
|
const Slice& key, const Slice& value,
|
||||||
EntryType type, SequenceNumber seq,
|
EntryType type, SequenceNumber seq,
|
||||||
uint64_t file_size) {
|
uint64_t file_size) {
|
||||||
|
assert(!finished_);
|
||||||
if (need_compaction_) {
|
if (need_compaction_) {
|
||||||
// If the output file already needs to be compacted, skip the check.
|
// If the output file already needs to be compacted, skip the check.
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
|
@ -61,7 +61,7 @@ class CompactOnDeletionCollector : public TablePropertiesCollector {
|
|||||||
// @params properties User will add their collected statistics to
|
// @params properties User will add their collected statistics to
|
||||||
// `properties`.
|
// `properties`.
|
||||||
virtual Status Finish(UserCollectedProperties* properties) override {
|
virtual Status Finish(UserCollectedProperties* properties) override {
|
||||||
Reset();
|
finished_ = true;
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -98,6 +98,7 @@ class CompactOnDeletionCollector : public TablePropertiesCollector {
|
|||||||
size_t deletion_trigger_;
|
size_t deletion_trigger_;
|
||||||
// true if the current SST file needs to be compacted.
|
// true if the current SST file needs to be compacted.
|
||||||
bool need_compaction_;
|
bool need_compaction_;
|
||||||
|
bool finished_;
|
||||||
};
|
};
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
#endif // !ROCKSDB_LITE
|
#endif // !ROCKSDB_LITE
|
||||||
|
@ -58,12 +58,12 @@ int main(int argc, char** argv) {
|
|||||||
const int kBias = (kNumDeletionTrigger + kBucketSize - 1) / kBucketSize;
|
const int kBias = (kNumDeletionTrigger + kBucketSize - 1) / kBucketSize;
|
||||||
// Simple test
|
// Simple test
|
||||||
{
|
{
|
||||||
std::unique_ptr<rocksdb::TablePropertiesCollector> collector;
|
|
||||||
auto factory = rocksdb::NewCompactOnDeletionCollectorFactory(
|
auto factory = rocksdb::NewCompactOnDeletionCollectorFactory(
|
||||||
kWindowSize, kNumDeletionTrigger);
|
kWindowSize, kNumDeletionTrigger);
|
||||||
collector.reset(factory->CreateTablePropertiesCollector(context));
|
|
||||||
const int kSample = 10;
|
const int kSample = 10;
|
||||||
for (int delete_rate = 0; delete_rate <= kSample; ++delete_rate) {
|
for (int delete_rate = 0; delete_rate <= kSample; ++delete_rate) {
|
||||||
|
std::unique_ptr<rocksdb::TablePropertiesCollector> collector(
|
||||||
|
factory->CreateTablePropertiesCollector(context));
|
||||||
int deletions = 0;
|
int deletions = 0;
|
||||||
for (int i = 0; i < kPaddedWindowSize; ++i) {
|
for (int i = 0; i < kPaddedWindowSize; ++i) {
|
||||||
if (i % kSample < delete_rate) {
|
if (i % kSample < delete_rate) {
|
||||||
@ -90,12 +90,12 @@ int main(int argc, char** argv) {
|
|||||||
|
|
||||||
// Only one section of a file satisfies the compaction trigger
|
// Only one section of a file satisfies the compaction trigger
|
||||||
{
|
{
|
||||||
std::unique_ptr<rocksdb::TablePropertiesCollector> collector;
|
|
||||||
auto factory = rocksdb::NewCompactOnDeletionCollectorFactory(
|
auto factory = rocksdb::NewCompactOnDeletionCollectorFactory(
|
||||||
kWindowSize, kNumDeletionTrigger);
|
kWindowSize, kNumDeletionTrigger);
|
||||||
collector.reset(factory->CreateTablePropertiesCollector(context));
|
|
||||||
const int kSample = 10;
|
const int kSample = 10;
|
||||||
for (int delete_rate = 0; delete_rate <= kSample; ++delete_rate) {
|
for (int delete_rate = 0; delete_rate <= kSample; ++delete_rate) {
|
||||||
|
std::unique_ptr<rocksdb::TablePropertiesCollector> collector(
|
||||||
|
factory->CreateTablePropertiesCollector(context));
|
||||||
int deletions = 0;
|
int deletions = 0;
|
||||||
for (int section = 0; section < 5; ++section) {
|
for (int section = 0; section < 5; ++section) {
|
||||||
int initial_entries = rnd.Uniform(kWindowSize) + kWindowSize;
|
int initial_entries = rnd.Uniform(kWindowSize) + kWindowSize;
|
||||||
|
Loading…
Reference in New Issue
Block a user