2019-12-09 08:49:32 +01:00
|
|
|
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
//
|
|
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
|
|
|
|
#ifdef GFLAGS
|
|
|
|
#include "db_stress_tool/db_stress_common.h"
|
2020-04-11 02:18:56 +02:00
|
|
|
#ifndef NDEBUG
|
2020-07-09 23:33:42 +02:00
|
|
|
#include "utilities/fault_injection_fs.h"
|
2020-04-11 02:18:56 +02:00
|
|
|
#endif // NDEBUG
|
2019-12-09 08:49:32 +01:00
|
|
|
|
2020-02-20 21:07:53 +01:00
|
|
|
namespace ROCKSDB_NAMESPACE {
|
2019-12-09 08:49:32 +01:00
|
|
|
class NonBatchedOpsStressTest : public StressTest {
|
|
|
|
public:
|
|
|
|
NonBatchedOpsStressTest() {}
|
|
|
|
|
|
|
|
virtual ~NonBatchedOpsStressTest() {}
|
|
|
|
|
2019-12-09 23:36:10 +01:00
|
|
|
void VerifyDb(ThreadState* thread) const override {
|
2022-02-17 08:17:03 +01:00
|
|
|
// This `ReadOptions` is for validation purposes. Ignore
|
|
|
|
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
|
2019-12-09 08:49:32 +01:00
|
|
|
ReadOptions options(FLAGS_verify_checksum, true);
|
Add user-defined timestamps to db_stress (#8061)
Summary:
Add some basic test for user-defined timestamp to db_stress. Currently,
read with timestamp always tries to read using the current timestamp.
Due to the per-key timestamp-sequence ordering constraint, we only add timestamp-
related tests to the `NonBatchedOpsStressTest` since this test serializes accesses
to the same key and uses a file to cross-check data correctness.
The timestamp feature is not supported in a number of components, e.g. Merge, SingleDelete,
DeleteRange, CompactionFilter, Readonly instance, secondary instance, SST file ingestion, transaction,
etc. Therefore, db_stress should exit if user enables both timestamp and these features at the same
time. The (currently) incompatible features can be found in
`CheckAndSetOptionsForUserTimestamp`.
This PR also fixes a bug triggered when timestamp is enabled together with
`index_type=kBinarySearchWithFirstKey`. This bug fix will also be in another separate PR
with more unit tests coverage. Fixing it here because I do not want to exclude the index type
from crash test.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/8061
Test Plan: make crash_test_with_ts
Reviewed By: jay-zhuang
Differential Revision: D27056282
Pulled By: riversand963
fbshipit-source-id: c3e00ad1023fdb9ebbdf9601ec18270c5e2925a9
2021-03-23 13:12:04 +01:00
|
|
|
std::string ts_str;
|
|
|
|
Slice ts;
|
|
|
|
if (FLAGS_user_timestamp_size > 0) {
|
|
|
|
ts_str = GenerateTimestampForRead();
|
|
|
|
ts = ts_str;
|
|
|
|
options.timestamp = &ts;
|
|
|
|
}
|
2019-12-09 08:49:32 +01:00
|
|
|
auto shared = thread->shared;
|
|
|
|
const int64_t max_key = shared->GetMaxKey();
|
|
|
|
const int64_t keys_per_thread = max_key / shared->GetNumThreads();
|
|
|
|
int64_t start = keys_per_thread * thread->tid;
|
|
|
|
int64_t end = start + keys_per_thread;
|
|
|
|
uint64_t prefix_to_use =
|
|
|
|
(FLAGS_prefix_size < 0) ? 1 : static_cast<size_t>(FLAGS_prefix_size);
|
|
|
|
if (thread->tid == shared->GetNumThreads() - 1) {
|
|
|
|
end = max_key;
|
|
|
|
}
|
|
|
|
for (size_t cf = 0; cf < column_families_.size(); ++cf) {
|
|
|
|
if (thread->shared->HasVerificationFailedYet()) {
|
|
|
|
break;
|
|
|
|
}
|
2020-05-19 21:41:43 +02:00
|
|
|
if (thread->rand.OneIn(3)) {
|
|
|
|
// 1/3 chance use iterator to verify this range
|
2020-01-10 06:25:40 +01:00
|
|
|
Slice prefix;
|
|
|
|
std::string seek_key = Key(start);
|
2019-12-09 08:49:32 +01:00
|
|
|
std::unique_ptr<Iterator> iter(
|
|
|
|
db_->NewIterator(options, column_families_[cf]));
|
2020-01-10 06:25:40 +01:00
|
|
|
iter->Seek(seek_key);
|
|
|
|
prefix = Slice(seek_key.data(), prefix_to_use);
|
2019-12-09 08:49:32 +01:00
|
|
|
for (auto i = start; i < end; i++) {
|
|
|
|
if (thread->shared->HasVerificationFailedYet()) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
std::string from_db;
|
|
|
|
std::string keystr = Key(i);
|
|
|
|
Slice k = keystr;
|
2020-01-10 06:25:40 +01:00
|
|
|
Slice pfx = Slice(keystr.data(), prefix_to_use);
|
|
|
|
// Reseek when the prefix changes
|
|
|
|
if (prefix_to_use > 0 && prefix.compare(pfx) != 0) {
|
|
|
|
iter->Seek(k);
|
|
|
|
seek_key = keystr;
|
|
|
|
prefix = Slice(seek_key.data(), prefix_to_use);
|
|
|
|
}
|
2019-12-09 08:49:32 +01:00
|
|
|
Status s = iter->status();
|
|
|
|
if (iter->Valid()) {
|
2020-01-10 06:25:40 +01:00
|
|
|
Slice iter_key = iter->key();
|
2019-12-09 08:49:32 +01:00
|
|
|
if (iter->key().compare(k) > 0) {
|
|
|
|
s = Status::NotFound(Slice());
|
|
|
|
} else if (iter->key().compare(k) == 0) {
|
|
|
|
from_db = iter->value().ToString();
|
|
|
|
iter->Next();
|
2020-01-10 06:25:40 +01:00
|
|
|
} else if (iter_key.compare(k) < 0) {
|
2019-12-09 08:49:32 +01:00
|
|
|
VerificationAbort(shared, "An out of range key was found",
|
|
|
|
static_cast<int>(cf), i);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// The iterator found no value for the key in question, so do not
|
|
|
|
// move to the next item in the iterator
|
2019-12-20 17:46:52 +01:00
|
|
|
s = Status::NotFound();
|
2019-12-09 08:49:32 +01:00
|
|
|
}
|
|
|
|
VerifyValue(static_cast<int>(cf), i, options, shared, from_db, s,
|
|
|
|
true);
|
|
|
|
if (from_db.length()) {
|
|
|
|
PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
|
|
|
|
from_db.data(), from_db.length());
|
|
|
|
}
|
|
|
|
}
|
2020-05-19 21:41:43 +02:00
|
|
|
} else if (thread->rand.OneIn(2)) {
|
|
|
|
// 1/3 chance use Get to verify this range
|
2019-12-09 08:49:32 +01:00
|
|
|
for (auto i = start; i < end; i++) {
|
|
|
|
if (thread->shared->HasVerificationFailedYet()) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
std::string from_db;
|
|
|
|
std::string keystr = Key(i);
|
|
|
|
Slice k = keystr;
|
|
|
|
Status s = db_->Get(options, column_families_[cf], k, &from_db);
|
|
|
|
VerifyValue(static_cast<int>(cf), i, options, shared, from_db, s,
|
|
|
|
true);
|
|
|
|
if (from_db.length()) {
|
|
|
|
PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
|
|
|
|
from_db.data(), from_db.length());
|
|
|
|
}
|
|
|
|
}
|
2020-05-19 21:41:43 +02:00
|
|
|
} else {
|
|
|
|
// 1/3 chance use MultiGet to verify this range
|
|
|
|
for (auto i = start; i < end;) {
|
|
|
|
if (thread->shared->HasVerificationFailedYet()) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
// Keep the batch size to some reasonable value
|
|
|
|
size_t batch_size = thread->rand.Uniform(128) + 1;
|
|
|
|
batch_size = std::min<size_t>(batch_size, end - i);
|
|
|
|
std::vector<std::string> keystrs(batch_size);
|
|
|
|
std::vector<Slice> keys(batch_size);
|
|
|
|
std::vector<PinnableSlice> values(batch_size);
|
|
|
|
std::vector<Status> statuses(batch_size);
|
|
|
|
for (size_t j = 0; j < batch_size; ++j) {
|
|
|
|
keystrs[j] = Key(i + j);
|
|
|
|
keys[j] = Slice(keystrs[j].data(), keystrs[j].length());
|
|
|
|
}
|
|
|
|
db_->MultiGet(options, column_families_[cf], batch_size, keys.data(),
|
|
|
|
values.data(), statuses.data());
|
|
|
|
for (size_t j = 0; j < batch_size; ++j) {
|
|
|
|
Status s = statuses[j];
|
|
|
|
std::string from_db = values[j].ToString();
|
|
|
|
VerifyValue(static_cast<int>(cf), i + j, options, shared, from_db,
|
|
|
|
s, true);
|
|
|
|
if (from_db.length()) {
|
|
|
|
PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i + j),
|
|
|
|
from_db.data(), from_db.length());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
i += batch_size;
|
|
|
|
}
|
2019-12-09 08:49:32 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-12-09 23:36:10 +01:00
|
|
|
void MaybeClearOneColumnFamily(ThreadState* thread) override {
|
2019-12-13 22:25:14 +01:00
|
|
|
if (FLAGS_column_families > 1) {
|
|
|
|
if (thread->rand.OneInOpt(FLAGS_clear_column_family_one_in)) {
|
2019-12-09 08:49:32 +01:00
|
|
|
// drop column family and then create it again (can't drop default)
|
|
|
|
int cf = thread->rand.Next() % (FLAGS_column_families - 1) + 1;
|
|
|
|
std::string new_name = ToString(new_column_family_name_.fetch_add(1));
|
|
|
|
{
|
|
|
|
MutexLock l(thread->shared->GetMutex());
|
|
|
|
fprintf(
|
|
|
|
stdout,
|
|
|
|
"[CF %d] Dropping and recreating column family. new name: %s\n",
|
|
|
|
cf, new_name.c_str());
|
|
|
|
}
|
|
|
|
thread->shared->LockColumnFamily(cf);
|
|
|
|
Status s = db_->DropColumnFamily(column_families_[cf]);
|
|
|
|
delete column_families_[cf];
|
|
|
|
if (!s.ok()) {
|
|
|
|
fprintf(stderr, "dropping column family error: %s\n",
|
|
|
|
s.ToString().c_str());
|
|
|
|
std::terminate();
|
|
|
|
}
|
|
|
|
s = db_->CreateColumnFamily(ColumnFamilyOptions(options_), new_name,
|
|
|
|
&column_families_[cf]);
|
|
|
|
column_family_names_[cf] = new_name;
|
|
|
|
thread->shared->ClearColumnFamily(cf);
|
|
|
|
if (!s.ok()) {
|
|
|
|
fprintf(stderr, "creating column family error: %s\n",
|
|
|
|
s.ToString().c_str());
|
|
|
|
std::terminate();
|
|
|
|
}
|
|
|
|
thread->shared->UnlockColumnFamily(cf);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-12-09 23:36:10 +01:00
|
|
|
bool ShouldAcquireMutexOnKey() const override { return true; }
|
2019-12-09 08:49:32 +01:00
|
|
|
|
2021-12-07 22:40:46 +01:00
|
|
|
bool IsStateTracked() const override { return true; }
|
|
|
|
|
2019-12-09 23:36:10 +01:00
|
|
|
Status TestGet(ThreadState* thread, const ReadOptions& read_opts,
|
|
|
|
const std::vector<int>& rand_column_families,
|
|
|
|
const std::vector<int64_t>& rand_keys) override {
|
2019-12-09 08:49:32 +01:00
|
|
|
auto cfh = column_families_[rand_column_families[0]];
|
|
|
|
std::string key_str = Key(rand_keys[0]);
|
|
|
|
Slice key = key_str;
|
|
|
|
std::string from_db;
|
2020-04-11 02:18:56 +02:00
|
|
|
int error_count = 0;
|
|
|
|
|
|
|
|
#ifndef NDEBUG
|
|
|
|
if (fault_fs_guard) {
|
|
|
|
fault_fs_guard->EnableErrorInjection();
|
2020-04-24 22:03:08 +02:00
|
|
|
SharedState::ignore_read_error = false;
|
2020-04-11 02:18:56 +02:00
|
|
|
}
|
|
|
|
#endif // NDEBUG
|
2019-12-09 08:49:32 +01:00
|
|
|
Status s = db_->Get(read_opts, cfh, key, &from_db);
|
2020-04-11 02:18:56 +02:00
|
|
|
#ifndef NDEBUG
|
|
|
|
if (fault_fs_guard) {
|
|
|
|
error_count = fault_fs_guard->GetAndResetErrorCount();
|
|
|
|
}
|
|
|
|
#endif // NDEBUG
|
2019-12-09 08:49:32 +01:00
|
|
|
if (s.ok()) {
|
2020-04-11 02:18:56 +02:00
|
|
|
#ifndef NDEBUG
|
|
|
|
if (fault_fs_guard) {
|
2020-04-24 22:03:08 +02:00
|
|
|
if (error_count && !SharedState::ignore_read_error) {
|
2020-04-11 02:18:56 +02:00
|
|
|
// Grab mutex so multiple thread don't try to print the
|
|
|
|
// stack trace at the same time
|
|
|
|
MutexLock l(thread->shared->GetMutex());
|
|
|
|
fprintf(stderr, "Didn't get expected error from Get\n");
|
2020-04-14 20:04:39 +02:00
|
|
|
fprintf(stderr, "Callstack that injected the fault\n");
|
2020-04-11 02:18:56 +02:00
|
|
|
fault_fs_guard->PrintFaultBacktrace();
|
|
|
|
std::terminate();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
#endif // NDEBUG
|
2019-12-09 08:49:32 +01:00
|
|
|
// found case
|
|
|
|
thread->stats.AddGets(1, 1);
|
|
|
|
} else if (s.IsNotFound()) {
|
|
|
|
// not found case
|
|
|
|
thread->stats.AddGets(1, 0);
|
|
|
|
} else {
|
2020-04-11 02:18:56 +02:00
|
|
|
if (error_count == 0) {
|
|
|
|
// errors case
|
|
|
|
thread->stats.AddErrors(1);
|
|
|
|
} else {
|
|
|
|
thread->stats.AddVerifiedErrors(1);
|
|
|
|
}
|
2019-12-09 08:49:32 +01:00
|
|
|
}
|
2020-04-11 02:18:56 +02:00
|
|
|
#ifndef NDEBUG
|
|
|
|
if (fault_fs_guard) {
|
|
|
|
fault_fs_guard->DisableErrorInjection();
|
|
|
|
}
|
|
|
|
#endif // NDEBUG
|
2019-12-09 08:49:32 +01:00
|
|
|
return s;
|
|
|
|
}
|
|
|
|
|
2019-12-09 23:36:10 +01:00
|
|
|
std::vector<Status> TestMultiGet(
|
2019-12-09 08:49:32 +01:00
|
|
|
ThreadState* thread, const ReadOptions& read_opts,
|
|
|
|
const std::vector<int>& rand_column_families,
|
2019-12-09 23:36:10 +01:00
|
|
|
const std::vector<int64_t>& rand_keys) override {
|
2019-12-09 08:49:32 +01:00
|
|
|
size_t num_keys = rand_keys.size();
|
|
|
|
std::vector<std::string> key_str;
|
|
|
|
std::vector<Slice> keys;
|
|
|
|
key_str.reserve(num_keys);
|
|
|
|
keys.reserve(num_keys);
|
|
|
|
std::vector<PinnableSlice> values(num_keys);
|
|
|
|
std::vector<Status> statuses(num_keys);
|
|
|
|
ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
|
2020-04-11 02:18:56 +02:00
|
|
|
int error_count = 0;
|
2020-05-19 21:41:43 +02:00
|
|
|
// Do a consistency check between Get and MultiGet. Don't do it too
|
|
|
|
// often as it will slow db_stress down
|
|
|
|
bool do_consistency_check = thread->rand.OneIn(4);
|
|
|
|
|
|
|
|
ReadOptions readoptionscopy = read_opts;
|
|
|
|
if (do_consistency_check) {
|
|
|
|
readoptionscopy.snapshot = db_->GetSnapshot();
|
|
|
|
}
|
2019-12-09 08:49:32 +01:00
|
|
|
|
2019-12-25 03:45:11 +01:00
|
|
|
// To appease clang analyzer
|
|
|
|
const bool use_txn = FLAGS_use_txn;
|
|
|
|
|
2019-12-21 08:10:58 +01:00
|
|
|
// Create a transaction in order to write some data. The purpose is to
|
|
|
|
// exercise WriteBatchWithIndex::MultiGetFromBatchAndDB. The transaction
|
|
|
|
// will be rolled back once MultiGet returns.
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
Transaction* txn = nullptr;
|
2019-12-25 03:45:11 +01:00
|
|
|
if (use_txn) {
|
2019-12-21 08:10:58 +01:00
|
|
|
WriteOptions wo;
|
2019-12-25 03:45:11 +01:00
|
|
|
Status s = NewTxn(wo, &txn);
|
|
|
|
if (!s.ok()) {
|
|
|
|
fprintf(stderr, "NewTxn: %s\n", s.ToString().c_str());
|
|
|
|
std::terminate();
|
|
|
|
}
|
2019-12-21 08:10:58 +01:00
|
|
|
}
|
|
|
|
#endif
|
2019-12-09 08:49:32 +01:00
|
|
|
for (size_t i = 0; i < num_keys; ++i) {
|
|
|
|
key_str.emplace_back(Key(rand_keys[i]));
|
|
|
|
keys.emplace_back(key_str.back());
|
2019-12-21 08:10:58 +01:00
|
|
|
#ifndef ROCKSDB_LITE
|
2019-12-25 03:45:11 +01:00
|
|
|
if (use_txn) {
|
2019-12-21 08:10:58 +01:00
|
|
|
// With a 1 in 10 probability, insert the just added key in the batch
|
|
|
|
// into the transaction. This will create an overlap with the MultiGet
|
|
|
|
// keys and exercise some corner cases in the code
|
|
|
|
if (thread->rand.OneIn(10)) {
|
|
|
|
int op = thread->rand.Uniform(2);
|
|
|
|
Status s;
|
|
|
|
switch (op) {
|
|
|
|
case 0:
|
|
|
|
case 1: {
|
|
|
|
uint32_t value_base =
|
|
|
|
thread->rand.Next() % thread->shared->UNKNOWN_SENTINEL;
|
|
|
|
char value[100];
|
|
|
|
size_t sz = GenerateValue(value_base, value, sizeof(value));
|
|
|
|
Slice v(value, sz);
|
|
|
|
if (op == 0) {
|
|
|
|
s = txn->Put(cfh, keys.back(), v);
|
|
|
|
} else {
|
|
|
|
s = txn->Merge(cfh, keys.back(), v);
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case 2:
|
|
|
|
s = txn->Delete(cfh, keys.back());
|
|
|
|
break;
|
|
|
|
default:
|
|
|
|
assert(false);
|
|
|
|
}
|
|
|
|
if (!s.ok()) {
|
|
|
|
fprintf(stderr, "Transaction put: %s\n", s.ToString().c_str());
|
|
|
|
std::terminate();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
#endif
|
2019-12-09 08:49:32 +01:00
|
|
|
}
|
2019-12-21 08:10:58 +01:00
|
|
|
|
2019-12-25 03:45:11 +01:00
|
|
|
if (!use_txn) {
|
2020-04-11 02:18:56 +02:00
|
|
|
#ifndef NDEBUG
|
|
|
|
if (fault_fs_guard) {
|
|
|
|
fault_fs_guard->EnableErrorInjection();
|
2020-04-24 22:03:08 +02:00
|
|
|
SharedState::ignore_read_error = false;
|
2020-04-11 02:18:56 +02:00
|
|
|
}
|
|
|
|
#endif // NDEBUG
|
2020-05-19 21:41:43 +02:00
|
|
|
db_->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(),
|
2019-12-21 08:10:58 +01:00
|
|
|
statuses.data());
|
2020-04-11 02:18:56 +02:00
|
|
|
#ifndef NDEBUG
|
|
|
|
if (fault_fs_guard) {
|
|
|
|
error_count = fault_fs_guard->GetAndResetErrorCount();
|
|
|
|
}
|
|
|
|
#endif // NDEBUG
|
2019-12-21 08:10:58 +01:00
|
|
|
} else {
|
|
|
|
#ifndef ROCKSDB_LITE
|
2020-05-19 21:41:43 +02:00
|
|
|
txn->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(),
|
2019-12-21 08:10:58 +01:00
|
|
|
statuses.data());
|
|
|
|
#endif
|
|
|
|
}
|
|
|
|
|
2020-04-11 02:18:56 +02:00
|
|
|
#ifndef NDEBUG
|
2020-04-24 22:03:08 +02:00
|
|
|
if (fault_fs_guard && error_count && !SharedState::ignore_read_error) {
|
2020-04-14 20:04:39 +02:00
|
|
|
int stat_nok = 0;
|
|
|
|
for (const auto& s : statuses) {
|
|
|
|
if (!s.ok() && !s.IsNotFound()) {
|
|
|
|
stat_nok++;
|
2020-04-11 02:18:56 +02:00
|
|
|
}
|
2020-04-14 20:04:39 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
if (stat_nok < error_count) {
|
|
|
|
// Grab mutex so multiple thread don't try to print the
|
|
|
|
// stack trace at the same time
|
|
|
|
MutexLock l(thread->shared->GetMutex());
|
2021-09-21 23:47:09 +02:00
|
|
|
fprintf(stderr, "Didn't get expected error from MultiGet. \n");
|
|
|
|
fprintf(stderr, "num_keys %zu Expected %d errors, seen %d\n", num_keys,
|
|
|
|
error_count, stat_nok);
|
2020-04-14 20:04:39 +02:00
|
|
|
fprintf(stderr, "Callstack that injected the fault\n");
|
|
|
|
fault_fs_guard->PrintFaultBacktrace();
|
|
|
|
std::terminate();
|
|
|
|
}
|
|
|
|
}
|
2020-05-19 21:41:43 +02:00
|
|
|
if (fault_fs_guard) {
|
|
|
|
fault_fs_guard->DisableErrorInjection();
|
|
|
|
}
|
2020-04-11 02:18:56 +02:00
|
|
|
#endif // NDEBUG
|
2020-04-14 20:04:39 +02:00
|
|
|
|
2020-05-19 21:41:43 +02:00
|
|
|
for (size_t i = 0; i < statuses.size(); ++i) {
|
|
|
|
Status s = statuses[i];
|
|
|
|
bool is_consistent = true;
|
|
|
|
// Only do the consistency check if no error was injected and MultiGet
|
|
|
|
// didn't return an unexpected error
|
|
|
|
if (do_consistency_check && !error_count && (s.ok() || s.IsNotFound())) {
|
|
|
|
Status tmp_s;
|
|
|
|
std::string value;
|
|
|
|
|
2020-05-20 23:45:03 +02:00
|
|
|
if (use_txn) {
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
tmp_s = txn->Get(readoptionscopy, cfh, keys[i], &value);
|
|
|
|
#endif // ROCKSDB_LITE
|
|
|
|
} else {
|
|
|
|
tmp_s = db_->Get(readoptionscopy, cfh, keys[i], &value);
|
|
|
|
}
|
2020-05-19 21:41:43 +02:00
|
|
|
if (!tmp_s.ok() && !tmp_s.IsNotFound()) {
|
|
|
|
fprintf(stderr, "Get error: %s\n", s.ToString().c_str());
|
|
|
|
is_consistent = false;
|
|
|
|
} else if (!s.ok() && tmp_s.ok()) {
|
|
|
|
fprintf(stderr, "MultiGet returned different results with key %s\n",
|
|
|
|
keys[i].ToString(true).c_str());
|
|
|
|
fprintf(stderr, "Get returned ok, MultiGet returned not found\n");
|
|
|
|
is_consistent = false;
|
|
|
|
} else if (s.ok() && tmp_s.IsNotFound()) {
|
|
|
|
fprintf(stderr, "MultiGet returned different results with key %s\n",
|
|
|
|
keys[i].ToString(true).c_str());
|
|
|
|
fprintf(stderr, "MultiGet returned ok, Get returned not found\n");
|
|
|
|
is_consistent = false;
|
|
|
|
} else if (s.ok() && value != values[i].ToString()) {
|
|
|
|
fprintf(stderr, "MultiGet returned different results with key %s\n",
|
|
|
|
keys[i].ToString(true).c_str());
|
|
|
|
fprintf(stderr, "MultiGet returned value %s\n",
|
|
|
|
values[i].ToString(true).c_str());
|
|
|
|
fprintf(stderr, "Get returned value %s\n", value.c_str());
|
|
|
|
is_consistent = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!is_consistent) {
|
|
|
|
fprintf(stderr, "TestMultiGet error: is_consistent is false\n");
|
|
|
|
thread->stats.AddErrors(1);
|
|
|
|
// Fail fast to preserve the DB state
|
|
|
|
thread->shared->SetVerificationFailure();
|
|
|
|
break;
|
|
|
|
} else if (s.ok()) {
|
2020-04-14 20:04:39 +02:00
|
|
|
// found case
|
|
|
|
thread->stats.AddGets(1, 1);
|
2019-12-09 08:49:32 +01:00
|
|
|
} else if (s.IsNotFound()) {
|
|
|
|
// not found case
|
|
|
|
thread->stats.AddGets(1, 0);
|
2020-01-03 22:05:21 +01:00
|
|
|
} else if (s.IsMergeInProgress() && use_txn) {
|
|
|
|
// With txn this is sometimes expected.
|
|
|
|
thread->stats.AddGets(1, 1);
|
2019-12-09 08:49:32 +01:00
|
|
|
} else {
|
2020-04-11 02:18:56 +02:00
|
|
|
if (error_count == 0) {
|
|
|
|
// errors case
|
|
|
|
fprintf(stderr, "MultiGet error: %s\n", s.ToString().c_str());
|
|
|
|
thread->stats.AddErrors(1);
|
|
|
|
} else {
|
|
|
|
thread->stats.AddVerifiedErrors(1);
|
|
|
|
}
|
2019-12-09 08:49:32 +01:00
|
|
|
}
|
|
|
|
}
|
2020-05-19 21:41:43 +02:00
|
|
|
|
|
|
|
if (readoptionscopy.snapshot) {
|
|
|
|
db_->ReleaseSnapshot(readoptionscopy.snapshot);
|
2020-04-11 02:18:56 +02:00
|
|
|
}
|
2020-05-25 00:25:43 +02:00
|
|
|
if (use_txn) {
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
RollbackTxn(txn);
|
|
|
|
#endif
|
|
|
|
}
|
2019-12-09 08:49:32 +01:00
|
|
|
return statuses;
|
|
|
|
}
|
|
|
|
|
2019-12-09 23:36:10 +01:00
|
|
|
Status TestPrefixScan(ThreadState* thread, const ReadOptions& read_opts,
|
|
|
|
const std::vector<int>& rand_column_families,
|
|
|
|
const std::vector<int64_t>& rand_keys) override {
|
2019-12-09 08:49:32 +01:00
|
|
|
auto cfh = column_families_[rand_column_families[0]];
|
|
|
|
std::string key_str = Key(rand_keys[0]);
|
|
|
|
Slice key = key_str;
|
|
|
|
Slice prefix = Slice(key.data(), FLAGS_prefix_size);
|
|
|
|
|
|
|
|
std::string upper_bound;
|
|
|
|
Slice ub_slice;
|
|
|
|
ReadOptions ro_copy = read_opts;
|
2020-01-10 06:25:40 +01:00
|
|
|
// Get the next prefix first and then see if we want to set upper bound.
|
|
|
|
// We'll use the next prefix in an assertion later on
|
|
|
|
if (GetNextPrefix(prefix, &upper_bound) && thread->rand.OneIn(2)) {
|
2019-12-09 08:49:32 +01:00
|
|
|
// For half of the time, set the upper bound to the next prefix
|
|
|
|
ub_slice = Slice(upper_bound);
|
|
|
|
ro_copy.iterate_upper_bound = &ub_slice;
|
|
|
|
}
|
|
|
|
|
|
|
|
Iterator* iter = db_->NewIterator(ro_copy, cfh);
|
2020-01-10 06:25:40 +01:00
|
|
|
unsigned long count = 0;
|
2019-12-09 08:49:32 +01:00
|
|
|
for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
|
|
|
|
iter->Next()) {
|
|
|
|
++count;
|
|
|
|
}
|
2020-01-10 06:25:40 +01:00
|
|
|
|
|
|
|
assert(count <= GetPrefixKeyCount(prefix.ToString(), upper_bound));
|
|
|
|
|
2019-12-09 08:49:32 +01:00
|
|
|
Status s = iter->status();
|
|
|
|
if (iter->status().ok()) {
|
|
|
|
thread->stats.AddPrefixes(1, count);
|
|
|
|
} else {
|
2020-01-03 01:43:55 +01:00
|
|
|
fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str());
|
2019-12-09 08:49:32 +01:00
|
|
|
thread->stats.AddErrors(1);
|
|
|
|
}
|
|
|
|
delete iter;
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
|
2019-12-09 23:36:10 +01:00
|
|
|
Status TestPut(ThreadState* thread, WriteOptions& write_opts,
|
|
|
|
const ReadOptions& read_opts,
|
|
|
|
const std::vector<int>& rand_column_families,
|
|
|
|
const std::vector<int64_t>& rand_keys, char (&value)[100],
|
|
|
|
std::unique_ptr<MutexLock>& lock) override {
|
2019-12-09 08:49:32 +01:00
|
|
|
auto shared = thread->shared;
|
|
|
|
int64_t max_key = shared->GetMaxKey();
|
|
|
|
int64_t rand_key = rand_keys[0];
|
|
|
|
int rand_column_family = rand_column_families[0];
|
Add user-defined timestamps to db_stress (#8061)
Summary:
Add some basic test for user-defined timestamp to db_stress. Currently,
read with timestamp always tries to read using the current timestamp.
Due to the per-key timestamp-sequence ordering constraint, we only add timestamp-
related tests to the `NonBatchedOpsStressTest` since this test serializes accesses
to the same key and uses a file to cross-check data correctness.
The timestamp feature is not supported in a number of components, e.g. Merge, SingleDelete,
DeleteRange, CompactionFilter, Readonly instance, secondary instance, SST file ingestion, transaction,
etc. Therefore, db_stress should exit if user enables both timestamp and these features at the same
time. The (currently) incompatible features can be found in
`CheckAndSetOptionsForUserTimestamp`.
This PR also fixes a bug triggered when timestamp is enabled together with
`index_type=kBinarySearchWithFirstKey`. This bug fix will also be in another separate PR
with more unit tests coverage. Fixing it here because I do not want to exclude the index type
from crash test.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/8061
Test Plan: make crash_test_with_ts
Reviewed By: jay-zhuang
Differential Revision: D27056282
Pulled By: riversand963
fbshipit-source-id: c3e00ad1023fdb9ebbdf9601ec18270c5e2925a9
2021-03-23 13:12:04 +01:00
|
|
|
std::string write_ts_str;
|
|
|
|
Slice write_ts;
|
2019-12-09 08:49:32 +01:00
|
|
|
while (!shared->AllowsOverwrite(rand_key) &&
|
|
|
|
(FLAGS_use_merge || shared->Exists(rand_column_family, rand_key))) {
|
|
|
|
lock.reset();
|
|
|
|
rand_key = thread->rand.Next() % max_key;
|
|
|
|
rand_column_family = thread->rand.Next() % FLAGS_column_families;
|
|
|
|
lock.reset(
|
|
|
|
new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
|
Add user-defined timestamps to db_stress (#8061)
Summary:
Add some basic test for user-defined timestamp to db_stress. Currently,
read with timestamp always tries to read using the current timestamp.
Due to the per-key timestamp-sequence ordering constraint, we only add timestamp-
related tests to the `NonBatchedOpsStressTest` since this test serializes accesses
to the same key and uses a file to cross-check data correctness.
The timestamp feature is not supported in a number of components, e.g. Merge, SingleDelete,
DeleteRange, CompactionFilter, Readonly instance, secondary instance, SST file ingestion, transaction,
etc. Therefore, db_stress should exit if user enables both timestamp and these features at the same
time. The (currently) incompatible features can be found in
`CheckAndSetOptionsForUserTimestamp`.
This PR also fixes a bug triggered when timestamp is enabled together with
`index_type=kBinarySearchWithFirstKey`. This bug fix will also be in another separate PR
with more unit tests coverage. Fixing it here because I do not want to exclude the index type
from crash test.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/8061
Test Plan: make crash_test_with_ts
Reviewed By: jay-zhuang
Differential Revision: D27056282
Pulled By: riversand963
fbshipit-source-id: c3e00ad1023fdb9ebbdf9601ec18270c5e2925a9
2021-03-23 13:12:04 +01:00
|
|
|
if (FLAGS_user_timestamp_size > 0) {
|
|
|
|
write_ts_str = NowNanosStr();
|
|
|
|
write_ts = write_ts_str;
|
|
|
|
}
|
2019-12-09 08:49:32 +01:00
|
|
|
}
|
Revise APIs related to user-defined timestamp (#8946)
Summary:
ajkr reminded me that we have a rule of not including per-kv related data in `WriteOptions`.
Namely, `WriteOptions` should not include information about "what-to-write", but should just
include information about "how-to-write".
According to this rule, `WriteOptions::timestamp` (experimental) is clearly a violation. Therefore,
this PR removes `WriteOptions::timestamp` for compliance.
After the removal, we need to pass timestamp info via another set of APIs. This PR proposes a set
of overloaded functions `Put(write_opts, key, value, ts)`, `Delete(write_opts, key, ts)`, and
`SingleDelete(write_opts, key, ts)`. Planned to add `Write(write_opts, batch, ts)`, but its complexity
made me reconsider doing it in another PR (maybe).
For better checking and returning error early, we also add a new set of APIs to `WriteBatch` that take
extra `timestamp` information when writing to `WriteBatch`es.
These set of APIs in `WriteBatchWithIndex` are currently not supported, and are on our TODO list.
Removed `WriteBatch::AssignTimestamps()` and renamed `WriteBatch::AssignTimestamp()` to
`WriteBatch::UpdateTimestamps()` since this method require that all keys have space for timestamps
allocated already and multiple timestamps can be updated.
The constructor of `WriteBatch` now takes a fourth argument `default_cf_ts_sz` which is the timestamp
size of the default column family. This will be used to allocate space when calling APIs that do not
specify a column family handle.
Also, updated `DB::Get()`, `DB::MultiGet()`, `DB::NewIterator()`, `DB::NewIterators()` methods, replacing
some assertions about timestamp to returning Status code.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/8946
Test Plan:
make check
./db_bench -benchmarks=fillseq,fillrandom,readrandom,readseq,deleterandom -user_timestamp_size=8
./db_stress --user_timestamp_size=8 -nooverwritepercent=0 -test_secondary=0 -secondary_catch_up_one_in=0 -continuous_verification_interval=0
Make sure there is no perf regression by running the following
```
./db_bench_opt -db=/dev/shm/rocksdb -use_existing_db=0 -level0_stop_writes_trigger=256 -level0_slowdown_writes_trigger=256 -level0_file_num_compaction_trigger=256 -disable_wal=1 -duration=10 -benchmarks=fillrandom
```
Before this PR
```
DB path: [/dev/shm/rocksdb]
fillrandom : 1.831 micros/op 546235 ops/sec; 60.4 MB/s
```
After this PR
```
DB path: [/dev/shm/rocksdb]
fillrandom : 1.820 micros/op 549404 ops/sec; 60.8 MB/s
```
Reviewed By: ltamasi
Differential Revision: D33721359
Pulled By: riversand963
fbshipit-source-id: c131561534272c120ffb80711d42748d21badf09
2022-02-02 07:17:46 +01:00
|
|
|
if (write_ts.size() == 0 && FLAGS_user_timestamp_size) {
|
|
|
|
write_ts_str = NowNanosStr();
|
|
|
|
write_ts = write_ts_str;
|
|
|
|
}
|
2019-12-09 08:49:32 +01:00
|
|
|
|
|
|
|
std::string key_str = Key(rand_key);
|
|
|
|
Slice key = key_str;
|
|
|
|
ColumnFamilyHandle* cfh = column_families_[rand_column_family];
|
|
|
|
|
|
|
|
if (FLAGS_verify_before_write) {
|
|
|
|
std::string key_str2 = Key(rand_key);
|
|
|
|
Slice k = key_str2;
|
|
|
|
std::string from_db;
|
|
|
|
Status s = db_->Get(read_opts, cfh, k, &from_db);
|
|
|
|
if (!VerifyValue(rand_column_family, rand_key, read_opts, shared, from_db,
|
|
|
|
s, true)) {
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL;
|
|
|
|
size_t sz = GenerateValue(value_base, value, sizeof(value));
|
|
|
|
Slice v(value, sz);
|
|
|
|
shared->Put(rand_column_family, rand_key, value_base, true /* pending */);
|
|
|
|
Status s;
|
|
|
|
if (FLAGS_use_merge) {
|
|
|
|
if (!FLAGS_use_txn) {
|
|
|
|
s = db_->Merge(write_opts, cfh, key, v);
|
|
|
|
} else {
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
Transaction* txn;
|
|
|
|
s = NewTxn(write_opts, &txn);
|
|
|
|
if (s.ok()) {
|
|
|
|
s = txn->Merge(cfh, key, v);
|
|
|
|
if (s.ok()) {
|
|
|
|
s = CommitTxn(txn);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
if (!FLAGS_use_txn) {
|
Revise APIs related to user-defined timestamp (#8946)
Summary:
ajkr reminded me that we have a rule of not including per-kv related data in `WriteOptions`.
Namely, `WriteOptions` should not include information about "what-to-write", but should just
include information about "how-to-write".
According to this rule, `WriteOptions::timestamp` (experimental) is clearly a violation. Therefore,
this PR removes `WriteOptions::timestamp` for compliance.
After the removal, we need to pass timestamp info via another set of APIs. This PR proposes a set
of overloaded functions `Put(write_opts, key, value, ts)`, `Delete(write_opts, key, ts)`, and
`SingleDelete(write_opts, key, ts)`. Planned to add `Write(write_opts, batch, ts)`, but its complexity
made me reconsider doing it in another PR (maybe).
For better checking and returning error early, we also add a new set of APIs to `WriteBatch` that take
extra `timestamp` information when writing to `WriteBatch`es.
These set of APIs in `WriteBatchWithIndex` are currently not supported, and are on our TODO list.
Removed `WriteBatch::AssignTimestamps()` and renamed `WriteBatch::AssignTimestamp()` to
`WriteBatch::UpdateTimestamps()` since this method require that all keys have space for timestamps
allocated already and multiple timestamps can be updated.
The constructor of `WriteBatch` now takes a fourth argument `default_cf_ts_sz` which is the timestamp
size of the default column family. This will be used to allocate space when calling APIs that do not
specify a column family handle.
Also, updated `DB::Get()`, `DB::MultiGet()`, `DB::NewIterator()`, `DB::NewIterators()` methods, replacing
some assertions about timestamp to returning Status code.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/8946
Test Plan:
make check
./db_bench -benchmarks=fillseq,fillrandom,readrandom,readseq,deleterandom -user_timestamp_size=8
./db_stress --user_timestamp_size=8 -nooverwritepercent=0 -test_secondary=0 -secondary_catch_up_one_in=0 -continuous_verification_interval=0
Make sure there is no perf regression by running the following
```
./db_bench_opt -db=/dev/shm/rocksdb -use_existing_db=0 -level0_stop_writes_trigger=256 -level0_slowdown_writes_trigger=256 -level0_file_num_compaction_trigger=256 -disable_wal=1 -duration=10 -benchmarks=fillrandom
```
Before this PR
```
DB path: [/dev/shm/rocksdb]
fillrandom : 1.831 micros/op 546235 ops/sec; 60.4 MB/s
```
After this PR
```
DB path: [/dev/shm/rocksdb]
fillrandom : 1.820 micros/op 549404 ops/sec; 60.8 MB/s
```
Reviewed By: ltamasi
Differential Revision: D33721359
Pulled By: riversand963
fbshipit-source-id: c131561534272c120ffb80711d42748d21badf09
2022-02-02 07:17:46 +01:00
|
|
|
if (FLAGS_user_timestamp_size == 0) {
|
|
|
|
s = db_->Put(write_opts, cfh, key, v);
|
|
|
|
} else {
|
|
|
|
s = db_->Put(write_opts, cfh, key, write_ts, v);
|
|
|
|
}
|
2019-12-09 08:49:32 +01:00
|
|
|
} else {
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
Transaction* txn;
|
|
|
|
s = NewTxn(write_opts, &txn);
|
|
|
|
if (s.ok()) {
|
|
|
|
s = txn->Put(cfh, key, v);
|
|
|
|
if (s.ok()) {
|
|
|
|
s = CommitTxn(txn);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
}
|
|
|
|
}
|
|
|
|
shared->Put(rand_column_family, rand_key, value_base, false /* pending */);
|
|
|
|
if (!s.ok()) {
|
2021-07-01 23:15:49 +02:00
|
|
|
if (FLAGS_injest_error_severity >= 2) {
|
|
|
|
if (!is_db_stopped_ && s.severity() >= Status::Severity::kFatalError) {
|
|
|
|
is_db_stopped_ = true;
|
|
|
|
} else if (!is_db_stopped_ ||
|
|
|
|
s.severity() < Status::Severity::kFatalError) {
|
|
|
|
fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
|
|
|
|
std::terminate();
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
|
|
|
|
std::terminate();
|
|
|
|
}
|
2019-12-09 08:49:32 +01:00
|
|
|
}
|
|
|
|
thread->stats.AddBytesForWrites(1, sz);
|
|
|
|
PrintKeyValue(rand_column_family, static_cast<uint32_t>(rand_key), value,
|
|
|
|
sz);
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
|
2019-12-09 23:36:10 +01:00
|
|
|
Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
|
|
|
|
const std::vector<int>& rand_column_families,
|
|
|
|
const std::vector<int64_t>& rand_keys,
|
|
|
|
std::unique_ptr<MutexLock>& lock) override {
|
2019-12-09 08:49:32 +01:00
|
|
|
int64_t rand_key = rand_keys[0];
|
|
|
|
int rand_column_family = rand_column_families[0];
|
|
|
|
auto shared = thread->shared;
|
|
|
|
int64_t max_key = shared->GetMaxKey();
|
|
|
|
|
|
|
|
// OPERATION delete
|
|
|
|
// If the chosen key does not allow overwrite and it does not exist,
|
|
|
|
// choose another key.
|
Add user-defined timestamps to db_stress (#8061)
Summary:
Add some basic test for user-defined timestamp to db_stress. Currently,
read with timestamp always tries to read using the current timestamp.
Due to the per-key timestamp-sequence ordering constraint, we only add timestamp-
related tests to the `NonBatchedOpsStressTest` since this test serializes accesses
to the same key and uses a file to cross-check data correctness.
The timestamp feature is not supported in a number of components, e.g. Merge, SingleDelete,
DeleteRange, CompactionFilter, Readonly instance, secondary instance, SST file ingestion, transaction,
etc. Therefore, db_stress should exit if user enables both timestamp and these features at the same
time. The (currently) incompatible features can be found in
`CheckAndSetOptionsForUserTimestamp`.
This PR also fixes a bug triggered when timestamp is enabled together with
`index_type=kBinarySearchWithFirstKey`. This bug fix will also be in another separate PR
with more unit tests coverage. Fixing it here because I do not want to exclude the index type
from crash test.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/8061
Test Plan: make crash_test_with_ts
Reviewed By: jay-zhuang
Differential Revision: D27056282
Pulled By: riversand963
fbshipit-source-id: c3e00ad1023fdb9ebbdf9601ec18270c5e2925a9
2021-03-23 13:12:04 +01:00
|
|
|
std::string write_ts_str;
|
|
|
|
Slice write_ts;
|
2019-12-09 08:49:32 +01:00
|
|
|
while (!shared->AllowsOverwrite(rand_key) &&
|
|
|
|
!shared->Exists(rand_column_family, rand_key)) {
|
|
|
|
lock.reset();
|
|
|
|
rand_key = thread->rand.Next() % max_key;
|
|
|
|
rand_column_family = thread->rand.Next() % FLAGS_column_families;
|
|
|
|
lock.reset(
|
|
|
|
new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
|
Add user-defined timestamps to db_stress (#8061)
Summary:
Add some basic test for user-defined timestamp to db_stress. Currently,
read with timestamp always tries to read using the current timestamp.
Due to the per-key timestamp-sequence ordering constraint, we only add timestamp-
related tests to the `NonBatchedOpsStressTest` since this test serializes accesses
to the same key and uses a file to cross-check data correctness.
The timestamp feature is not supported in a number of components, e.g. Merge, SingleDelete,
DeleteRange, CompactionFilter, Readonly instance, secondary instance, SST file ingestion, transaction,
etc. Therefore, db_stress should exit if user enables both timestamp and these features at the same
time. The (currently) incompatible features can be found in
`CheckAndSetOptionsForUserTimestamp`.
This PR also fixes a bug triggered when timestamp is enabled together with
`index_type=kBinarySearchWithFirstKey`. This bug fix will also be in another separate PR
with more unit tests coverage. Fixing it here because I do not want to exclude the index type
from crash test.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/8061
Test Plan: make crash_test_with_ts
Reviewed By: jay-zhuang
Differential Revision: D27056282
Pulled By: riversand963
fbshipit-source-id: c3e00ad1023fdb9ebbdf9601ec18270c5e2925a9
2021-03-23 13:12:04 +01:00
|
|
|
if (FLAGS_user_timestamp_size > 0) {
|
|
|
|
write_ts_str = NowNanosStr();
|
|
|
|
write_ts = write_ts_str;
|
|
|
|
}
|
2019-12-09 08:49:32 +01:00
|
|
|
}
|
Revise APIs related to user-defined timestamp (#8946)
Summary:
ajkr reminded me that we have a rule of not including per-kv related data in `WriteOptions`.
Namely, `WriteOptions` should not include information about "what-to-write", but should just
include information about "how-to-write".
According to this rule, `WriteOptions::timestamp` (experimental) is clearly a violation. Therefore,
this PR removes `WriteOptions::timestamp` for compliance.
After the removal, we need to pass timestamp info via another set of APIs. This PR proposes a set
of overloaded functions `Put(write_opts, key, value, ts)`, `Delete(write_opts, key, ts)`, and
`SingleDelete(write_opts, key, ts)`. Planned to add `Write(write_opts, batch, ts)`, but its complexity
made me reconsider doing it in another PR (maybe).
For better checking and returning error early, we also add a new set of APIs to `WriteBatch` that take
extra `timestamp` information when writing to `WriteBatch`es.
These set of APIs in `WriteBatchWithIndex` are currently not supported, and are on our TODO list.
Removed `WriteBatch::AssignTimestamps()` and renamed `WriteBatch::AssignTimestamp()` to
`WriteBatch::UpdateTimestamps()` since this method require that all keys have space for timestamps
allocated already and multiple timestamps can be updated.
The constructor of `WriteBatch` now takes a fourth argument `default_cf_ts_sz` which is the timestamp
size of the default column family. This will be used to allocate space when calling APIs that do not
specify a column family handle.
Also, updated `DB::Get()`, `DB::MultiGet()`, `DB::NewIterator()`, `DB::NewIterators()` methods, replacing
some assertions about timestamp to returning Status code.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/8946
Test Plan:
make check
./db_bench -benchmarks=fillseq,fillrandom,readrandom,readseq,deleterandom -user_timestamp_size=8
./db_stress --user_timestamp_size=8 -nooverwritepercent=0 -test_secondary=0 -secondary_catch_up_one_in=0 -continuous_verification_interval=0
Make sure there is no perf regression by running the following
```
./db_bench_opt -db=/dev/shm/rocksdb -use_existing_db=0 -level0_stop_writes_trigger=256 -level0_slowdown_writes_trigger=256 -level0_file_num_compaction_trigger=256 -disable_wal=1 -duration=10 -benchmarks=fillrandom
```
Before this PR
```
DB path: [/dev/shm/rocksdb]
fillrandom : 1.831 micros/op 546235 ops/sec; 60.4 MB/s
```
After this PR
```
DB path: [/dev/shm/rocksdb]
fillrandom : 1.820 micros/op 549404 ops/sec; 60.8 MB/s
```
Reviewed By: ltamasi
Differential Revision: D33721359
Pulled By: riversand963
fbshipit-source-id: c131561534272c120ffb80711d42748d21badf09
2022-02-02 07:17:46 +01:00
|
|
|
if (write_ts.size() == 0 && FLAGS_user_timestamp_size) {
|
|
|
|
write_ts_str = NowNanosStr();
|
|
|
|
write_ts = write_ts_str;
|
|
|
|
}
|
2019-12-09 08:49:32 +01:00
|
|
|
|
|
|
|
std::string key_str = Key(rand_key);
|
|
|
|
Slice key = key_str;
|
|
|
|
auto cfh = column_families_[rand_column_family];
|
|
|
|
|
|
|
|
// Use delete if the key may be overwritten and a single deletion
|
|
|
|
// otherwise.
|
|
|
|
Status s;
|
|
|
|
if (shared->AllowsOverwrite(rand_key)) {
|
|
|
|
shared->Delete(rand_column_family, rand_key, true /* pending */);
|
|
|
|
if (!FLAGS_use_txn) {
|
Revise APIs related to user-defined timestamp (#8946)
Summary:
ajkr reminded me that we have a rule of not including per-kv related data in `WriteOptions`.
Namely, `WriteOptions` should not include information about "what-to-write", but should just
include information about "how-to-write".
According to this rule, `WriteOptions::timestamp` (experimental) is clearly a violation. Therefore,
this PR removes `WriteOptions::timestamp` for compliance.
After the removal, we need to pass timestamp info via another set of APIs. This PR proposes a set
of overloaded functions `Put(write_opts, key, value, ts)`, `Delete(write_opts, key, ts)`, and
`SingleDelete(write_opts, key, ts)`. Planned to add `Write(write_opts, batch, ts)`, but its complexity
made me reconsider doing it in another PR (maybe).
For better checking and returning error early, we also add a new set of APIs to `WriteBatch` that take
extra `timestamp` information when writing to `WriteBatch`es.
These set of APIs in `WriteBatchWithIndex` are currently not supported, and are on our TODO list.
Removed `WriteBatch::AssignTimestamps()` and renamed `WriteBatch::AssignTimestamp()` to
`WriteBatch::UpdateTimestamps()` since this method require that all keys have space for timestamps
allocated already and multiple timestamps can be updated.
The constructor of `WriteBatch` now takes a fourth argument `default_cf_ts_sz` which is the timestamp
size of the default column family. This will be used to allocate space when calling APIs that do not
specify a column family handle.
Also, updated `DB::Get()`, `DB::MultiGet()`, `DB::NewIterator()`, `DB::NewIterators()` methods, replacing
some assertions about timestamp to returning Status code.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/8946
Test Plan:
make check
./db_bench -benchmarks=fillseq,fillrandom,readrandom,readseq,deleterandom -user_timestamp_size=8
./db_stress --user_timestamp_size=8 -nooverwritepercent=0 -test_secondary=0 -secondary_catch_up_one_in=0 -continuous_verification_interval=0
Make sure there is no perf regression by running the following
```
./db_bench_opt -db=/dev/shm/rocksdb -use_existing_db=0 -level0_stop_writes_trigger=256 -level0_slowdown_writes_trigger=256 -level0_file_num_compaction_trigger=256 -disable_wal=1 -duration=10 -benchmarks=fillrandom
```
Before this PR
```
DB path: [/dev/shm/rocksdb]
fillrandom : 1.831 micros/op 546235 ops/sec; 60.4 MB/s
```
After this PR
```
DB path: [/dev/shm/rocksdb]
fillrandom : 1.820 micros/op 549404 ops/sec; 60.8 MB/s
```
Reviewed By: ltamasi
Differential Revision: D33721359
Pulled By: riversand963
fbshipit-source-id: c131561534272c120ffb80711d42748d21badf09
2022-02-02 07:17:46 +01:00
|
|
|
if (FLAGS_user_timestamp_size == 0) {
|
|
|
|
s = db_->Delete(write_opts, cfh, key);
|
|
|
|
} else {
|
|
|
|
s = db_->Delete(write_opts, cfh, key, write_ts);
|
|
|
|
}
|
2019-12-09 08:49:32 +01:00
|
|
|
} else {
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
Transaction* txn;
|
|
|
|
s = NewTxn(write_opts, &txn);
|
|
|
|
if (s.ok()) {
|
|
|
|
s = txn->Delete(cfh, key);
|
|
|
|
if (s.ok()) {
|
|
|
|
s = CommitTxn(txn);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
}
|
|
|
|
shared->Delete(rand_column_family, rand_key, false /* pending */);
|
|
|
|
thread->stats.AddDeletes(1);
|
|
|
|
if (!s.ok()) {
|
2021-07-01 23:15:49 +02:00
|
|
|
if (FLAGS_injest_error_severity >= 2) {
|
|
|
|
if (!is_db_stopped_ &&
|
|
|
|
s.severity() >= Status::Severity::kFatalError) {
|
|
|
|
is_db_stopped_ = true;
|
|
|
|
} else if (!is_db_stopped_ ||
|
|
|
|
s.severity() < Status::Severity::kFatalError) {
|
|
|
|
fprintf(stderr, "delete error: %s\n", s.ToString().c_str());
|
|
|
|
std::terminate();
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
fprintf(stderr, "delete error: %s\n", s.ToString().c_str());
|
|
|
|
std::terminate();
|
|
|
|
}
|
2019-12-09 08:49:32 +01:00
|
|
|
}
|
|
|
|
} else {
|
|
|
|
shared->SingleDelete(rand_column_family, rand_key, true /* pending */);
|
|
|
|
if (!FLAGS_use_txn) {
|
Revise APIs related to user-defined timestamp (#8946)
Summary:
ajkr reminded me that we have a rule of not including per-kv related data in `WriteOptions`.
Namely, `WriteOptions` should not include information about "what-to-write", but should just
include information about "how-to-write".
According to this rule, `WriteOptions::timestamp` (experimental) is clearly a violation. Therefore,
this PR removes `WriteOptions::timestamp` for compliance.
After the removal, we need to pass timestamp info via another set of APIs. This PR proposes a set
of overloaded functions `Put(write_opts, key, value, ts)`, `Delete(write_opts, key, ts)`, and
`SingleDelete(write_opts, key, ts)`. Planned to add `Write(write_opts, batch, ts)`, but its complexity
made me reconsider doing it in another PR (maybe).
For better checking and returning error early, we also add a new set of APIs to `WriteBatch` that take
extra `timestamp` information when writing to `WriteBatch`es.
These set of APIs in `WriteBatchWithIndex` are currently not supported, and are on our TODO list.
Removed `WriteBatch::AssignTimestamps()` and renamed `WriteBatch::AssignTimestamp()` to
`WriteBatch::UpdateTimestamps()` since this method require that all keys have space for timestamps
allocated already and multiple timestamps can be updated.
The constructor of `WriteBatch` now takes a fourth argument `default_cf_ts_sz` which is the timestamp
size of the default column family. This will be used to allocate space when calling APIs that do not
specify a column family handle.
Also, updated `DB::Get()`, `DB::MultiGet()`, `DB::NewIterator()`, `DB::NewIterators()` methods, replacing
some assertions about timestamp to returning Status code.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/8946
Test Plan:
make check
./db_bench -benchmarks=fillseq,fillrandom,readrandom,readseq,deleterandom -user_timestamp_size=8
./db_stress --user_timestamp_size=8 -nooverwritepercent=0 -test_secondary=0 -secondary_catch_up_one_in=0 -continuous_verification_interval=0
Make sure there is no perf regression by running the following
```
./db_bench_opt -db=/dev/shm/rocksdb -use_existing_db=0 -level0_stop_writes_trigger=256 -level0_slowdown_writes_trigger=256 -level0_file_num_compaction_trigger=256 -disable_wal=1 -duration=10 -benchmarks=fillrandom
```
Before this PR
```
DB path: [/dev/shm/rocksdb]
fillrandom : 1.831 micros/op 546235 ops/sec; 60.4 MB/s
```
After this PR
```
DB path: [/dev/shm/rocksdb]
fillrandom : 1.820 micros/op 549404 ops/sec; 60.8 MB/s
```
Reviewed By: ltamasi
Differential Revision: D33721359
Pulled By: riversand963
fbshipit-source-id: c131561534272c120ffb80711d42748d21badf09
2022-02-02 07:17:46 +01:00
|
|
|
if (FLAGS_user_timestamp_size == 0) {
|
|
|
|
s = db_->SingleDelete(write_opts, cfh, key);
|
|
|
|
} else {
|
|
|
|
s = db_->SingleDelete(write_opts, cfh, key, write_ts);
|
|
|
|
}
|
2019-12-09 08:49:32 +01:00
|
|
|
} else {
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
Transaction* txn;
|
|
|
|
s = NewTxn(write_opts, &txn);
|
|
|
|
if (s.ok()) {
|
|
|
|
s = txn->SingleDelete(cfh, key);
|
|
|
|
if (s.ok()) {
|
|
|
|
s = CommitTxn(txn);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
}
|
|
|
|
shared->SingleDelete(rand_column_family, rand_key, false /* pending */);
|
|
|
|
thread->stats.AddSingleDeletes(1);
|
|
|
|
if (!s.ok()) {
|
2021-07-01 23:15:49 +02:00
|
|
|
if (FLAGS_injest_error_severity >= 2) {
|
|
|
|
if (!is_db_stopped_ &&
|
|
|
|
s.severity() >= Status::Severity::kFatalError) {
|
|
|
|
is_db_stopped_ = true;
|
|
|
|
} else if (!is_db_stopped_ ||
|
|
|
|
s.severity() < Status::Severity::kFatalError) {
|
|
|
|
fprintf(stderr, "single delete error: %s\n", s.ToString().c_str());
|
|
|
|
std::terminate();
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
fprintf(stderr, "single delete error: %s\n", s.ToString().c_str());
|
|
|
|
std::terminate();
|
|
|
|
}
|
2019-12-09 08:49:32 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
|
2019-12-09 23:36:10 +01:00
|
|
|
Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
|
|
|
|
const std::vector<int>& rand_column_families,
|
|
|
|
const std::vector<int64_t>& rand_keys,
|
|
|
|
std::unique_ptr<MutexLock>& lock) override {
|
2019-12-09 08:49:32 +01:00
|
|
|
// OPERATION delete range
|
|
|
|
std::vector<std::unique_ptr<MutexLock>> range_locks;
|
|
|
|
// delete range does not respect disallowed overwrites. the keys for
|
|
|
|
// which overwrites are disallowed are randomly distributed so it
|
|
|
|
// could be expensive to find a range where each key allows
|
|
|
|
// overwrites.
|
|
|
|
int64_t rand_key = rand_keys[0];
|
|
|
|
int rand_column_family = rand_column_families[0];
|
|
|
|
auto shared = thread->shared;
|
|
|
|
int64_t max_key = shared->GetMaxKey();
|
|
|
|
if (rand_key > max_key - FLAGS_range_deletion_width) {
|
|
|
|
lock.reset();
|
|
|
|
rand_key =
|
|
|
|
thread->rand.Next() % (max_key - FLAGS_range_deletion_width + 1);
|
|
|
|
range_locks.emplace_back(
|
|
|
|
new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
|
|
|
|
} else {
|
|
|
|
range_locks.emplace_back(std::move(lock));
|
|
|
|
}
|
|
|
|
for (int j = 1; j < FLAGS_range_deletion_width; ++j) {
|
|
|
|
if (((rand_key + j) & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
|
|
|
|
range_locks.emplace_back(new MutexLock(
|
|
|
|
shared->GetMutexForKey(rand_column_family, rand_key + j)));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
shared->DeleteRange(rand_column_family, rand_key,
|
|
|
|
rand_key + FLAGS_range_deletion_width,
|
|
|
|
true /* pending */);
|
|
|
|
|
|
|
|
std::string keystr = Key(rand_key);
|
|
|
|
Slice key = keystr;
|
|
|
|
auto cfh = column_families_[rand_column_family];
|
|
|
|
std::string end_keystr = Key(rand_key + FLAGS_range_deletion_width);
|
|
|
|
Slice end_key = end_keystr;
|
|
|
|
Status s = db_->DeleteRange(write_opts, cfh, key, end_key);
|
|
|
|
if (!s.ok()) {
|
2021-07-01 23:15:49 +02:00
|
|
|
if (FLAGS_injest_error_severity >= 2) {
|
|
|
|
if (!is_db_stopped_ && s.severity() >= Status::Severity::kFatalError) {
|
|
|
|
is_db_stopped_ = true;
|
|
|
|
} else if (!is_db_stopped_ ||
|
|
|
|
s.severity() < Status::Severity::kFatalError) {
|
|
|
|
fprintf(stderr, "delete range error: %s\n", s.ToString().c_str());
|
|
|
|
std::terminate();
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
fprintf(stderr, "delete range error: %s\n", s.ToString().c_str());
|
|
|
|
std::terminate();
|
|
|
|
}
|
2019-12-09 08:49:32 +01:00
|
|
|
}
|
|
|
|
int covered = shared->DeleteRange(rand_column_family, rand_key,
|
|
|
|
rand_key + FLAGS_range_deletion_width,
|
|
|
|
false /* pending */);
|
|
|
|
thread->stats.AddRangeDeletions(1);
|
|
|
|
thread->stats.AddCoveredByRangeDeletions(covered);
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
|
|
|
|
#ifdef ROCKSDB_LITE
|
2019-12-09 23:36:10 +01:00
|
|
|
void TestIngestExternalFile(
|
2019-12-09 08:49:32 +01:00
|
|
|
ThreadState* /* thread */,
|
|
|
|
const std::vector<int>& /* rand_column_families */,
|
|
|
|
const std::vector<int64_t>& /* rand_keys */,
|
2019-12-09 23:36:10 +01:00
|
|
|
std::unique_ptr<MutexLock>& /* lock */) override {
|
2019-12-09 08:49:32 +01:00
|
|
|
assert(false);
|
|
|
|
fprintf(stderr,
|
|
|
|
"RocksDB lite does not support "
|
|
|
|
"TestIngestExternalFile\n");
|
|
|
|
std::terminate();
|
|
|
|
}
|
|
|
|
#else
|
2019-12-09 23:36:10 +01:00
|
|
|
void TestIngestExternalFile(ThreadState* thread,
|
|
|
|
const std::vector<int>& rand_column_families,
|
|
|
|
const std::vector<int64_t>& rand_keys,
|
|
|
|
std::unique_ptr<MutexLock>& lock) override {
|
2019-12-09 08:49:32 +01:00
|
|
|
const std::string sst_filename =
|
|
|
|
FLAGS_db + "/." + ToString(thread->tid) + ".sst";
|
|
|
|
Status s;
|
2019-12-21 01:13:19 +01:00
|
|
|
if (db_stress_env->FileExists(sst_filename).ok()) {
|
2019-12-09 08:49:32 +01:00
|
|
|
// Maybe we terminated abnormally before, so cleanup to give this file
|
|
|
|
// ingestion a clean slate
|
2019-12-21 01:13:19 +01:00
|
|
|
s = db_stress_env->DeleteFile(sst_filename);
|
2019-12-09 08:49:32 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
SstFileWriter sst_file_writer(EnvOptions(options_), options_);
|
|
|
|
if (s.ok()) {
|
|
|
|
s = sst_file_writer.Open(sst_filename);
|
|
|
|
}
|
|
|
|
int64_t key_base = rand_keys[0];
|
|
|
|
int column_family = rand_column_families[0];
|
|
|
|
std::vector<std::unique_ptr<MutexLock>> range_locks;
|
|
|
|
std::vector<uint32_t> values;
|
|
|
|
SharedState* shared = thread->shared;
|
|
|
|
|
|
|
|
// Grab locks, set pending state on expected values, and add keys
|
|
|
|
for (int64_t key = key_base;
|
|
|
|
s.ok() && key < std::min(key_base + FLAGS_ingest_external_file_width,
|
|
|
|
shared->GetMaxKey());
|
|
|
|
++key) {
|
|
|
|
if (key == key_base) {
|
|
|
|
range_locks.emplace_back(std::move(lock));
|
|
|
|
} else if ((key & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
|
|
|
|
range_locks.emplace_back(
|
|
|
|
new MutexLock(shared->GetMutexForKey(column_family, key)));
|
|
|
|
}
|
|
|
|
|
|
|
|
uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL;
|
|
|
|
values.push_back(value_base);
|
|
|
|
shared->Put(column_family, key, value_base, true /* pending */);
|
|
|
|
|
|
|
|
char value[100];
|
|
|
|
size_t value_len = GenerateValue(value_base, value, sizeof(value));
|
|
|
|
auto key_str = Key(key);
|
|
|
|
s = sst_file_writer.Put(Slice(key_str), Slice(value, value_len));
|
|
|
|
}
|
|
|
|
|
|
|
|
if (s.ok()) {
|
|
|
|
s = sst_file_writer.Finish();
|
|
|
|
}
|
|
|
|
if (s.ok()) {
|
|
|
|
s = db_->IngestExternalFile(column_families_[column_family],
|
|
|
|
{sst_filename}, IngestExternalFileOptions());
|
|
|
|
}
|
|
|
|
if (!s.ok()) {
|
|
|
|
fprintf(stderr, "file ingestion error: %s\n", s.ToString().c_str());
|
|
|
|
std::terminate();
|
|
|
|
}
|
|
|
|
int64_t key = key_base;
|
|
|
|
for (int32_t value : values) {
|
|
|
|
shared->Put(column_family, key, value, false /* pending */);
|
|
|
|
++key;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
#endif // ROCKSDB_LITE
|
|
|
|
|
|
|
|
bool VerifyValue(int cf, int64_t key, const ReadOptions& /*opts*/,
|
|
|
|
SharedState* shared, const std::string& value_from_db,
|
2019-12-20 17:46:52 +01:00
|
|
|
const Status& s, bool strict = false) const {
|
2019-12-09 08:49:32 +01:00
|
|
|
if (shared->HasVerificationFailedYet()) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
// compare value_from_db with the value in the shared state
|
|
|
|
char value[kValueMaxLen];
|
|
|
|
uint32_t value_base = shared->Get(cf, key);
|
|
|
|
if (value_base == SharedState::UNKNOWN_SENTINEL) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
if (value_base == SharedState::DELETION_SENTINEL && !strict) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (s.ok()) {
|
|
|
|
if (value_base == SharedState::DELETION_SENTINEL) {
|
|
|
|
VerificationAbort(shared, "Unexpected value found", cf, key);
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
size_t sz = GenerateValue(value_base, value, sizeof(value));
|
|
|
|
if (value_from_db.length() != sz) {
|
|
|
|
VerificationAbort(shared, "Length of value read is not equal", cf, key);
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
if (memcmp(value_from_db.data(), value, sz) != 0) {
|
|
|
|
VerificationAbort(shared, "Contents of value read don't match", cf,
|
|
|
|
key);
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
if (value_base != SharedState::DELETION_SENTINEL) {
|
|
|
|
VerificationAbort(shared, "Value not found: " + s.ToString(), cf, key);
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
StressTest* CreateNonBatchedOpsStressTest() {
|
|
|
|
return new NonBatchedOpsStressTest();
|
|
|
|
}
|
|
|
|
|
2020-02-20 21:07:53 +01:00
|
|
|
} // namespace ROCKSDB_NAMESPACE
|
2019-12-09 08:49:32 +01:00
|
|
|
#endif // GFLAGS
|