From cbd825deea1ab6ce033110a25b234f84203e1e2d Mon Sep 17 00:00:00 2001 From: Chen Shen Date: Fri, 16 Jun 2017 14:12:52 -0700 Subject: [PATCH] Create a MergeOperator for Cassandra Row Value Summary: This PR implements the MergeOperator for Cassandra Row Values. Closes https://github.com/facebook/rocksdb/pull/2289 Differential Revision: D5055464 Pulled By: scv119 fbshipit-source-id: 45f276ef8cbc4704279202f6a20c64889bc1adef --- CMakeLists.txt | 7 + Makefile | 16 + TARGETS | 15 + java/Makefile | 1 + java/rocksjni/cassandra_value_operator.cc | 47 +++ .../rocksdb/CassandraValueMergeOperator.java | 22 ++ src.mk | 7 + utilities/merge_operators.h | 3 + .../cassandra/cassandra_merge_test.cc | 134 ++++++++ utilities/merge_operators/cassandra/format.cc | 296 +++++++++++++++++ utilities/merge_operators/cassandra/format.h | 179 ++++++++++ .../merge_operators/cassandra/format_test.cc | 306 ++++++++++++++++++ .../cassandra/merge_operator.cc | 103 ++++++ .../cassandra/merge_operator.h | 39 +++ .../cassandra/row_merge_test.cc | 114 +++++++ .../merge_operators/cassandra/serialize.h | 77 +++++ .../cassandra/serialize_test.cc | 190 +++++++++++ .../merge_operators/cassandra/test_utils.cc | 65 ++++ .../merge_operators/cassandra/test_utils.h | 43 +++ 19 files changed, 1664 insertions(+) create mode 100644 java/rocksjni/cassandra_value_operator.cc create mode 100644 java/src/main/java/org/rocksdb/CassandraValueMergeOperator.java create mode 100644 utilities/merge_operators/cassandra/cassandra_merge_test.cc create mode 100644 utilities/merge_operators/cassandra/format.cc create mode 100644 utilities/merge_operators/cassandra/format.h create mode 100644 utilities/merge_operators/cassandra/format_test.cc create mode 100644 utilities/merge_operators/cassandra/merge_operator.cc create mode 100644 utilities/merge_operators/cassandra/merge_operator.h create mode 100644 utilities/merge_operators/cassandra/row_merge_test.cc create mode 100644 utilities/merge_operators/cassandra/serialize.h create mode 100644 utilities/merge_operators/cassandra/serialize_test.cc create mode 100644 utilities/merge_operators/cassandra/test_utils.cc create mode 100644 utilities/merge_operators/cassandra/test_utils.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 80cf032fa..4227c1367 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -500,6 +500,8 @@ set(SOURCES utilities/memory/memory_util.cc utilities/merge_operators/max.cc utilities/merge_operators/put.cc + utilities/merge_operators/cassandra/format.cc + utilities/merge_operators/cassandra/merge_operator.cc utilities/merge_operators/string_append/stringappend.cc utilities/merge_operators/string_append/stringappend2.cc utilities/merge_operators/uint64add.cc @@ -710,6 +712,10 @@ set(TESTS utilities/geodb/geodb_test.cc utilities/lua/rocks_lua_test.cc utilities/memory/memory_test.cc + utilities/merge_operators/cassandra/cassandra_merge_test.cc + utilities/merge_operators/cassandra/format_test.cc + utilities/merge_operators/cassandra/row_merge_test.cc + utilities/merge_operators/cassandra/serialize_test.cc utilities/merge_operators/string_append/stringappend_test.cc utilities/object_registry_test.cc utilities/option_change_migration/option_change_migration_test.cc @@ -750,6 +756,7 @@ set(TESTUTIL_SOURCE monitoring/thread_status_updater_debug.cc table/mock_table.cc util/fault_injection_test_env.cc + utilities/merge_operators/cassandra/test_utils.cc ) # test utilities are only build in debug enable_testing() diff --git a/Makefile b/Makefile index c858bf397..4c48beac4 100644 --- a/Makefile +++ b/Makefile @@ -401,6 +401,10 @@ TESTS = \ skiplist_test \ write_buffer_manager_test \ stringappend_test \ + cassandra_format_test \ + cassandra_merge_test \ + cassandra_row_merge_test \ + cassandra_serialize_test \ ttl_test \ date_tiered_test \ backupable_db_test \ @@ -990,6 +994,18 @@ option_change_migration_test: utilities/option_change_migration/option_change_mi stringappend_test: utilities/merge_operators/string_append/stringappend_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +cassandra_format_test: utilities/merge_operators/cassandra/format_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + +cassandra_merge_test: utilities/merge_operators/cassandra/cassandra_merge_test.o utilities/merge_operators/cassandra/test_utils.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + +cassandra_row_merge_test: utilities/merge_operators/cassandra/row_merge_test.o utilities/merge_operators/cassandra/test_utils.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + +cassandra_serialize_test: utilities/merge_operators/cassandra/serialize_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + redis_test: utilities/redis/redis_lists_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 36437006b..fa0be6be1 100644 --- a/TARGETS +++ b/TARGETS @@ -222,6 +222,8 @@ cpp_library( "utilities/memory/memory_util.cc", "utilities/merge_operators/max.cc", "utilities/merge_operators/put.cc", + "utilities/merge_operators/cassandra/format.cc", + "utilities/merge_operators/cassandra/merge_operator.cc", "utilities/merge_operators/string_append/stringappend.cc", "utilities/merge_operators/string_append/stringappend2.cc", "utilities/merge_operators/uint64add.cc", @@ -270,6 +272,7 @@ cpp_library( "utilities/col_buf_encoder.cc", "utilities/col_buf_decoder.cc", "utilities/column_aware_encoding_util.cc", + "utilities/merge_operators/cassandra/test_utils.cc" ], deps = [":rocksdb_lib"], preprocessor_flags = rocksdb_preprocessor_flags, @@ -407,6 +410,18 @@ ROCKS_TESTS = [['merger_test', 'table/merger_test.cc', 'serial'], ['stringappend_test', 'utilities/merge_operators/string_append/stringappend_test.cc', 'serial'], + ['cassandra_format_test', + 'utilities/merge_operators/cassandra/format_test.cc', + 'serial'], + ['cassandra_merge_test', + 'utilities/merge_operators/cassandra/cassandra_merge_test.cc', + 'serial'], + ['cassandra_row_merge_test', + 'utilities/merge_operators/cassandra/row_merge_test.cc', + 'serial'], + ['cassandra_serialize_test', + 'utilities/merge_operators/cassandra/serialize_test.cc', + 'serial'], ['reduce_levels_test', 'tools/reduce_levels_test.cc', 'serial'], ['prefix_test', 'db/prefix_test.cc', 'serial'], ['ttl_test', 'utilities/ttl/ttl_test.cc', 'serial'], diff --git a/java/Makefile b/java/Makefile index b64955a07..c90c261f4 100644 --- a/java/Makefile +++ b/java/Makefile @@ -7,6 +7,7 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractCompactionFilter\ org.rocksdb.BloomFilter\ org.rocksdb.Checkpoint\ org.rocksdb.ClockCache\ + org.rocksdb.CassandraValueMergeOperator\ org.rocksdb.ColumnFamilyHandle\ org.rocksdb.ColumnFamilyOptions\ org.rocksdb.CompactionOptionsFIFO\ diff --git a/java/rocksjni/cassandra_value_operator.cc b/java/rocksjni/cassandra_value_operator.cc new file mode 100644 index 000000000..17410bd12 --- /dev/null +++ b/java/rocksjni/cassandra_value_operator.cc @@ -0,0 +1,47 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// This source code is also licensed under the GPLv2 license found in the +// COPYING file in the root directory of this source tree. + +#include +#include +#include +#include +#include + +#include "include/org_rocksdb_CassandraValueMergeOperator.h" +#include "rocksjni/portal.h" +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/statistics.h" +#include "rocksdb/memtablerep.h" +#include "rocksdb/table.h" +#include "rocksdb/slice_transform.h" +#include "rocksdb/merge_operator.h" +#include "utilities/merge_operators/cassandra/merge_operator.h" + +/* + * Class: org_rocksdb_CassandraValueMergeOperator + * Method: newSharedCassandraValueMergeOperator + * Signature: ()J + */ +jlong Java_org_rocksdb_CassandraValueMergeOperator_newSharedCassandraValueMergeOperator +(JNIEnv* env, jclass jclazz) { + auto* sptr_string_append_op = new std::shared_ptr( + rocksdb::CassandraValueMergeOperator::CreateSharedInstance()); + return reinterpret_cast(sptr_string_append_op); +} + +/* + * Class: org_rocksdb_CassandraValueMergeOperator + * Method: disposeInternal + * Signature: (J)V + */ +void Java_org_rocksdb_CassandraValueMergeOperator_disposeInternal( + JNIEnv* env, jobject jobj, jlong jhandle) { + auto* sptr_string_append_op = + reinterpret_cast* >(jhandle); + delete sptr_string_append_op; // delete std::shared_ptr +} diff --git a/java/src/main/java/org/rocksdb/CassandraValueMergeOperator.java b/java/src/main/java/org/rocksdb/CassandraValueMergeOperator.java new file mode 100644 index 000000000..71c77b643 --- /dev/null +++ b/java/src/main/java/org/rocksdb/CassandraValueMergeOperator.java @@ -0,0 +1,22 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// This source code is also licensed under the GPLv2 license found in the +// COPYING file in the root directory of this source tree. + +package org.rocksdb; + +/** + * CassandraValueMergeOperator is a merge operator that merges two cassandra wide column + * values. + */ +public class CassandraValueMergeOperator extends MergeOperator { + public CassandraValueMergeOperator() { + super(newSharedCassandraValueMergeOperator()); + } + + private native static long newSharedCassandraValueMergeOperator(); + + @Override protected final native void disposeInternal(final long handle); +} diff --git a/src.mk b/src.mk index 6f9c8233f..732bf3a5a 100644 --- a/src.mk +++ b/src.mk @@ -172,6 +172,8 @@ LIB_SOURCES = \ utilities/leveldb_options/leveldb_options.cc \ utilities/lua/rocks_lua_compaction_filter.cc \ utilities/memory/memory_util.cc \ + utilities/merge_operators/cassandra/format.cc \ + utilities/merge_operators/cassandra/merge_operator.cc \ utilities/merge_operators/max.cc \ utilities/merge_operators/put.cc \ utilities/merge_operators/string_append/stringappend.cc \ @@ -334,6 +336,11 @@ MAIN_SOURCES = \ utilities/lua/rocks_lua_test.cc \ utilities/memory/memory_test.cc \ utilities/merge_operators/string_append/stringappend_test.cc \ + utilities/merge_operators/cassandra/cassandra_merge_test.cc \ + utilities/merge_operators/cassandra/test_utils.cc \ + utilities/merge_operators/cassandra/format_test.cc \ + utilities/merge_operators/cassandra/row_merge_test.cc \ + utilities/merge_operators/cassandra/serialize_test.cc \ utilities/object_registry_test.cc \ utilities/option_change_migration/option_change_migration_test.cc \ utilities/options/options_util_test.cc \ diff --git a/utilities/merge_operators.h b/utilities/merge_operators.h index 919f23c0b..06c18e08f 100644 --- a/utilities/merge_operators.h +++ b/utilities/merge_operators.h @@ -23,6 +23,7 @@ class MergeOperators { static std::shared_ptr CreateStringAppendOperator(); static std::shared_ptr CreateStringAppendTESTOperator(); static std::shared_ptr CreateMaxOperator(); + static std::shared_ptr CreateCassandraMergeOperator(); // Will return a different merge operator depending on the string. // TODO: Hook the "name" up to the actual Name() of the MergeOperators? @@ -40,6 +41,8 @@ class MergeOperators { return CreateStringAppendTESTOperator(); } else if (name == "max") { return CreateMaxOperator(); + } else if (name == "cassandra") { + return CreateCassandraMergeOperator(); } else { // Empty or unknown, just return nullptr return nullptr; diff --git a/utilities/merge_operators/cassandra/cassandra_merge_test.cc b/utilities/merge_operators/cassandra/cassandra_merge_test.cc new file mode 100644 index 000000000..b898ca0e9 --- /dev/null +++ b/utilities/merge_operators/cassandra/cassandra_merge_test.cc @@ -0,0 +1,134 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// This source code is also licensed under the GPLv2 license found in the +// COPYING file in the root directory of this source tree. + +#include + +#include "rocksdb/db.h" +#include "rocksdb/merge_operator.h" +#include "rocksdb/utilities/db_ttl.h" +#include "util/testharness.h" +#include "util/random.h" +#include "utilities/merge_operators.h" +#include "utilities/merge_operators/cassandra/merge_operator.h" +#include "utilities/merge_operators/cassandra/test_utils.h" + +using namespace rocksdb; + +namespace rocksdb { +namespace cassandra { + +// Path to the database on file system +const std::string kDbName = test::TmpDir() + "/cassandramerge_test"; + +class CassandraStore { + public: + explicit CassandraStore(std::shared_ptr db) + : db_(db), + merge_option_(), + get_option_() { + assert(db); + } + + bool Append(const std::string& key, const RowValue& val){ + std::string result; + val.Serialize(&result); + Slice valSlice(result.data(), result.size()); + auto s = db_->Merge(merge_option_, key, valSlice); + + if (s.ok()) { + return true; + } else { + std::cerr << "ERROR " << s.ToString() << std::endl; + return false; + } + } + + std::tuple Get(const std::string& key){ + std::string result; + auto s = db_->Get(get_option_, key, &result); + + if (s.ok()) { + return std::make_tuple(true, + RowValue::Deserialize(result.data(), + result.size())); + } + + if (!s.IsNotFound()) { + std::cerr << "ERROR " << s.ToString() << std::endl; + } + + return std::make_tuple(false, RowValue(0, 0)); + } + + private: + std::shared_ptr db_; + WriteOptions merge_option_; + ReadOptions get_option_; +}; + + +// The class for unit-testing +class CassandraMergeTest : public testing::Test { + public: + CassandraMergeTest() { + DestroyDB(kDbName, Options()); // Start each test with a fresh DB + } + + std::shared_ptr OpenDb() { + DB* db; + Options options; + options.create_if_missing = true; + options.merge_operator.reset(new CassandraValueMergeOperator()); + EXPECT_OK(DB::Open(options, kDbName, &db)); + return std::shared_ptr(db); + } +}; + +// THE TEST CASES BEGIN HERE + +TEST_F(CassandraMergeTest, SimpleTest) { + auto db = OpenDb(); + CassandraStore store(db); + + store.Append("k1", CreateTestRowValue({ + std::make_tuple(kTombstone, 0, 5), + std::make_tuple(kColumn, 1, 8), + std::make_tuple(kExpiringColumn, 2, 5), + })); + store.Append("k1",CreateTestRowValue({ + std::make_tuple(kColumn, 0, 2), + std::make_tuple(kExpiringColumn, 1, 5), + std::make_tuple(kTombstone, 2, 7), + std::make_tuple(kExpiringColumn, 7, 17), + })); + store.Append("k1", CreateTestRowValue({ + std::make_tuple(kExpiringColumn, 0, 6), + std::make_tuple(kTombstone, 1, 5), + std::make_tuple(kColumn, 2, 4), + std::make_tuple(kTombstone, 11, 11), + })); + + auto ret = store.Get("k1"); + + ASSERT_TRUE(std::get<0>(ret)); + RowValue& merged = std::get<1>(ret); + EXPECT_EQ(merged.columns_.size(), 5); + VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 0, 6); + VerifyRowValueColumns(merged.columns_, 1, kColumn, 1, 8); + VerifyRowValueColumns(merged.columns_, 2, kTombstone, 2, 7); + VerifyRowValueColumns(merged.columns_, 3, kExpiringColumn, 7, 17); + VerifyRowValueColumns(merged.columns_, 4, kTombstone, 11, 11); +} + + +} // namespace cassandra +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/utilities/merge_operators/cassandra/format.cc b/utilities/merge_operators/cassandra/format.cc new file mode 100644 index 000000000..2a65acb99 --- /dev/null +++ b/utilities/merge_operators/cassandra/format.cc @@ -0,0 +1,296 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// This source code is also licensed under the GPLv2 license found in the +// COPYING file in the root directory of this source tree. + +#include "format.h" + +#include +#include +#include + +#include "utilities/merge_operators/cassandra/serialize.h" + +namespace rocksdb { +namespace cassandra { +namespace { +const int32_t kDefaultLocalDeletionTime = + std::numeric_limits::max(); +const int64_t kDefaultMarkedForDeleteAt = + std::numeric_limits::min(); +} + +ColumnBase::ColumnBase(int8_t mask, int8_t index) + : mask_(mask), index_(index) {} + +std::size_t ColumnBase::Size() const { + return sizeof(mask_) + sizeof(index_); +} + +int8_t ColumnBase::Mask() const { + return mask_; +} + +int8_t ColumnBase::Index() const { + return index_; +} + +void ColumnBase::Serialize(std::string* dest) const { + rocksdb::cassandra::Serialize(mask_, dest); + rocksdb::cassandra::Serialize(index_, dest); +} + +std::unique_ptr ColumnBase::Deserialize(const char* src, + std::size_t offset) { + int8_t mask = rocksdb::cassandra::Deserialize(src, offset); + if ((mask & ColumnTypeMask::DELETION_MASK) != 0) { + return Tombstone::Deserialize(src, offset); + } else if ((mask & ColumnTypeMask::EXPIRATION_MASK) != 0) { + return ExpiringColumn::Deserialize(src, offset); + } else { + return Column::Deserialize(src, offset); + } +} + +Column::Column( + int8_t mask, + int8_t index, + int64_t timestamp, + int32_t value_size, + const char* value +) : ColumnBase(mask, index), timestamp_(timestamp), + value_size_(value_size), value_(value) {} + +int64_t Column::Timestamp() const { + return timestamp_; +} + +std::size_t Column::Size() const { + return ColumnBase::Size() + sizeof(timestamp_) + sizeof(value_size_) + + value_size_; +} + +void Column::Serialize(std::string* dest) const { + ColumnBase::Serialize(dest); + rocksdb::cassandra::Serialize(timestamp_, dest); + rocksdb::cassandra::Serialize(value_size_, dest); + dest->append(value_, value_size_); +} + +std::unique_ptr Column::Deserialize(const char *src, + std::size_t offset) { + int8_t mask = rocksdb::cassandra::Deserialize(src, offset); + offset += sizeof(mask); + int8_t index = rocksdb::cassandra::Deserialize(src, offset); + offset += sizeof(index); + int64_t timestamp = rocksdb::cassandra::Deserialize(src, offset); + offset += sizeof(timestamp); + int32_t value_size = rocksdb::cassandra::Deserialize(src, offset); + offset += sizeof(value_size); + return std::unique_ptr( + new Column(mask, index, timestamp, value_size, src + offset)); +} + +ExpiringColumn::ExpiringColumn( + int8_t mask, + int8_t index, + int64_t timestamp, + int32_t value_size, + const char* value, + int32_t ttl +) : Column(mask, index, timestamp, value_size, value), + ttl_(ttl) {} + +std::size_t ExpiringColumn::Size() const { + return Column::Size() + sizeof(ttl_); +} + +void ExpiringColumn::Serialize(std::string* dest) const { + Column::Serialize(dest); + rocksdb::cassandra::Serialize(ttl_, dest); +} + +std::unique_ptr ExpiringColumn::Deserialize( + const char *src, + std::size_t offset) { + int8_t mask = rocksdb::cassandra::Deserialize(src, offset); + offset += sizeof(mask); + int8_t index = rocksdb::cassandra::Deserialize(src, offset); + offset += sizeof(index); + int64_t timestamp = rocksdb::cassandra::Deserialize(src, offset); + offset += sizeof(timestamp); + int32_t value_size = rocksdb::cassandra::Deserialize(src, offset); + offset += sizeof(value_size); + const char* value = src + offset; + offset += value_size; + int32_t ttl = rocksdb::cassandra::Deserialize(src, offset); + return std::unique_ptr( + new ExpiringColumn(mask, index, timestamp, value_size, value, ttl)); +} + +Tombstone::Tombstone( + int8_t mask, + int8_t index, + int32_t local_deletion_time, + int64_t marked_for_delete_at +) : ColumnBase(mask, index), local_deletion_time_(local_deletion_time), + marked_for_delete_at_(marked_for_delete_at) {} + +int64_t Tombstone::Timestamp() const { + return marked_for_delete_at_; +} + +std::size_t Tombstone::Size() const { + return ColumnBase::Size() + sizeof(local_deletion_time_) + + sizeof(marked_for_delete_at_); +} + +void Tombstone::Serialize(std::string* dest) const { + ColumnBase::Serialize(dest); + rocksdb::cassandra::Serialize(local_deletion_time_, dest); + rocksdb::cassandra::Serialize(marked_for_delete_at_, dest); +} + +std::unique_ptr Tombstone::Deserialize(const char *src, + std::size_t offset) { + int8_t mask = rocksdb::cassandra::Deserialize(src, offset); + offset += sizeof(mask); + int8_t index = rocksdb::cassandra::Deserialize(src, offset); + offset += sizeof(index); + int32_t local_deletion_time = + rocksdb::cassandra::Deserialize(src, offset); + offset += sizeof(int32_t); + int64_t marked_for_delete_at = + rocksdb::cassandra::Deserialize(src, offset); + return std::unique_ptr( + new Tombstone(mask, index, local_deletion_time, marked_for_delete_at)); +} + +RowValue::RowValue(int32_t local_deletion_time, int64_t marked_for_delete_at) + : local_deletion_time_(local_deletion_time), + marked_for_delete_at_(marked_for_delete_at), columns_(), + last_modified_time_(0) {} + +RowValue::RowValue(std::vector> columns, + int64_t last_modified_time) + : local_deletion_time_(kDefaultLocalDeletionTime), + marked_for_delete_at_(kDefaultMarkedForDeleteAt), + columns_(std::move(columns)), last_modified_time_(last_modified_time) {} + +std::size_t RowValue::Size() const { + std::size_t size = sizeof(local_deletion_time_) + + sizeof(marked_for_delete_at_); + for (const auto& column : columns_) { + size += column -> Size(); + } + return size; +} + +int64_t RowValue::LastModifiedTime() const { + if (IsTombstone()) { + return marked_for_delete_at_; + } else { + return last_modified_time_; + } +} + +bool RowValue::IsTombstone() const { + return marked_for_delete_at_ > kDefaultMarkedForDeleteAt; +} + +void RowValue::Serialize(std::string* dest) const { + rocksdb::cassandra::Serialize(local_deletion_time_, dest); + rocksdb::cassandra::Serialize(marked_for_delete_at_, dest); + for (const auto& column : columns_) { + column -> Serialize(dest); + } +} + +RowValue RowValue::Deserialize(const char *src, std::size_t size) { + std::size_t offset = 0; + assert(size >= sizeof(local_deletion_time_) + sizeof(marked_for_delete_at_)); + int32_t local_deletion_time = + rocksdb::cassandra::Deserialize(src, offset); + offset += sizeof(int32_t); + int64_t marked_for_delete_at = + rocksdb::cassandra::Deserialize(src, offset); + offset += sizeof(int64_t); + if (offset == size) { + return RowValue(local_deletion_time, marked_for_delete_at); + } + + assert(local_deletion_time == kDefaultLocalDeletionTime); + assert(marked_for_delete_at == kDefaultMarkedForDeleteAt); + std::vector> columns; + int64_t last_modified_time = 0; + while (offset < size) { + auto c = ColumnBase::Deserialize(src, offset); + offset += c -> Size(); + assert(offset <= size); + last_modified_time = std::max(last_modified_time, c -> Timestamp()); + columns.push_back(std::move(c)); + } + + return RowValue(std::move(columns), last_modified_time); +} + +// Merge multiple row values into one. +// For each column in rows with same index, we pick the one with latest +// timestamp. And we also take row tombstone into consideration, by iterating +// each row from reverse timestamp order, and stop once we hit the first +// row tombstone. +RowValue RowValue::Merge(std::vector&& values) { + assert(values.size() > 0); + if (values.size() == 1) { + return std::move(values[0]); + } + + // Merge columns by their last modified time, and skip once we hit + // a row tombstone. + std::sort(values.begin(), values.end(), + [](const RowValue& r1, const RowValue& r2) { + return r1.LastModifiedTime() > r2.LastModifiedTime(); + }); + + std::map> merged_columns; + int64_t tombstone_timestamp = 0; + + for (auto& value : values) { + if (value.IsTombstone()) { + if (merged_columns.size() == 0) { + return std::move(value); + } + tombstone_timestamp = value.LastModifiedTime(); + break; + } + for (auto& column : value.columns_) { + int8_t index = column->Index(); + if (merged_columns.find(index) == merged_columns.end()) { + merged_columns[index] = std::move(column); + } else { + if (column->Timestamp() > merged_columns[index]->Timestamp()) { + merged_columns[index] = std::move(column); + } + } + } + } + + int64_t last_modified_time = 0; + std::vector> columns; + for (auto& pair: merged_columns) { + // For some row, its last_modified_time > row tombstone_timestamp, but + // it might have rows whose timestamp is ealier than tombstone, so we + // ned to filter these rows. + if (pair.second->Timestamp() <= tombstone_timestamp) { + continue; + } + last_modified_time = std::max(last_modified_time, pair.second->Timestamp()); + columns.push_back(std::move(pair.second)); + } + return RowValue(std::move(columns), last_modified_time); +} + +} // namepsace cassandrda +} // namespace rocksdb diff --git a/utilities/merge_operators/cassandra/format.h b/utilities/merge_operators/cassandra/format.h new file mode 100644 index 000000000..9aabc6267 --- /dev/null +++ b/utilities/merge_operators/cassandra/format.h @@ -0,0 +1,179 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// This source code is also licensed under the GPLv2 license found in the +// COPYING file in the root directory of this source tree. + +/** + * The encoding of Cassandra Row Value. + * + * A Cassandra Row Value could either be a row tombstone, + * or contains multiple columns, it has following fields: + * + * struct row_value { + * int32_t local_deletion_time; // Time in second when the row is deleted, + * // only used for Cassandra tombstone gc. + * int64_t marked_for_delete_at; // Ms that marked this row is deleted. + * struct column_base columns[]; // For non tombstone row, all columns + * // are stored here. + * } + * + * If the local_deletion_time and marked_for_delete_at is set, then this is + * a tombstone, otherwise it contains multiple columns. + * + * There are three type of Columns: Normal Column, Expiring Column and Column + * Tombstone, which have following fields: + * + * // Identify the type of the column. + * enum mask { + * DELETION_MASK = 0x01, + * EXPIRATION_MASK = 0x02, + * }; + * + * struct column { + * int8_t mask = 0; + * int8_t index; + * int64_t timestamp; + * int32_t value_length; + * char value[value_length]; + * } + * + * struct expiring_column { + * int8_t mask = mask.EXPIRATION_MASK; + * int8_t index; + * int64_t timestamp; + * int32_t value_length; + * char value[value_length]; + * int32_t ttl; + * } + * + * struct tombstone_column { + * int8_t mask = mask.DELETION_MASK; + * int8_t index; + * int32_t local_deletion_time; // Similar to row_value's field. + * int64_t marked_for_delete_at; + * } + */ + +#pragma once +#include +#include +#include "rocksdb/merge_operator.h" +#include "rocksdb/slice.h" +#include "util/testharness.h" + +namespace rocksdb { +namespace cassandra { + +// Identify the type of the column. +enum ColumnTypeMask { + DELETION_MASK = 0x01, + EXPIRATION_MASK = 0x02, +}; + +class ColumnBase { +public: + ColumnBase(int8_t mask, int8_t index); + virtual ~ColumnBase() = default; + + virtual int64_t Timestamp() const = 0; + virtual int8_t Mask() const; + virtual int8_t Index() const; + virtual std::size_t Size() const; + virtual void Serialize(std::string* dest) const; + + static std::unique_ptr Deserialize(const char* src, + std::size_t offset); + +private: + int8_t mask_; + int8_t index_; +}; + +class Column : public ColumnBase { +public: + Column(int8_t mask, int8_t index, int64_t timestamp, + int32_t value_size, const char* value); + + virtual int64_t Timestamp() const override; + virtual std::size_t Size() const override; + virtual void Serialize(std::string* dest) const override; + + static std::unique_ptr Deserialize(const char* src, + std::size_t offset); + +private: + int64_t timestamp_; + int32_t value_size_; + const char* value_; +}; + +class ExpiringColumn : public Column { +public: + ExpiringColumn(int8_t mask, int8_t index, int64_t timestamp, + int32_t value_size, const char* value, int32_t ttl); + + virtual std::size_t Size() const override; + virtual void Serialize(std::string* dest) const override; + + static std::unique_ptr Deserialize(const char* src, + std::size_t offset); + +private: + int32_t ttl_; +}; + +class Tombstone : public ColumnBase { +public: + Tombstone(int8_t mask, int8_t index, + int32_t local_deletion_time, int64_t marked_for_delete_at); + + virtual int64_t Timestamp() const override; + virtual std::size_t Size() const override; + virtual void Serialize(std::string* dest) const override; + + static std::unique_ptr Deserialize(const char* src, + std::size_t offset); + +private: + int32_t local_deletion_time_; + int64_t marked_for_delete_at_; +}; + +class RowValue { +public: + // Create a Row Tombstone. + RowValue(int32_t local_deletion_time, int64_t marked_for_delete_at); + // Create a Row containing columns. + RowValue(std::vector> columns, + int64_t last_modified_time); + RowValue(const RowValue& that) = delete; + RowValue(RowValue&& that) noexcept = default; + RowValue& operator=(const RowValue& that) = delete; + RowValue& operator=(RowValue&& that) = default; + + std::size_t Size() const;; + bool IsTombstone() const; + // For Tombstone this returns the marked_for_delete_at_, + // otherwise it returns the max timestamp of containing columns. + int64_t LastModifiedTime() const; + void Serialize(std::string* dest) const; + + static RowValue Deserialize(const char* src, std::size_t size); + // Merge multiple rows according to their timestamp. + static RowValue Merge(std::vector&& values); + +private: + int32_t local_deletion_time_; + int64_t marked_for_delete_at_; + std::vector> columns_; + int64_t last_modified_time_; + + FRIEND_TEST(RowValueMergeTest, Merge); + FRIEND_TEST(RowValueMergeTest, MergeWithRowTombstone); + FRIEND_TEST(CassandraMergeTest, SimpleTest); +}; + +} // namepsace cassandrda +} // namespace rocksdb diff --git a/utilities/merge_operators/cassandra/format_test.cc b/utilities/merge_operators/cassandra/format_test.cc new file mode 100644 index 000000000..381cf0828 --- /dev/null +++ b/utilities/merge_operators/cassandra/format_test.cc @@ -0,0 +1,306 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// This source code is also licensed under the GPLv2 license found in the +// COPYING file in the root directory of this source tree. + +#include +#include +#include "util/testharness.h" +#include "utilities/merge_operators/cassandra/format.h" +#include "utilities/merge_operators/cassandra/serialize.h" + +using namespace rocksdb::cassandra; + +namespace rocksdb { +namespace cassandra { + +TEST(ColumnTest, Column) { + char data[4] = {'d', 'a', 't', 'a'}; + int8_t mask = 0; + int8_t index = 1; + int64_t timestamp = 1494022807044; + Column c = Column(mask, index, timestamp, sizeof(data), data); + + EXPECT_EQ(c.Index(), index); + EXPECT_EQ(c.Timestamp(), timestamp); + EXPECT_EQ(c.Size(), 14 + sizeof(data)); + + // Verify the serialization. + std::string dest; + dest.reserve(c.Size() * 2); + c.Serialize(&dest); + + EXPECT_EQ(dest.size(), c.Size()); + std::size_t offset = 0; + EXPECT_EQ(Deserialize(dest.c_str(), offset), mask); + offset += sizeof(int8_t); + EXPECT_EQ(Deserialize(dest.c_str(), offset), index); + offset += sizeof(int8_t); + EXPECT_EQ(Deserialize(dest.c_str(), offset), timestamp); + offset += sizeof(int64_t); + EXPECT_EQ(Deserialize(dest.c_str(), offset), sizeof(data)); + offset += sizeof(int32_t); + EXPECT_TRUE(std::memcmp(data, dest.c_str() + offset, sizeof(data)) == 0); + + // Verify the deserialization. + std::unique_ptr c1 = Column::Deserialize(dest.c_str(), 0); + EXPECT_EQ(c1->Index(), index); + EXPECT_EQ(c1->Timestamp(), timestamp); + EXPECT_EQ(c1->Size(), 14 + sizeof(data)); + + c1->Serialize(&dest); + EXPECT_EQ(dest.size(), 2 * c.Size()); + EXPECT_TRUE( + std::memcmp(dest.c_str(), dest.c_str() + c.Size(), c.Size()) == 0); + + // Verify the ColumnBase::Deserialization. + std::unique_ptr c2 = + ColumnBase::Deserialize(dest.c_str(), c.Size()); + c2->Serialize(&dest); + EXPECT_EQ(dest.size(), 3 * c.Size()); + EXPECT_TRUE( + std::memcmp(dest.c_str() + c.Size(), dest.c_str() + c.Size() * 2, c.Size()) + == 0); +} + +TEST(ExpiringColumnTest, ExpiringColumn) { + char data[4] = {'d', 'a', 't', 'a'}; + int8_t mask = ColumnTypeMask::EXPIRATION_MASK; + int8_t index = 3; + int64_t timestamp = 1494022807044; + int32_t ttl = 3600; + ExpiringColumn c = ExpiringColumn(mask, index, timestamp, + sizeof(data), data, ttl); + + EXPECT_EQ(c.Index(), index); + EXPECT_EQ(c.Timestamp(), timestamp); + EXPECT_EQ(c.Size(), 18 + sizeof(data)); + + // Verify the serialization. + std::string dest; + dest.reserve(c.Size() * 2); + c.Serialize(&dest); + + EXPECT_EQ(dest.size(), c.Size()); + std::size_t offset = 0; + EXPECT_EQ(Deserialize(dest.c_str(), offset), mask); + offset += sizeof(int8_t); + EXPECT_EQ(Deserialize(dest.c_str(), offset), index); + offset += sizeof(int8_t); + EXPECT_EQ(Deserialize(dest.c_str(), offset), timestamp); + offset += sizeof(int64_t); + EXPECT_EQ(Deserialize(dest.c_str(), offset), sizeof(data)); + offset += sizeof(int32_t); + EXPECT_TRUE(std::memcmp(data, dest.c_str() + offset, sizeof(data)) == 0); + offset += sizeof(data); + EXPECT_EQ(Deserialize(dest.c_str(), offset), ttl); + + // Verify the deserialization. + std::unique_ptr c1 = + ExpiringColumn::Deserialize(dest.c_str(), 0); + EXPECT_EQ(c1->Index(), index); + EXPECT_EQ(c1->Timestamp(), timestamp); + EXPECT_EQ(c1->Size(), 18 + sizeof(data)); + + c1->Serialize(&dest); + EXPECT_EQ(dest.size(), 2 * c.Size()); + EXPECT_TRUE( + std::memcmp(dest.c_str(), dest.c_str() + c.Size(), c.Size()) == 0); + + // Verify the ColumnBase::Deserialization. + std::unique_ptr c2 = + ColumnBase::Deserialize(dest.c_str(), c.Size()); + c2->Serialize(&dest); + EXPECT_EQ(dest.size(), 3 * c.Size()); + EXPECT_TRUE( + std::memcmp(dest.c_str() + c.Size(), dest.c_str() + c.Size() * 2, c.Size()) + == 0); +} + +TEST(TombstoneTest, Tombstone) { + int8_t mask = ColumnTypeMask::DELETION_MASK; + int8_t index = 2; + int32_t local_deletion_time = 1494022807; + int64_t marked_for_delete_at = 1494022807044; + Tombstone c = Tombstone(mask, index, local_deletion_time, + marked_for_delete_at); + + EXPECT_EQ(c.Index(), index); + EXPECT_EQ(c.Timestamp(), marked_for_delete_at); + EXPECT_EQ(c.Size(), 14); + + // Verify the serialization. + std::string dest; + dest.reserve(c.Size() * 2); + c.Serialize(&dest); + + EXPECT_EQ(dest.size(), c.Size()); + std::size_t offset = 0; + EXPECT_EQ(Deserialize(dest.c_str(), offset), mask); + offset += sizeof(int8_t); + EXPECT_EQ(Deserialize(dest.c_str(), offset), index); + offset += sizeof(int8_t); + EXPECT_EQ(Deserialize(dest.c_str(), offset), local_deletion_time); + offset += sizeof(int32_t); + EXPECT_EQ(Deserialize(dest.c_str(), offset), marked_for_delete_at); + + // Verify the deserialization. + std::unique_ptr c1 = Tombstone::Deserialize(dest.c_str(), 0); + EXPECT_EQ(c1->Index(), index); + EXPECT_EQ(c1->Timestamp(), marked_for_delete_at); + EXPECT_EQ(c1->Size(), 14); + + c1->Serialize(&dest); + EXPECT_EQ(dest.size(), 2 * c.Size()); + EXPECT_TRUE( + std::memcmp(dest.c_str(), dest.c_str() + c.Size(), c.Size()) == 0); + + // Verify the ColumnBase::Deserialization. + std::unique_ptr c2 = + ColumnBase::Deserialize(dest.c_str(), c.Size()); + c2->Serialize(&dest); + EXPECT_EQ(dest.size(), 3 * c.Size()); + EXPECT_TRUE( + std::memcmp(dest.c_str() + c.Size(), dest.c_str() + c.Size() * 2, c.Size()) + == 0); +} + +TEST(RowValueTest, RowTombstone) { + int32_t local_deletion_time = 1494022807; + int64_t marked_for_delete_at = 1494022807044; + RowValue r = RowValue(local_deletion_time, marked_for_delete_at); + + EXPECT_EQ(r.Size(), 12); + EXPECT_EQ(r.IsTombstone(), true); + EXPECT_EQ(r.LastModifiedTime(), marked_for_delete_at); + + // Verify the serialization. + std::string dest; + dest.reserve(r.Size() * 2); + r.Serialize(&dest); + + EXPECT_EQ(dest.size(), r.Size()); + std::size_t offset = 0; + EXPECT_EQ(Deserialize(dest.c_str(), offset), local_deletion_time); + offset += sizeof(int32_t); + EXPECT_EQ(Deserialize(dest.c_str(), offset), marked_for_delete_at); + + // Verify the deserialization. + RowValue r1 = RowValue::Deserialize(dest.c_str(), r.Size()); + EXPECT_EQ(r1.Size(), 12); + EXPECT_EQ(r1.IsTombstone(), true); + EXPECT_EQ(r1.LastModifiedTime(), marked_for_delete_at); + + r1.Serialize(&dest); + EXPECT_EQ(dest.size(), 2 * r.Size()); + EXPECT_TRUE( + std::memcmp(dest.c_str(), dest.c_str() + r.Size(), r.Size()) == 0); +} + +TEST(RowValueTest, RowWithColumns) { + std::vector> columns; + int64_t last_modified_time = 1494022807048; + std::size_t columns_data_size = 0; + + char e_data[5] = {'e', 'd', 'a', 't', 'a'}; + int8_t e_index = 0; + int64_t e_timestamp = 1494022807044; + int32_t e_ttl = 3600; + columns.push_back(std::unique_ptr( + new ExpiringColumn(ColumnTypeMask::EXPIRATION_MASK, e_index, + e_timestamp, sizeof(e_data), e_data, e_ttl))); + columns_data_size += columns[0]->Size(); + + char c_data[4] = {'d', 'a', 't', 'a'}; + int8_t c_index = 1; + int64_t c_timestamp = 1494022807048; + columns.push_back(std::unique_ptr( + new Column(0, c_index, c_timestamp, sizeof(c_data), c_data))); + columns_data_size += columns[1]->Size(); + + int8_t t_index = 2; + int32_t t_local_deletion_time = 1494022801; + int64_t t_marked_for_delete_at = 1494022807043; + columns.push_back(std::unique_ptr( + new Tombstone(ColumnTypeMask::DELETION_MASK, + t_index, t_local_deletion_time, t_marked_for_delete_at))); + columns_data_size += columns[2]->Size(); + + RowValue r = RowValue(std::move(columns), last_modified_time); + + EXPECT_EQ(r.Size(), columns_data_size + 12); + EXPECT_EQ(r.IsTombstone(), false); + EXPECT_EQ(r.LastModifiedTime(), last_modified_time); + + // Verify the serialization. + std::string dest; + dest.reserve(r.Size() * 2); + r.Serialize(&dest); + + EXPECT_EQ(dest.size(), r.Size()); + std::size_t offset = 0; + EXPECT_EQ(Deserialize(dest.c_str(), offset), + std::numeric_limits::max()); + offset += sizeof(int32_t); + EXPECT_EQ(Deserialize(dest.c_str(), offset), + std::numeric_limits::min()); + offset += sizeof(int64_t); + + // Column0: ExpiringColumn + EXPECT_EQ(Deserialize(dest.c_str(), offset), + ColumnTypeMask::EXPIRATION_MASK); + offset += sizeof(int8_t); + EXPECT_EQ(Deserialize(dest.c_str(), offset), e_index); + offset += sizeof(int8_t); + EXPECT_EQ(Deserialize(dest.c_str(), offset), e_timestamp); + offset += sizeof(int64_t); + EXPECT_EQ(Deserialize(dest.c_str(), offset), sizeof(e_data)); + offset += sizeof(int32_t); + EXPECT_TRUE(std::memcmp(e_data, dest.c_str() + offset, sizeof(e_data)) == 0); + offset += sizeof(e_data); + EXPECT_EQ(Deserialize(dest.c_str(), offset), e_ttl); + offset += sizeof(int32_t); + + // Column1: Column + EXPECT_EQ(Deserialize(dest.c_str(), offset), 0); + offset += sizeof(int8_t); + EXPECT_EQ(Deserialize(dest.c_str(), offset), c_index); + offset += sizeof(int8_t); + EXPECT_EQ(Deserialize(dest.c_str(), offset), c_timestamp); + offset += sizeof(int64_t); + EXPECT_EQ(Deserialize(dest.c_str(), offset), sizeof(c_data)); + offset += sizeof(int32_t); + EXPECT_TRUE(std::memcmp(c_data, dest.c_str() + offset, sizeof(c_data)) == 0); + offset += sizeof(c_data); + + // Column2: Tombstone + EXPECT_EQ(Deserialize(dest.c_str(), offset), + ColumnTypeMask::DELETION_MASK); + offset += sizeof(int8_t); + EXPECT_EQ(Deserialize(dest.c_str(), offset), t_index); + offset += sizeof(int8_t); + EXPECT_EQ(Deserialize(dest.c_str(), offset), t_local_deletion_time); + offset += sizeof(int32_t); + EXPECT_EQ(Deserialize(dest.c_str(), offset), t_marked_for_delete_at); + + // Verify the deserialization. + RowValue r1 = RowValue::Deserialize(dest.c_str(), r.Size()); + EXPECT_EQ(r1.Size(), columns_data_size + 12); + EXPECT_EQ(r1.IsTombstone(), false); + EXPECT_EQ(r1.LastModifiedTime(), last_modified_time); + + r1.Serialize(&dest); + EXPECT_EQ(dest.size(), 2 * r.Size()); + EXPECT_TRUE( + std::memcmp(dest.c_str(), dest.c_str() + r.Size(), r.Size()) == 0); +} + +} // namespace cassandra +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/utilities/merge_operators/cassandra/merge_operator.cc b/utilities/merge_operators/cassandra/merge_operator.cc new file mode 100644 index 000000000..308b84598 --- /dev/null +++ b/utilities/merge_operators/cassandra/merge_operator.cc @@ -0,0 +1,103 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// This source code is also licensed under the GPLv2 license found in the +// COPYING file in the root directory of this source tree. + +#include "merge_operator.h" + +#include +#include + +#include "rocksdb/slice.h" +#include "rocksdb/merge_operator.h" +#include "utilities/merge_operators.h" +#include "utilities/merge_operators/cassandra/format.h" + +namespace rocksdb { +namespace cassandra { + +// Implementation for the merge operation (merges two Cassandra values) +bool CassandraValueMergeOperator::FullMergeV2( + const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const { + // Clear the *new_value for writing. + merge_out->new_value.clear(); + + if (merge_in.existing_value == nullptr && merge_in.operand_list.size() == 1) { + // Only one operand + merge_out->existing_operand = merge_in.operand_list.back(); + return true; + } + + std::vector row_values; + if (merge_in.existing_value) { + row_values.push_back( + RowValue::Deserialize(merge_in.existing_value->data(), + merge_in.existing_value->size())); + } + + for (auto& operand : merge_in.operand_list) { + row_values.push_back(RowValue::Deserialize(operand.data(), operand.size())); + } + + RowValue merged = RowValue::Merge(std::move(row_values)); + merge_out->new_value.reserve(merged.Size()); + merged.Serialize(&(merge_out->new_value)); + + return true; +} + +// Implementation for the merge operation (merges two Cassandra values) +bool CassandraValueMergeOperator::PartialMerge(const Slice& key, + const Slice& left_operand, + const Slice& right_operand, + std::string* new_value, + Logger* logger) const { + // Clear the *new_value for writing. + assert(new_value); + new_value->clear(); + + std::vector row_values; + row_values.push_back(RowValue::Deserialize(left_operand.data(), + left_operand.size())); + row_values.push_back(RowValue::Deserialize(right_operand.data(), + right_operand.size())); + RowValue merged = RowValue::Merge(std::move(row_values)); + new_value->reserve(merged.Size()); + merged.Serialize(new_value); + return true; +} + +bool CassandraValueMergeOperator::PartialMergeMulti( + const Slice& key, + const std::deque& operand_list, + std::string* new_value, + Logger* logger) const { + // Clear the *new_value for writing. + assert(new_value); + new_value->clear(); + + std::vector row_values; + for (auto& operand : operand_list) { + row_values.push_back(RowValue::Deserialize(operand.data(), operand.size())); + } + RowValue merged = RowValue::Merge(std::move(row_values)); + new_value->reserve(merged.Size()); + merged.Serialize(new_value); + return true; +} + +const char* CassandraValueMergeOperator::Name() const { + return "CassandraValueMergeOperator"; +} + +} // namespace cassandra + +std::shared_ptr + MergeOperators::CreateCassandraMergeOperator() { + return std::make_shared(); +} + +} // namespace rocksdb diff --git a/utilities/merge_operators/cassandra/merge_operator.h b/utilities/merge_operators/cassandra/merge_operator.h new file mode 100644 index 000000000..b824c6182 --- /dev/null +++ b/utilities/merge_operators/cassandra/merge_operator.h @@ -0,0 +1,39 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// This source code is also licensed under the GPLv2 license found in the +// COPYING file in the root directory of this source tree. + +#pragma once +#include "rocksdb/merge_operator.h" +#include "rocksdb/slice.h" + +namespace rocksdb { +namespace cassandra { + +/** + * A MergeOperator for rocksdb that implements Cassandra row value merge. + */ +class CassandraValueMergeOperator : public MergeOperator { +public: + static std::shared_ptr CreateSharedInstance(); + + virtual bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override; + + virtual bool PartialMerge(const Slice& key, + const Slice& left_operand, + const Slice& right_operand, + std::string* new_value, + Logger* logger) const override; + + virtual bool PartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, + Logger* logger) const override; + + virtual const char* Name() const override; +}; +} // namespace cassandra +} // namespace rocksdb diff --git a/utilities/merge_operators/cassandra/row_merge_test.cc b/utilities/merge_operators/cassandra/row_merge_test.cc new file mode 100644 index 000000000..1bd7e3ff9 --- /dev/null +++ b/utilities/merge_operators/cassandra/row_merge_test.cc @@ -0,0 +1,114 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// This source code is also licensed under the GPLv2 license found in the +// COPYING file in the root directory of this source tree. + +#include +#include "util/testharness.h" +#include "utilities/merge_operators/cassandra/format.h" +#include "utilities/merge_operators/cassandra/test_utils.h" + +namespace rocksdb { +namespace cassandra { + +TEST(RowValueMergeTest, Merge) { + std::vector row_values; + row_values.push_back( + CreateTestRowValue({ + std::make_tuple(kTombstone, 0, 5), + std::make_tuple(kColumn, 1, 8), + std::make_tuple(kExpiringColumn, 2, 5), + }) + ); + + row_values.push_back( + CreateTestRowValue({ + std::make_tuple(kColumn, 0, 2), + std::make_tuple(kExpiringColumn, 1, 5), + std::make_tuple(kTombstone, 2, 7), + std::make_tuple(kExpiringColumn, 7, 17), + }) + ); + + row_values.push_back( + CreateTestRowValue({ + std::make_tuple(kExpiringColumn, 0, 6), + std::make_tuple(kTombstone, 1, 5), + std::make_tuple(kColumn, 2, 4), + std::make_tuple(kTombstone, 11, 11), + }) + ); + + RowValue merged = RowValue::Merge(std::move(row_values)); + EXPECT_FALSE(merged.IsTombstone()); + EXPECT_EQ(merged.columns_.size(), 5); + VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 0, 6); + VerifyRowValueColumns(merged.columns_, 1, kColumn, 1, 8); + VerifyRowValueColumns(merged.columns_, 2, kTombstone, 2, 7); + VerifyRowValueColumns(merged.columns_, 3, kExpiringColumn, 7, 17); + VerifyRowValueColumns(merged.columns_, 4, kTombstone, 11, 11); +} + +TEST(RowValueMergeTest, MergeWithRowTombstone) { + std::vector row_values; + + // A row tombstone. + row_values.push_back( + CreateRowTombstone(11) + ); + + // This row's timestamp is smaller than tombstone. + row_values.push_back( + CreateTestRowValue({ + std::make_tuple(kColumn, 0, 5), + std::make_tuple(kColumn, 1, 6), + }) + ); + + // Some of the column's row is smaller, some is larger. + row_values.push_back( + CreateTestRowValue({ + std::make_tuple(kColumn, 2, 10), + std::make_tuple(kColumn, 3, 12), + }) + ); + + // All of the column's rows are larger than tombstone. + row_values.push_back( + CreateTestRowValue({ + std::make_tuple(kColumn, 4, 13), + std::make_tuple(kColumn, 5, 14), + }) + ); + + RowValue merged = RowValue::Merge(std::move(row_values)); + EXPECT_FALSE(merged.IsTombstone()); + EXPECT_EQ(merged.columns_.size(), 3); + VerifyRowValueColumns(merged.columns_, 0, kColumn, 3, 12); + VerifyRowValueColumns(merged.columns_, 1, kColumn, 4, 13); + VerifyRowValueColumns(merged.columns_, 2, kColumn, 5, 14); + + // If the tombstone's timestamp is the latest, then it returns a + // row tombstone. + row_values.push_back( + CreateRowTombstone(15) + ); + + row_values.push_back( + CreateRowTombstone(17) + ); + + merged = RowValue::Merge(std::move(row_values)); + EXPECT_TRUE(merged.IsTombstone()); + EXPECT_EQ(merged.LastModifiedTime(), 17); +} + +} // namespace cassandra +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/utilities/merge_operators/cassandra/serialize.h b/utilities/merge_operators/cassandra/serialize.h new file mode 100644 index 000000000..55bc6bb28 --- /dev/null +++ b/utilities/merge_operators/cassandra/serialize.h @@ -0,0 +1,77 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// This source code is also licensed under the GPLv2 license found in the +// COPYING file in the root directory of this source tree. + +/** + * Helper functions which serialize and deserialize integers + * into bytes in big endian. + */ + +#pragma once + +namespace rocksdb { +namespace cassandra { +namespace { +const int64_t kCharMask = 0xFFLL; +const int32_t kBitsPerByte = 8; +} + +template +void Serialize(T val, std::string* dest); + +template +T Deserialize(const char* src, std::size_t offset=0); + +// Specializations +template<> +inline void Serialize(int8_t t, std::string* dest) { + dest->append(1, static_cast(t & kCharMask)); +} + +template<> +inline void Serialize(int32_t t, std::string* dest) { + for (unsigned long i = 0; i < sizeof(int32_t); i++) { + dest->append(1, static_cast( + (t >> (sizeof(int32_t) - 1 - i) * kBitsPerByte) & kCharMask)); + } +} + +template<> +inline void Serialize(int64_t t, std::string* dest) { + for (unsigned long i = 0; i < sizeof(int64_t); i++) { + dest->append( + 1, static_cast( + (t >> (sizeof(int64_t) - 1 - i) * kBitsPerByte) & kCharMask)); + } +} + +template<> +inline int8_t Deserialize(const char* src, std::size_t offset) { + return static_cast(src[offset]); +} + +template<> +inline int32_t Deserialize(const char* src, std::size_t offset) { + int32_t result = 0; + for (unsigned long i = 0; i < sizeof(int32_t); i++) { + result |= static_cast(static_cast(src[offset + i])) + << ((sizeof(int32_t) - 1 - i) * kBitsPerByte); + } + return result; +} + +template<> +inline int64_t Deserialize(const char* src, std::size_t offset) { + int64_t result = 0; + for (unsigned long i = 0; i < sizeof(int64_t); i++) { + result |= static_cast(static_cast(src[offset + i])) + << ((sizeof(int64_t) - 1 - i) * kBitsPerByte); + } + return result; +} + +} // namepsace cassandrda +} // namespace rocksdb diff --git a/utilities/merge_operators/cassandra/serialize_test.cc b/utilities/merge_operators/cassandra/serialize_test.cc new file mode 100644 index 000000000..06e9eaa95 --- /dev/null +++ b/utilities/merge_operators/cassandra/serialize_test.cc @@ -0,0 +1,190 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// This source code is also licensed under the GPLv2 license found in the +// COPYING file in the root directory of this source tree. + +#include "util/testharness.h" +#include "utilities/merge_operators/cassandra/serialize.h" + +using namespace rocksdb::cassandra; + +namespace rocksdb { +namespace cassandra { + +TEST(SerializeTest, SerializeI64) { + std::string dest; + Serialize(0, &dest); + EXPECT_EQ( + std::string( + {'\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x00'}), + dest); + + dest.clear(); + Serialize(1, &dest); + EXPECT_EQ( + std::string( + {'\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x01'}), + dest); + + + dest.clear(); + Serialize(-1, &dest); + EXPECT_EQ( + std::string( + {'\xff', '\xff', '\xff', '\xff', '\xff', '\xff', '\xff', '\xff'}), + dest); + + dest.clear(); + Serialize(9223372036854775807, &dest); + EXPECT_EQ( + std::string( + {'\x7f', '\xff', '\xff', '\xff', '\xff', '\xff', '\xff', '\xff'}), + dest); + + dest.clear(); + Serialize(-9223372036854775807, &dest); + EXPECT_EQ( + std::string( + {'\x80', '\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x01'}), + dest); +} + +TEST(SerializeTest, DeserializeI64) { + std::string dest; + std::size_t offset = dest.size(); + Serialize(0, &dest); + EXPECT_EQ(0, Deserialize(dest.c_str(), offset)); + + offset = dest.size(); + Serialize(1, &dest); + EXPECT_EQ(1, Deserialize(dest.c_str(), offset)); + + offset = dest.size(); + Serialize(-1, &dest); + EXPECT_EQ(-1, Deserialize(dest.c_str(), offset)); + + offset = dest.size(); + Serialize(-9223372036854775807, &dest); + EXPECT_EQ(-9223372036854775807, Deserialize(dest.c_str(), offset)); + + offset = dest.size(); + Serialize(9223372036854775807, &dest); + EXPECT_EQ(9223372036854775807, Deserialize(dest.c_str(), offset)); +} + +TEST(SerializeTest, SerializeI32) { + std::string dest; + Serialize(0, &dest); + EXPECT_EQ( + std::string( + {'\x00', '\x00', '\x00', '\x00'}), + dest); + + dest.clear(); + Serialize(1, &dest); + EXPECT_EQ( + std::string( + {'\x00', '\x00', '\x00', '\x01'}), + dest); + + + dest.clear(); + Serialize(-1, &dest); + EXPECT_EQ( + std::string( + {'\xff', '\xff', '\xff', '\xff'}), + dest); + + dest.clear(); + Serialize(2147483647, &dest); + EXPECT_EQ( + std::string( + {'\x7f', '\xff', '\xff', '\xff'}), + dest); + + dest.clear(); + Serialize(-2147483648LL, &dest); + EXPECT_EQ( + std::string( + {'\x80', '\x00', '\x00', '\x00'}), + dest); +} + +TEST(SerializeTest, DeserializeI32) { + std::string dest; + std::size_t offset = dest.size(); + Serialize(0, &dest); + EXPECT_EQ(0, Deserialize(dest.c_str(), offset)); + + offset = dest.size(); + Serialize(1, &dest); + EXPECT_EQ(1, Deserialize(dest.c_str(), offset)); + + offset = dest.size(); + Serialize(-1, &dest); + EXPECT_EQ(-1, Deserialize(dest.c_str(), offset)); + + offset = dest.size(); + Serialize(2147483647, &dest); + EXPECT_EQ(2147483647, Deserialize(dest.c_str(), offset)); + + offset = dest.size(); + Serialize(-2147483648LL, &dest); + EXPECT_EQ(-2147483648LL, Deserialize(dest.c_str(), offset)); +} + +TEST(SerializeTest, SerializeI8) { + std::string dest; + Serialize(0, &dest); + EXPECT_EQ(std::string({'\x00'}), dest); + + dest.clear(); + Serialize(1, &dest); + EXPECT_EQ(std::string({'\x01'}), dest); + + + dest.clear(); + Serialize(-1, &dest); + EXPECT_EQ(std::string({'\xff'}), dest); + + dest.clear(); + Serialize(127, &dest); + EXPECT_EQ(std::string({'\x7f'}), dest); + + dest.clear(); + Serialize(-128, &dest); + EXPECT_EQ(std::string({'\x80'}), dest); +} + +TEST(SerializeTest, DeserializeI8) { + std::string dest; + std::size_t offset = dest.size(); + Serialize(0, &dest); + EXPECT_EQ(0, Deserialize(dest.c_str(), offset)); + + offset = dest.size(); + Serialize(1, &dest); + EXPECT_EQ(1, Deserialize(dest.c_str(), offset)); + + offset = dest.size(); + Serialize(-1, &dest); + EXPECT_EQ(-1, Deserialize(dest.c_str(), offset)); + + offset = dest.size(); + Serialize(127, &dest); + EXPECT_EQ(127, Deserialize(dest.c_str(), offset)); + + offset = dest.size(); + Serialize(-128, &dest); + EXPECT_EQ(-128, Deserialize(dest.c_str(), offset)); +} + +} // namespace cassandra +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/utilities/merge_operators/cassandra/test_utils.cc b/utilities/merge_operators/cassandra/test_utils.cc new file mode 100644 index 000000000..b742e2bc7 --- /dev/null +++ b/utilities/merge_operators/cassandra/test_utils.cc @@ -0,0 +1,65 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// This source code is also licensed under the GPLv2 license found in the +// COPYING file in the root directory of this source tree. + +#include "test_utils.h" + +namespace rocksdb { +namespace cassandra { +const char kData[] = {'d', 'a', 't', 'a'}; +const char kExpiringData[] = {'e', 'd', 'a', 't', 'a'}; +const int32_t kLocalDeletionTime = 1; +const int32_t kTtl = 100; +const int8_t kColumn = 0; +const int8_t kTombstone = 1; +const int8_t kExpiringColumn = 2; + +std::unique_ptr CreateTestColumn(int8_t mask, + int8_t index, + int64_t timestamp) { + if ((mask & ColumnTypeMask::DELETION_MASK) != 0) { + return std::unique_ptr(new Tombstone( + mask, index, kLocalDeletionTime, timestamp)); + } else if ((mask & ColumnTypeMask::EXPIRATION_MASK) != 0) { + return std::unique_ptr(new ExpiringColumn( + mask, index, timestamp, sizeof(kExpiringData), kExpiringData, kTtl)); + } else { + return std::unique_ptr( + new Column(mask, index, timestamp, sizeof(kData), kData)); + } +} + +RowValue CreateTestRowValue( + std::vector> column_specs) { + std::vector> columns; + int64_t last_modified_time = 0; + for (auto spec: column_specs) { + auto c = CreateTestColumn(std::get<0>(spec), std::get<1>(spec), + std::get<2>(spec)); + last_modified_time = std::max(last_modified_time, c -> Timestamp()); + columns.push_back(std::move(c)); + } + return RowValue(std::move(columns), last_modified_time); +} + +RowValue CreateRowTombstone(int64_t timestamp) { + return RowValue(kLocalDeletionTime, timestamp); +} + +void VerifyRowValueColumns( + std::vector> &columns, + std::size_t index_of_vector, + int8_t expected_mask, + int8_t expected_index, + int64_t expected_timestamp +) { + EXPECT_EQ(expected_timestamp, columns[index_of_vector]->Timestamp()); + EXPECT_EQ(expected_mask, columns[index_of_vector]->Mask()); + EXPECT_EQ(expected_index, columns[index_of_vector]->Index()); +} + +} +} diff --git a/utilities/merge_operators/cassandra/test_utils.h b/utilities/merge_operators/cassandra/test_utils.h new file mode 100644 index 000000000..96e6ded0f --- /dev/null +++ b/utilities/merge_operators/cassandra/test_utils.h @@ -0,0 +1,43 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// This source code is also licensed under the GPLv2 license found in the +// COPYING file in the root directory of this source tree. + +#pragma once +#include +#include "util/testharness.h" +#include "utilities/merge_operators/cassandra/format.h" +#include "utilities/merge_operators/cassandra/serialize.h" + +namespace rocksdb { +namespace cassandra { +extern const char kData[]; +extern const char kExpiringData[]; +extern const int32_t kLocalDeletionTime; +extern const int32_t kTtl; +extern const int8_t kColumn; +extern const int8_t kTombstone; +extern const int8_t kExpiringColumn; + + +std::unique_ptr CreateTestColumn(int8_t mask, + int8_t index, + int64_t timestamp); + +RowValue CreateTestRowValue( + std::vector> column_specs); + +RowValue CreateRowTombstone(int64_t timestamp); + +void VerifyRowValueColumns( + std::vector> &columns, + std::size_t index_of_vector, + int8_t expected_mask, + int8_t expected_index, + int64_t expected_timestamp +); + +} +}