rocksdb/utilities/cassandra/format.cc
Pengchao Wang 534c255c7a Cassandra compaction filter for purge expired columns and rows
Summary:
Major changes in this PR:
* Implement CassandraCompactionFilter to remove expired columns and rows (if all column expired)
* Move cassandra related code from utilities/merge_operators/cassandra to utilities/cassandra/*
* Switch to use shared_ptr<> from uniqu_ptr for Column membership management in RowValue. Since columns do have multiple owners in Merge and GC process, use shared_ptr helps make RowValue immutable.
* Rename cassandra_merge_test to cassandra_functional_test and add two TTL compaction related tests there.
Closes https://github.com/facebook/rocksdb/pull/2588

Differential Revision: D5430010

Pulled By: wpc

fbshipit-source-id: 9566c21e06de17491d486a68c70f52d501f27687
2017-07-21 14:57:44 -07:00

363 lines
11 KiB
C++

// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "format.h"
#include <algorithm>
#include <map>
#include <memory>
#include "utilities/cassandra/serialize.h"
namespace rocksdb {
namespace cassandra {
namespace {
const int32_t kDefaultLocalDeletionTime =
std::numeric_limits<int32_t>::max();
const int64_t kDefaultMarkedForDeleteAt =
std::numeric_limits<int64_t>::min();
}
ColumnBase::ColumnBase(int8_t mask, int8_t index)
: mask_(mask), index_(index) {}
std::size_t ColumnBase::Size() const {
return sizeof(mask_) + sizeof(index_);
}
int8_t ColumnBase::Mask() const {
return mask_;
}
int8_t ColumnBase::Index() const {
return index_;
}
void ColumnBase::Serialize(std::string* dest) const {
rocksdb::cassandra::Serialize<int8_t>(mask_, dest);
rocksdb::cassandra::Serialize<int8_t>(index_, dest);
}
std::shared_ptr<ColumnBase> ColumnBase::Deserialize(const char* src,
std::size_t offset) {
int8_t mask = rocksdb::cassandra::Deserialize<int8_t>(src, offset);
if ((mask & ColumnTypeMask::DELETION_MASK) != 0) {
return Tombstone::Deserialize(src, offset);
} else if ((mask & ColumnTypeMask::EXPIRATION_MASK) != 0) {
return ExpiringColumn::Deserialize(src, offset);
} else {
return Column::Deserialize(src, offset);
}
}
Column::Column(
int8_t mask,
int8_t index,
int64_t timestamp,
int32_t value_size,
const char* value
) : ColumnBase(mask, index), timestamp_(timestamp),
value_size_(value_size), value_(value) {}
int64_t Column::Timestamp() const {
return timestamp_;
}
std::size_t Column::Size() const {
return ColumnBase::Size() + sizeof(timestamp_) + sizeof(value_size_)
+ value_size_;
}
void Column::Serialize(std::string* dest) const {
ColumnBase::Serialize(dest);
rocksdb::cassandra::Serialize<int64_t>(timestamp_, dest);
rocksdb::cassandra::Serialize<int32_t>(value_size_, dest);
dest->append(value_, value_size_);
}
std::shared_ptr<Column> Column::Deserialize(const char *src,
std::size_t offset) {
int8_t mask = rocksdb::cassandra::Deserialize<int8_t>(src, offset);
offset += sizeof(mask);
int8_t index = rocksdb::cassandra::Deserialize<int8_t>(src, offset);
offset += sizeof(index);
int64_t timestamp = rocksdb::cassandra::Deserialize<int64_t>(src, offset);
offset += sizeof(timestamp);
int32_t value_size = rocksdb::cassandra::Deserialize<int32_t>(src, offset);
offset += sizeof(value_size);
return std::make_shared<Column>(
mask, index, timestamp, value_size, src + offset);
}
ExpiringColumn::ExpiringColumn(
int8_t mask,
int8_t index,
int64_t timestamp,
int32_t value_size,
const char* value,
int32_t ttl
) : Column(mask, index, timestamp, value_size, value),
ttl_(ttl) {}
std::size_t ExpiringColumn::Size() const {
return Column::Size() + sizeof(ttl_);
}
void ExpiringColumn::Serialize(std::string* dest) const {
Column::Serialize(dest);
rocksdb::cassandra::Serialize<int32_t>(ttl_, dest);
}
std::chrono::time_point<std::chrono::system_clock> ExpiringColumn::TimePoint() const {
return std::chrono::time_point<std::chrono::system_clock>(std::chrono::microseconds(Timestamp()));
}
std::chrono::seconds ExpiringColumn::Ttl() const {
return std::chrono::seconds(ttl_);
}
bool ExpiringColumn::Expired() const {
return TimePoint() + Ttl() < std::chrono::system_clock::now();
}
std::shared_ptr<Tombstone> ExpiringColumn::ToTombstone() const {
auto expired_at = (TimePoint() + Ttl()).time_since_epoch();
int32_t local_deletion_time = static_cast<int32_t>(
std::chrono::duration_cast<std::chrono::seconds>(expired_at).count());
int64_t marked_for_delete_at =
std::chrono::duration_cast<std::chrono::microseconds>(expired_at).count();
return std::make_shared<Tombstone>(
ColumnTypeMask::DELETION_MASK,
Index(),
local_deletion_time,
marked_for_delete_at);
}
std::shared_ptr<ExpiringColumn> ExpiringColumn::Deserialize(
const char *src,
std::size_t offset) {
int8_t mask = rocksdb::cassandra::Deserialize<int8_t>(src, offset);
offset += sizeof(mask);
int8_t index = rocksdb::cassandra::Deserialize<int8_t>(src, offset);
offset += sizeof(index);
int64_t timestamp = rocksdb::cassandra::Deserialize<int64_t>(src, offset);
offset += sizeof(timestamp);
int32_t value_size = rocksdb::cassandra::Deserialize<int32_t>(src, offset);
offset += sizeof(value_size);
const char* value = src + offset;
offset += value_size;
int32_t ttl = rocksdb::cassandra::Deserialize<int32_t>(src, offset);
return std::make_shared<ExpiringColumn>(
mask, index, timestamp, value_size, value, ttl);
}
Tombstone::Tombstone(
int8_t mask,
int8_t index,
int32_t local_deletion_time,
int64_t marked_for_delete_at
) : ColumnBase(mask, index), local_deletion_time_(local_deletion_time),
marked_for_delete_at_(marked_for_delete_at) {}
int64_t Tombstone::Timestamp() const {
return marked_for_delete_at_;
}
std::size_t Tombstone::Size() const {
return ColumnBase::Size() + sizeof(local_deletion_time_)
+ sizeof(marked_for_delete_at_);
}
void Tombstone::Serialize(std::string* dest) const {
ColumnBase::Serialize(dest);
rocksdb::cassandra::Serialize<int32_t>(local_deletion_time_, dest);
rocksdb::cassandra::Serialize<int64_t>(marked_for_delete_at_, dest);
}
std::shared_ptr<Tombstone> Tombstone::Deserialize(const char *src,
std::size_t offset) {
int8_t mask = rocksdb::cassandra::Deserialize<int8_t>(src, offset);
offset += sizeof(mask);
int8_t index = rocksdb::cassandra::Deserialize<int8_t>(src, offset);
offset += sizeof(index);
int32_t local_deletion_time =
rocksdb::cassandra::Deserialize<int32_t>(src, offset);
offset += sizeof(int32_t);
int64_t marked_for_delete_at =
rocksdb::cassandra::Deserialize<int64_t>(src, offset);
return std::make_shared<Tombstone>(
mask, index, local_deletion_time, marked_for_delete_at);
}
RowValue::RowValue(int32_t local_deletion_time, int64_t marked_for_delete_at)
: local_deletion_time_(local_deletion_time),
marked_for_delete_at_(marked_for_delete_at), columns_(),
last_modified_time_(0) {}
RowValue::RowValue(Columns columns,
int64_t last_modified_time)
: local_deletion_time_(kDefaultLocalDeletionTime),
marked_for_delete_at_(kDefaultMarkedForDeleteAt),
columns_(std::move(columns)), last_modified_time_(last_modified_time) {}
std::size_t RowValue::Size() const {
std::size_t size = sizeof(local_deletion_time_)
+ sizeof(marked_for_delete_at_);
for (const auto& column : columns_) {
size += column -> Size();
}
return size;
}
int64_t RowValue::LastModifiedTime() const {
if (IsTombstone()) {
return marked_for_delete_at_;
} else {
return last_modified_time_;
}
}
bool RowValue::IsTombstone() const {
return marked_for_delete_at_ > kDefaultMarkedForDeleteAt;
}
void RowValue::Serialize(std::string* dest) const {
rocksdb::cassandra::Serialize<int32_t>(local_deletion_time_, dest);
rocksdb::cassandra::Serialize<int64_t>(marked_for_delete_at_, dest);
for (const auto& column : columns_) {
column -> Serialize(dest);
}
}
RowValue RowValue::PurgeTtl(bool* changed) const {
*changed = false;
Columns new_columns;
for (auto& column : columns_) {
if(column->Mask() == ColumnTypeMask::EXPIRATION_MASK) {
std::shared_ptr<ExpiringColumn> expiring_column =
std::static_pointer_cast<ExpiringColumn>(column);
if(expiring_column->Expired()){
*changed = true;
continue;
}
}
new_columns.push_back(column);
}
return RowValue(std::move(new_columns), last_modified_time_);
}
RowValue RowValue::ExpireTtl(bool* changed) const {
*changed = false;
Columns new_columns;
for (auto& column : columns_) {
if(column->Mask() == ColumnTypeMask::EXPIRATION_MASK) {
std::shared_ptr<ExpiringColumn> expiring_column =
std::static_pointer_cast<ExpiringColumn>(column);
if(expiring_column->Expired()) {
shared_ptr<Tombstone> tombstone = expiring_column->ToTombstone();
new_columns.push_back(tombstone);
*changed = true;
continue;
}
}
new_columns.push_back(column);
}
return RowValue(std::move(new_columns), last_modified_time_);
}
bool RowValue::Empty() const {
return columns_.empty();
}
RowValue RowValue::Deserialize(const char *src, std::size_t size) {
std::size_t offset = 0;
assert(size >= sizeof(local_deletion_time_) + sizeof(marked_for_delete_at_));
int32_t local_deletion_time =
rocksdb::cassandra::Deserialize<int32_t>(src, offset);
offset += sizeof(int32_t);
int64_t marked_for_delete_at =
rocksdb::cassandra::Deserialize<int64_t>(src, offset);
offset += sizeof(int64_t);
if (offset == size) {
return RowValue(local_deletion_time, marked_for_delete_at);
}
assert(local_deletion_time == kDefaultLocalDeletionTime);
assert(marked_for_delete_at == kDefaultMarkedForDeleteAt);
Columns columns;
int64_t last_modified_time = 0;
while (offset < size) {
auto c = ColumnBase::Deserialize(src, offset);
offset += c -> Size();
assert(offset <= size);
last_modified_time = std::max(last_modified_time, c -> Timestamp());
columns.push_back(std::move(c));
}
return RowValue(std::move(columns), last_modified_time);
}
// Merge multiple row values into one.
// For each column in rows with same index, we pick the one with latest
// timestamp. And we also take row tombstone into consideration, by iterating
// each row from reverse timestamp order, and stop once we hit the first
// row tombstone.
RowValue RowValue::Merge(std::vector<RowValue>&& values) {
assert(values.size() > 0);
if (values.size() == 1) {
return std::move(values[0]);
}
// Merge columns by their last modified time, and skip once we hit
// a row tombstone.
std::sort(values.begin(), values.end(),
[](const RowValue& r1, const RowValue& r2) {
return r1.LastModifiedTime() > r2.LastModifiedTime();
});
std::map<int8_t, std::shared_ptr<ColumnBase>> merged_columns;
int64_t tombstone_timestamp = 0;
for (auto& value : values) {
if (value.IsTombstone()) {
if (merged_columns.size() == 0) {
return std::move(value);
}
tombstone_timestamp = value.LastModifiedTime();
break;
}
for (auto& column : value.columns_) {
int8_t index = column->Index();
if (merged_columns.find(index) == merged_columns.end()) {
merged_columns[index] = column;
} else {
if (column->Timestamp() > merged_columns[index]->Timestamp()) {
merged_columns[index] = column;
}
}
}
}
int64_t last_modified_time = 0;
Columns columns;
for (auto& pair: merged_columns) {
// For some row, its last_modified_time > row tombstone_timestamp, but
// it might have rows whose timestamp is ealier than tombstone, so we
// ned to filter these rows.
if (pair.second->Timestamp() <= tombstone_timestamp) {
continue;
}
last_modified_time = std::max(last_modified_time, pair.second->Timestamp());
columns.push_back(std::move(pair.second));
}
return RowValue(std::move(columns), last_modified_time);
}
} // namepsace cassandrda
} // namespace rocksdb