11526252cc
Summary: PinnableSlice Summary: Currently the point lookup values are copied to a string provided by the user. This incures an extra memcpy cost. This patch allows doing point lookup via a PinnableSlice which pins the source memory location (instead of copying their content) and releases them after the content is consumed by the user. The old API of Get(string) is translated to the new API underneath. Here is the summary for improvements: value 100 byte: 1.8% regular, 1.2% merge values value 1k byte: 11.5% regular, 7.5% merge values value 10k byte: 26% regular, 29.9% merge values The improvement for merge could be more if we extend this approach to pin the merge output and delay the full merge operation until the user actually needs it. We have put that for future work. PS: Sometimes we observe a small decrease in performance when switching from t5452014 to this patch but with the old Get(string) API. The d Closes https://github.com/facebook/rocksdb/pull/1756 Differential Revision: D4391738 Pulled By: maysamyabandeh fbshipit-source-id: 6f3edd3
314 lines
10 KiB
C++
314 lines
10 KiB
C++
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
#include "utilities/ttl/db_ttl_impl.h"
|
|
|
|
#include "db/filename.h"
|
|
#include "db/write_batch_internal.h"
|
|
#include "rocksdb/convenience.h"
|
|
#include "rocksdb/env.h"
|
|
#include "rocksdb/iterator.h"
|
|
#include "rocksdb/utilities/db_ttl.h"
|
|
#include "util/coding.h"
|
|
|
|
namespace rocksdb {
|
|
|
|
void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options,
|
|
Env* env) {
|
|
if (options->compaction_filter) {
|
|
options->compaction_filter =
|
|
new TtlCompactionFilter(ttl, env, options->compaction_filter);
|
|
} else {
|
|
options->compaction_filter_factory =
|
|
std::shared_ptr<CompactionFilterFactory>(new TtlCompactionFilterFactory(
|
|
ttl, env, options->compaction_filter_factory));
|
|
}
|
|
|
|
if (options->merge_operator) {
|
|
options->merge_operator.reset(
|
|
new TtlMergeOperator(options->merge_operator, env));
|
|
}
|
|
}
|
|
|
|
// Open the db inside DBWithTTLImpl because options needs pointer to its ttl
|
|
DBWithTTLImpl::DBWithTTLImpl(DB* db) : DBWithTTL(db) {}
|
|
|
|
DBWithTTLImpl::~DBWithTTLImpl() {
|
|
// Need to stop background compaction before getting rid of the filter
|
|
CancelAllBackgroundWork(db_, /* wait = */ true);
|
|
delete GetOptions().compaction_filter;
|
|
}
|
|
|
|
Status UtilityDB::OpenTtlDB(const Options& options, const std::string& dbname,
|
|
StackableDB** dbptr, int32_t ttl, bool read_only) {
|
|
DBWithTTL* db;
|
|
Status s = DBWithTTL::Open(options, dbname, &db, ttl, read_only);
|
|
if (s.ok()) {
|
|
*dbptr = db;
|
|
} else {
|
|
*dbptr = nullptr;
|
|
}
|
|
return s;
|
|
}
|
|
|
|
Status DBWithTTL::Open(const Options& options, const std::string& dbname,
|
|
DBWithTTL** dbptr, int32_t ttl, bool read_only) {
|
|
|
|
DBOptions db_options(options);
|
|
ColumnFamilyOptions cf_options(options);
|
|
std::vector<ColumnFamilyDescriptor> column_families;
|
|
column_families.push_back(
|
|
ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
|
|
std::vector<ColumnFamilyHandle*> handles;
|
|
Status s = DBWithTTL::Open(db_options, dbname, column_families, &handles,
|
|
dbptr, {ttl}, read_only);
|
|
if (s.ok()) {
|
|
assert(handles.size() == 1);
|
|
// i can delete the handle since DBImpl is always holding a reference to
|
|
// default column family
|
|
delete handles[0];
|
|
}
|
|
return s;
|
|
}
|
|
|
|
Status DBWithTTL::Open(
|
|
const DBOptions& db_options, const std::string& dbname,
|
|
const std::vector<ColumnFamilyDescriptor>& column_families,
|
|
std::vector<ColumnFamilyHandle*>* handles, DBWithTTL** dbptr,
|
|
std::vector<int32_t> ttls, bool read_only) {
|
|
|
|
if (ttls.size() != column_families.size()) {
|
|
return Status::InvalidArgument(
|
|
"ttls size has to be the same as number of column families");
|
|
}
|
|
|
|
std::vector<ColumnFamilyDescriptor> column_families_sanitized =
|
|
column_families;
|
|
for (size_t i = 0; i < column_families_sanitized.size(); ++i) {
|
|
DBWithTTLImpl::SanitizeOptions(
|
|
ttls[i], &column_families_sanitized[i].options,
|
|
db_options.env == nullptr ? Env::Default() : db_options.env);
|
|
}
|
|
DB* db;
|
|
|
|
Status st;
|
|
if (read_only) {
|
|
st = DB::OpenForReadOnly(db_options, dbname, column_families_sanitized,
|
|
handles, &db);
|
|
} else {
|
|
st = DB::Open(db_options, dbname, column_families_sanitized, handles, &db);
|
|
}
|
|
if (st.ok()) {
|
|
*dbptr = new DBWithTTLImpl(db);
|
|
} else {
|
|
*dbptr = nullptr;
|
|
}
|
|
return st;
|
|
}
|
|
|
|
Status DBWithTTLImpl::CreateColumnFamilyWithTtl(
|
|
const ColumnFamilyOptions& options, const std::string& column_family_name,
|
|
ColumnFamilyHandle** handle, int ttl) {
|
|
ColumnFamilyOptions sanitized_options = options;
|
|
DBWithTTLImpl::SanitizeOptions(ttl, &sanitized_options, GetEnv());
|
|
|
|
return DBWithTTL::CreateColumnFamily(sanitized_options, column_family_name,
|
|
handle);
|
|
}
|
|
|
|
Status DBWithTTLImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
|
|
const std::string& column_family_name,
|
|
ColumnFamilyHandle** handle) {
|
|
return CreateColumnFamilyWithTtl(options, column_family_name, handle, 0);
|
|
}
|
|
|
|
// Appends the current timestamp to the string.
|
|
// Returns false if could not get the current_time, true if append succeeds
|
|
Status DBWithTTLImpl::AppendTS(const Slice& val, std::string* val_with_ts,
|
|
Env* env) {
|
|
val_with_ts->reserve(kTSLength + val.size());
|
|
char ts_string[kTSLength];
|
|
int64_t curtime;
|
|
Status st = env->GetCurrentTime(&curtime);
|
|
if (!st.ok()) {
|
|
return st;
|
|
}
|
|
EncodeFixed32(ts_string, (int32_t)curtime);
|
|
val_with_ts->append(val.data(), val.size());
|
|
val_with_ts->append(ts_string, kTSLength);
|
|
return st;
|
|
}
|
|
|
|
// Returns corruption if the length of the string is lesser than timestamp, or
|
|
// timestamp refers to a time lesser than ttl-feature release time
|
|
Status DBWithTTLImpl::SanityCheckTimestamp(const Slice& str) {
|
|
if (str.size() < kTSLength) {
|
|
return Status::Corruption("Error: value's length less than timestamp's\n");
|
|
}
|
|
// Checks that TS is not lesser than kMinTimestamp
|
|
// Gaurds against corruption & normal database opened incorrectly in ttl mode
|
|
int32_t timestamp_value = DecodeFixed32(str.data() + str.size() - kTSLength);
|
|
if (timestamp_value < kMinTimestamp) {
|
|
return Status::Corruption("Error: Timestamp < ttl feature release time!\n");
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
// Checks if the string is stale or not according to TTl provided
|
|
bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl, Env* env) {
|
|
if (ttl <= 0) { // Data is fresh if TTL is non-positive
|
|
return false;
|
|
}
|
|
int64_t curtime;
|
|
if (!env->GetCurrentTime(&curtime).ok()) {
|
|
return false; // Treat the data as fresh if could not get current time
|
|
}
|
|
int32_t timestamp_value =
|
|
DecodeFixed32(value.data() + value.size() - kTSLength);
|
|
return (timestamp_value + ttl) < curtime;
|
|
}
|
|
|
|
// Strips the TS from the end of the slice
|
|
Status DBWithTTLImpl::StripTS(PinnableSlice* pinnable_val) {
|
|
Status st;
|
|
if (pinnable_val->size() < kTSLength) {
|
|
return Status::Corruption("Bad timestamp in key-value");
|
|
}
|
|
// Erasing characters which hold the TS
|
|
pinnable_val->remove_suffix(kTSLength);
|
|
return st;
|
|
}
|
|
|
|
// Strips the TS from the end of the string
|
|
Status DBWithTTLImpl::StripTS(std::string* str) {
|
|
Status st;
|
|
if (str->length() < kTSLength) {
|
|
return Status::Corruption("Bad timestamp in key-value");
|
|
}
|
|
// Erasing characters which hold the TS
|
|
str->erase(str->length() - kTSLength, kTSLength);
|
|
return st;
|
|
}
|
|
|
|
Status DBWithTTLImpl::Put(const WriteOptions& options,
|
|
ColumnFamilyHandle* column_family, const Slice& key,
|
|
const Slice& val) {
|
|
WriteBatch batch;
|
|
batch.Put(column_family, key, val);
|
|
return Write(options, &batch);
|
|
}
|
|
|
|
Status DBWithTTLImpl::Get(const ReadOptions& options,
|
|
ColumnFamilyHandle* column_family, const Slice& key,
|
|
PinnableSlice* value) {
|
|
Status st = db_->Get(options, column_family, key, value);
|
|
if (!st.ok()) {
|
|
return st;
|
|
}
|
|
st = SanityCheckTimestamp(*value);
|
|
if (!st.ok()) {
|
|
return st;
|
|
}
|
|
return StripTS(value);
|
|
}
|
|
|
|
std::vector<Status> DBWithTTLImpl::MultiGet(
|
|
const ReadOptions& options,
|
|
const std::vector<ColumnFamilyHandle*>& column_family,
|
|
const std::vector<Slice>& keys, std::vector<std::string>* values) {
|
|
auto statuses = db_->MultiGet(options, column_family, keys, values);
|
|
for (size_t i = 0; i < keys.size(); ++i) {
|
|
if (!statuses[i].ok()) {
|
|
continue;
|
|
}
|
|
statuses[i] = SanityCheckTimestamp((*values)[i]);
|
|
if (!statuses[i].ok()) {
|
|
continue;
|
|
}
|
|
statuses[i] = StripTS(&(*values)[i]);
|
|
}
|
|
return statuses;
|
|
}
|
|
|
|
bool DBWithTTLImpl::KeyMayExist(const ReadOptions& options,
|
|
ColumnFamilyHandle* column_family,
|
|
const Slice& key, std::string* value,
|
|
bool* value_found) {
|
|
bool ret = db_->KeyMayExist(options, column_family, key, value, value_found);
|
|
if (ret && value != nullptr && value_found != nullptr && *value_found) {
|
|
if (!SanityCheckTimestamp(*value).ok() || !StripTS(value).ok()) {
|
|
return false;
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
Status DBWithTTLImpl::Merge(const WriteOptions& options,
|
|
ColumnFamilyHandle* column_family, const Slice& key,
|
|
const Slice& value) {
|
|
WriteBatch batch;
|
|
batch.Merge(column_family, key, value);
|
|
return Write(options, &batch);
|
|
}
|
|
|
|
Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
|
|
class Handler : public WriteBatch::Handler {
|
|
public:
|
|
explicit Handler(Env* env) : env_(env) {}
|
|
WriteBatch updates_ttl;
|
|
Status batch_rewrite_status;
|
|
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
|
|
const Slice& value) override {
|
|
std::string value_with_ts;
|
|
Status st = AppendTS(value, &value_with_ts, env_);
|
|
if (!st.ok()) {
|
|
batch_rewrite_status = st;
|
|
} else {
|
|
WriteBatchInternal::Put(&updates_ttl, column_family_id, key,
|
|
value_with_ts);
|
|
}
|
|
return Status::OK();
|
|
}
|
|
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
|
|
const Slice& value) override {
|
|
std::string value_with_ts;
|
|
Status st = AppendTS(value, &value_with_ts, env_);
|
|
if (!st.ok()) {
|
|
batch_rewrite_status = st;
|
|
} else {
|
|
WriteBatchInternal::Merge(&updates_ttl, column_family_id, key,
|
|
value_with_ts);
|
|
}
|
|
return Status::OK();
|
|
}
|
|
virtual Status DeleteCF(uint32_t column_family_id,
|
|
const Slice& key) override {
|
|
WriteBatchInternal::Delete(&updates_ttl, column_family_id, key);
|
|
return Status::OK();
|
|
}
|
|
virtual void LogData(const Slice& blob) override {
|
|
updates_ttl.PutLogData(blob);
|
|
}
|
|
|
|
private:
|
|
Env* env_;
|
|
};
|
|
Handler handler(GetEnv());
|
|
updates->Iterate(&handler);
|
|
if (!handler.batch_rewrite_status.ok()) {
|
|
return handler.batch_rewrite_status;
|
|
} else {
|
|
return db_->Write(opts, &(handler.updates_ttl));
|
|
}
|
|
}
|
|
|
|
Iterator* DBWithTTLImpl::NewIterator(const ReadOptions& opts,
|
|
ColumnFamilyHandle* column_family) {
|
|
return new TtlIterator(db_->NewIterator(opts, column_family));
|
|
}
|
|
|
|
} // namespace rocksdb
|
|
#endif // ROCKSDB_LITE
|