Add CompactOnDeletionCollector in utilities/table_properties_collectors.
Summary: This diff adds CompactOnDeletionCollector in utilities/table_properties_collectors, which applies a sliding window to a sst file and mark this file as need-compaction when it observe enough deletion entries within the consecutive keys covered by the sliding window. Test Plan: compact_on_deletion_collector_test Reviewers: igor, anthony, IslamAbdelRahman, kradhakrishnan, yoshinorim, sdong Reviewed By: sdong Subscribers: maykov, dhruba Differential Revision: https://reviews.facebook.net/D41175
This commit is contained in:
parent
20b244fcca
commit
26894303c1
@ -216,6 +216,7 @@ set(SOURCES
|
||||
utilities/merge_operators/uint64add.cc
|
||||
utilities/redis/redis_lists.cc
|
||||
utilities/spatialdb/spatial_db.cc
|
||||
utilities/table_properties_collectors/compact_on_deletion_collector.cc
|
||||
utilities/transactions/optimistic_transaction_impl.cc
|
||||
utilities/transactions/optimistic_transaction_db_impl.cc
|
||||
utilities/ttl/db_ttl_impl.cc
|
||||
@ -327,6 +328,7 @@ set(TESTS
|
||||
utilities/merge_operators/string_append/stringappend_test.cc
|
||||
utilities/redis/redis_lists_test.cc
|
||||
utilities/spatialdb/spatial_db_test.cc
|
||||
utilities/table_properties_collectors/compact_on_deletion_collector_test.cc
|
||||
utilities/transactions/optimistic_transaction_test.cc
|
||||
utilities/ttl/ttl_test.cc
|
||||
utilities/write_batch_with_index/write_batch_with_index_test.cc
|
||||
|
8
Makefile
8
Makefile
@ -300,8 +300,9 @@ TESTS = \
|
||||
perf_context_test \
|
||||
optimistic_transaction_test \
|
||||
write_callback_test \
|
||||
compaction_job_stats_test \
|
||||
heap_test
|
||||
heap_test \
|
||||
compact_on_deletion_collector_test \
|
||||
compaction_job_stats_test
|
||||
|
||||
SUBSET := $(shell echo $(TESTS) |sed s/^.*$(ROCKSDBTESTS_START)/$(ROCKSDBTESTS_START)/)
|
||||
|
||||
@ -757,6 +758,9 @@ compaction_job_test: db/compaction_job_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
compaction_job_stats_test: db/compaction_job_stats_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
$(AM_LINK)
|
||||
|
||||
compact_on_deletion_collector_test: utilities/table_properties_collectors/compact_on_deletion_collector_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
$(AM_LINK)
|
||||
|
||||
wal_manager_test: db/wal_manager_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
$(AM_LINK)
|
||||
|
||||
|
29
include/rocksdb/utilities/table_properties_collectors.h
Normal file
29
include/rocksdb/utilities/table_properties_collectors.h
Normal file
@ -0,0 +1,29 @@
|
||||
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
|
||||
#pragma once
|
||||
#ifndef ROCKSDB_LITE
|
||||
#include <memory>
|
||||
|
||||
#include "rocksdb/table_properties.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
// Creates a factory of a table property collector that marks a SST
|
||||
// file as need-compaction when it observe at least "D" deletion
|
||||
// entries in any "N" consecutive entires.
|
||||
//
|
||||
// @param sliding_window_size "N". Note that this number will be
|
||||
// round up to the smallest multilpe of 128 that is no less
|
||||
// than the specified size.
|
||||
// @param deletion_trigger "D". Note that even when "N" is changed,
|
||||
// the specified number for "D" will not be changed.
|
||||
extern std::shared_ptr<TablePropertiesCollectorFactory>
|
||||
NewCompactOnDeletionCollectorFactory(
|
||||
size_t sliding_window_size,
|
||||
size_t deletion_trigger);
|
||||
} // namespace rocksdb
|
||||
|
||||
#endif // !ROCKSDB_LITE
|
2
src.mk
2
src.mk
@ -111,6 +111,7 @@ LIB_SOURCES = \
|
||||
utilities/merge_operators/uint64add.cc \
|
||||
utilities/redis/redis_lists.cc \
|
||||
utilities/spatialdb/spatial_db.cc \
|
||||
utilities/table_properties_collectors/compact_on_deletion_collector.cc \
|
||||
utilities/transactions/optimistic_transaction_impl.cc \
|
||||
utilities/transactions/optimistic_transaction_db_impl.cc \
|
||||
utilities/ttl/db_ttl_impl.cc \
|
||||
@ -227,6 +228,7 @@ TEST_BENCH_SOURCES = \
|
||||
utilities/merge_operators/string_append/stringappend_test.cc \
|
||||
utilities/redis/redis_lists_test.cc \
|
||||
utilities/spatialdb/spatial_db_test.cc \
|
||||
utilities/table_properties_collectors/compact_on_deletion_collector_test.cc \
|
||||
utilities/transactions/optimistic_transaction_test.cc \
|
||||
utilities/ttl/ttl_test.cc \
|
||||
utilities/write_batch_with_index/write_batch_with_index_test.cc \
|
||||
|
@ -0,0 +1,93 @@
|
||||
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
#include <memory>
|
||||
|
||||
#include "rocksdb/utilities/table_properties_collectors.h"
|
||||
#include "utilities/table_properties_collectors/compact_on_deletion_collector.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
CompactOnDeletionCollector::CompactOnDeletionCollector(
|
||||
size_t sliding_window_size,
|
||||
size_t deletion_trigger) {
|
||||
deletion_trigger_ = deletion_trigger;
|
||||
|
||||
// First, compute the number of keys in each bucket.
|
||||
bucket_size_ =
|
||||
(sliding_window_size + kNumBuckets - 1) / kNumBuckets;
|
||||
assert(bucket_size_ > 0U);
|
||||
|
||||
Reset();
|
||||
}
|
||||
|
||||
void CompactOnDeletionCollector::Reset() {
|
||||
for (int i = 0; i < kNumBuckets; ++i) {
|
||||
num_deletions_in_buckets_[i] = 0;
|
||||
}
|
||||
current_bucket_ = 0;
|
||||
num_keys_in_current_bucket_ = 0;
|
||||
num_deletions_in_observation_window_ = 0;
|
||||
need_compaction_ = false;
|
||||
}
|
||||
|
||||
// AddUserKey() will be called when a new key/value pair is inserted into the
|
||||
// table.
|
||||
// @params key the user key that is inserted into the table.
|
||||
// @params value the value that is inserted into the table.
|
||||
// @params file_size file size up to now
|
||||
Status CompactOnDeletionCollector::AddUserKey(
|
||||
const Slice& key, const Slice& value,
|
||||
EntryType type, SequenceNumber seq,
|
||||
uint64_t file_size) {
|
||||
if (need_compaction_) {
|
||||
// If the output file already needs to be compacted, skip the check.
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
if (num_keys_in_current_bucket_ == bucket_size_) {
|
||||
// When the current bucket is full, advance the cursor of the
|
||||
// ring buffer to the next bucket.
|
||||
current_bucket_ = (current_bucket_ + 1) % kNumBuckets;
|
||||
|
||||
// Update the current count of observed deletion keys by excluding
|
||||
// the number of deletion keys in the oldest bucket in the
|
||||
// observation window.
|
||||
assert(num_deletions_in_observation_window_ >=
|
||||
num_deletions_in_buckets_[current_bucket_]);
|
||||
num_deletions_in_observation_window_ -=
|
||||
num_deletions_in_buckets_[current_bucket_];
|
||||
num_deletions_in_buckets_[current_bucket_] = 0;
|
||||
num_keys_in_current_bucket_ = 0;
|
||||
}
|
||||
|
||||
num_keys_in_current_bucket_++;
|
||||
if (type == kEntryDelete) {
|
||||
num_deletions_in_observation_window_++;
|
||||
num_deletions_in_buckets_[current_bucket_]++;
|
||||
if (num_deletions_in_observation_window_ >= deletion_trigger_) {
|
||||
need_compaction_ = true;
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
TablePropertiesCollector* CompactOnDeletionCollectorFactory::
|
||||
CreateTablePropertiesCollector() {
|
||||
return new CompactOnDeletionCollector(
|
||||
sliding_window_size_, deletion_trigger_);
|
||||
}
|
||||
|
||||
std::shared_ptr<TablePropertiesCollectorFactory>
|
||||
NewCompactOnDeletionCollectorFactory(
|
||||
size_t sliding_window_size,
|
||||
size_t deletion_trigger) {
|
||||
return std::shared_ptr<TablePropertiesCollectorFactory>(
|
||||
new CompactOnDeletionCollectorFactory(
|
||||
sliding_window_size, deletion_trigger));
|
||||
}
|
||||
} // namespace rocksdb
|
||||
#endif // !ROCKSDB_LITE
|
@ -0,0 +1,101 @@
|
||||
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
|
||||
#pragma once
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
namespace rocksdb {
|
||||
|
||||
// A factory of a table property collector that marks a SST
|
||||
// file as need-compaction when it observe at least "D" deletion
|
||||
// entries in any "N" consecutive entires.
|
||||
class CompactOnDeletionCollectorFactory
|
||||
: public TablePropertiesCollectorFactory {
|
||||
public:
|
||||
// A factory of a table property collector that marks a SST
|
||||
// file as need-compaction when it observe at least "D" deletion
|
||||
// entries in any "N" consecutive entires.
|
||||
//
|
||||
// @param sliding_window_size "N"
|
||||
// @param deletion_trigger "D"
|
||||
CompactOnDeletionCollectorFactory(
|
||||
size_t sliding_window_size,
|
||||
size_t deletion_trigger) :
|
||||
sliding_window_size_(sliding_window_size),
|
||||
deletion_trigger_(deletion_trigger) {}
|
||||
|
||||
virtual ~CompactOnDeletionCollectorFactory() {}
|
||||
|
||||
virtual TablePropertiesCollector* CreateTablePropertiesCollector() override;
|
||||
|
||||
virtual const char* Name() const override {
|
||||
return "CompactOnDeletionCollector";
|
||||
}
|
||||
|
||||
private:
|
||||
size_t sliding_window_size_;
|
||||
size_t deletion_trigger_;
|
||||
};
|
||||
|
||||
class CompactOnDeletionCollector : public TablePropertiesCollector {
|
||||
public:
|
||||
CompactOnDeletionCollector(
|
||||
size_t sliding_window_size,
|
||||
size_t deletion_trigger);
|
||||
|
||||
// AddUserKey() will be called when a new key/value pair is inserted into the
|
||||
// table.
|
||||
// @params key the user key that is inserted into the table.
|
||||
// @params value the value that is inserted into the table.
|
||||
// @params file_size file size up to now
|
||||
virtual Status AddUserKey(const Slice& key, const Slice& value,
|
||||
EntryType type, SequenceNumber seq,
|
||||
uint64_t file_size) override;
|
||||
|
||||
// Finish() will be called when a table has already been built and is ready
|
||||
// for writing the properties block.
|
||||
// @params properties User will add their collected statistics to
|
||||
// `properties`.
|
||||
virtual Status Finish(UserCollectedProperties* properties) override {
|
||||
Reset();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// Return the human-readable properties, where the key is property name and
|
||||
// the value is the human-readable form of value.
|
||||
virtual UserCollectedProperties GetReadableProperties() const override {
|
||||
return UserCollectedProperties();
|
||||
}
|
||||
|
||||
// The name of the properties collector can be used for debugging purpose.
|
||||
virtual const char* Name() const {
|
||||
return "CompactOnDeletionCollector";
|
||||
}
|
||||
|
||||
// EXPERIMENTAL Return whether the output file should be further compacted
|
||||
virtual bool NeedCompact() const {
|
||||
return need_compaction_;
|
||||
}
|
||||
|
||||
static const int kNumBuckets = 128;
|
||||
|
||||
private:
|
||||
void Reset();
|
||||
|
||||
// A ring buffer that used to count the number of deletion entries for every
|
||||
// "bucket_size_" keys.
|
||||
size_t num_deletions_in_buckets_[kNumBuckets];
|
||||
// the number of keys in a bucket
|
||||
size_t bucket_size_;
|
||||
|
||||
size_t current_bucket_;
|
||||
size_t num_keys_in_current_bucket_;
|
||||
size_t num_deletions_in_observation_window_;
|
||||
size_t deletion_trigger_;
|
||||
// true if the current SST file needs to be compacted.
|
||||
bool need_compaction_;
|
||||
};
|
||||
} // namespace rocksdb
|
||||
#endif // !ROCKSDB_LITE
|
@ -0,0 +1,177 @@
|
||||
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
//
|
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#include <stdio.h>
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
#include <algorithm>
|
||||
#include <cmath>
|
||||
#include <vector>
|
||||
|
||||
#include "rocksdb/table.h"
|
||||
#include "rocksdb/utilities/table_properties_collectors.h"
|
||||
#include "util/random.h"
|
||||
#include "utilities/table_properties_collectors/compact_on_deletion_collector.h"
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
const int kWindowSizes[] =
|
||||
{1000, 10000, 10000, 127, 128, 129, 255, 256, 257, 2, 10000};
|
||||
const int kDeletionTriggers[] =
|
||||
{500, 9500, 4323, 47, 61, 128, 250, 250, 250, 2, 2};
|
||||
|
||||
std::vector<int> window_sizes;
|
||||
std::vector<int> deletion_triggers;
|
||||
// deterministic tests
|
||||
for (int test = 0; test < 9; ++test) {
|
||||
window_sizes.emplace_back(kWindowSizes[test]);
|
||||
deletion_triggers.emplace_back(kDeletionTriggers[test]);
|
||||
}
|
||||
|
||||
// randomize tests
|
||||
rocksdb::Random rnd(301);
|
||||
const int kMaxTestSize = 100000l;
|
||||
for (int random_test = 0; random_test < 100; random_test++) {
|
||||
int window_size = rnd.Uniform(kMaxTestSize) + 1;
|
||||
int deletion_trigger = rnd.Uniform(window_size);
|
||||
window_sizes.emplace_back(window_size);
|
||||
deletion_triggers.emplace_back(deletion_trigger);
|
||||
}
|
||||
|
||||
assert(window_sizes.size() == deletion_triggers.size());
|
||||
|
||||
for (size_t test = 0; test < window_sizes.size(); ++test) {
|
||||
const int kBucketSize = 128;
|
||||
const int kWindowSize = window_sizes[test];
|
||||
const int kPaddedWindowSize =
|
||||
kBucketSize * ((window_sizes[test] + kBucketSize - 1) / kBucketSize);
|
||||
const int kNumDeletionTrigger = deletion_triggers[test];
|
||||
const int kBias = (kNumDeletionTrigger + kBucketSize - 1) / kBucketSize;
|
||||
// Simple test
|
||||
{
|
||||
std::unique_ptr<rocksdb::TablePropertiesCollector> collector;
|
||||
auto factory = rocksdb::NewCompactOnDeletionCollectorFactory(
|
||||
kWindowSize, kNumDeletionTrigger);
|
||||
collector.reset(
|
||||
factory->CreateTablePropertiesCollector());
|
||||
const int kSample = 10;
|
||||
for (int delete_rate = 0; delete_rate <= kSample; ++delete_rate) {
|
||||
int deletions = 0;
|
||||
for (int i = 0; i < kPaddedWindowSize; ++i) {
|
||||
if (i % kSample < delete_rate) {
|
||||
collector->AddUserKey("hello", "rocksdb",
|
||||
rocksdb::kEntryDelete, 0, 0);
|
||||
deletions++;
|
||||
} else {
|
||||
collector->AddUserKey("hello", "rocksdb",
|
||||
rocksdb::kEntryPut, 0, 0);
|
||||
}
|
||||
}
|
||||
if (collector->NeedCompact() !=
|
||||
(deletions >= kNumDeletionTrigger) &&
|
||||
std::abs(deletions - kNumDeletionTrigger) > kBias) {
|
||||
fprintf(stderr, "[Error] collector->NeedCompact() != (%d >= %d)"
|
||||
" with kWindowSize = %d and kNumDeletionTrigger = %d\n",
|
||||
deletions, kNumDeletionTrigger,
|
||||
kWindowSize, kNumDeletionTrigger);
|
||||
assert(false);
|
||||
}
|
||||
collector->Finish(nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
// Only one section of a file satisfies the compaction trigger
|
||||
{
|
||||
std::unique_ptr<rocksdb::TablePropertiesCollector> collector;
|
||||
auto factory = rocksdb::NewCompactOnDeletionCollectorFactory(
|
||||
kWindowSize, kNumDeletionTrigger);
|
||||
collector.reset(
|
||||
factory->CreateTablePropertiesCollector());
|
||||
const int kSample = 10;
|
||||
for (int delete_rate = 0; delete_rate <= kSample; ++delete_rate) {
|
||||
int deletions = 0;
|
||||
for (int section = 0; section < 5; ++section) {
|
||||
int initial_entries = rnd.Uniform(kWindowSize) + kWindowSize;
|
||||
for (int i = 0; i < initial_entries; ++i) {
|
||||
collector->AddUserKey("hello", "rocksdb",
|
||||
rocksdb::kEntryPut, 0, 0);
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < kPaddedWindowSize; ++i) {
|
||||
if (i % kSample < delete_rate) {
|
||||
collector->AddUserKey("hello", "rocksdb",
|
||||
rocksdb::kEntryDelete, 0, 0);
|
||||
deletions++;
|
||||
} else {
|
||||
collector->AddUserKey("hello", "rocksdb",
|
||||
rocksdb::kEntryPut, 0, 0);
|
||||
}
|
||||
}
|
||||
for (int section = 0; section < 5; ++section) {
|
||||
int ending_entries = rnd.Uniform(kWindowSize) + kWindowSize;
|
||||
for (int i = 0; i < ending_entries; ++i) {
|
||||
collector->AddUserKey("hello", "rocksdb",
|
||||
rocksdb::kEntryPut, 0, 0);
|
||||
}
|
||||
}
|
||||
if (collector->NeedCompact() != (deletions >= kNumDeletionTrigger) &&
|
||||
std::abs(deletions - kNumDeletionTrigger) > kBias) {
|
||||
fprintf(stderr, "[Error] collector->NeedCompact() %d != (%d >= %d)"
|
||||
" with kWindowSize = %d, kNumDeletionTrigger = %d\n",
|
||||
collector->NeedCompact(),
|
||||
deletions, kNumDeletionTrigger, kWindowSize,
|
||||
kNumDeletionTrigger);
|
||||
assert(false);
|
||||
}
|
||||
collector->Finish(nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
// TEST 3: Issues a lots of deletes, but their density is not
|
||||
// high enough to trigger compaction.
|
||||
{
|
||||
std::unique_ptr<rocksdb::TablePropertiesCollector> collector;
|
||||
auto factory = rocksdb::NewCompactOnDeletionCollectorFactory(
|
||||
kWindowSize, kNumDeletionTrigger);
|
||||
collector.reset(
|
||||
factory->CreateTablePropertiesCollector());
|
||||
assert(collector->NeedCompact() == false);
|
||||
// Insert "kNumDeletionTrigger * 0.95" deletions for every
|
||||
// "kWindowSize" and verify compaction is not needed.
|
||||
const int kDeletionsPerSection = kNumDeletionTrigger * 95 / 100;
|
||||
if (kDeletionsPerSection >= 0) {
|
||||
for (int section = 0; section < 200; ++section) {
|
||||
for (int i = 0; i < kPaddedWindowSize; ++i) {
|
||||
if (i < kDeletionsPerSection) {
|
||||
collector->AddUserKey("hello", "rocksdb",
|
||||
rocksdb::kEntryDelete, 0, 0);
|
||||
} else {
|
||||
collector->AddUserKey("hello", "rocksdb",
|
||||
rocksdb::kEntryPut, 0, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (collector->NeedCompact() &&
|
||||
std::abs(kDeletionsPerSection - kNumDeletionTrigger) > kBias) {
|
||||
fprintf(stderr, "[Error] collector->NeedCompact() != false"
|
||||
" with kWindowSize = %d and kNumDeletionTrigger = %d\n",
|
||||
kWindowSize, kNumDeletionTrigger);
|
||||
assert(false);
|
||||
}
|
||||
collector->Finish(nullptr);
|
||||
}
|
||||
}
|
||||
}
|
||||
fprintf(stderr, "PASSED\n");
|
||||
}
|
||||
#else
|
||||
int main(int argc, char** argv) {
|
||||
fprintf(stderr, "SKIPPED as RocksDBLite does not include utilities.\n");
|
||||
return 0;
|
||||
}
|
||||
#endif // !ROCKSDB_LITE
|
Loading…
Reference in New Issue
Block a user