d057e8326d
Summary: - Changed MergeOperator, CompactionFilter, and CompactionFilterFactory into Customizable classes. - Added Options/Configurable/Object Registration for TTL and Cassandra variants - Changed the StringAppend MergeOperators to accept a string delimiter rather than a simple char. Made the delimiter into a configurable option - Added tests for new functionality Pull Request resolved: https://github.com/facebook/rocksdb/pull/8481 Reviewed By: zhichao-cao Differential Revision: D30136050 Pulled By: mrambacher fbshipit-source-id: 271d1772835935b6773abaf018ee71e42f9491af
422 lines
14 KiB
C++
422 lines
14 KiB
C++
// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include <iostream>
|
|
|
|
#include "db/db_impl/db_impl.h"
|
|
#include "rocksdb/convenience.h"
|
|
#include "rocksdb/db.h"
|
|
#include "rocksdb/merge_operator.h"
|
|
#include "rocksdb/utilities/object_registry.h"
|
|
#include "test_util/testharness.h"
|
|
#include "util/cast_util.h"
|
|
#include "util/random.h"
|
|
#include "utilities/cassandra/cassandra_compaction_filter.h"
|
|
#include "utilities/cassandra/merge_operator.h"
|
|
#include "utilities/cassandra/test_utils.h"
|
|
#include "utilities/merge_operators.h"
|
|
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
namespace cassandra {
|
|
|
|
// Path to the database on file system
|
|
const std::string kDbName = test::PerThreadDBPath("cassandra_functional_test");
|
|
|
|
class CassandraStore {
|
|
public:
|
|
explicit CassandraStore(std::shared_ptr<DB> db)
|
|
: db_(db), write_option_(), get_option_() {
|
|
assert(db);
|
|
}
|
|
|
|
bool Append(const std::string& key, const RowValue& val){
|
|
std::string result;
|
|
val.Serialize(&result);
|
|
Slice valSlice(result.data(), result.size());
|
|
auto s = db_->Merge(write_option_, key, valSlice);
|
|
|
|
if (s.ok()) {
|
|
return true;
|
|
} else {
|
|
std::cerr << "ERROR " << s.ToString() << std::endl;
|
|
return false;
|
|
}
|
|
}
|
|
|
|
bool Put(const std::string& key, const RowValue& val) {
|
|
std::string result;
|
|
val.Serialize(&result);
|
|
Slice valSlice(result.data(), result.size());
|
|
auto s = db_->Put(write_option_, key, valSlice);
|
|
if (s.ok()) {
|
|
return true;
|
|
} else {
|
|
std::cerr << "ERROR " << s.ToString() << std::endl;
|
|
return false;
|
|
}
|
|
}
|
|
|
|
Status Flush() {
|
|
Status s = dbfull()->TEST_FlushMemTable();
|
|
if (s.ok()) {
|
|
s = dbfull()->TEST_WaitForCompact();
|
|
}
|
|
return s;
|
|
}
|
|
|
|
Status Compact() {
|
|
return dbfull()->TEST_CompactRange(0, nullptr, nullptr,
|
|
db_->DefaultColumnFamily());
|
|
}
|
|
|
|
std::tuple<bool, RowValue> Get(const std::string& key){
|
|
std::string result;
|
|
auto s = db_->Get(get_option_, key, &result);
|
|
|
|
if (s.ok()) {
|
|
return std::make_tuple(true,
|
|
RowValue::Deserialize(result.data(),
|
|
result.size()));
|
|
}
|
|
|
|
if (!s.IsNotFound()) {
|
|
std::cerr << "ERROR " << s.ToString() << std::endl;
|
|
}
|
|
|
|
return std::make_tuple(false, RowValue(0, 0));
|
|
}
|
|
|
|
private:
|
|
std::shared_ptr<DB> db_;
|
|
WriteOptions write_option_;
|
|
ReadOptions get_option_;
|
|
|
|
DBImpl* dbfull() { return static_cast_with_check<DBImpl>(db_.get()); }
|
|
};
|
|
|
|
class TestCompactionFilterFactory : public CompactionFilterFactory {
|
|
public:
|
|
explicit TestCompactionFilterFactory(bool purge_ttl_on_expiration,
|
|
int32_t gc_grace_period_in_seconds)
|
|
: purge_ttl_on_expiration_(purge_ttl_on_expiration),
|
|
gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {}
|
|
|
|
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
|
|
const CompactionFilter::Context& /*context*/) override {
|
|
return std::unique_ptr<CompactionFilter>(new CassandraCompactionFilter(
|
|
purge_ttl_on_expiration_, gc_grace_period_in_seconds_));
|
|
}
|
|
|
|
const char* Name() const override { return "TestCompactionFilterFactory"; }
|
|
|
|
private:
|
|
bool purge_ttl_on_expiration_;
|
|
int32_t gc_grace_period_in_seconds_;
|
|
};
|
|
|
|
|
|
// The class for unit-testing
|
|
class CassandraFunctionalTest : public testing::Test {
|
|
public:
|
|
CassandraFunctionalTest() {
|
|
DestroyDB(kDbName, Options()); // Start each test with a fresh DB
|
|
}
|
|
|
|
std::shared_ptr<DB> OpenDb() {
|
|
DB* db;
|
|
Options options;
|
|
options.create_if_missing = true;
|
|
options.merge_operator.reset(new CassandraValueMergeOperator(gc_grace_period_in_seconds_));
|
|
auto* cf_factory = new TestCompactionFilterFactory(
|
|
purge_ttl_on_expiration_, gc_grace_period_in_seconds_);
|
|
options.compaction_filter_factory.reset(cf_factory);
|
|
EXPECT_OK(DB::Open(options, kDbName, &db));
|
|
return std::shared_ptr<DB>(db);
|
|
}
|
|
|
|
bool purge_ttl_on_expiration_ = false;
|
|
int32_t gc_grace_period_in_seconds_ = 100;
|
|
};
|
|
|
|
// THE TEST CASES BEGIN HERE
|
|
|
|
TEST_F(CassandraFunctionalTest, SimpleMergeTest) {
|
|
CassandraStore store(OpenDb());
|
|
int64_t now = time(nullptr);
|
|
|
|
store.Append("k1", CreateTestRowValue({
|
|
CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now + 5)),
|
|
CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now + 8)),
|
|
CreateTestColumnSpec(kExpiringColumn, 2, ToMicroSeconds(now + 5)),
|
|
}));
|
|
store.Append("k1",CreateTestRowValue({
|
|
CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now + 2)),
|
|
CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now + 5)),
|
|
CreateTestColumnSpec(kTombstone, 2, ToMicroSeconds(now + 7)),
|
|
CreateTestColumnSpec(kExpiringColumn, 7, ToMicroSeconds(now + 17)),
|
|
}));
|
|
store.Append("k1", CreateTestRowValue({
|
|
CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now + 6)),
|
|
CreateTestColumnSpec(kTombstone, 1, ToMicroSeconds(now + 5)),
|
|
CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now + 4)),
|
|
CreateTestColumnSpec(kTombstone, 11, ToMicroSeconds(now + 11)),
|
|
}));
|
|
|
|
auto ret = store.Get("k1");
|
|
|
|
ASSERT_TRUE(std::get<0>(ret));
|
|
RowValue& merged = std::get<1>(ret);
|
|
EXPECT_EQ(merged.get_columns().size(), 5);
|
|
VerifyRowValueColumns(merged.get_columns(), 0, kExpiringColumn, 0,
|
|
ToMicroSeconds(now + 6));
|
|
VerifyRowValueColumns(merged.get_columns(), 1, kColumn, 1,
|
|
ToMicroSeconds(now + 8));
|
|
VerifyRowValueColumns(merged.get_columns(), 2, kTombstone, 2,
|
|
ToMicroSeconds(now + 7));
|
|
VerifyRowValueColumns(merged.get_columns(), 3, kExpiringColumn, 7,
|
|
ToMicroSeconds(now + 17));
|
|
VerifyRowValueColumns(merged.get_columns(), 4, kTombstone, 11,
|
|
ToMicroSeconds(now + 11));
|
|
}
|
|
|
|
TEST_F(CassandraFunctionalTest,
|
|
CompactionShouldConvertExpiredColumnsToTombstone) {
|
|
CassandraStore store(OpenDb());
|
|
int64_t now= time(nullptr);
|
|
|
|
store.Append("k1", CreateTestRowValue({
|
|
CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired
|
|
CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl + 10)), // not expired
|
|
CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))
|
|
}));
|
|
|
|
ASSERT_OK(store.Flush());
|
|
|
|
store.Append("k1",CreateTestRowValue({
|
|
CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired
|
|
CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now))
|
|
}));
|
|
|
|
ASSERT_OK(store.Flush());
|
|
ASSERT_OK(store.Compact());
|
|
|
|
auto ret = store.Get("k1");
|
|
ASSERT_TRUE(std::get<0>(ret));
|
|
RowValue& merged = std::get<1>(ret);
|
|
EXPECT_EQ(merged.get_columns().size(), 4);
|
|
VerifyRowValueColumns(merged.get_columns(), 0, kTombstone, 0,
|
|
ToMicroSeconds(now - 10));
|
|
VerifyRowValueColumns(merged.get_columns(), 1, kExpiringColumn, 1,
|
|
ToMicroSeconds(now - kTtl + 10));
|
|
VerifyRowValueColumns(merged.get_columns(), 2, kColumn, 2,
|
|
ToMicroSeconds(now));
|
|
VerifyRowValueColumns(merged.get_columns(), 3, kTombstone, 3,
|
|
ToMicroSeconds(now));
|
|
}
|
|
|
|
|
|
TEST_F(CassandraFunctionalTest,
|
|
CompactionShouldPurgeExpiredColumnsIfPurgeTtlIsOn) {
|
|
purge_ttl_on_expiration_ = true;
|
|
CassandraStore store(OpenDb());
|
|
int64_t now = time(nullptr);
|
|
|
|
store.Append("k1", CreateTestRowValue({
|
|
CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired
|
|
CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now)), // not expired
|
|
CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))
|
|
}));
|
|
|
|
ASSERT_OK(store.Flush());
|
|
|
|
store.Append("k1",CreateTestRowValue({
|
|
CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired
|
|
CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now))
|
|
}));
|
|
|
|
ASSERT_OK(store.Flush());
|
|
ASSERT_OK(store.Compact());
|
|
|
|
auto ret = store.Get("k1");
|
|
ASSERT_TRUE(std::get<0>(ret));
|
|
RowValue& merged = std::get<1>(ret);
|
|
EXPECT_EQ(merged.get_columns().size(), 3);
|
|
VerifyRowValueColumns(merged.get_columns(), 0, kExpiringColumn, 1,
|
|
ToMicroSeconds(now));
|
|
VerifyRowValueColumns(merged.get_columns(), 1, kColumn, 2,
|
|
ToMicroSeconds(now));
|
|
VerifyRowValueColumns(merged.get_columns(), 2, kTombstone, 3,
|
|
ToMicroSeconds(now));
|
|
}
|
|
|
|
TEST_F(CassandraFunctionalTest,
|
|
CompactionShouldRemoveRowWhenAllColumnsExpiredIfPurgeTtlIsOn) {
|
|
purge_ttl_on_expiration_ = true;
|
|
CassandraStore store(OpenDb());
|
|
int64_t now = time(nullptr);
|
|
|
|
store.Append("k1", CreateTestRowValue({
|
|
CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)),
|
|
CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl - 20)),
|
|
}));
|
|
|
|
ASSERT_OK(store.Flush());
|
|
|
|
store.Append("k1",CreateTestRowValue({
|
|
CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)),
|
|
}));
|
|
|
|
ASSERT_OK(store.Flush());
|
|
ASSERT_OK(store.Compact());
|
|
ASSERT_FALSE(std::get<0>(store.Get("k1")));
|
|
}
|
|
|
|
TEST_F(CassandraFunctionalTest,
|
|
CompactionShouldRemoveTombstoneExceedingGCGracePeriod) {
|
|
purge_ttl_on_expiration_ = true;
|
|
CassandraStore store(OpenDb());
|
|
int64_t now = time(nullptr);
|
|
|
|
store.Append("k1", CreateTestRowValue({
|
|
CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)),
|
|
CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now))
|
|
}));
|
|
|
|
store.Append("k2", CreateTestRowValue({
|
|
CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now))
|
|
}));
|
|
|
|
ASSERT_OK(store.Flush());
|
|
|
|
store.Append("k1",CreateTestRowValue({
|
|
CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now)),
|
|
}));
|
|
|
|
ASSERT_OK(store.Flush());
|
|
ASSERT_OK(store.Compact());
|
|
|
|
auto ret = store.Get("k1");
|
|
ASSERT_TRUE(std::get<0>(ret));
|
|
RowValue& gced = std::get<1>(ret);
|
|
EXPECT_EQ(gced.get_columns().size(), 1);
|
|
VerifyRowValueColumns(gced.get_columns(), 0, kColumn, 1, ToMicroSeconds(now));
|
|
}
|
|
|
|
TEST_F(CassandraFunctionalTest, CompactionShouldRemoveTombstoneFromPut) {
|
|
purge_ttl_on_expiration_ = true;
|
|
CassandraStore store(OpenDb());
|
|
int64_t now = time(nullptr);
|
|
|
|
store.Put("k1", CreateTestRowValue({
|
|
CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)),
|
|
}));
|
|
|
|
ASSERT_OK(store.Flush());
|
|
ASSERT_OK(store.Compact());
|
|
ASSERT_FALSE(std::get<0>(store.Get("k1")));
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
TEST_F(CassandraFunctionalTest, LoadMergeOperator) {
|
|
ConfigOptions config_options;
|
|
std::shared_ptr<MergeOperator> mo;
|
|
config_options.ignore_unsupported_options = false;
|
|
|
|
ASSERT_NOK(MergeOperator::CreateFromString(
|
|
config_options, CassandraValueMergeOperator::kClassName(), &mo));
|
|
|
|
config_options.registry->AddLibrary("cassandra", RegisterCassandraObjects,
|
|
"cassandra");
|
|
|
|
ASSERT_OK(MergeOperator::CreateFromString(
|
|
config_options, CassandraValueMergeOperator::kClassName(), &mo));
|
|
ASSERT_NE(mo, nullptr);
|
|
ASSERT_STREQ(mo->Name(), CassandraValueMergeOperator::kClassName());
|
|
mo.reset();
|
|
ASSERT_OK(MergeOperator::CreateFromString(
|
|
config_options,
|
|
std::string("operands_limit=20;gc_grace_period_in_seconds=42;id=") +
|
|
CassandraValueMergeOperator::kClassName(),
|
|
&mo));
|
|
ASSERT_NE(mo, nullptr);
|
|
ASSERT_STREQ(mo->Name(), CassandraValueMergeOperator::kClassName());
|
|
const auto* opts = mo->GetOptions<CassandraOptions>();
|
|
ASSERT_NE(opts, nullptr);
|
|
ASSERT_EQ(opts->gc_grace_period_in_seconds, 42);
|
|
ASSERT_EQ(opts->operands_limit, 20);
|
|
}
|
|
|
|
TEST_F(CassandraFunctionalTest, LoadCompactionFilter) {
|
|
ConfigOptions config_options;
|
|
const CompactionFilter* filter = nullptr;
|
|
config_options.ignore_unsupported_options = false;
|
|
|
|
ASSERT_NOK(CompactionFilter::CreateFromString(
|
|
config_options, CassandraCompactionFilter::kClassName(), &filter));
|
|
config_options.registry->AddLibrary("cassandra", RegisterCassandraObjects,
|
|
"cassandra");
|
|
|
|
ASSERT_OK(CompactionFilter::CreateFromString(
|
|
config_options, CassandraCompactionFilter::kClassName(), &filter));
|
|
ASSERT_NE(filter, nullptr);
|
|
ASSERT_STREQ(filter->Name(), CassandraCompactionFilter::kClassName());
|
|
delete filter;
|
|
filter = nullptr;
|
|
ASSERT_OK(CompactionFilter::CreateFromString(
|
|
config_options,
|
|
std::string(
|
|
"purge_ttl_on_expiration=true;gc_grace_period_in_seconds=42;id=") +
|
|
CassandraCompactionFilter::kClassName(),
|
|
&filter));
|
|
ASSERT_NE(filter, nullptr);
|
|
ASSERT_STREQ(filter->Name(), CassandraCompactionFilter::kClassName());
|
|
const auto* opts = filter->GetOptions<CassandraOptions>();
|
|
ASSERT_NE(opts, nullptr);
|
|
ASSERT_EQ(opts->gc_grace_period_in_seconds, 42);
|
|
ASSERT_TRUE(opts->purge_ttl_on_expiration);
|
|
delete filter;
|
|
}
|
|
|
|
TEST_F(CassandraFunctionalTest, LoadCompactionFilterFactory) {
|
|
ConfigOptions config_options;
|
|
std::shared_ptr<CompactionFilterFactory> factory;
|
|
|
|
config_options.ignore_unsupported_options = false;
|
|
ASSERT_NOK(CompactionFilterFactory::CreateFromString(
|
|
config_options, CassandraCompactionFilterFactory::kClassName(),
|
|
&factory));
|
|
config_options.registry->AddLibrary("cassandra", RegisterCassandraObjects,
|
|
"cassandra");
|
|
|
|
ASSERT_OK(CompactionFilterFactory::CreateFromString(
|
|
config_options, CassandraCompactionFilterFactory::kClassName(),
|
|
&factory));
|
|
ASSERT_NE(factory, nullptr);
|
|
ASSERT_STREQ(factory->Name(), CassandraCompactionFilterFactory::kClassName());
|
|
factory.reset();
|
|
ASSERT_OK(CompactionFilterFactory::CreateFromString(
|
|
config_options,
|
|
std::string(
|
|
"purge_ttl_on_expiration=true;gc_grace_period_in_seconds=42;id=") +
|
|
CassandraCompactionFilterFactory::kClassName(),
|
|
&factory));
|
|
ASSERT_NE(factory, nullptr);
|
|
ASSERT_STREQ(factory->Name(), CassandraCompactionFilterFactory::kClassName());
|
|
const auto* opts = factory->GetOptions<CassandraOptions>();
|
|
ASSERT_NE(opts, nullptr);
|
|
ASSERT_EQ(opts->gc_grace_period_in_seconds, 42);
|
|
ASSERT_TRUE(opts->purge_ttl_on_expiration);
|
|
}
|
|
#endif // ROCKSDB_LITE
|
|
|
|
} // namespace cassandra
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
|
|
int main(int argc, char** argv) {
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|