Create a MergeOperator for Cassandra Row Value

Summary:
This PR implements the MergeOperator for Cassandra Row Values.
Closes https://github.com/facebook/rocksdb/pull/2289

Differential Revision: D5055464

Pulled By: scv119

fbshipit-source-id: 45f276ef8cbc4704279202f6a20c64889bc1adef
This commit is contained in:
Chen Shen 2017-06-16 14:12:52 -07:00 committed by Facebook Github Bot
parent 2c98b06bff
commit cbd825deea
19 changed files with 1664 additions and 0 deletions

View File

@ -500,6 +500,8 @@ set(SOURCES
utilities/memory/memory_util.cc utilities/memory/memory_util.cc
utilities/merge_operators/max.cc utilities/merge_operators/max.cc
utilities/merge_operators/put.cc utilities/merge_operators/put.cc
utilities/merge_operators/cassandra/format.cc
utilities/merge_operators/cassandra/merge_operator.cc
utilities/merge_operators/string_append/stringappend.cc utilities/merge_operators/string_append/stringappend.cc
utilities/merge_operators/string_append/stringappend2.cc utilities/merge_operators/string_append/stringappend2.cc
utilities/merge_operators/uint64add.cc utilities/merge_operators/uint64add.cc
@ -710,6 +712,10 @@ set(TESTS
utilities/geodb/geodb_test.cc utilities/geodb/geodb_test.cc
utilities/lua/rocks_lua_test.cc utilities/lua/rocks_lua_test.cc
utilities/memory/memory_test.cc utilities/memory/memory_test.cc
utilities/merge_operators/cassandra/cassandra_merge_test.cc
utilities/merge_operators/cassandra/format_test.cc
utilities/merge_operators/cassandra/row_merge_test.cc
utilities/merge_operators/cassandra/serialize_test.cc
utilities/merge_operators/string_append/stringappend_test.cc utilities/merge_operators/string_append/stringappend_test.cc
utilities/object_registry_test.cc utilities/object_registry_test.cc
utilities/option_change_migration/option_change_migration_test.cc utilities/option_change_migration/option_change_migration_test.cc
@ -750,6 +756,7 @@ set(TESTUTIL_SOURCE
monitoring/thread_status_updater_debug.cc monitoring/thread_status_updater_debug.cc
table/mock_table.cc table/mock_table.cc
util/fault_injection_test_env.cc util/fault_injection_test_env.cc
utilities/merge_operators/cassandra/test_utils.cc
) )
# test utilities are only build in debug # test utilities are only build in debug
enable_testing() enable_testing()

View File

@ -401,6 +401,10 @@ TESTS = \
skiplist_test \ skiplist_test \
write_buffer_manager_test \ write_buffer_manager_test \
stringappend_test \ stringappend_test \
cassandra_format_test \
cassandra_merge_test \
cassandra_row_merge_test \
cassandra_serialize_test \
ttl_test \ ttl_test \
date_tiered_test \ date_tiered_test \
backupable_db_test \ backupable_db_test \
@ -990,6 +994,18 @@ option_change_migration_test: utilities/option_change_migration/option_change_mi
stringappend_test: utilities/merge_operators/string_append/stringappend_test.o $(LIBOBJECTS) $(TESTHARNESS) stringappend_test: utilities/merge_operators/string_append/stringappend_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)
cassandra_format_test: utilities/merge_operators/cassandra/format_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
cassandra_merge_test: utilities/merge_operators/cassandra/cassandra_merge_test.o utilities/merge_operators/cassandra/test_utils.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
cassandra_row_merge_test: utilities/merge_operators/cassandra/row_merge_test.o utilities/merge_operators/cassandra/test_utils.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
cassandra_serialize_test: utilities/merge_operators/cassandra/serialize_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
redis_test: utilities/redis/redis_lists_test.o $(LIBOBJECTS) $(TESTHARNESS) redis_test: utilities/redis/redis_lists_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)

15
TARGETS
View File

@ -222,6 +222,8 @@ cpp_library(
"utilities/memory/memory_util.cc", "utilities/memory/memory_util.cc",
"utilities/merge_operators/max.cc", "utilities/merge_operators/max.cc",
"utilities/merge_operators/put.cc", "utilities/merge_operators/put.cc",
"utilities/merge_operators/cassandra/format.cc",
"utilities/merge_operators/cassandra/merge_operator.cc",
"utilities/merge_operators/string_append/stringappend.cc", "utilities/merge_operators/string_append/stringappend.cc",
"utilities/merge_operators/string_append/stringappend2.cc", "utilities/merge_operators/string_append/stringappend2.cc",
"utilities/merge_operators/uint64add.cc", "utilities/merge_operators/uint64add.cc",
@ -270,6 +272,7 @@ cpp_library(
"utilities/col_buf_encoder.cc", "utilities/col_buf_encoder.cc",
"utilities/col_buf_decoder.cc", "utilities/col_buf_decoder.cc",
"utilities/column_aware_encoding_util.cc", "utilities/column_aware_encoding_util.cc",
"utilities/merge_operators/cassandra/test_utils.cc"
], ],
deps = [":rocksdb_lib"], deps = [":rocksdb_lib"],
preprocessor_flags = rocksdb_preprocessor_flags, preprocessor_flags = rocksdb_preprocessor_flags,
@ -407,6 +410,18 @@ ROCKS_TESTS = [['merger_test', 'table/merger_test.cc', 'serial'],
['stringappend_test', ['stringappend_test',
'utilities/merge_operators/string_append/stringappend_test.cc', 'utilities/merge_operators/string_append/stringappend_test.cc',
'serial'], 'serial'],
['cassandra_format_test',
'utilities/merge_operators/cassandra/format_test.cc',
'serial'],
['cassandra_merge_test',
'utilities/merge_operators/cassandra/cassandra_merge_test.cc',
'serial'],
['cassandra_row_merge_test',
'utilities/merge_operators/cassandra/row_merge_test.cc',
'serial'],
['cassandra_serialize_test',
'utilities/merge_operators/cassandra/serialize_test.cc',
'serial'],
['reduce_levels_test', 'tools/reduce_levels_test.cc', 'serial'], ['reduce_levels_test', 'tools/reduce_levels_test.cc', 'serial'],
['prefix_test', 'db/prefix_test.cc', 'serial'], ['prefix_test', 'db/prefix_test.cc', 'serial'],
['ttl_test', 'utilities/ttl/ttl_test.cc', 'serial'], ['ttl_test', 'utilities/ttl/ttl_test.cc', 'serial'],

View File

@ -7,6 +7,7 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractCompactionFilter\
org.rocksdb.BloomFilter\ org.rocksdb.BloomFilter\
org.rocksdb.Checkpoint\ org.rocksdb.Checkpoint\
org.rocksdb.ClockCache\ org.rocksdb.ClockCache\
org.rocksdb.CassandraValueMergeOperator\
org.rocksdb.ColumnFamilyHandle\ org.rocksdb.ColumnFamilyHandle\
org.rocksdb.ColumnFamilyOptions\ org.rocksdb.ColumnFamilyOptions\
org.rocksdb.CompactionOptionsFIFO\ org.rocksdb.CompactionOptionsFIFO\

View File

@ -0,0 +1,47 @@
// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree.
#include <stdio.h>
#include <stdlib.h>
#include <jni.h>
#include <string>
#include <memory>
#include "include/org_rocksdb_CassandraValueMergeOperator.h"
#include "rocksjni/portal.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/statistics.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/table.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/merge_operator.h"
#include "utilities/merge_operators/cassandra/merge_operator.h"
/*
* Class: org_rocksdb_CassandraValueMergeOperator
* Method: newSharedCassandraValueMergeOperator
* Signature: ()J
*/
jlong Java_org_rocksdb_CassandraValueMergeOperator_newSharedCassandraValueMergeOperator
(JNIEnv* env, jclass jclazz) {
auto* sptr_string_append_op = new std::shared_ptr<rocksdb::MergeOperator>(
rocksdb::CassandraValueMergeOperator::CreateSharedInstance());
return reinterpret_cast<jlong>(sptr_string_append_op);
}
/*
* Class: org_rocksdb_CassandraValueMergeOperator
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_CassandraValueMergeOperator_disposeInternal(
JNIEnv* env, jobject jobj, jlong jhandle) {
auto* sptr_string_append_op =
reinterpret_cast<std::shared_ptr<rocksdb::MergeOperator>* >(jhandle);
delete sptr_string_append_op; // delete std::shared_ptr
}

View File

@ -0,0 +1,22 @@
// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree.
package org.rocksdb;
/**
* CassandraValueMergeOperator is a merge operator that merges two cassandra wide column
* values.
*/
public class CassandraValueMergeOperator extends MergeOperator {
public CassandraValueMergeOperator() {
super(newSharedCassandraValueMergeOperator());
}
private native static long newSharedCassandraValueMergeOperator();
@Override protected final native void disposeInternal(final long handle);
}

7
src.mk
View File

@ -172,6 +172,8 @@ LIB_SOURCES = \
utilities/leveldb_options/leveldb_options.cc \ utilities/leveldb_options/leveldb_options.cc \
utilities/lua/rocks_lua_compaction_filter.cc \ utilities/lua/rocks_lua_compaction_filter.cc \
utilities/memory/memory_util.cc \ utilities/memory/memory_util.cc \
utilities/merge_operators/cassandra/format.cc \
utilities/merge_operators/cassandra/merge_operator.cc \
utilities/merge_operators/max.cc \ utilities/merge_operators/max.cc \
utilities/merge_operators/put.cc \ utilities/merge_operators/put.cc \
utilities/merge_operators/string_append/stringappend.cc \ utilities/merge_operators/string_append/stringappend.cc \
@ -334,6 +336,11 @@ MAIN_SOURCES = \
utilities/lua/rocks_lua_test.cc \ utilities/lua/rocks_lua_test.cc \
utilities/memory/memory_test.cc \ utilities/memory/memory_test.cc \
utilities/merge_operators/string_append/stringappend_test.cc \ utilities/merge_operators/string_append/stringappend_test.cc \
utilities/merge_operators/cassandra/cassandra_merge_test.cc \
utilities/merge_operators/cassandra/test_utils.cc \
utilities/merge_operators/cassandra/format_test.cc \
utilities/merge_operators/cassandra/row_merge_test.cc \
utilities/merge_operators/cassandra/serialize_test.cc \
utilities/object_registry_test.cc \ utilities/object_registry_test.cc \
utilities/option_change_migration/option_change_migration_test.cc \ utilities/option_change_migration/option_change_migration_test.cc \
utilities/options/options_util_test.cc \ utilities/options/options_util_test.cc \

View File

@ -23,6 +23,7 @@ class MergeOperators {
static std::shared_ptr<MergeOperator> CreateStringAppendOperator(); static std::shared_ptr<MergeOperator> CreateStringAppendOperator();
static std::shared_ptr<MergeOperator> CreateStringAppendTESTOperator(); static std::shared_ptr<MergeOperator> CreateStringAppendTESTOperator();
static std::shared_ptr<MergeOperator> CreateMaxOperator(); static std::shared_ptr<MergeOperator> CreateMaxOperator();
static std::shared_ptr<MergeOperator> CreateCassandraMergeOperator();
// Will return a different merge operator depending on the string. // Will return a different merge operator depending on the string.
// TODO: Hook the "name" up to the actual Name() of the MergeOperators? // TODO: Hook the "name" up to the actual Name() of the MergeOperators?
@ -40,6 +41,8 @@ class MergeOperators {
return CreateStringAppendTESTOperator(); return CreateStringAppendTESTOperator();
} else if (name == "max") { } else if (name == "max") {
return CreateMaxOperator(); return CreateMaxOperator();
} else if (name == "cassandra") {
return CreateCassandraMergeOperator();
} else { } else {
// Empty or unknown, just return nullptr // Empty or unknown, just return nullptr
return nullptr; return nullptr;

View File

@ -0,0 +1,134 @@
// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree.
#include <iostream>
#include "rocksdb/db.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/utilities/db_ttl.h"
#include "util/testharness.h"
#include "util/random.h"
#include "utilities/merge_operators.h"
#include "utilities/merge_operators/cassandra/merge_operator.h"
#include "utilities/merge_operators/cassandra/test_utils.h"
using namespace rocksdb;
namespace rocksdb {
namespace cassandra {
// Path to the database on file system
const std::string kDbName = test::TmpDir() + "/cassandramerge_test";
class CassandraStore {
public:
explicit CassandraStore(std::shared_ptr<DB> db)
: db_(db),
merge_option_(),
get_option_() {
assert(db);
}
bool Append(const std::string& key, const RowValue& val){
std::string result;
val.Serialize(&result);
Slice valSlice(result.data(), result.size());
auto s = db_->Merge(merge_option_, key, valSlice);
if (s.ok()) {
return true;
} else {
std::cerr << "ERROR " << s.ToString() << std::endl;
return false;
}
}
std::tuple<bool, RowValue> Get(const std::string& key){
std::string result;
auto s = db_->Get(get_option_, key, &result);
if (s.ok()) {
return std::make_tuple(true,
RowValue::Deserialize(result.data(),
result.size()));
}
if (!s.IsNotFound()) {
std::cerr << "ERROR " << s.ToString() << std::endl;
}
return std::make_tuple(false, RowValue(0, 0));
}
private:
std::shared_ptr<DB> db_;
WriteOptions merge_option_;
ReadOptions get_option_;
};
// The class for unit-testing
class CassandraMergeTest : public testing::Test {
public:
CassandraMergeTest() {
DestroyDB(kDbName, Options()); // Start each test with a fresh DB
}
std::shared_ptr<DB> OpenDb() {
DB* db;
Options options;
options.create_if_missing = true;
options.merge_operator.reset(new CassandraValueMergeOperator());
EXPECT_OK(DB::Open(options, kDbName, &db));
return std::shared_ptr<DB>(db);
}
};
// THE TEST CASES BEGIN HERE
TEST_F(CassandraMergeTest, SimpleTest) {
auto db = OpenDb();
CassandraStore store(db);
store.Append("k1", CreateTestRowValue({
std::make_tuple(kTombstone, 0, 5),
std::make_tuple(kColumn, 1, 8),
std::make_tuple(kExpiringColumn, 2, 5),
}));
store.Append("k1",CreateTestRowValue({
std::make_tuple(kColumn, 0, 2),
std::make_tuple(kExpiringColumn, 1, 5),
std::make_tuple(kTombstone, 2, 7),
std::make_tuple(kExpiringColumn, 7, 17),
}));
store.Append("k1", CreateTestRowValue({
std::make_tuple(kExpiringColumn, 0, 6),
std::make_tuple(kTombstone, 1, 5),
std::make_tuple(kColumn, 2, 4),
std::make_tuple(kTombstone, 11, 11),
}));
auto ret = store.Get("k1");
ASSERT_TRUE(std::get<0>(ret));
RowValue& merged = std::get<1>(ret);
EXPECT_EQ(merged.columns_.size(), 5);
VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 0, 6);
VerifyRowValueColumns(merged.columns_, 1, kColumn, 1, 8);
VerifyRowValueColumns(merged.columns_, 2, kTombstone, 2, 7);
VerifyRowValueColumns(merged.columns_, 3, kExpiringColumn, 7, 17);
VerifyRowValueColumns(merged.columns_, 4, kTombstone, 11, 11);
}
} // namespace cassandra
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -0,0 +1,296 @@
// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree.
#include "format.h"
#include <algorithm>
#include <map>
#include <memory>
#include "utilities/merge_operators/cassandra/serialize.h"
namespace rocksdb {
namespace cassandra {
namespace {
const int32_t kDefaultLocalDeletionTime =
std::numeric_limits<int32_t>::max();
const int64_t kDefaultMarkedForDeleteAt =
std::numeric_limits<int64_t>::min();
}
ColumnBase::ColumnBase(int8_t mask, int8_t index)
: mask_(mask), index_(index) {}
std::size_t ColumnBase::Size() const {
return sizeof(mask_) + sizeof(index_);
}
int8_t ColumnBase::Mask() const {
return mask_;
}
int8_t ColumnBase::Index() const {
return index_;
}
void ColumnBase::Serialize(std::string* dest) const {
rocksdb::cassandra::Serialize<int8_t>(mask_, dest);
rocksdb::cassandra::Serialize<int8_t>(index_, dest);
}
std::unique_ptr<ColumnBase> ColumnBase::Deserialize(const char* src,
std::size_t offset) {
int8_t mask = rocksdb::cassandra::Deserialize<int8_t>(src, offset);
if ((mask & ColumnTypeMask::DELETION_MASK) != 0) {
return Tombstone::Deserialize(src, offset);
} else if ((mask & ColumnTypeMask::EXPIRATION_MASK) != 0) {
return ExpiringColumn::Deserialize(src, offset);
} else {
return Column::Deserialize(src, offset);
}
}
Column::Column(
int8_t mask,
int8_t index,
int64_t timestamp,
int32_t value_size,
const char* value
) : ColumnBase(mask, index), timestamp_(timestamp),
value_size_(value_size), value_(value) {}
int64_t Column::Timestamp() const {
return timestamp_;
}
std::size_t Column::Size() const {
return ColumnBase::Size() + sizeof(timestamp_) + sizeof(value_size_)
+ value_size_;
}
void Column::Serialize(std::string* dest) const {
ColumnBase::Serialize(dest);
rocksdb::cassandra::Serialize<int64_t>(timestamp_, dest);
rocksdb::cassandra::Serialize<int32_t>(value_size_, dest);
dest->append(value_, value_size_);
}
std::unique_ptr<Column> Column::Deserialize(const char *src,
std::size_t offset) {
int8_t mask = rocksdb::cassandra::Deserialize<int8_t>(src, offset);
offset += sizeof(mask);
int8_t index = rocksdb::cassandra::Deserialize<int8_t>(src, offset);
offset += sizeof(index);
int64_t timestamp = rocksdb::cassandra::Deserialize<int64_t>(src, offset);
offset += sizeof(timestamp);
int32_t value_size = rocksdb::cassandra::Deserialize<int32_t>(src, offset);
offset += sizeof(value_size);
return std::unique_ptr<Column>(
new Column(mask, index, timestamp, value_size, src + offset));
}
ExpiringColumn::ExpiringColumn(
int8_t mask,
int8_t index,
int64_t timestamp,
int32_t value_size,
const char* value,
int32_t ttl
) : Column(mask, index, timestamp, value_size, value),
ttl_(ttl) {}
std::size_t ExpiringColumn::Size() const {
return Column::Size() + sizeof(ttl_);
}
void ExpiringColumn::Serialize(std::string* dest) const {
Column::Serialize(dest);
rocksdb::cassandra::Serialize<int32_t>(ttl_, dest);
}
std::unique_ptr<ExpiringColumn> ExpiringColumn::Deserialize(
const char *src,
std::size_t offset) {
int8_t mask = rocksdb::cassandra::Deserialize<int8_t>(src, offset);
offset += sizeof(mask);
int8_t index = rocksdb::cassandra::Deserialize<int8_t>(src, offset);
offset += sizeof(index);
int64_t timestamp = rocksdb::cassandra::Deserialize<int64_t>(src, offset);
offset += sizeof(timestamp);
int32_t value_size = rocksdb::cassandra::Deserialize<int32_t>(src, offset);
offset += sizeof(value_size);
const char* value = src + offset;
offset += value_size;
int32_t ttl = rocksdb::cassandra::Deserialize<int32_t>(src, offset);
return std::unique_ptr<ExpiringColumn>(
new ExpiringColumn(mask, index, timestamp, value_size, value, ttl));
}
Tombstone::Tombstone(
int8_t mask,
int8_t index,
int32_t local_deletion_time,
int64_t marked_for_delete_at
) : ColumnBase(mask, index), local_deletion_time_(local_deletion_time),
marked_for_delete_at_(marked_for_delete_at) {}
int64_t Tombstone::Timestamp() const {
return marked_for_delete_at_;
}
std::size_t Tombstone::Size() const {
return ColumnBase::Size() + sizeof(local_deletion_time_)
+ sizeof(marked_for_delete_at_);
}
void Tombstone::Serialize(std::string* dest) const {
ColumnBase::Serialize(dest);
rocksdb::cassandra::Serialize<int32_t>(local_deletion_time_, dest);
rocksdb::cassandra::Serialize<int64_t>(marked_for_delete_at_, dest);
}
std::unique_ptr<Tombstone> Tombstone::Deserialize(const char *src,
std::size_t offset) {
int8_t mask = rocksdb::cassandra::Deserialize<int8_t>(src, offset);
offset += sizeof(mask);
int8_t index = rocksdb::cassandra::Deserialize<int8_t>(src, offset);
offset += sizeof(index);
int32_t local_deletion_time =
rocksdb::cassandra::Deserialize<int32_t>(src, offset);
offset += sizeof(int32_t);
int64_t marked_for_delete_at =
rocksdb::cassandra::Deserialize<int64_t>(src, offset);
return std::unique_ptr<Tombstone>(
new Tombstone(mask, index, local_deletion_time, marked_for_delete_at));
}
RowValue::RowValue(int32_t local_deletion_time, int64_t marked_for_delete_at)
: local_deletion_time_(local_deletion_time),
marked_for_delete_at_(marked_for_delete_at), columns_(),
last_modified_time_(0) {}
RowValue::RowValue(std::vector<std::unique_ptr<ColumnBase>> columns,
int64_t last_modified_time)
: local_deletion_time_(kDefaultLocalDeletionTime),
marked_for_delete_at_(kDefaultMarkedForDeleteAt),
columns_(std::move(columns)), last_modified_time_(last_modified_time) {}
std::size_t RowValue::Size() const {
std::size_t size = sizeof(local_deletion_time_)
+ sizeof(marked_for_delete_at_);
for (const auto& column : columns_) {
size += column -> Size();
}
return size;
}
int64_t RowValue::LastModifiedTime() const {
if (IsTombstone()) {
return marked_for_delete_at_;
} else {
return last_modified_time_;
}
}
bool RowValue::IsTombstone() const {
return marked_for_delete_at_ > kDefaultMarkedForDeleteAt;
}
void RowValue::Serialize(std::string* dest) const {
rocksdb::cassandra::Serialize<int32_t>(local_deletion_time_, dest);
rocksdb::cassandra::Serialize<int64_t>(marked_for_delete_at_, dest);
for (const auto& column : columns_) {
column -> Serialize(dest);
}
}
RowValue RowValue::Deserialize(const char *src, std::size_t size) {
std::size_t offset = 0;
assert(size >= sizeof(local_deletion_time_) + sizeof(marked_for_delete_at_));
int32_t local_deletion_time =
rocksdb::cassandra::Deserialize<int32_t>(src, offset);
offset += sizeof(int32_t);
int64_t marked_for_delete_at =
rocksdb::cassandra::Deserialize<int64_t>(src, offset);
offset += sizeof(int64_t);
if (offset == size) {
return RowValue(local_deletion_time, marked_for_delete_at);
}
assert(local_deletion_time == kDefaultLocalDeletionTime);
assert(marked_for_delete_at == kDefaultMarkedForDeleteAt);
std::vector<std::unique_ptr<ColumnBase>> columns;
int64_t last_modified_time = 0;
while (offset < size) {
auto c = ColumnBase::Deserialize(src, offset);
offset += c -> Size();
assert(offset <= size);
last_modified_time = std::max(last_modified_time, c -> Timestamp());
columns.push_back(std::move(c));
}
return RowValue(std::move(columns), last_modified_time);
}
// Merge multiple row values into one.
// For each column in rows with same index, we pick the one with latest
// timestamp. And we also take row tombstone into consideration, by iterating
// each row from reverse timestamp order, and stop once we hit the first
// row tombstone.
RowValue RowValue::Merge(std::vector<RowValue>&& values) {
assert(values.size() > 0);
if (values.size() == 1) {
return std::move(values[0]);
}
// Merge columns by their last modified time, and skip once we hit
// a row tombstone.
std::sort(values.begin(), values.end(),
[](const RowValue& r1, const RowValue& r2) {
return r1.LastModifiedTime() > r2.LastModifiedTime();
});
std::map<int8_t, std::unique_ptr<ColumnBase>> merged_columns;
int64_t tombstone_timestamp = 0;
for (auto& value : values) {
if (value.IsTombstone()) {
if (merged_columns.size() == 0) {
return std::move(value);
}
tombstone_timestamp = value.LastModifiedTime();
break;
}
for (auto& column : value.columns_) {
int8_t index = column->Index();
if (merged_columns.find(index) == merged_columns.end()) {
merged_columns[index] = std::move(column);
} else {
if (column->Timestamp() > merged_columns[index]->Timestamp()) {
merged_columns[index] = std::move(column);
}
}
}
}
int64_t last_modified_time = 0;
std::vector<std::unique_ptr<ColumnBase>> columns;
for (auto& pair: merged_columns) {
// For some row, its last_modified_time > row tombstone_timestamp, but
// it might have rows whose timestamp is ealier than tombstone, so we
// ned to filter these rows.
if (pair.second->Timestamp() <= tombstone_timestamp) {
continue;
}
last_modified_time = std::max(last_modified_time, pair.second->Timestamp());
columns.push_back(std::move(pair.second));
}
return RowValue(std::move(columns), last_modified_time);
}
} // namepsace cassandrda
} // namespace rocksdb

View File

@ -0,0 +1,179 @@
// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree.
/**
* The encoding of Cassandra Row Value.
*
* A Cassandra Row Value could either be a row tombstone,
* or contains multiple columns, it has following fields:
*
* struct row_value {
* int32_t local_deletion_time; // Time in second when the row is deleted,
* // only used for Cassandra tombstone gc.
* int64_t marked_for_delete_at; // Ms that marked this row is deleted.
* struct column_base columns[]; // For non tombstone row, all columns
* // are stored here.
* }
*
* If the local_deletion_time and marked_for_delete_at is set, then this is
* a tombstone, otherwise it contains multiple columns.
*
* There are three type of Columns: Normal Column, Expiring Column and Column
* Tombstone, which have following fields:
*
* // Identify the type of the column.
* enum mask {
* DELETION_MASK = 0x01,
* EXPIRATION_MASK = 0x02,
* };
*
* struct column {
* int8_t mask = 0;
* int8_t index;
* int64_t timestamp;
* int32_t value_length;
* char value[value_length];
* }
*
* struct expiring_column {
* int8_t mask = mask.EXPIRATION_MASK;
* int8_t index;
* int64_t timestamp;
* int32_t value_length;
* char value[value_length];
* int32_t ttl;
* }
*
* struct tombstone_column {
* int8_t mask = mask.DELETION_MASK;
* int8_t index;
* int32_t local_deletion_time; // Similar to row_value's field.
* int64_t marked_for_delete_at;
* }
*/
#pragma once
#include <vector>
#include <memory>
#include "rocksdb/merge_operator.h"
#include "rocksdb/slice.h"
#include "util/testharness.h"
namespace rocksdb {
namespace cassandra {
// Identify the type of the column.
enum ColumnTypeMask {
DELETION_MASK = 0x01,
EXPIRATION_MASK = 0x02,
};
class ColumnBase {
public:
ColumnBase(int8_t mask, int8_t index);
virtual ~ColumnBase() = default;
virtual int64_t Timestamp() const = 0;
virtual int8_t Mask() const;
virtual int8_t Index() const;
virtual std::size_t Size() const;
virtual void Serialize(std::string* dest) const;
static std::unique_ptr<ColumnBase> Deserialize(const char* src,
std::size_t offset);
private:
int8_t mask_;
int8_t index_;
};
class Column : public ColumnBase {
public:
Column(int8_t mask, int8_t index, int64_t timestamp,
int32_t value_size, const char* value);
virtual int64_t Timestamp() const override;
virtual std::size_t Size() const override;
virtual void Serialize(std::string* dest) const override;
static std::unique_ptr<Column> Deserialize(const char* src,
std::size_t offset);
private:
int64_t timestamp_;
int32_t value_size_;
const char* value_;
};
class ExpiringColumn : public Column {
public:
ExpiringColumn(int8_t mask, int8_t index, int64_t timestamp,
int32_t value_size, const char* value, int32_t ttl);
virtual std::size_t Size() const override;
virtual void Serialize(std::string* dest) const override;
static std::unique_ptr<ExpiringColumn> Deserialize(const char* src,
std::size_t offset);
private:
int32_t ttl_;
};
class Tombstone : public ColumnBase {
public:
Tombstone(int8_t mask, int8_t index,
int32_t local_deletion_time, int64_t marked_for_delete_at);
virtual int64_t Timestamp() const override;
virtual std::size_t Size() const override;
virtual void Serialize(std::string* dest) const override;
static std::unique_ptr<Tombstone> Deserialize(const char* src,
std::size_t offset);
private:
int32_t local_deletion_time_;
int64_t marked_for_delete_at_;
};
class RowValue {
public:
// Create a Row Tombstone.
RowValue(int32_t local_deletion_time, int64_t marked_for_delete_at);
// Create a Row containing columns.
RowValue(std::vector<std::unique_ptr<ColumnBase>> columns,
int64_t last_modified_time);
RowValue(const RowValue& that) = delete;
RowValue(RowValue&& that) noexcept = default;
RowValue& operator=(const RowValue& that) = delete;
RowValue& operator=(RowValue&& that) = default;
std::size_t Size() const;;
bool IsTombstone() const;
// For Tombstone this returns the marked_for_delete_at_,
// otherwise it returns the max timestamp of containing columns.
int64_t LastModifiedTime() const;
void Serialize(std::string* dest) const;
static RowValue Deserialize(const char* src, std::size_t size);
// Merge multiple rows according to their timestamp.
static RowValue Merge(std::vector<RowValue>&& values);
private:
int32_t local_deletion_time_;
int64_t marked_for_delete_at_;
std::vector<std::unique_ptr<ColumnBase>> columns_;
int64_t last_modified_time_;
FRIEND_TEST(RowValueMergeTest, Merge);
FRIEND_TEST(RowValueMergeTest, MergeWithRowTombstone);
FRIEND_TEST(CassandraMergeTest, SimpleTest);
};
} // namepsace cassandrda
} // namespace rocksdb

View File

@ -0,0 +1,306 @@
// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree.
#include <cstring>
#include <memory>
#include "util/testharness.h"
#include "utilities/merge_operators/cassandra/format.h"
#include "utilities/merge_operators/cassandra/serialize.h"
using namespace rocksdb::cassandra;
namespace rocksdb {
namespace cassandra {
TEST(ColumnTest, Column) {
char data[4] = {'d', 'a', 't', 'a'};
int8_t mask = 0;
int8_t index = 1;
int64_t timestamp = 1494022807044;
Column c = Column(mask, index, timestamp, sizeof(data), data);
EXPECT_EQ(c.Index(), index);
EXPECT_EQ(c.Timestamp(), timestamp);
EXPECT_EQ(c.Size(), 14 + sizeof(data));
// Verify the serialization.
std::string dest;
dest.reserve(c.Size() * 2);
c.Serialize(&dest);
EXPECT_EQ(dest.size(), c.Size());
std::size_t offset = 0;
EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), mask);
offset += sizeof(int8_t);
EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), index);
offset += sizeof(int8_t);
EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), timestamp);
offset += sizeof(int64_t);
EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), sizeof(data));
offset += sizeof(int32_t);
EXPECT_TRUE(std::memcmp(data, dest.c_str() + offset, sizeof(data)) == 0);
// Verify the deserialization.
std::unique_ptr<Column> c1 = Column::Deserialize(dest.c_str(), 0);
EXPECT_EQ(c1->Index(), index);
EXPECT_EQ(c1->Timestamp(), timestamp);
EXPECT_EQ(c1->Size(), 14 + sizeof(data));
c1->Serialize(&dest);
EXPECT_EQ(dest.size(), 2 * c.Size());
EXPECT_TRUE(
std::memcmp(dest.c_str(), dest.c_str() + c.Size(), c.Size()) == 0);
// Verify the ColumnBase::Deserialization.
std::unique_ptr<ColumnBase> c2 =
ColumnBase::Deserialize(dest.c_str(), c.Size());
c2->Serialize(&dest);
EXPECT_EQ(dest.size(), 3 * c.Size());
EXPECT_TRUE(
std::memcmp(dest.c_str() + c.Size(), dest.c_str() + c.Size() * 2, c.Size())
== 0);
}
TEST(ExpiringColumnTest, ExpiringColumn) {
char data[4] = {'d', 'a', 't', 'a'};
int8_t mask = ColumnTypeMask::EXPIRATION_MASK;
int8_t index = 3;
int64_t timestamp = 1494022807044;
int32_t ttl = 3600;
ExpiringColumn c = ExpiringColumn(mask, index, timestamp,
sizeof(data), data, ttl);
EXPECT_EQ(c.Index(), index);
EXPECT_EQ(c.Timestamp(), timestamp);
EXPECT_EQ(c.Size(), 18 + sizeof(data));
// Verify the serialization.
std::string dest;
dest.reserve(c.Size() * 2);
c.Serialize(&dest);
EXPECT_EQ(dest.size(), c.Size());
std::size_t offset = 0;
EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), mask);
offset += sizeof(int8_t);
EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), index);
offset += sizeof(int8_t);
EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), timestamp);
offset += sizeof(int64_t);
EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), sizeof(data));
offset += sizeof(int32_t);
EXPECT_TRUE(std::memcmp(data, dest.c_str() + offset, sizeof(data)) == 0);
offset += sizeof(data);
EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), ttl);
// Verify the deserialization.
std::unique_ptr<ExpiringColumn> c1 =
ExpiringColumn::Deserialize(dest.c_str(), 0);
EXPECT_EQ(c1->Index(), index);
EXPECT_EQ(c1->Timestamp(), timestamp);
EXPECT_EQ(c1->Size(), 18 + sizeof(data));
c1->Serialize(&dest);
EXPECT_EQ(dest.size(), 2 * c.Size());
EXPECT_TRUE(
std::memcmp(dest.c_str(), dest.c_str() + c.Size(), c.Size()) == 0);
// Verify the ColumnBase::Deserialization.
std::unique_ptr<ColumnBase> c2 =
ColumnBase::Deserialize(dest.c_str(), c.Size());
c2->Serialize(&dest);
EXPECT_EQ(dest.size(), 3 * c.Size());
EXPECT_TRUE(
std::memcmp(dest.c_str() + c.Size(), dest.c_str() + c.Size() * 2, c.Size())
== 0);
}
TEST(TombstoneTest, Tombstone) {
int8_t mask = ColumnTypeMask::DELETION_MASK;
int8_t index = 2;
int32_t local_deletion_time = 1494022807;
int64_t marked_for_delete_at = 1494022807044;
Tombstone c = Tombstone(mask, index, local_deletion_time,
marked_for_delete_at);
EXPECT_EQ(c.Index(), index);
EXPECT_EQ(c.Timestamp(), marked_for_delete_at);
EXPECT_EQ(c.Size(), 14);
// Verify the serialization.
std::string dest;
dest.reserve(c.Size() * 2);
c.Serialize(&dest);
EXPECT_EQ(dest.size(), c.Size());
std::size_t offset = 0;
EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), mask);
offset += sizeof(int8_t);
EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), index);
offset += sizeof(int8_t);
EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), local_deletion_time);
offset += sizeof(int32_t);
EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), marked_for_delete_at);
// Verify the deserialization.
std::unique_ptr<Tombstone> c1 = Tombstone::Deserialize(dest.c_str(), 0);
EXPECT_EQ(c1->Index(), index);
EXPECT_EQ(c1->Timestamp(), marked_for_delete_at);
EXPECT_EQ(c1->Size(), 14);
c1->Serialize(&dest);
EXPECT_EQ(dest.size(), 2 * c.Size());
EXPECT_TRUE(
std::memcmp(dest.c_str(), dest.c_str() + c.Size(), c.Size()) == 0);
// Verify the ColumnBase::Deserialization.
std::unique_ptr<ColumnBase> c2 =
ColumnBase::Deserialize(dest.c_str(), c.Size());
c2->Serialize(&dest);
EXPECT_EQ(dest.size(), 3 * c.Size());
EXPECT_TRUE(
std::memcmp(dest.c_str() + c.Size(), dest.c_str() + c.Size() * 2, c.Size())
== 0);
}
TEST(RowValueTest, RowTombstone) {
int32_t local_deletion_time = 1494022807;
int64_t marked_for_delete_at = 1494022807044;
RowValue r = RowValue(local_deletion_time, marked_for_delete_at);
EXPECT_EQ(r.Size(), 12);
EXPECT_EQ(r.IsTombstone(), true);
EXPECT_EQ(r.LastModifiedTime(), marked_for_delete_at);
// Verify the serialization.
std::string dest;
dest.reserve(r.Size() * 2);
r.Serialize(&dest);
EXPECT_EQ(dest.size(), r.Size());
std::size_t offset = 0;
EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), local_deletion_time);
offset += sizeof(int32_t);
EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), marked_for_delete_at);
// Verify the deserialization.
RowValue r1 = RowValue::Deserialize(dest.c_str(), r.Size());
EXPECT_EQ(r1.Size(), 12);
EXPECT_EQ(r1.IsTombstone(), true);
EXPECT_EQ(r1.LastModifiedTime(), marked_for_delete_at);
r1.Serialize(&dest);
EXPECT_EQ(dest.size(), 2 * r.Size());
EXPECT_TRUE(
std::memcmp(dest.c_str(), dest.c_str() + r.Size(), r.Size()) == 0);
}
TEST(RowValueTest, RowWithColumns) {
std::vector<std::unique_ptr<ColumnBase>> columns;
int64_t last_modified_time = 1494022807048;
std::size_t columns_data_size = 0;
char e_data[5] = {'e', 'd', 'a', 't', 'a'};
int8_t e_index = 0;
int64_t e_timestamp = 1494022807044;
int32_t e_ttl = 3600;
columns.push_back(std::unique_ptr<ExpiringColumn>(
new ExpiringColumn(ColumnTypeMask::EXPIRATION_MASK, e_index,
e_timestamp, sizeof(e_data), e_data, e_ttl)));
columns_data_size += columns[0]->Size();
char c_data[4] = {'d', 'a', 't', 'a'};
int8_t c_index = 1;
int64_t c_timestamp = 1494022807048;
columns.push_back(std::unique_ptr<Column>(
new Column(0, c_index, c_timestamp, sizeof(c_data), c_data)));
columns_data_size += columns[1]->Size();
int8_t t_index = 2;
int32_t t_local_deletion_time = 1494022801;
int64_t t_marked_for_delete_at = 1494022807043;
columns.push_back(std::unique_ptr<Tombstone>(
new Tombstone(ColumnTypeMask::DELETION_MASK,
t_index, t_local_deletion_time, t_marked_for_delete_at)));
columns_data_size += columns[2]->Size();
RowValue r = RowValue(std::move(columns), last_modified_time);
EXPECT_EQ(r.Size(), columns_data_size + 12);
EXPECT_EQ(r.IsTombstone(), false);
EXPECT_EQ(r.LastModifiedTime(), last_modified_time);
// Verify the serialization.
std::string dest;
dest.reserve(r.Size() * 2);
r.Serialize(&dest);
EXPECT_EQ(dest.size(), r.Size());
std::size_t offset = 0;
EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset),
std::numeric_limits<int32_t>::max());
offset += sizeof(int32_t);
EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset),
std::numeric_limits<int64_t>::min());
offset += sizeof(int64_t);
// Column0: ExpiringColumn
EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset),
ColumnTypeMask::EXPIRATION_MASK);
offset += sizeof(int8_t);
EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), e_index);
offset += sizeof(int8_t);
EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), e_timestamp);
offset += sizeof(int64_t);
EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), sizeof(e_data));
offset += sizeof(int32_t);
EXPECT_TRUE(std::memcmp(e_data, dest.c_str() + offset, sizeof(e_data)) == 0);
offset += sizeof(e_data);
EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), e_ttl);
offset += sizeof(int32_t);
// Column1: Column
EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), 0);
offset += sizeof(int8_t);
EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), c_index);
offset += sizeof(int8_t);
EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), c_timestamp);
offset += sizeof(int64_t);
EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), sizeof(c_data));
offset += sizeof(int32_t);
EXPECT_TRUE(std::memcmp(c_data, dest.c_str() + offset, sizeof(c_data)) == 0);
offset += sizeof(c_data);
// Column2: Tombstone
EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset),
ColumnTypeMask::DELETION_MASK);
offset += sizeof(int8_t);
EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), t_index);
offset += sizeof(int8_t);
EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), t_local_deletion_time);
offset += sizeof(int32_t);
EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), t_marked_for_delete_at);
// Verify the deserialization.
RowValue r1 = RowValue::Deserialize(dest.c_str(), r.Size());
EXPECT_EQ(r1.Size(), columns_data_size + 12);
EXPECT_EQ(r1.IsTombstone(), false);
EXPECT_EQ(r1.LastModifiedTime(), last_modified_time);
r1.Serialize(&dest);
EXPECT_EQ(dest.size(), 2 * r.Size());
EXPECT_TRUE(
std::memcmp(dest.c_str(), dest.c_str() + r.Size(), r.Size()) == 0);
}
} // namespace cassandra
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -0,0 +1,103 @@
// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree.
#include "merge_operator.h"
#include <memory>
#include <assert.h>
#include "rocksdb/slice.h"
#include "rocksdb/merge_operator.h"
#include "utilities/merge_operators.h"
#include "utilities/merge_operators/cassandra/format.h"
namespace rocksdb {
namespace cassandra {
// Implementation for the merge operation (merges two Cassandra values)
bool CassandraValueMergeOperator::FullMergeV2(
const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const {
// Clear the *new_value for writing.
merge_out->new_value.clear();
if (merge_in.existing_value == nullptr && merge_in.operand_list.size() == 1) {
// Only one operand
merge_out->existing_operand = merge_in.operand_list.back();
return true;
}
std::vector<RowValue> row_values;
if (merge_in.existing_value) {
row_values.push_back(
RowValue::Deserialize(merge_in.existing_value->data(),
merge_in.existing_value->size()));
}
for (auto& operand : merge_in.operand_list) {
row_values.push_back(RowValue::Deserialize(operand.data(), operand.size()));
}
RowValue merged = RowValue::Merge(std::move(row_values));
merge_out->new_value.reserve(merged.Size());
merged.Serialize(&(merge_out->new_value));
return true;
}
// Implementation for the merge operation (merges two Cassandra values)
bool CassandraValueMergeOperator::PartialMerge(const Slice& key,
const Slice& left_operand,
const Slice& right_operand,
std::string* new_value,
Logger* logger) const {
// Clear the *new_value for writing.
assert(new_value);
new_value->clear();
std::vector<RowValue> row_values;
row_values.push_back(RowValue::Deserialize(left_operand.data(),
left_operand.size()));
row_values.push_back(RowValue::Deserialize(right_operand.data(),
right_operand.size()));
RowValue merged = RowValue::Merge(std::move(row_values));
new_value->reserve(merged.Size());
merged.Serialize(new_value);
return true;
}
bool CassandraValueMergeOperator::PartialMergeMulti(
const Slice& key,
const std::deque<Slice>& operand_list,
std::string* new_value,
Logger* logger) const {
// Clear the *new_value for writing.
assert(new_value);
new_value->clear();
std::vector<RowValue> row_values;
for (auto& operand : operand_list) {
row_values.push_back(RowValue::Deserialize(operand.data(), operand.size()));
}
RowValue merged = RowValue::Merge(std::move(row_values));
new_value->reserve(merged.Size());
merged.Serialize(new_value);
return true;
}
const char* CassandraValueMergeOperator::Name() const {
return "CassandraValueMergeOperator";
}
} // namespace cassandra
std::shared_ptr<MergeOperator>
MergeOperators::CreateCassandraMergeOperator() {
return std::make_shared<rocksdb::cassandra::CassandraValueMergeOperator>();
}
} // namespace rocksdb

View File

@ -0,0 +1,39 @@
// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree.
#pragma once
#include "rocksdb/merge_operator.h"
#include "rocksdb/slice.h"
namespace rocksdb {
namespace cassandra {
/**
* A MergeOperator for rocksdb that implements Cassandra row value merge.
*/
class CassandraValueMergeOperator : public MergeOperator {
public:
static std::shared_ptr<MergeOperator> CreateSharedInstance();
virtual bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override;
virtual bool PartialMerge(const Slice& key,
const Slice& left_operand,
const Slice& right_operand,
std::string* new_value,
Logger* logger) const override;
virtual bool PartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
std::string* new_value,
Logger* logger) const override;
virtual const char* Name() const override;
};
} // namespace cassandra
} // namespace rocksdb

View File

@ -0,0 +1,114 @@
// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree.
#include <memory>
#include "util/testharness.h"
#include "utilities/merge_operators/cassandra/format.h"
#include "utilities/merge_operators/cassandra/test_utils.h"
namespace rocksdb {
namespace cassandra {
TEST(RowValueMergeTest, Merge) {
std::vector<RowValue> row_values;
row_values.push_back(
CreateTestRowValue({
std::make_tuple(kTombstone, 0, 5),
std::make_tuple(kColumn, 1, 8),
std::make_tuple(kExpiringColumn, 2, 5),
})
);
row_values.push_back(
CreateTestRowValue({
std::make_tuple(kColumn, 0, 2),
std::make_tuple(kExpiringColumn, 1, 5),
std::make_tuple(kTombstone, 2, 7),
std::make_tuple(kExpiringColumn, 7, 17),
})
);
row_values.push_back(
CreateTestRowValue({
std::make_tuple(kExpiringColumn, 0, 6),
std::make_tuple(kTombstone, 1, 5),
std::make_tuple(kColumn, 2, 4),
std::make_tuple(kTombstone, 11, 11),
})
);
RowValue merged = RowValue::Merge(std::move(row_values));
EXPECT_FALSE(merged.IsTombstone());
EXPECT_EQ(merged.columns_.size(), 5);
VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 0, 6);
VerifyRowValueColumns(merged.columns_, 1, kColumn, 1, 8);
VerifyRowValueColumns(merged.columns_, 2, kTombstone, 2, 7);
VerifyRowValueColumns(merged.columns_, 3, kExpiringColumn, 7, 17);
VerifyRowValueColumns(merged.columns_, 4, kTombstone, 11, 11);
}
TEST(RowValueMergeTest, MergeWithRowTombstone) {
std::vector<RowValue> row_values;
// A row tombstone.
row_values.push_back(
CreateRowTombstone(11)
);
// This row's timestamp is smaller than tombstone.
row_values.push_back(
CreateTestRowValue({
std::make_tuple(kColumn, 0, 5),
std::make_tuple(kColumn, 1, 6),
})
);
// Some of the column's row is smaller, some is larger.
row_values.push_back(
CreateTestRowValue({
std::make_tuple(kColumn, 2, 10),
std::make_tuple(kColumn, 3, 12),
})
);
// All of the column's rows are larger than tombstone.
row_values.push_back(
CreateTestRowValue({
std::make_tuple(kColumn, 4, 13),
std::make_tuple(kColumn, 5, 14),
})
);
RowValue merged = RowValue::Merge(std::move(row_values));
EXPECT_FALSE(merged.IsTombstone());
EXPECT_EQ(merged.columns_.size(), 3);
VerifyRowValueColumns(merged.columns_, 0, kColumn, 3, 12);
VerifyRowValueColumns(merged.columns_, 1, kColumn, 4, 13);
VerifyRowValueColumns(merged.columns_, 2, kColumn, 5, 14);
// If the tombstone's timestamp is the latest, then it returns a
// row tombstone.
row_values.push_back(
CreateRowTombstone(15)
);
row_values.push_back(
CreateRowTombstone(17)
);
merged = RowValue::Merge(std::move(row_values));
EXPECT_TRUE(merged.IsTombstone());
EXPECT_EQ(merged.LastModifiedTime(), 17);
}
} // namespace cassandra
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -0,0 +1,77 @@
// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree.
/**
* Helper functions which serialize and deserialize integers
* into bytes in big endian.
*/
#pragma once
namespace rocksdb {
namespace cassandra {
namespace {
const int64_t kCharMask = 0xFFLL;
const int32_t kBitsPerByte = 8;
}
template<typename T>
void Serialize(T val, std::string* dest);
template<typename T>
T Deserialize(const char* src, std::size_t offset=0);
// Specializations
template<>
inline void Serialize<int8_t>(int8_t t, std::string* dest) {
dest->append(1, static_cast<char>(t & kCharMask));
}
template<>
inline void Serialize<int32_t>(int32_t t, std::string* dest) {
for (unsigned long i = 0; i < sizeof(int32_t); i++) {
dest->append(1, static_cast<char>(
(t >> (sizeof(int32_t) - 1 - i) * kBitsPerByte) & kCharMask));
}
}
template<>
inline void Serialize<int64_t>(int64_t t, std::string* dest) {
for (unsigned long i = 0; i < sizeof(int64_t); i++) {
dest->append(
1, static_cast<char>(
(t >> (sizeof(int64_t) - 1 - i) * kBitsPerByte) & kCharMask));
}
}
template<>
inline int8_t Deserialize<int8_t>(const char* src, std::size_t offset) {
return static_cast<int8_t>(src[offset]);
}
template<>
inline int32_t Deserialize<int32_t>(const char* src, std::size_t offset) {
int32_t result = 0;
for (unsigned long i = 0; i < sizeof(int32_t); i++) {
result |= static_cast<int32_t>(static_cast<unsigned char>(src[offset + i]))
<< ((sizeof(int32_t) - 1 - i) * kBitsPerByte);
}
return result;
}
template<>
inline int64_t Deserialize<int64_t>(const char* src, std::size_t offset) {
int64_t result = 0;
for (unsigned long i = 0; i < sizeof(int64_t); i++) {
result |= static_cast<int64_t>(static_cast<unsigned char>(src[offset + i]))
<< ((sizeof(int64_t) - 1 - i) * kBitsPerByte);
}
return result;
}
} // namepsace cassandrda
} // namespace rocksdb

View File

@ -0,0 +1,190 @@
// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree.
#include "util/testharness.h"
#include "utilities/merge_operators/cassandra/serialize.h"
using namespace rocksdb::cassandra;
namespace rocksdb {
namespace cassandra {
TEST(SerializeTest, SerializeI64) {
std::string dest;
Serialize<int64_t>(0, &dest);
EXPECT_EQ(
std::string(
{'\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x00'}),
dest);
dest.clear();
Serialize<int64_t>(1, &dest);
EXPECT_EQ(
std::string(
{'\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x01'}),
dest);
dest.clear();
Serialize<int64_t>(-1, &dest);
EXPECT_EQ(
std::string(
{'\xff', '\xff', '\xff', '\xff', '\xff', '\xff', '\xff', '\xff'}),
dest);
dest.clear();
Serialize<int64_t>(9223372036854775807, &dest);
EXPECT_EQ(
std::string(
{'\x7f', '\xff', '\xff', '\xff', '\xff', '\xff', '\xff', '\xff'}),
dest);
dest.clear();
Serialize<int64_t>(-9223372036854775807, &dest);
EXPECT_EQ(
std::string(
{'\x80', '\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x01'}),
dest);
}
TEST(SerializeTest, DeserializeI64) {
std::string dest;
std::size_t offset = dest.size();
Serialize<int64_t>(0, &dest);
EXPECT_EQ(0, Deserialize<int64_t>(dest.c_str(), offset));
offset = dest.size();
Serialize<int64_t>(1, &dest);
EXPECT_EQ(1, Deserialize<int64_t>(dest.c_str(), offset));
offset = dest.size();
Serialize<int64_t>(-1, &dest);
EXPECT_EQ(-1, Deserialize<int64_t>(dest.c_str(), offset));
offset = dest.size();
Serialize<int64_t>(-9223372036854775807, &dest);
EXPECT_EQ(-9223372036854775807, Deserialize<int64_t>(dest.c_str(), offset));
offset = dest.size();
Serialize<int64_t>(9223372036854775807, &dest);
EXPECT_EQ(9223372036854775807, Deserialize<int64_t>(dest.c_str(), offset));
}
TEST(SerializeTest, SerializeI32) {
std::string dest;
Serialize<int32_t>(0, &dest);
EXPECT_EQ(
std::string(
{'\x00', '\x00', '\x00', '\x00'}),
dest);
dest.clear();
Serialize<int32_t>(1, &dest);
EXPECT_EQ(
std::string(
{'\x00', '\x00', '\x00', '\x01'}),
dest);
dest.clear();
Serialize<int32_t>(-1, &dest);
EXPECT_EQ(
std::string(
{'\xff', '\xff', '\xff', '\xff'}),
dest);
dest.clear();
Serialize<int32_t>(2147483647, &dest);
EXPECT_EQ(
std::string(
{'\x7f', '\xff', '\xff', '\xff'}),
dest);
dest.clear();
Serialize<int32_t>(-2147483648LL, &dest);
EXPECT_EQ(
std::string(
{'\x80', '\x00', '\x00', '\x00'}),
dest);
}
TEST(SerializeTest, DeserializeI32) {
std::string dest;
std::size_t offset = dest.size();
Serialize<int32_t>(0, &dest);
EXPECT_EQ(0, Deserialize<int32_t>(dest.c_str(), offset));
offset = dest.size();
Serialize<int32_t>(1, &dest);
EXPECT_EQ(1, Deserialize<int32_t>(dest.c_str(), offset));
offset = dest.size();
Serialize<int32_t>(-1, &dest);
EXPECT_EQ(-1, Deserialize<int32_t>(dest.c_str(), offset));
offset = dest.size();
Serialize<int32_t>(2147483647, &dest);
EXPECT_EQ(2147483647, Deserialize<int32_t>(dest.c_str(), offset));
offset = dest.size();
Serialize<int32_t>(-2147483648LL, &dest);
EXPECT_EQ(-2147483648LL, Deserialize<int32_t>(dest.c_str(), offset));
}
TEST(SerializeTest, SerializeI8) {
std::string dest;
Serialize<int8_t>(0, &dest);
EXPECT_EQ(std::string({'\x00'}), dest);
dest.clear();
Serialize<int8_t>(1, &dest);
EXPECT_EQ(std::string({'\x01'}), dest);
dest.clear();
Serialize<int8_t>(-1, &dest);
EXPECT_EQ(std::string({'\xff'}), dest);
dest.clear();
Serialize<int8_t>(127, &dest);
EXPECT_EQ(std::string({'\x7f'}), dest);
dest.clear();
Serialize<int8_t>(-128, &dest);
EXPECT_EQ(std::string({'\x80'}), dest);
}
TEST(SerializeTest, DeserializeI8) {
std::string dest;
std::size_t offset = dest.size();
Serialize<int8_t>(0, &dest);
EXPECT_EQ(0, Deserialize<int8_t>(dest.c_str(), offset));
offset = dest.size();
Serialize<int8_t>(1, &dest);
EXPECT_EQ(1, Deserialize<int8_t>(dest.c_str(), offset));
offset = dest.size();
Serialize<int8_t>(-1, &dest);
EXPECT_EQ(-1, Deserialize<int8_t>(dest.c_str(), offset));
offset = dest.size();
Serialize<int8_t>(127, &dest);
EXPECT_EQ(127, Deserialize<int8_t>(dest.c_str(), offset));
offset = dest.size();
Serialize<int8_t>(-128, &dest);
EXPECT_EQ(-128, Deserialize<int8_t>(dest.c_str(), offset));
}
} // namespace cassandra
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -0,0 +1,65 @@
// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree.
#include "test_utils.h"
namespace rocksdb {
namespace cassandra {
const char kData[] = {'d', 'a', 't', 'a'};
const char kExpiringData[] = {'e', 'd', 'a', 't', 'a'};
const int32_t kLocalDeletionTime = 1;
const int32_t kTtl = 100;
const int8_t kColumn = 0;
const int8_t kTombstone = 1;
const int8_t kExpiringColumn = 2;
std::unique_ptr<ColumnBase> CreateTestColumn(int8_t mask,
int8_t index,
int64_t timestamp) {
if ((mask & ColumnTypeMask::DELETION_MASK) != 0) {
return std::unique_ptr<Tombstone>(new Tombstone(
mask, index, kLocalDeletionTime, timestamp));
} else if ((mask & ColumnTypeMask::EXPIRATION_MASK) != 0) {
return std::unique_ptr<ExpiringColumn>(new ExpiringColumn(
mask, index, timestamp, sizeof(kExpiringData), kExpiringData, kTtl));
} else {
return std::unique_ptr<Column>(
new Column(mask, index, timestamp, sizeof(kData), kData));
}
}
RowValue CreateTestRowValue(
std::vector<std::tuple<int8_t, int8_t, int64_t>> column_specs) {
std::vector<std::unique_ptr<ColumnBase>> columns;
int64_t last_modified_time = 0;
for (auto spec: column_specs) {
auto c = CreateTestColumn(std::get<0>(spec), std::get<1>(spec),
std::get<2>(spec));
last_modified_time = std::max(last_modified_time, c -> Timestamp());
columns.push_back(std::move(c));
}
return RowValue(std::move(columns), last_modified_time);
}
RowValue CreateRowTombstone(int64_t timestamp) {
return RowValue(kLocalDeletionTime, timestamp);
}
void VerifyRowValueColumns(
std::vector<std::unique_ptr<ColumnBase>> &columns,
std::size_t index_of_vector,
int8_t expected_mask,
int8_t expected_index,
int64_t expected_timestamp
) {
EXPECT_EQ(expected_timestamp, columns[index_of_vector]->Timestamp());
EXPECT_EQ(expected_mask, columns[index_of_vector]->Mask());
EXPECT_EQ(expected_index, columns[index_of_vector]->Index());
}
}
}

View File

@ -0,0 +1,43 @@
// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree.
#pragma once
#include <memory>
#include "util/testharness.h"
#include "utilities/merge_operators/cassandra/format.h"
#include "utilities/merge_operators/cassandra/serialize.h"
namespace rocksdb {
namespace cassandra {
extern const char kData[];
extern const char kExpiringData[];
extern const int32_t kLocalDeletionTime;
extern const int32_t kTtl;
extern const int8_t kColumn;
extern const int8_t kTombstone;
extern const int8_t kExpiringColumn;
std::unique_ptr<ColumnBase> CreateTestColumn(int8_t mask,
int8_t index,
int64_t timestamp);
RowValue CreateTestRowValue(
std::vector<std::tuple<int8_t, int8_t, int64_t>> column_specs);
RowValue CreateRowTombstone(int64_t timestamp);
void VerifyRowValueColumns(
std::vector<std::unique_ptr<ColumnBase>> &columns,
std::size_t index_of_vector,
int8_t expected_mask,
int8_t expected_index,
int64_t expected_timestamp
);
}
}