WritePrepared Txn: make db_stress transactional

Summary:
Add "--use_txn" option to use transactional API in db_stress, default being WRITE_PREPARED policy, which is the main intention of modifying db_stress. It also extend the existing snapshots to verify that before releasing a snapshot a read from it returns the same value as before.
Closes https://github.com/facebook/rocksdb/pull/3243

Differential Revision: D6556912

Pulled By: maysamyabandeh

fbshipit-source-id: 1ae31465be362d44bd06e635e2e9e49a1da11268
This commit is contained in:
Maysam Yabandeh 2017-12-13 11:51:12 -08:00 committed by Facebook Github Bot
parent 546a63272f
commit cd2e5cae7f

View File

@ -51,6 +51,8 @@ int main() {
#include "rocksdb/slice_transform.h"
#include "rocksdb/statistics.h"
#include "rocksdb/utilities/db_ttl.h"
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
#include "rocksdb/write_batch.h"
#include "util/coding.h"
#include "util/compression.h"
@ -350,6 +352,10 @@ DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value.");
DEFINE_bool(rate_limit_bg_reads, false,
"Use options.rate_limiter on compaction reads");
DEFINE_bool(use_txn, false,
"Use TransactionDB. Currently the default write policy is "
"TxnDBWritePolicy::WRITE_PREPARED");
// Temporarily disable this to allows it to detect new bugs
DEFINE_int32(compact_files_one_in, 0,
"If non-zero, then CompactFiles() will be called one for every N "
@ -981,11 +987,22 @@ const uint32_t SharedState::SENTINEL = 0xffffffff;
// Per-thread state for concurrent executions of the same benchmark.
struct ThreadState {
uint32_t tid; // 0..n-1
Random rand; // Has different seeds for different threads
uint32_t tid; // 0..n-1
Random rand; // Has different seeds for different threads
SharedState* shared;
Stats stats;
std::queue<std::pair<uint64_t, const Snapshot*> > snapshot_queue;
struct SnapshotState {
const Snapshot* snapshot;
// The cf from which we did a Get at this stapshot
int cf_at;
// The key with which we did a Get at this stapshot
std::string key;
// The status of the Get
Status status;
// The value of the Get
std::string value;
};
std::queue<std::pair<uint64_t, SnapshotState> > snapshot_queue;
ThreadState(uint32_t index, SharedState* _shared)
: tid(index), rand(1000 + index + _shared->GetSeed()), shared(_shared) {}
@ -1109,6 +1126,9 @@ class StressTest {
: NewBloomFilterPolicy(FLAGS_bloom_bits, false)
: nullptr),
db_(nullptr),
#ifndef ROCKSDB_LITE
txn_db_(nullptr),
#endif
new_column_family_name_(1),
num_times_reopened_(0) {
if (FLAGS_destroy_db_initially) {
@ -1316,6 +1336,32 @@ class StressTest {
}
private:
Status AssertSame(DB* db, ColumnFamilyHandle* cf,
ThreadState::SnapshotState& snap_state) {
ReadOptions ropt;
ropt.snapshot = snap_state.snapshot;
PinnableSlice exp_v(&snap_state.value);
exp_v.PinSelf();
Status s;
PinnableSlice v;
s = db->Get(ropt, cf, snap_state.key, &v);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
if (snap_state.status != s) {
return Status::Corruption("The snapshot gave inconsistent results: (" +
snap_state.status.ToString() + ") vs. (" +
s.ToString() + ")");
}
if (s.ok()) {
if (exp_v != v) {
return Status::Corruption("The snapshot gave inconsistent values: (" +
exp_v.ToString() + ") vs. (" + v.ToString() +
")");
}
}
return Status::OK();
}
static void ThreadBody(void* v) {
ThreadState* thread = reinterpret_cast<ThreadState*>(v);
@ -1641,6 +1687,32 @@ class StressTest {
return db_->SetOptions(cfh, opts);
}
#ifndef ROCKSDB_LITE
Status NewTxn(WriteOptions& write_opts, Transaction** txn) {
if (!FLAGS_use_txn) {
return Status::InvalidArgument("NewTxn when FLAGS_use_txn is not set");
}
static std::atomic<uint64_t> txn_id = {0};
TransactionOptions txn_options;
*txn = txn_db_->BeginTransaction(write_opts, txn_options);
auto istr = std::to_string(txn_id.fetch_add(1));
Status s = (*txn)->SetName("xid" + istr);
return s;
}
Status CommitTxn(Transaction* txn) {
if (!FLAGS_use_txn) {
return Status::InvalidArgument("CommitTxn when FLAGS_use_txn is not set");
}
Status s = txn->Prepare();
if (s.ok()) {
s = txn->Commit();
}
delete txn;
return s;
}
#endif
void OperateDb(ThreadState* thread) {
ReadOptions read_opts(FLAGS_verify_checksum, true);
WriteOptions write_opts;
@ -1667,7 +1739,8 @@ class StressTest {
thread->stats.FinishedSingleOp();
MutexLock l(thread->shared->GetMutex());
while (!thread->snapshot_queue.empty()) {
db_->ReleaseSnapshot(thread->snapshot_queue.front().second);
db_->ReleaseSnapshot(
thread->snapshot_queue.front().second.snapshot);
thread->snapshot_queue.pop();
}
thread->shared->IncVotedReopen();
@ -1781,19 +1854,6 @@ class StressTest {
}
}
#endif // !ROCKSDB_LITE
if (FLAGS_acquire_snapshot_one_in > 0 &&
thread->rand.Uniform(FLAGS_acquire_snapshot_one_in) == 0) {
thread->snapshot_queue.emplace(
std::min(FLAGS_ops_per_thread - 1, i + FLAGS_snapshot_hold_ops),
db_->GetSnapshot());
}
if (!thread->snapshot_queue.empty()) {
while (i == thread->snapshot_queue.front().first) {
db_->ReleaseSnapshot(thread->snapshot_queue.front().second);
thread->snapshot_queue.pop();
}
}
const double completed_ratio =
static_cast<double>(i) / FLAGS_ops_per_thread;
const int64_t base_key = static_cast<int64_t>(
@ -1807,8 +1867,38 @@ class StressTest {
l.reset(new MutexLock(
shared->GetMutexForKey(rand_column_family, rand_key)));
}
auto column_family = column_families_[rand_column_family];
if (FLAGS_acquire_snapshot_one_in > 0 &&
thread->rand.Uniform(FLAGS_acquire_snapshot_one_in) == 0) {
auto snapshot = db_->GetSnapshot();
ReadOptions ropt;
ropt.snapshot = snapshot;
std::string value_at;
// When taking a snapshot, we also read a key from that snapshot. We
// will later read the same key before releasing the snapshot and verify
// that the results are the same.
auto status_at = db_->Get(ropt, column_family, key, &value_at);
ThreadState::SnapshotState snap_state = {snapshot, rand_column_family,
keystr, status_at, value_at};
thread->snapshot_queue.emplace(
std::min(FLAGS_ops_per_thread - 1, i + FLAGS_snapshot_hold_ops),
snap_state);
}
while (!thread->snapshot_queue.empty() &&
i == thread->snapshot_queue.front().first) {
auto snap_state = thread->snapshot_queue.front().second;
assert(snap_state.snapshot);
Status s =
AssertSame(db_, column_families_[snap_state.cf_at], snap_state);
if (!s.ok()) {
VerificationAbort(shared, "Snapshot gave inconsistent state", s);
}
db_->ReleaseSnapshot(snap_state.snapshot);
thread->snapshot_queue.pop();
}
int prob_op = thread->rand.Uniform(100);
if (prob_op >= 0 && prob_op < (int)FLAGS_readpercent) {
// OPERATION read
@ -1885,9 +1975,35 @@ class StressTest {
shared->Put(rand_column_family, rand_key, value_base);
Status s;
if (FLAGS_use_merge) {
s = db_->Merge(write_opts, column_family, key, v);
if (!FLAGS_use_txn) {
s = db_->Merge(write_opts, column_family, key, v);
} else {
#ifndef ROCKSDB_LITE
Transaction* txn;
s = NewTxn(write_opts, &txn);
if (s.ok()) {
s = txn->Merge(column_family, key, v);
if (s.ok()) {
s = CommitTxn(txn);
}
}
#endif
}
} else {
s = db_->Put(write_opts, column_family, key, v);
if (!FLAGS_use_txn) {
s = db_->Put(write_opts, column_family, key, v);
} else {
#ifndef ROCKSDB_LITE
Transaction* txn;
s = NewTxn(write_opts, &txn);
if (s.ok()) {
s = txn->Put(column_family, key, v);
if (s.ok()) {
s = CommitTxn(txn);
}
}
#endif
}
}
if (!s.ok()) {
fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
@ -1921,7 +2037,21 @@ class StressTest {
// otherwise.
if (shared->AllowsOverwrite(rand_column_family, rand_key)) {
shared->Delete(rand_column_family, rand_key);
Status s = db_->Delete(write_opts, column_family, key);
Status s;
if (!FLAGS_use_txn) {
s = db_->Delete(write_opts, column_family, key);
} else {
#ifndef ROCKSDB_LITE
Transaction* txn;
s = NewTxn(write_opts, &txn);
if (s.ok()) {
s = txn->Delete(column_family, key);
if (s.ok()) {
s = CommitTxn(txn);
}
}
#endif
}
thread->stats.AddDeletes(1);
if (!s.ok()) {
fprintf(stderr, "delete error: %s\n", s.ToString().c_str());
@ -1929,7 +2059,21 @@ class StressTest {
}
} else {
shared->SingleDelete(rand_column_family, rand_key);
Status s = db_->SingleDelete(write_opts, column_family, key);
Status s;
if (!FLAGS_use_txn) {
s = db_->SingleDelete(write_opts, column_family, key);
} else {
#ifndef ROCKSDB_LITE
Transaction* txn;
s = NewTxn(write_opts, &txn);
if (s.ok()) {
s = txn->SingleDelete(column_family, key);
if (s.ok()) {
s = CommitTxn(txn);
}
}
#endif
}
thread->stats.AddSingleDeletes(1);
if (!s.ok()) {
fprintf(stderr, "single delete error: %s\n",
@ -2067,6 +2211,12 @@ class StressTest {
}
}
void VerificationAbort(SharedState* shared, std::string msg, Status s) const {
printf("Verification failed: %s. Status is %s\n", msg.c_str(),
s.ToString().c_str());
shared->SetVerificationFailure();
}
void VerificationAbort(SharedState* shared, std::string msg, int cf,
int64_t key) const {
printf("Verification failed for column family %d key %" PRIi64 ": %s\n", cf, key,
@ -2138,6 +2288,8 @@ class StressTest {
void PrintEnv() const {
fprintf(stdout, "RocksDB version : %d.%d\n", kMajorVersion,
kMinorVersion);
fprintf(stdout, "TransactionDB : %s\n",
FLAGS_use_txn ? "true" : "false");
fprintf(stdout, "Column families : %d\n", FLAGS_column_families);
if (!FLAGS_test_batches_snapshots) {
fprintf(stdout, "Clear CFs one in : %d\n",
@ -2210,6 +2362,9 @@ class StressTest {
void Open() {
assert(db_ == nullptr);
#ifndef ROCKSDB_LITE
assert(txn_db_ == nullptr);
#endif
BlockBasedTableOptions block_based_options;
block_based_options.block_cache = cache_;
block_based_options.block_cache_compressed = compressed_cache_;
@ -2382,8 +2537,19 @@ class StressTest {
options_.listeners.emplace_back(
new DbStressListener(FLAGS_db, options_.db_paths));
options_.create_missing_column_families = true;
s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
&column_families_, &db_);
if (!FLAGS_use_txn) {
s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
&column_families_, &db_);
} else {
#ifndef ROCKSDB_LITE
TransactionDBOptions txn_db_options;
// For the moment it is sufficient to test WRITE_PREPARED policy
txn_db_options.write_policy = TxnDBWritePolicy::WRITE_PREPARED;
s = TransactionDB::Open(options_, txn_db_options, FLAGS_db,
cf_descriptors, &column_families_, &txn_db_);
db_ = txn_db_;
#endif
}
assert(!s.ok() || column_families_.size() ==
static_cast<size_t>(FLAGS_column_families));
} else {
@ -2409,6 +2575,9 @@ class StressTest {
column_families_.clear();
delete db_;
db_ = nullptr;
#ifndef ROCKSDB_LITE
txn_db_ = nullptr;
#endif
num_times_reopened_++;
auto now = FLAGS_env->NowMicros();
@ -2429,6 +2598,9 @@ class StressTest {
std::shared_ptr<Cache> compressed_cache_;
std::shared_ptr<const FilterPolicy> filter_policy_;
DB* db_;
#ifndef ROCKSDB_LITE
TransactionDB* txn_db_;
#endif
Options options_;
std::vector<ColumnFamilyHandle*> column_families_;
std::vector<std::string> column_family_names_;