Use PinnableSlice in Transactions
Summary: The ::Get from DB is not augmented with an overload method that takes a PinnableSlice instead of a string. Transactions however are not yet upgraded to use the new API. As a result, transaction users such as MyRocks cannot benefit from it. This patch updates the transactional API with a PinnableSlice overload. Closes https://github.com/facebook/rocksdb/pull/2736 Differential Revision: D5645770 Pulled By: maysamyabandeh fbshipit-source-id: f6af520df902f842de1bcf99bed3e8dfc43ad96d
This commit is contained in:
parent
1dfcdb15f9
commit
ccf7f833e3
@ -169,8 +169,26 @@ class Transaction {
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
std::string* value) = 0;
|
||||
|
||||
// An overload of the the above method that receives a PinnableSlice
|
||||
// For backward compatiblity a default implementation is provided
|
||||
virtual Status Get(const ReadOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
PinnableSlice* pinnable_val) {
|
||||
assert(pinnable_val != nullptr);
|
||||
auto s = Get(options, column_family, key, pinnable_val->GetSelf());
|
||||
pinnable_val->PinSelf();
|
||||
return s;
|
||||
}
|
||||
|
||||
virtual Status Get(const ReadOptions& options, const Slice& key,
|
||||
std::string* value) = 0;
|
||||
virtual Status Get(const ReadOptions& options, const Slice& key,
|
||||
PinnableSlice* pinnable_val) {
|
||||
assert(pinnable_val != nullptr);
|
||||
auto s = Get(options, key, pinnable_val->GetSelf());
|
||||
pinnable_val->PinSelf();
|
||||
return s;
|
||||
}
|
||||
|
||||
virtual std::vector<Status> MultiGet(
|
||||
const ReadOptions& options,
|
||||
@ -212,6 +230,22 @@ class Transaction {
|
||||
const Slice& key, std::string* value,
|
||||
bool exclusive = true) = 0;
|
||||
|
||||
// An overload of the the above method that receives a PinnableSlice
|
||||
// For backward compatiblity a default implementation is provided
|
||||
virtual Status GetForUpdate(const ReadOptions& options,
|
||||
ColumnFamilyHandle* column_family,
|
||||
const Slice& key, PinnableSlice* pinnable_val,
|
||||
bool exclusive = true) {
|
||||
if (pinnable_val == nullptr) {
|
||||
std::string* null_str = nullptr;
|
||||
return GetForUpdate(options, key, null_str);
|
||||
} else {
|
||||
auto s = GetForUpdate(options, key, pinnable_val->GetSelf());
|
||||
pinnable_val->PinSelf();
|
||||
return s;
|
||||
}
|
||||
}
|
||||
|
||||
virtual Status GetForUpdate(const ReadOptions& options, const Slice& key,
|
||||
std::string* value, bool exclusive = true) = 0;
|
||||
|
||||
|
@ -186,10 +186,20 @@ class WriteBatchWithIndex : public WriteBatchBase {
|
||||
// regardless).
|
||||
Status GetFromBatchAndDB(DB* db, const ReadOptions& read_options,
|
||||
const Slice& key, std::string* value);
|
||||
|
||||
// An overload of the the above method that receives a PinnableSlice
|
||||
Status GetFromBatchAndDB(DB* db, const ReadOptions& read_options,
|
||||
const Slice& key, PinnableSlice* value);
|
||||
|
||||
Status GetFromBatchAndDB(DB* db, const ReadOptions& read_options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
std::string* value);
|
||||
|
||||
// An overload of the the above method that receives a PinnableSlice
|
||||
Status GetFromBatchAndDB(DB* db, const ReadOptions& read_options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
PinnableSlice* value);
|
||||
|
||||
// Records the state of the batch for future calls to RollbackToSavePoint().
|
||||
// May be called multiple times to set multiple save points.
|
||||
void SetSavePoint() override;
|
||||
|
@ -181,8 +181,21 @@ Status TransactionBaseImpl::RollbackToSavePoint() {
|
||||
Status TransactionBaseImpl::Get(const ReadOptions& read_options,
|
||||
ColumnFamilyHandle* column_family,
|
||||
const Slice& key, std::string* value) {
|
||||
assert(value != nullptr);
|
||||
PinnableSlice pinnable_val(value);
|
||||
assert(!pinnable_val.IsPinned());
|
||||
auto s = Get(read_options, column_family, key, &pinnable_val);
|
||||
if (s.ok() && pinnable_val.IsPinned()) {
|
||||
value->assign(pinnable_val.data(), pinnable_val.size());
|
||||
} // else value is already assigned
|
||||
return s;
|
||||
}
|
||||
|
||||
Status TransactionBaseImpl::Get(const ReadOptions& read_options,
|
||||
ColumnFamilyHandle* column_family,
|
||||
const Slice& key, PinnableSlice* pinnable_val) {
|
||||
return write_batch_.GetFromBatchAndDB(db_, read_options, column_family, key,
|
||||
value);
|
||||
pinnable_val);
|
||||
}
|
||||
|
||||
Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options,
|
||||
@ -192,7 +205,26 @@ Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options,
|
||||
Status s = TryLock(column_family, key, true /* read_only */, exclusive);
|
||||
|
||||
if (s.ok() && value != nullptr) {
|
||||
s = Get(read_options, column_family, key, value);
|
||||
assert(value != nullptr);
|
||||
PinnableSlice pinnable_val(value);
|
||||
assert(!pinnable_val.IsPinned());
|
||||
s = Get(read_options, column_family, key, &pinnable_val);
|
||||
if (s.ok() && pinnable_val.IsPinned()) {
|
||||
value->assign(pinnable_val.data(), pinnable_val.size());
|
||||
} // else value is already assigned
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options,
|
||||
ColumnFamilyHandle* column_family,
|
||||
const Slice& key,
|
||||
PinnableSlice* pinnable_val,
|
||||
bool exclusive) {
|
||||
Status s = TryLock(column_family, key, true /* read_only */, exclusive);
|
||||
|
||||
if (s.ok() && pinnable_val != nullptr) {
|
||||
s = Get(read_options, column_family, key, pinnable_val);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
@ -46,18 +46,27 @@ class TransactionBaseImpl : public Transaction {
|
||||
|
||||
Status RollbackToSavePoint() override;
|
||||
|
||||
using Transaction::Get;
|
||||
Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
|
||||
const Slice& key, std::string* value) override;
|
||||
|
||||
Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
|
||||
const Slice& key, PinnableSlice* value) override;
|
||||
|
||||
Status Get(const ReadOptions& options, const Slice& key,
|
||||
std::string* value) override {
|
||||
return Get(options, db_->DefaultColumnFamily(), key, value);
|
||||
}
|
||||
|
||||
using Transaction::GetForUpdate;
|
||||
Status GetForUpdate(const ReadOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
std::string* value, bool exclusive) override;
|
||||
|
||||
Status GetForUpdate(const ReadOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
PinnableSlice* pinnable_val, bool exclusive) override;
|
||||
|
||||
Status GetForUpdate(const ReadOptions& options, const Slice& key,
|
||||
std::string* value, bool exclusive) override {
|
||||
return GetForUpdate(options, db_->DefaultColumnFamily(), key, value,
|
||||
|
@ -385,8 +385,8 @@ class WBWIIteratorImpl : public WBWIIterator {
|
||||
};
|
||||
|
||||
struct WriteBatchWithIndex::Rep {
|
||||
Rep(const Comparator* index_comparator, size_t reserved_bytes = 0,
|
||||
size_t max_bytes = 0, bool _overwrite_key = false)
|
||||
explicit Rep(const Comparator* index_comparator, size_t reserved_bytes = 0,
|
||||
size_t max_bytes = 0, bool _overwrite_key = false)
|
||||
: write_batch(reserved_bytes, max_bytes),
|
||||
comparator(index_comparator, &write_batch),
|
||||
skip_list(comparator, &arena),
|
||||
@ -743,8 +743,23 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
|
||||
const ReadOptions& read_options,
|
||||
const Slice& key,
|
||||
std::string* value) {
|
||||
assert(value != nullptr);
|
||||
PinnableSlice pinnable_val(value);
|
||||
assert(!pinnable_val.IsPinned());
|
||||
auto s = GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key,
|
||||
&pinnable_val);
|
||||
if (s.ok() && pinnable_val.IsPinned()) {
|
||||
value->assign(pinnable_val.data(), pinnable_val.size());
|
||||
} // else value is already assigned
|
||||
return s;
|
||||
}
|
||||
|
||||
Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
|
||||
const ReadOptions& read_options,
|
||||
const Slice& key,
|
||||
PinnableSlice* pinnable_val) {
|
||||
return GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key,
|
||||
value);
|
||||
pinnable_val);
|
||||
}
|
||||
|
||||
Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
|
||||
@ -752,19 +767,38 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
|
||||
ColumnFamilyHandle* column_family,
|
||||
const Slice& key,
|
||||
std::string* value) {
|
||||
assert(value != nullptr);
|
||||
PinnableSlice pinnable_val(value);
|
||||
assert(!pinnable_val.IsPinned());
|
||||
auto s =
|
||||
GetFromBatchAndDB(db, read_options, column_family, key, &pinnable_val);
|
||||
if (s.ok() && pinnable_val.IsPinned()) {
|
||||
value->assign(pinnable_val.data(), pinnable_val.size());
|
||||
} // else value is already assigned
|
||||
return s;
|
||||
}
|
||||
|
||||
Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
|
||||
const ReadOptions& read_options,
|
||||
ColumnFamilyHandle* column_family,
|
||||
const Slice& key,
|
||||
PinnableSlice* pinnable_val) {
|
||||
Status s;
|
||||
MergeContext merge_context;
|
||||
const ImmutableDBOptions& immuable_db_options =
|
||||
reinterpret_cast<DBImpl*>(db)->immutable_db_options();
|
||||
|
||||
std::string batch_value;
|
||||
// Since the lifetime of the WriteBatch is the same as that of the transaction
|
||||
// we cannot pin it as otherwise the returned value will not be available
|
||||
// after the transaction finishes.
|
||||
std::string& batch_value = *pinnable_val->GetSelf();
|
||||
WriteBatchWithIndexInternal::Result result =
|
||||
WriteBatchWithIndexInternal::GetFromBatch(
|
||||
immuable_db_options, this, column_family, key, &merge_context,
|
||||
&rep->comparator, &batch_value, rep->overwrite_key, &s);
|
||||
|
||||
if (result == WriteBatchWithIndexInternal::Result::kFound) {
|
||||
value->assign(batch_value.data(), batch_value.size());
|
||||
pinnable_val->PinSelf();
|
||||
return s;
|
||||
}
|
||||
if (result == WriteBatchWithIndexInternal::Result::kDeleted) {
|
||||
@ -785,7 +819,7 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
|
||||
result == WriteBatchWithIndexInternal::Result::kNotFound);
|
||||
|
||||
// Did not find key in batch OR could not resolve Merges. Try DB.
|
||||
s = db->Get(read_options, column_family, key, value);
|
||||
s = db->Get(read_options, column_family, key, pinnable_val);
|
||||
|
||||
if (s.ok() || s.IsNotFound()) { // DB Get Succeeded
|
||||
if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress) {
|
||||
@ -797,18 +831,18 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
|
||||
Env* env = immuable_db_options.env;
|
||||
Logger* logger = immuable_db_options.info_log.get();
|
||||
|
||||
Slice db_slice(*value);
|
||||
Slice* merge_data;
|
||||
if (s.ok()) {
|
||||
merge_data = &db_slice;
|
||||
merge_data = pinnable_val;
|
||||
} else { // Key not present in db (s.IsNotFound())
|
||||
merge_data = nullptr;
|
||||
}
|
||||
|
||||
if (merge_operator) {
|
||||
s = MergeHelper::TimedFullMerge(merge_operator, key, merge_data,
|
||||
merge_context.GetOperands(), value,
|
||||
logger, statistics, env);
|
||||
s = MergeHelper::TimedFullMerge(
|
||||
merge_operator, key, merge_data, merge_context.GetOperands(),
|
||||
pinnable_val->GetSelf(), logger, statistics, env);
|
||||
pinnable_val->PinSelf();
|
||||
} else {
|
||||
s = Status::InvalidArgument("Options::merge_operator must be set");
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user