rocksdb/utilities/transactions/write_unprepared_txn.h
Maysam Yabandeh 7441a0ecba WriteUnPrepared: fix ubsan complaint (#5148)
Summary:
Ubsna complains that in initialization of WriteUnpreparedTxnReadCallback the method of the child class is used before the parent class is constructed. The patch fixes that by making the aforementioned method static.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5148

Differential Revision: D14760098

Pulled By: maysamyabandeh

fbshipit-source-id: cf19b7c1fdb5de0a54e62c1deebe09a0fa048ded
2019-04-03 15:51:30 -07:00

167 lines
6.3 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#ifndef ROCKSDB_LITE
#include <set>
#include "utilities/transactions/write_prepared_txn.h"
#include "utilities/transactions/write_unprepared_txn_db.h"
namespace rocksdb {
class WriteUnpreparedTxnDB;
class WriteUnpreparedTxn;
class WriteUnpreparedTxnReadCallback : public ReadCallback {
public:
WriteUnpreparedTxnReadCallback(WritePreparedTxnDB* db,
SequenceNumber snapshot,
SequenceNumber min_uncommitted,
WriteUnpreparedTxn* txn)
// Pass our last uncommitted seq as the snapshot to the parent class to
// ensure that the parent will not prematurely filter out own writes. We
// will do the exact comparison agaisnt snapshots in IsVisibleFullCheck
// override.
: ReadCallback(CalcMaxVisibleSeq(txn, snapshot), min_uncommitted),
db_(db),
txn_(txn),
wup_snapshot_(snapshot) {}
virtual bool IsVisibleFullCheck(SequenceNumber seq) override;
bool CanReseekToSkip() override {
return wup_snapshot_ == max_visible_seq_;
// Otherwise our own writes uncommitted are in db, and the assumptions
// behind reseek optimizations are no longer valid.
}
// TODO(myabandeh): override Refresh when Iterator::Refresh is supported
private:
static SequenceNumber CalcMaxVisibleSeq(WriteUnpreparedTxn* txn,
SequenceNumber snapshot_seq) {
SequenceNumber max_unprepared = CalcMaxUnpreparedSequenceNumber(txn);
assert(snapshot_seq < max_unprepared || max_unprepared == 0 ||
snapshot_seq == kMaxSequenceNumber);
return std::max(max_unprepared, snapshot_seq);
}
static SequenceNumber CalcMaxUnpreparedSequenceNumber(
WriteUnpreparedTxn* txn);
WritePreparedTxnDB* db_;
WriteUnpreparedTxn* txn_;
SequenceNumber wup_snapshot_;
};
class WriteUnpreparedTxn : public WritePreparedTxn {
public:
WriteUnpreparedTxn(WriteUnpreparedTxnDB* db,
const WriteOptions& write_options,
const TransactionOptions& txn_options);
virtual ~WriteUnpreparedTxn();
using TransactionBaseImpl::Put;
virtual Status Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value,
const bool assume_tracked = false) override;
virtual Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value,
const bool assume_tracked = false) override;
using TransactionBaseImpl::Merge;
virtual Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value,
const bool assume_tracked = false) override;
using TransactionBaseImpl::Delete;
virtual Status Delete(ColumnFamilyHandle* column_family, const Slice& key,
const bool assume_tracked = false) override;
virtual Status Delete(ColumnFamilyHandle* column_family,
const SliceParts& key,
const bool assume_tracked = false) override;
using TransactionBaseImpl::SingleDelete;
virtual Status SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key,
const bool assume_tracked = false) override;
virtual Status SingleDelete(ColumnFamilyHandle* column_family,
const SliceParts& key,
const bool assume_tracked = false) override;
virtual Status RebuildFromWriteBatch(WriteBatch*) override {
// This function was only useful for recovering prepared transactions, but
// is unused for write prepared because a transaction may consist of
// multiple write batches.
//
// If there are use cases outside of recovery that can make use of this,
// then support could be added.
return Status::NotSupported("Not supported for WriteUnprepared");
}
const std::map<SequenceNumber, size_t>& GetUnpreparedSequenceNumbers();
void UpdateWriteKeySet(uint32_t cfid, const Slice& key);
protected:
void Initialize(const TransactionOptions& txn_options) override;
Status PrepareInternal() override;
Status CommitWithoutPrepareInternal() override;
Status CommitInternal() override;
Status RollbackInternal() override;
// Get and GetIterator needs to be overridden so that a ReadCallback to
// handle read-your-own-write is used.
using Transaction::Get;
virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;
using Transaction::GetIterator;
virtual Iterator* GetIterator(const ReadOptions& options) override;
virtual Iterator* GetIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family) override;
private:
friend class WriteUnpreparedTransactionTest_ReadYourOwnWrite_Test;
friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
friend class WriteUnpreparedTransactionTest_UnpreparedBatch_Test;
friend class WriteUnpreparedTxnDB;
Status MaybeFlushWriteBatchToDB();
Status FlushWriteBatchToDB(bool prepared);
// For write unprepared, we check on every writebatch append to see if
// max_write_batch_size_ has been exceeded, and then call
// FlushWriteBatchToDB if so. This logic is encapsulated in
// MaybeFlushWriteBatchToDB.
size_t max_write_batch_size_;
WriteUnpreparedTxnDB* wupt_db_;
// Ordered list of unprep_seq sequence numbers that we have already written
// to DB.
//
// This maps unprep_seq => prepare_batch_cnt for each unprepared batch
// written by this transaction.
//
// Note that this contains both prepared and unprepared batches, since they
// are treated similarily in prepare heap/commit map, so it simplifies the
// commit callbacks.
std::map<SequenceNumber, size_t> unprep_seqs_;
// Set of keys that have written to that have already been written to DB
// (ie. not in write_batch_).
//
std::map<uint32_t, std::vector<std::string>> write_set_keys_;
};
} // namespace rocksdb
#endif // ROCKSDB_LITE