Add basic support for user-defined timestamp to db_bench (#7389)

Summary:
Update db_bench so that we can run it with user-defined timestamp.
Currently, only 64-bit timestamp is supported, while others are disabled by assertion.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7389

Test Plan: ./db_bench -benchmarks=fillseq,fillrandom,readrandom,readsequential,....., -user_timestamp_size=8

Reviewed By: ltamasi

Differential Revision: D23720830

Pulled By: riversand963

fbshipit-source-id: 486eacbb82de9a5441e79a61bfa9beef6581608a
This commit is contained in:
Yanqin Jin 2020-09-15 20:04:58 -07:00 committed by Facebook GitHub Bot
parent 9d3b2db9b5
commit a28df7a75a

View File

@ -292,6 +292,9 @@ static bool ValidateUint32Range(const char* flagname, uint64_t value) {
DEFINE_int32(key_size, 16, "size of each key"); DEFINE_int32(key_size, 16, "size of each key");
DEFINE_int32(user_timestamp_size, 0,
"number of bytes in a user-defined timestamp");
DEFINE_int32(num_multi_db, 0, DEFINE_int32(num_multi_db, 0,
"Number of DBs used in the benchmark. 0 means single DB."); "Number of DBs used in the benchmark. 0 means single DB.");
@ -1339,6 +1342,10 @@ DEFINE_bool(report_file_operations, false, "if report number of file "
"operations"); "operations");
DEFINE_int32(readahead_size, 0, "Iterator readahead size"); DEFINE_int32(readahead_size, 0, "Iterator readahead size");
DEFINE_bool(read_with_latest_user_timestamp, true,
"If true, always use the current latest timestamp for read. If "
"false, choose a random timestamp from the past.");
static const bool FLAGS_soft_rate_limit_dummy __attribute__((__unused__)) = static const bool FLAGS_soft_rate_limit_dummy __attribute__((__unused__)) =
RegisterFlagValidator(&FLAGS_soft_rate_limit, &ValidateRateLimit); RegisterFlagValidator(&FLAGS_soft_rate_limit, &ValidateRateLimit);
@ -2246,6 +2253,25 @@ class TimestampEmulator {
TimestampEmulator() : timestamp_(0) {} TimestampEmulator() : timestamp_(0) {}
uint64_t Get() const { return timestamp_.load(); } uint64_t Get() const { return timestamp_.load(); }
void Inc() { timestamp_++; } void Inc() { timestamp_++; }
Slice Allocate(char* scratch) {
// TODO: support larger timestamp sizes
assert(FLAGS_user_timestamp_size == 8);
assert(scratch);
uint64_t ts = timestamp_.fetch_add(1);
EncodeFixed64(scratch, ts);
return Slice(scratch, FLAGS_user_timestamp_size);
}
Slice GetTimestampForRead(Random64& rand, char* scratch) {
assert(FLAGS_user_timestamp_size == 8);
assert(scratch);
if (FLAGS_read_with_latest_user_timestamp) {
return Allocate(scratch);
}
// Choose a random timestamp from the past.
uint64_t ts = rand.Next() % Get();
EncodeFixed64(scratch, ts);
return Slice(scratch, FLAGS_user_timestamp_size);
}
}; };
// State shared by all concurrent executions of the same benchmark. // State shared by all concurrent executions of the same benchmark.
@ -2277,10 +2303,8 @@ struct ThreadState {
Stats stats; Stats stats;
SharedState* shared; SharedState* shared;
/* implicit */ ThreadState(int index) explicit ThreadState(int index)
: tid(index), : tid(index), rand((FLAGS_seed ? FLAGS_seed : 1000) + index) {}
rand((FLAGS_seed ? FLAGS_seed : 1000) + index) {
}
}; };
class Duration { class Duration {
@ -2331,6 +2355,7 @@ class Benchmark {
std::vector<DBWithColumnFamilies> multi_dbs_; std::vector<DBWithColumnFamilies> multi_dbs_;
int64_t num_; int64_t num_;
int key_size_; int key_size_;
int user_timestamp_size_;
int prefix_size_; int prefix_size_;
int64_t keys_per_prefix_; int64_t keys_per_prefix_;
int64_t entries_per_batch_; int64_t entries_per_batch_;
@ -2406,6 +2431,8 @@ class Benchmark {
std::shared_ptr<ErrorHandlerListener> listener_; std::shared_ptr<ErrorHandlerListener> listener_;
std::unique_ptr<TimestampEmulator> mock_app_clock_;
bool SanityCheck() { bool SanityCheck() {
if (FLAGS_compression_ratio > 1) { if (FLAGS_compression_ratio > 1) {
fprintf(stderr, "compression_ratio should be between 0 and 1\n"); fprintf(stderr, "compression_ratio should be between 0 and 1\n");
@ -2454,7 +2481,9 @@ class Benchmark {
void PrintHeader() { void PrintHeader() {
PrintEnvironment(); PrintEnvironment();
fprintf(stdout, "Keys: %d bytes each\n", FLAGS_key_size); fprintf(stdout,
"Keys: %d bytes each (+ %d bytes user-defined timestamp)\n",
FLAGS_key_size, FLAGS_user_timestamp_size);
auto avg_value_size = FLAGS_value_size; auto avg_value_size = FLAGS_value_size;
if (FLAGS_value_size_distribution_type_e == kFixed) { if (FLAGS_value_size_distribution_type_e == kFixed) {
fprintf(stdout, "Values: %d bytes each (%d bytes after compression)\n", fprintf(stdout, "Values: %d bytes each (%d bytes after compression)\n",
@ -2694,6 +2723,7 @@ class Benchmark {
prefix_extractor_(NewFixedPrefixTransform(FLAGS_prefix_size)), prefix_extractor_(NewFixedPrefixTransform(FLAGS_prefix_size)),
num_(FLAGS_num), num_(FLAGS_num),
key_size_(FLAGS_key_size), key_size_(FLAGS_key_size),
user_timestamp_size_(FLAGS_user_timestamp_size),
prefix_size_(FLAGS_prefix_size), prefix_size_(FLAGS_prefix_size),
keys_per_prefix_(FLAGS_keys_per_prefix), keys_per_prefix_(FLAGS_keys_per_prefix),
entries_per_batch_(1), entries_per_batch_(1),
@ -2769,6 +2799,9 @@ class Benchmark {
} }
listener_.reset(new ErrorHandlerListener()); listener_.reset(new ErrorHandlerListener());
if (user_timestamp_size_ > 0) {
mock_app_clock_.reset(new TimestampEmulator());
}
} }
~Benchmark() { ~Benchmark() {
@ -2788,17 +2821,19 @@ class Benchmark {
} }
// Generate key according to the given specification and random number. // Generate key according to the given specification and random number.
// The resulting key will have the following format (if keys_per_prefix_ // The resulting key will have the following format:
// is positive), extra trailing bytes are either cut off or padded with '0'. // - If keys_per_prefix_ is positive, extra trailing bytes are either cut
// The prefix value is derived from key value. // off or padded with '0'.
// ---------------------------- // The prefix value is derived from key value.
// | prefix 00000 | key 00000 | // ----------------------------
// ---------------------------- // | prefix 00000 | key 00000 |
// If keys_per_prefix_ is 0, the key is simply a binary representation of // ----------------------------
// random number followed by trailing '0's //
// ---------------------------- // - If keys_per_prefix_ is 0, the key is simply a binary representation of
// | key 00000 | // random number followed by trailing '0's
// ---------------------------- // ----------------------------
// | key 00000 |
// ----------------------------
void GenerateKeyFromInt(uint64_t v, int64_t num_keys, Slice* key) { void GenerateKeyFromInt(uint64_t v, int64_t num_keys, Slice* key) {
if (!keys_.empty()) { if (!keys_.empty()) {
assert(FLAGS_use_existing_keys); assert(FLAGS_use_existing_keys);
@ -4037,6 +4072,14 @@ class Benchmark {
options.enable_thread_tracking = true; options.enable_thread_tracking = true;
} }
if (FLAGS_user_timestamp_size > 0) {
if (FLAGS_user_timestamp_size != 8) {
fprintf(stderr, "Only 64 bits timestamps are supported.\n");
exit(1);
}
options.comparator = ROCKSDB_NAMESPACE::test::ComparatorWithU64Ts();
}
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (FLAGS_readonly && FLAGS_transaction_db) { if (FLAGS_readonly && FLAGS_transaction_db) {
fprintf(stderr, "Cannot use readonly flag with transaction_db\n"); fprintf(stderr, "Cannot use readonly flag with transaction_db\n");
@ -4430,7 +4473,8 @@ class Benchmark {
} }
RandomGenerator gen; RandomGenerator gen;
WriteBatch batch; WriteBatch batch(/*reserved_bytes=*/0, /*max_bytes=*/0,
user_timestamp_size_);
Status s; Status s;
int64_t bytes = 0; int64_t bytes = 0;
@ -4449,6 +4493,11 @@ class Benchmark {
} }
} }
std::unique_ptr<char[]> ts_guard;
if (user_timestamp_size_ > 0) {
ts_guard.reset(new char[user_timestamp_size_]);
}
int64_t stage = 0; int64_t stage = 0;
int64_t num_written = 0; int64_t num_written = 0;
while (!duration.Done(entries_per_batch_)) { while (!duration.Done(entries_per_batch_)) {
@ -4492,8 +4541,8 @@ class Benchmark {
batch.Put(db_with_cfh->GetCfh(rand_num), key, batch.Put(db_with_cfh->GetCfh(rand_num), key,
val); val);
} }
batch_bytes += val.size() + key_size_; batch_bytes += val.size() + key_size_ + user_timestamp_size_;
bytes += val.size() + key_size_; bytes += val.size() + key_size_ + user_timestamp_size_;
++num_written; ++num_written;
if (writes_per_range_tombstone_ > 0 && if (writes_per_range_tombstone_ > 0 &&
num_written > writes_before_delete_range_ && num_written > writes_before_delete_range_ &&
@ -4549,6 +4598,15 @@ class Benchmark {
// once per write. // once per write.
thread->stats.ResetLastOpTime(); thread->stats.ResetLastOpTime();
} }
if (user_timestamp_size_ > 0) {
Slice user_ts = mock_app_clock_->Allocate(ts_guard.get());
s = batch.AssignTimestamp(user_ts);
if (!s.ok()) {
fprintf(stderr, "assign timestamp to write batch: %s\n",
s.ToString().c_str());
ErrorExit();
}
}
if (!use_blob_db_) { if (!use_blob_db_) {
s = db_with_cfh->db->Write(write_options_, &batch); s = db_with_cfh->db->Write(write_options_, &batch);
} }
@ -4905,6 +4963,13 @@ class Benchmark {
void ReadSequential(ThreadState* thread, DB* db) { void ReadSequential(ThreadState* thread, DB* db) {
ReadOptions options(FLAGS_verify_checksum, true); ReadOptions options(FLAGS_verify_checksum, true);
options.tailing = FLAGS_use_tailing_iterator; options.tailing = FLAGS_use_tailing_iterator;
std::unique_ptr<char[]> ts_guard;
Slice ts;
if (user_timestamp_size_ > 0) {
ts_guard.reset(new char[user_timestamp_size_]);
ts = mock_app_clock_->GetTimestampForRead(thread->rand, ts_guard.get());
options.timestamp = &ts;
}
Iterator* iter = db->NewIterator(options); Iterator* iter = db->NewIterator(options);
int64_t i = 0; int64_t i = 0;
@ -5026,6 +5091,11 @@ class Benchmark {
std::unique_ptr<const char[]> key_guard; std::unique_ptr<const char[]> key_guard;
Slice key = AllocateKey(&key_guard); Slice key = AllocateKey(&key_guard);
std::string value; std::string value;
Slice ts;
std::unique_ptr<char[]> ts_guard;
if (user_timestamp_size_ > 0) {
ts_guard.reset(new char[user_timestamp_size_]);
}
DB* db = SelectDBWithCfh(thread)->db; DB* db = SelectDBWithCfh(thread)->db;
int64_t pot = 1; int64_t pot = 1;
@ -5039,7 +5109,15 @@ class Benchmark {
int64_t key_rand = thread->rand.Next() & (pot - 1); int64_t key_rand = thread->rand.Next() & (pot - 1);
GenerateKeyFromInt(key_rand, FLAGS_num, &key); GenerateKeyFromInt(key_rand, FLAGS_num, &key);
++read; ++read;
auto status = db->Get(options, key, &value); std::string ts_ret;
std::string* ts_ptr = nullptr;
if (user_timestamp_size_ > 0) {
ts = mock_app_clock_->GetTimestampForRead(thread->rand,
ts_guard.get());
options.timestamp = &ts;
ts_ptr = &ts_ret;
}
auto status = db->Get(options, key, &value, ts_ptr);
if (status.ok()) { if (status.ok()) {
++found; ++found;
} else if (!status.IsNotFound()) { } else if (!status.IsNotFound()) {
@ -5103,6 +5181,11 @@ class Benchmark {
std::unique_ptr<const char[]> key_guard; std::unique_ptr<const char[]> key_guard;
Slice key = AllocateKey(&key_guard); Slice key = AllocateKey(&key_guard);
PinnableSlice pinnable_val; PinnableSlice pinnable_val;
std::unique_ptr<char[]> ts_guard;
Slice ts;
if (user_timestamp_size_ > 0) {
ts_guard.reset(new char[user_timestamp_size_]);
}
Duration duration(FLAGS_duration, reads_); Duration duration(FLAGS_duration, reads_);
while (!duration.Done(1)) { while (!duration.Done(1)) {
@ -5126,19 +5209,26 @@ class Benchmark {
} }
GenerateKeyFromInt(key_rand, FLAGS_num, &key); GenerateKeyFromInt(key_rand, FLAGS_num, &key);
read++; read++;
std::string ts_ret;
std::string* ts_ptr = nullptr;
if (user_timestamp_size_ > 0) {
ts = mock_app_clock_->GetTimestampForRead(thread->rand, ts_guard.get());
options.timestamp = &ts;
ts_ptr = &ts_ret;
}
Status s; Status s;
if (FLAGS_num_column_families > 1) { if (FLAGS_num_column_families > 1) {
s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key, s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key,
&pinnable_val); &pinnable_val, ts_ptr);
} else { } else {
pinnable_val.Reset(); pinnable_val.Reset();
s = db_with_cfh->db->Get(options, s = db_with_cfh->db->Get(options,
db_with_cfh->db->DefaultColumnFamily(), key, db_with_cfh->db->DefaultColumnFamily(), key,
&pinnable_val); &pinnable_val, ts_ptr);
} }
if (s.ok()) { if (s.ok()) {
found++; found++;
bytes += key.size() + pinnable_val.size(); bytes += key.size() + pinnable_val.size() + user_timestamp_size_;
} else if (!s.IsNotFound()) { } else if (!s.IsNotFound()) {
fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str()); fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());
abort(); abort();
@ -5184,6 +5274,11 @@ class Benchmark {
keys.push_back(AllocateKey(&key_guards.back())); keys.push_back(AllocateKey(&key_guards.back()));
} }
std::unique_ptr<char[]> ts_guard;
if (user_timestamp_size_ > 0) {
ts_guard.reset(new char[user_timestamp_size_]);
}
Duration duration(FLAGS_duration, reads_); Duration duration(FLAGS_duration, reads_);
while (!duration.Done(1)) { while (!duration.Done(1)) {
DB* db = SelectDB(thread); DB* db = SelectDB(thread);
@ -5202,6 +5297,11 @@ class Benchmark {
GenerateKeyFromInt(GetRandomKey(&thread->rand), FLAGS_num, &keys[i]); GenerateKeyFromInt(GetRandomKey(&thread->rand), FLAGS_num, &keys[i]);
} }
} }
Slice ts;
if (user_timestamp_size_ > 0) {
ts = mock_app_clock_->GetTimestampForRead(thread->rand, ts_guard.get());
options.timestamp = &ts;
}
if (!FLAGS_multiread_batched) { if (!FLAGS_multiread_batched) {
std::vector<Status> statuses = db->MultiGet(options, keys, &values); std::vector<Status> statuses = db->MultiGet(options, keys, &values);
assert(static_cast<int64_t>(statuses.size()) == entries_per_batch_); assert(static_cast<int64_t>(statuses.size()) == entries_per_batch_);
@ -5718,8 +5818,17 @@ class Benchmark {
void IteratorCreation(ThreadState* thread) { void IteratorCreation(ThreadState* thread) {
Duration duration(FLAGS_duration, reads_); Duration duration(FLAGS_duration, reads_);
ReadOptions options(FLAGS_verify_checksum, true); ReadOptions options(FLAGS_verify_checksum, true);
std::unique_ptr<char[]> ts_guard;
if (user_timestamp_size_ > 0) {
ts_guard.reset(new char[user_timestamp_size_]);
}
while (!duration.Done(1)) { while (!duration.Done(1)) {
DB* db = SelectDB(thread); DB* db = SelectDB(thread);
Slice ts;
if (user_timestamp_size_ > 0) {
ts = mock_app_clock_->GetTimestampForRead(thread->rand, ts_guard.get());
options.timestamp = &ts;
}
Iterator* iter = db->NewIterator(options); Iterator* iter = db->NewIterator(options);
delete iter; delete iter;
thread->stats.FinishedOps(nullptr, db, 1, kOthers); thread->stats.FinishedOps(nullptr, db, 1, kOthers);
@ -5743,6 +5852,13 @@ class Benchmark {
options.prefix_same_as_start = FLAGS_prefix_same_as_start; options.prefix_same_as_start = FLAGS_prefix_same_as_start;
options.tailing = FLAGS_use_tailing_iterator; options.tailing = FLAGS_use_tailing_iterator;
options.readahead_size = FLAGS_readahead_size; options.readahead_size = FLAGS_readahead_size;
std::unique_ptr<char[]> ts_guard;
Slice ts;
if (user_timestamp_size_ > 0) {
ts_guard.reset(new char[user_timestamp_size_]);
ts = mock_app_clock_->GetTimestampForRead(thread->rand, ts_guard.get());
options.timestamp = &ts;
}
Iterator* single_iter = nullptr; Iterator* single_iter = nullptr;
std::vector<Iterator*> multi_iters; std::vector<Iterator*> multi_iters;
@ -5866,11 +5982,17 @@ class Benchmark {
} }
void DoDelete(ThreadState* thread, bool seq) { void DoDelete(ThreadState* thread, bool seq) {
WriteBatch batch; WriteBatch batch(/*reserved_bytes=*/0, /*max_bytes=*/0,
user_timestamp_size_);
Duration duration(seq ? 0 : FLAGS_duration, deletes_); Duration duration(seq ? 0 : FLAGS_duration, deletes_);
int64_t i = 0; int64_t i = 0;
std::unique_ptr<const char[]> key_guard; std::unique_ptr<const char[]> key_guard;
Slice key = AllocateKey(&key_guard); Slice key = AllocateKey(&key_guard);
std::unique_ptr<char[]> ts_guard;
Slice ts;
if (user_timestamp_size_ > 0) {
ts_guard.reset(new char[user_timestamp_size_]);
}
while (!duration.Done(entries_per_batch_)) { while (!duration.Done(entries_per_batch_)) {
DB* db = SelectDB(thread); DB* db = SelectDB(thread);
@ -5880,7 +6002,16 @@ class Benchmark {
GenerateKeyFromInt(k, FLAGS_num, &key); GenerateKeyFromInt(k, FLAGS_num, &key);
batch.Delete(key); batch.Delete(key);
} }
auto s = db->Write(write_options_, &batch); Status s;
if (user_timestamp_size_ > 0) {
ts = mock_app_clock_->Allocate(ts_guard.get());
s = batch.AssignTimestamp(ts);
if (!s.ok()) {
fprintf(stderr, "assign timestamp: %s\n", s.ToString().c_str());
ErrorExit();
}
}
s = db->Write(write_options_, &batch);
thread->stats.FinishedOps(nullptr, db, entries_per_batch_, kDelete); thread->stats.FinishedOps(nullptr, db, entries_per_batch_, kDelete);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "del error: %s\n", s.ToString().c_str()); fprintf(stderr, "del error: %s\n", s.ToString().c_str());
@ -5930,6 +6061,10 @@ class Benchmark {
std::unique_ptr<const char[]> key_guard; std::unique_ptr<const char[]> key_guard;
Slice key = AllocateKey(&key_guard); Slice key = AllocateKey(&key_guard);
std::unique_ptr<char[]> ts_guard;
if (user_timestamp_size_ > 0) {
ts_guard.reset(new char[user_timestamp_size_]);
}
uint32_t written = 0; uint32_t written = 0;
bool hint_printed = false; bool hint_printed = false;
@ -5961,18 +6096,27 @@ class Benchmark {
Status s; Status s;
Slice val = gen.Generate(); Slice val = gen.Generate();
Slice ts;
if (user_timestamp_size_ > 0) {
ts = mock_app_clock_->Allocate(ts_guard.get());
write_options_.timestamp = &ts;
}
if (write_merge == kWrite) { if (write_merge == kWrite) {
s = db->Put(write_options_, key, val); s = db->Put(write_options_, key, val);
} else { } else {
s = db->Merge(write_options_, key, val); s = db->Merge(write_options_, key, val);
} }
// Restore write_options_
if (user_timestamp_size_ > 0) {
write_options_.timestamp = nullptr;
}
written++; written++;
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str()); fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
exit(1); exit(1);
} }
bytes += key.size() + val.size(); bytes += key.size() + val.size() + user_timestamp_size_;
thread->stats.FinishedOps(&db_, db_.db, 1, kWrite); thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
if (FLAGS_benchmark_write_rate_limit > 0) { if (FLAGS_benchmark_write_rate_limit > 0) {
@ -5999,6 +6143,13 @@ class Benchmark {
} }
assert(db_.db != nullptr); assert(db_.db != nullptr);
ReadOptions read_options; ReadOptions read_options;
std::unique_ptr<char[]> ts_guard;
Slice ts;
if (user_timestamp_size_ > 0) {
ts_guard.reset(new char[user_timestamp_size_]);
ts = mock_app_clock_->GetTimestampForRead(thread->rand, ts_guard.get());
read_options.timestamp = &ts;
}
Iterator* iter = db_.db->NewIterator(read_options); Iterator* iter = db_.db->NewIterator(read_options);
fprintf(stderr, "num reads to do %" PRIu64 "\n", reads_); fprintf(stderr, "num reads to do %" PRIu64 "\n", reads_);
@ -6030,13 +6181,26 @@ class Benchmark {
std::string suffixes[3] = {"2", "1", "0"}; std::string suffixes[3] = {"2", "1", "0"};
std::string keys[3]; std::string keys[3];
WriteBatch batch; WriteBatch batch(/*reserved_bytes=*/0, /*max_bytes=*/0,
user_timestamp_size_);
Status s; Status s;
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
keys[i] = key.ToString() + suffixes[i]; keys[i] = key.ToString() + suffixes[i];
batch.Put(keys[i], value); batch.Put(keys[i], value);
} }
std::unique_ptr<char[]> ts_guard;
if (user_timestamp_size_ > 0) {
ts_guard.reset(new char[user_timestamp_size_]);
Slice ts = mock_app_clock_->Allocate(ts_guard.get());
s = batch.AssignTimestamp(ts);
if (!s.ok()) {
fprintf(stderr, "assign timestamp to batch: %s\n",
s.ToString().c_str());
ErrorExit();
}
}
s = db->Write(writeoptions, &batch); s = db->Write(writeoptions, &batch);
return s; return s;
} }
@ -6049,13 +6213,25 @@ class Benchmark {
std::string suffixes[3] = {"1", "2", "0"}; std::string suffixes[3] = {"1", "2", "0"};
std::string keys[3]; std::string keys[3];
WriteBatch batch; WriteBatch batch(0, 0, user_timestamp_size_);
Status s; Status s;
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
keys[i] = key.ToString() + suffixes[i]; keys[i] = key.ToString() + suffixes[i];
batch.Delete(keys[i]); batch.Delete(keys[i]);
} }
std::unique_ptr<char[]> ts_guard;
if (user_timestamp_size_ > 0) {
ts_guard.reset(new char[user_timestamp_size_]);
Slice ts = mock_app_clock_->Allocate(ts_guard.get());
s = batch.AssignTimestamp(ts);
if (!s.ok()) {
fprintf(stderr, "assign timestamp to batch: %s\n",
s.ToString().c_str());
ErrorExit();
}
}
s = db->Write(writeoptions, &batch); s = db->Write(writeoptions, &batch);
return s; return s;
} }
@ -6070,6 +6246,15 @@ class Benchmark {
Slice key_slices[3]; Slice key_slices[3];
std::string values[3]; std::string values[3];
ReadOptions readoptionscopy = readoptions; ReadOptions readoptionscopy = readoptions;
std::unique_ptr<char[]> ts_guard;
Slice ts;
if (user_timestamp_size_ > 0) {
ts_guard.reset(new char[user_timestamp_size_]);
ts = mock_app_clock_->Allocate(ts_guard.get());
readoptionscopy.timestamp = &ts;
}
readoptionscopy.snapshot = db->GetSnapshot(); readoptionscopy.snapshot = db->GetSnapshot();
Status s; Status s;
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
@ -6192,6 +6377,11 @@ class Benchmark {
std::unique_ptr<const char[]> key_guard; std::unique_ptr<const char[]> key_guard;
Slice key = AllocateKey(&key_guard); Slice key = AllocateKey(&key_guard);
std::unique_ptr<char[]> ts_guard;
if (user_timestamp_size_ > 0) {
ts_guard.reset(new char[user_timestamp_size_]);
}
// the number of iterations is the larger of read_ or write_ // the number of iterations is the larger of read_ or write_
while (!duration.Done(1)) { while (!duration.Done(1)) {
DB* db = SelectDB(thread); DB* db = SelectDB(thread);
@ -6203,6 +6393,12 @@ class Benchmark {
} }
if (get_weight > 0) { if (get_weight > 0) {
// do all the gets first // do all the gets first
Slice ts;
if (user_timestamp_size_ > 0) {
ts = mock_app_clock_->GetTimestampForRead(thread->rand,
ts_guard.get());
options.timestamp = &ts;
}
Status s = db->Get(options, key, &value); Status s = db->Get(options, key, &value);
if (!s.ok() && !s.IsNotFound()) { if (!s.ok() && !s.IsNotFound()) {
fprintf(stderr, "get error: %s\n", s.ToString().c_str()); fprintf(stderr, "get error: %s\n", s.ToString().c_str());
@ -6217,6 +6413,11 @@ class Benchmark {
} else if (put_weight > 0) { } else if (put_weight > 0) {
// then do all the corresponding number of puts // then do all the corresponding number of puts
// for all the gets we have done earlier // for all the gets we have done earlier
Slice ts;
if (user_timestamp_size_ > 0) {
ts = mock_app_clock_->Allocate(ts_guard.get());
write_options_.timestamp = &ts;
}
Status s = db->Put(write_options_, key, gen.Generate()); Status s = db->Put(write_options_, key, gen.Generate());
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str()); fprintf(stderr, "put error: %s\n", s.ToString().c_str());
@ -6246,15 +6447,25 @@ class Benchmark {
std::unique_ptr<const char[]> key_guard; std::unique_ptr<const char[]> key_guard;
Slice key = AllocateKey(&key_guard); Slice key = AllocateKey(&key_guard);
std::unique_ptr<char[]> ts_guard;
if (user_timestamp_size_ > 0) {
ts_guard.reset(new char[user_timestamp_size_]);
}
// the number of iterations is the larger of read_ or write_ // the number of iterations is the larger of read_ or write_
while (!duration.Done(1)) { while (!duration.Done(1)) {
DB* db = SelectDB(thread); DB* db = SelectDB(thread);
GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key); GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
Slice ts;
if (user_timestamp_size_ > 0) {
// Read with newest timestamp because we are doing rmw.
ts = mock_app_clock_->Allocate(ts_guard.get());
options.timestamp = &ts;
}
auto status = db->Get(options, key, &value); auto status = db->Get(options, key, &value);
if (status.ok()) { if (status.ok()) {
++found; ++found;
bytes += key.size() + value.size(); bytes += key.size() + value.size() + user_timestamp_size_;
} else if (!status.IsNotFound()) { } else if (!status.IsNotFound()) {
fprintf(stderr, "Get returned an error: %s\n", fprintf(stderr, "Get returned an error: %s\n",
status.ToString().c_str()); status.ToString().c_str());
@ -6268,12 +6479,16 @@ class Benchmark {
} }
Slice val = gen.Generate(); Slice val = gen.Generate();
if (user_timestamp_size_ > 0) {
ts = mock_app_clock_->Allocate(ts_guard.get());
write_options_.timestamp = &ts;
}
Status s = db->Put(write_options_, key, val); Status s = db->Put(write_options_, key, val);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str()); fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1); exit(1);
} }
bytes += key.size() + val.size(); bytes += key.size() + val.size() + user_timestamp_size_;
thread->stats.FinishedOps(nullptr, db, 1, kUpdate); thread->stats.FinishedOps(nullptr, db, 1, kUpdate);
} }
char msg[100]; char msg[100];
@ -6298,10 +6513,19 @@ class Benchmark {
std::unique_ptr<const char[]> key_guard; std::unique_ptr<const char[]> key_guard;
Slice key = AllocateKey(&key_guard); Slice key = AllocateKey(&key_guard);
std::unique_ptr<char[]> ts_guard;
if (user_timestamp_size_ > 0) {
ts_guard.reset(new char[user_timestamp_size_]);
}
// the number of iterations is the larger of read_ or write_ // the number of iterations is the larger of read_ or write_
while (!duration.Done(1)) { while (!duration.Done(1)) {
DB* db = SelectDB(thread); DB* db = SelectDB(thread);
GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key); GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
Slice ts;
if (user_timestamp_size_ > 0) {
ts = mock_app_clock_->Allocate(ts_guard.get());
options.timestamp = &ts;
}
auto status = db->Get(options, key, &existing_value); auto status = db->Get(options, key, &existing_value);
if (status.ok()) { if (status.ok()) {
@ -6322,6 +6546,11 @@ class Benchmark {
xor_operator.XOR(nullptr, value, &new_value); xor_operator.XOR(nullptr, value, &new_value);
} }
if (user_timestamp_size_ > 0) {
ts = mock_app_clock_->Allocate(ts_guard.get());
write_options_.timestamp = &ts;
}
Status s = db->Put(write_options_, key, Slice(new_value)); Status s = db->Put(write_options_, key, Slice(new_value));
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str()); fprintf(stderr, "put error: %s\n", s.ToString().c_str());
@ -6347,16 +6576,25 @@ class Benchmark {
std::unique_ptr<const char[]> key_guard; std::unique_ptr<const char[]> key_guard;
Slice key = AllocateKey(&key_guard); Slice key = AllocateKey(&key_guard);
std::unique_ptr<char[]> ts_guard;
if (user_timestamp_size_ > 0) {
ts_guard.reset(new char[user_timestamp_size_]);
}
// The number of iterations is the larger of read_ or write_ // The number of iterations is the larger of read_ or write_
Duration duration(FLAGS_duration, readwrites_); Duration duration(FLAGS_duration, readwrites_);
while (!duration.Done(1)) { while (!duration.Done(1)) {
DB* db = SelectDB(thread); DB* db = SelectDB(thread);
GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key); GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
Slice ts;
if (user_timestamp_size_ > 0) {
ts = mock_app_clock_->Allocate(ts_guard.get());
options.timestamp = &ts;
}
auto status = db->Get(options, key, &value); auto status = db->Get(options, key, &value);
if (status.ok()) { if (status.ok()) {
++found; ++found;
bytes += key.size() + value.size(); bytes += key.size() + value.size() + user_timestamp_size_;
} else if (!status.IsNotFound()) { } else if (!status.IsNotFound()) {
fprintf(stderr, "Get returned an error: %s\n", fprintf(stderr, "Get returned an error: %s\n",
status.ToString().c_str()); status.ToString().c_str());
@ -6374,13 +6612,18 @@ class Benchmark {
} }
value.append(operand.data(), operand.size()); value.append(operand.data(), operand.size());
if (user_timestamp_size_ > 0) {
ts = mock_app_clock_->Allocate(ts_guard.get());
write_options_.timestamp = &ts;
}
// Write back to the database // Write back to the database
Status s = db->Put(write_options_, key, value); Status s = db->Put(write_options_, key, value);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str()); fprintf(stderr, "put error: %s\n", s.ToString().c_str());
ErrorExit(); ErrorExit();
} }
bytes += key.size() + value.size(); bytes += key.size() + value.size() + user_timestamp_size_;
thread->stats.FinishedOps(nullptr, db, 1, kUpdate); thread->stats.FinishedOps(nullptr, db, 1, kUpdate);
} }
@ -6506,8 +6749,15 @@ class Benchmark {
thread->stats.Start(thread->tid); thread->stats.Start(thread->tid);
DB* db = SelectDB(thread); DB* db = SelectDB(thread);
std::unique_ptr<Iterator> iter( ReadOptions read_opts(FLAGS_verify_checksum, true);
db->NewIterator(ReadOptions(FLAGS_verify_checksum, true))); std::unique_ptr<char[]> ts_guard;
Slice ts;
if (user_timestamp_size_ > 0) {
ts_guard.reset(new char[user_timestamp_size_]);
ts = mock_app_clock_->GetTimestampForRead(thread->rand, ts_guard.get());
read_opts.timestamp = &ts;
}
std::unique_ptr<Iterator> iter(db->NewIterator(read_opts));
std::unique_ptr<const char[]> key_guard; std::unique_ptr<const char[]> key_guard;
Slice key = AllocateKey(&key_guard); Slice key = AllocateKey(&key_guard);
@ -6735,6 +6985,10 @@ class Benchmark {
void RandomReplaceKeys(ThreadState* thread) { void RandomReplaceKeys(ThreadState* thread) {
std::unique_ptr<const char[]> key_guard; std::unique_ptr<const char[]> key_guard;
Slice key = AllocateKey(&key_guard); Slice key = AllocateKey(&key_guard);
std::unique_ptr<char[]> ts_guard;
if (user_timestamp_size_ > 0) {
ts_guard.reset(new char[user_timestamp_size_]);
}
std::vector<uint32_t> counters(FLAGS_numdistinct, 0); std::vector<uint32_t> counters(FLAGS_numdistinct, 0);
size_t max_counter = 50; size_t max_counter = 50;
RandomGenerator gen; RandomGenerator gen;
@ -6743,6 +6997,11 @@ class Benchmark {
DB* db = SelectDB(thread); DB* db = SelectDB(thread);
for (int64_t i = 0; i < FLAGS_numdistinct; i++) { for (int64_t i = 0; i < FLAGS_numdistinct; i++) {
GenerateKeyFromInt(i * max_counter, FLAGS_num, &key); GenerateKeyFromInt(i * max_counter, FLAGS_num, &key);
Slice ts;
if (user_timestamp_size_ > 0) {
ts = mock_app_clock_->Allocate(ts_guard.get());
write_options_.timestamp = &ts;
}
s = db->Put(write_options_, key, gen.Generate()); s = db->Put(write_options_, key, gen.Generate());
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "Operation failed: %s\n", s.ToString().c_str()); fprintf(stderr, "Operation failed: %s\n", s.ToString().c_str());
@ -6762,12 +7021,21 @@ class Benchmark {
static_cast<int64_t>(0)); static_cast<int64_t>(0));
GenerateKeyFromInt(key_id * max_counter + counters[key_id], FLAGS_num, GenerateKeyFromInt(key_id * max_counter + counters[key_id], FLAGS_num,
&key); &key);
Slice ts;
if (user_timestamp_size_ > 0) {
ts = mock_app_clock_->Allocate(ts_guard.get());
write_options_.timestamp = &ts;
}
s = FLAGS_use_single_deletes ? db->SingleDelete(write_options_, key) s = FLAGS_use_single_deletes ? db->SingleDelete(write_options_, key)
: db->Delete(write_options_, key); : db->Delete(write_options_, key);
if (s.ok()) { if (s.ok()) {
counters[key_id] = (counters[key_id] + 1) % max_counter; counters[key_id] = (counters[key_id] + 1) % max_counter;
GenerateKeyFromInt(key_id * max_counter + counters[key_id], FLAGS_num, GenerateKeyFromInt(key_id * max_counter + counters[key_id], FLAGS_num,
&key); &key);
if (user_timestamp_size_ > 0) {
ts = mock_app_clock_->Allocate(ts_guard.get());
write_options_.timestamp = &ts;
}
s = db->Put(write_options_, key, Slice()); s = db->Put(write_options_, key, Slice());
} }