Merge branch 'main' into sst-partitioner

This commit is contained in:
Oron Sharabi 2022-05-02 09:19:34 +03:00 committed by GitHub
commit b8ae2ecb13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 654 additions and 163 deletions

View File

@ -3,6 +3,7 @@
### Bug Fixes ### Bug Fixes
* Fixed a bug where manual flush would block forever even though flush options had wait=false. * Fixed a bug where manual flush would block forever even though flush options had wait=false.
* Fixed a bug where RocksDB could corrupt DBs with `avoid_flush_during_recovery == true` by removing valid WALs, leading to `Status::Corruption` with message like "SST file is ahead of WALs" when attempting to reopen. * Fixed a bug where RocksDB could corrupt DBs with `avoid_flush_during_recovery == true` by removing valid WALs, leading to `Status::Corruption` with message like "SST file is ahead of WALs" when attempting to reopen.
* Fixed a bug in async_io path where incorrect length of data is read by FilePrefetchBuffer if data is consumed from two populated buffers and request for more data is sent.
### New Features ### New Features
* DB::GetLiveFilesStorageInfo is ready for production use. * DB::GetLiveFilesStorageInfo is ready for production use.
@ -16,6 +17,9 @@
* RocksDB calls FileSystem::Poll API during FilePrefetchBuffer destruction which impacts performance as it waits for read requets completion which is not needed anymore. Calling FileSystem::AbortIO to abort those requests instead fixes that performance issue. * RocksDB calls FileSystem::Poll API during FilePrefetchBuffer destruction which impacts performance as it waits for read requets completion which is not needed anymore. Calling FileSystem::AbortIO to abort those requests instead fixes that performance issue.
* Fixed unnecessary block cache contention when queries within a MultiGet batch and across parallel batches access the same data block, which previously could cause severely degraded performance in this unusual case. (In more typical MultiGet cases, this fix is expected to yield a small or negligible performance improvement.) * Fixed unnecessary block cache contention when queries within a MultiGet batch and across parallel batches access the same data block, which previously could cause severely degraded performance in this unusual case. (In more typical MultiGet cases, this fix is expected to yield a small or negligible performance improvement.)
### Behavior changes
* Enforce the existing contract of SingleDelete so that SingleDelete cannot be mixed with Delete because it leads to undefined behavior. Fix a number of unit tests that violate the contract but happen to pass.
## 7.2.0 (04/15/2022) ## 7.2.0 (04/15/2022)
### Bug Fixes ### Bug Fixes
* Fixed bug which caused rocksdb failure in the situation when rocksdb was accessible using UNC path * Fixed bug which caused rocksdb failure in the situation when rocksdb was accessible using UNC path

View File

@ -625,24 +625,34 @@ void CompactionIterator::NextFromInput() {
TEST_SYNC_POINT_CALLBACK( TEST_SYNC_POINT_CALLBACK(
"CompactionIterator::NextFromInput:SingleDelete:2", nullptr); "CompactionIterator::NextFromInput:SingleDelete:2", nullptr);
if (next_ikey.type == kTypeSingleDeletion || if (next_ikey.type == kTypeSingleDeletion) {
next_ikey.type == kTypeDeletion) {
// We encountered two SingleDeletes for same key in a row. This // We encountered two SingleDeletes for same key in a row. This
// could be due to unexpected user input. If write-(un)prepared // could be due to unexpected user input. If write-(un)prepared
// transaction is used, this could also be due to releasing an old // transaction is used, this could also be due to releasing an old
// snapshot between a Put and its matching SingleDelete. // snapshot between a Put and its matching SingleDelete.
// Furthermore, if write-(un)prepared transaction is rolled back
// after prepare, we will write a Delete to cancel a prior Put. If
// old snapshot is released between a later Put and its matching
// SingleDelete, we will end up with a Delete followed by
// SingleDelete.
// Skip the first SingleDelete and let the next iteration decide // Skip the first SingleDelete and let the next iteration decide
// how to handle the second SingleDelete or Delete. // how to handle the second SingleDelete.
// First SingleDelete has been skipped since we already called // First SingleDelete has been skipped since we already called
// input_.Next(). // input_.Next().
++iter_stats_.num_record_drop_obsolete; ++iter_stats_.num_record_drop_obsolete;
++iter_stats_.num_single_del_mismatch; ++iter_stats_.num_single_del_mismatch;
} else if (next_ikey.type == kTypeDeletion) {
std::ostringstream oss;
oss << "Found SD and type: " << static_cast<int>(next_ikey.type)
<< " on the same key, violating the contract "
"of SingleDelete. Check your application to make sure the "
"application does not mix SingleDelete and Delete for "
"the same key. If you are using "
"write-prepared/write-unprepared transactions, and use "
"SingleDelete to delete certain keys, then make sure "
"TransactionDBOptions::rollback_deletion_type_callback is "
"configured properly. Mixing SD and DEL can lead to "
"undefined behaviors";
ROCKS_LOG_ERROR(info_log_, "%s", oss.str().c_str());
valid_ = false;
status_ = Status::Corruption(oss.str());
return;
} else if (!is_timestamp_eligible_for_gc) { } else if (!is_timestamp_eligible_for_gc) {
// We cannot drop the SingleDelete as timestamp is enabled, and // We cannot drop the SingleDelete as timestamp is enabled, and
// timestamp of this key is greater than or equal to // timestamp of this key is greater than or equal to

View File

@ -890,10 +890,10 @@ TEST_F(CompactionJobTest, MultiSingleDelete) {
// -> Snapshot Put // -> Snapshot Put
// K: SDel SDel Put SDel Put Put Snapshot SDel Put SDel SDel Put SDel // K: SDel SDel Put SDel Put Put Snapshot SDel Put SDel SDel Put SDel
// -> Snapshot Put Snapshot SDel // -> Snapshot Put Snapshot SDel
// L: SDel Put Del Put SDel Snapshot Del Put Del SDel Put SDel // L: SDel Put SDel Put SDel Snapshot SDel Put SDel SDel Put SDel
// -> Snapshot SDel // -> Snapshot SDel Put SDel
// M: (Put) SDel Put Del Put SDel Snapshot Put Del SDel Put SDel Del // M: (Put) SDel Put SDel Put SDel Snapshot Put SDel SDel Put SDel SDel
// -> SDel Snapshot Del // -> SDel Snapshot Put SDel
NewDB(); NewDB();
auto file1 = mock::MakeMockFile({ auto file1 = mock::MakeMockFile({
@ -924,14 +924,14 @@ TEST_F(CompactionJobTest, MultiSingleDelete) {
{KeyStr("L", 16U, kTypeSingleDeletion), ""}, {KeyStr("L", 16U, kTypeSingleDeletion), ""},
{KeyStr("L", 15U, kTypeValue), "val"}, {KeyStr("L", 15U, kTypeValue), "val"},
{KeyStr("L", 14U, kTypeSingleDeletion), ""}, {KeyStr("L", 14U, kTypeSingleDeletion), ""},
{KeyStr("L", 13U, kTypeDeletion), ""}, {KeyStr("L", 13U, kTypeSingleDeletion), ""},
{KeyStr("L", 12U, kTypeValue), "val"}, {KeyStr("L", 12U, kTypeValue), "val"},
{KeyStr("L", 11U, kTypeDeletion), ""}, {KeyStr("L", 11U, kTypeSingleDeletion), ""},
{KeyStr("M", 16U, kTypeDeletion), ""}, {KeyStr("M", 16U, kTypeSingleDeletion), ""},
{KeyStr("M", 15U, kTypeSingleDeletion), ""}, {KeyStr("M", 15U, kTypeSingleDeletion), ""},
{KeyStr("M", 14U, kTypeValue), "val"}, {KeyStr("M", 14U, kTypeValue), "val"},
{KeyStr("M", 13U, kTypeSingleDeletion), ""}, {KeyStr("M", 13U, kTypeSingleDeletion), ""},
{KeyStr("M", 12U, kTypeDeletion), ""}, {KeyStr("M", 12U, kTypeSingleDeletion), ""},
{KeyStr("M", 11U, kTypeValue), "val"}, {KeyStr("M", 11U, kTypeValue), "val"},
}); });
AddMockFile(file1); AddMockFile(file1);
@ -972,12 +972,12 @@ TEST_F(CompactionJobTest, MultiSingleDelete) {
{KeyStr("K", 1U, kTypeSingleDeletion), ""}, {KeyStr("K", 1U, kTypeSingleDeletion), ""},
{KeyStr("L", 5U, kTypeSingleDeletion), ""}, {KeyStr("L", 5U, kTypeSingleDeletion), ""},
{KeyStr("L", 4U, kTypeValue), "val"}, {KeyStr("L", 4U, kTypeValue), "val"},
{KeyStr("L", 3U, kTypeDeletion), ""}, {KeyStr("L", 3U, kTypeSingleDeletion), ""},
{KeyStr("L", 2U, kTypeValue), "val"}, {KeyStr("L", 2U, kTypeValue), "val"},
{KeyStr("L", 1U, kTypeSingleDeletion), ""}, {KeyStr("L", 1U, kTypeSingleDeletion), ""},
{KeyStr("M", 10U, kTypeSingleDeletion), ""}, {KeyStr("M", 10U, kTypeSingleDeletion), ""},
{KeyStr("M", 7U, kTypeValue), "val"}, {KeyStr("M", 7U, kTypeValue), "val"},
{KeyStr("M", 5U, kTypeDeletion), ""}, {KeyStr("M", 5U, kTypeSingleDeletion), ""},
{KeyStr("M", 4U, kTypeValue), "val"}, {KeyStr("M", 4U, kTypeValue), "val"},
{KeyStr("M", 3U, kTypeSingleDeletion), ""}, {KeyStr("M", 3U, kTypeSingleDeletion), ""},
}); });
@ -1019,7 +1019,9 @@ TEST_F(CompactionJobTest, MultiSingleDelete) {
{KeyStr("K", 8U, kTypeValue), "val3"}, {KeyStr("K", 8U, kTypeValue), "val3"},
{KeyStr("L", 16U, kTypeSingleDeletion), ""}, {KeyStr("L", 16U, kTypeSingleDeletion), ""},
{KeyStr("L", 15U, kTypeValue), ""}, {KeyStr("L", 15U, kTypeValue), ""},
{KeyStr("M", 16U, kTypeDeletion), ""}, {KeyStr("L", 11U, kTypeSingleDeletion), ""},
{KeyStr("M", 15U, kTypeSingleDeletion), ""},
{KeyStr("M", 14U, kTypeValue), ""},
{KeyStr("M", 3U, kTypeSingleDeletion), ""}}); {KeyStr("M", 3U, kTypeSingleDeletion), ""}});
SetLastSequence(22U); SetLastSequence(22U);

View File

@ -947,7 +947,6 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
// Read all the records and add to a memtable // Read all the records and add to a memtable
std::string scratch; std::string scratch;
Slice record; Slice record;
WriteBatch batch;
TEST_SYNC_POINT_CALLBACK("DBImpl::RecoverLogFiles:BeforeReadWal", TEST_SYNC_POINT_CALLBACK("DBImpl::RecoverLogFiles:BeforeReadWal",
/*arg=*/nullptr); /*arg=*/nullptr);
@ -961,10 +960,15 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
continue; continue;
} }
// We create a new batch and initialize with a valid prot_info_ to store
// the data checksums
WriteBatch batch(0, 0, 8, 0);
status = WriteBatchInternal::SetContents(&batch, record); status = WriteBatchInternal::SetContents(&batch, record);
if (!status.ok()) { if (!status.ok()) {
return status; return status;
} }
SequenceNumber sequence = WriteBatchInternal::Sequence(&batch); SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
if (immutable_db_options_.wal_recovery_mode == if (immutable_db_options_.wal_recovery_mode ==

View File

@ -152,14 +152,6 @@ struct SavePoints {
std::stack<SavePoint, autovector<SavePoint>> stack; std::stack<SavePoint, autovector<SavePoint>> stack;
}; };
WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes)
: content_flags_(0), max_bytes_(max_bytes), rep_() {
rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader)
? reserved_bytes
: WriteBatchInternal::kHeader);
rep_.resize(WriteBatchInternal::kHeader);
}
WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes, WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes,
size_t protection_bytes_per_key, size_t default_cf_ts_sz) size_t protection_bytes_per_key, size_t default_cf_ts_sz)
: content_flags_(0), : content_flags_(0),
@ -580,14 +572,16 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb,
s = handler->MarkBeginPrepare(); s = handler->MarkBeginPrepare();
assert(s.ok()); assert(s.ok());
empty_batch = false; empty_batch = false;
if (!handler->WriteAfterCommit()) { if (handler->WriteAfterCommit() ==
WriteBatch::Handler::OptionState::kDisabled) {
s = Status::NotSupported( s = Status::NotSupported(
"WriteCommitted txn tag when write_after_commit_ is disabled (in " "WriteCommitted txn tag when write_after_commit_ is disabled (in "
"WritePrepared/WriteUnprepared mode). If it is not due to " "WritePrepared/WriteUnprepared mode). If it is not due to "
"corruption, the WAL must be emptied before changing the " "corruption, the WAL must be emptied before changing the "
"WritePolicy."); "WritePolicy.");
} }
if (handler->WriteBeforePrepare()) { if (handler->WriteBeforePrepare() ==
WriteBatch::Handler::OptionState::kEnabled) {
s = Status::NotSupported( s = Status::NotSupported(
"WriteCommitted txn tag when write_before_prepare_ is enabled " "WriteCommitted txn tag when write_before_prepare_ is enabled "
"(in WriteUnprepared mode). If it is not due to corruption, the " "(in WriteUnprepared mode). If it is not due to corruption, the "
@ -600,7 +594,8 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb,
s = handler->MarkBeginPrepare(); s = handler->MarkBeginPrepare();
assert(s.ok()); assert(s.ok());
empty_batch = false; empty_batch = false;
if (handler->WriteAfterCommit()) { if (handler->WriteAfterCommit() ==
WriteBatch::Handler::OptionState::kEnabled) {
s = Status::NotSupported( s = Status::NotSupported(
"WritePrepared/WriteUnprepared txn tag when write_after_commit_ " "WritePrepared/WriteUnprepared txn tag when write_after_commit_ "
"is enabled (in default WriteCommitted mode). If it is not due " "is enabled (in default WriteCommitted mode). If it is not due "
@ -614,13 +609,15 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb,
s = handler->MarkBeginPrepare(true /* unprepared */); s = handler->MarkBeginPrepare(true /* unprepared */);
assert(s.ok()); assert(s.ok());
empty_batch = false; empty_batch = false;
if (handler->WriteAfterCommit()) { if (handler->WriteAfterCommit() ==
WriteBatch::Handler::OptionState::kEnabled) {
s = Status::NotSupported( s = Status::NotSupported(
"WriteUnprepared txn tag when write_after_commit_ is enabled (in " "WriteUnprepared txn tag when write_after_commit_ is enabled (in "
"default WriteCommitted mode). If it is not due to corruption, " "default WriteCommitted mode). If it is not due to corruption, "
"the WAL must be emptied before changing the WritePolicy."); "the WAL must be emptied before changing the WritePolicy.");
} }
if (!handler->WriteBeforePrepare()) { if (handler->WriteBeforePrepare() ==
WriteBatch::Handler::OptionState::kDisabled) {
s = Status::NotSupported( s = Status::NotSupported(
"WriteUnprepared txn tag when write_before_prepare_ is disabled " "WriteUnprepared txn tag when write_before_prepare_ is disabled "
"(in WriteCommitted/WritePrepared mode). If it is not due to " "(in WriteCommitted/WritePrepared mode). If it is not due to "
@ -1494,6 +1491,8 @@ Status WriteBatch::UpdateTimestamps(
return s; return s;
} }
namespace {
class MemTableInserter : public WriteBatch::Handler { class MemTableInserter : public WriteBatch::Handler {
SequenceNumber sequence_; SequenceNumber sequence_;
@ -1581,9 +1580,24 @@ class MemTableInserter : public WriteBatch::Handler {
return res; return res;
} }
void DecrementProtectionInfoIdxForTryAgain() {
if (prot_info_ != nullptr) --prot_info_idx_;
}
void ResetProtectionInfo() {
prot_info_idx_ = 0;
prot_info_ = nullptr;
}
protected: protected:
bool WriteBeforePrepare() const override { return write_before_prepare_; } Handler::OptionState WriteBeforePrepare() const override {
bool WriteAfterCommit() const override { return write_after_commit_; } return write_before_prepare_ ? Handler::OptionState::kEnabled
: Handler::OptionState::kDisabled;
}
Handler::OptionState WriteAfterCommit() const override {
return write_after_commit_ ? Handler::OptionState::kEnabled
: Handler::OptionState::kDisabled;
}
public: public:
// cf_mems should not be shared with concurrent inserters // cf_mems should not be shared with concurrent inserters
@ -1871,16 +1885,26 @@ class MemTableInserter : public WriteBatch::Handler {
Status PutCF(uint32_t column_family_id, const Slice& key, Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override { const Slice& value) override {
const auto* kv_prot_info = NextProtectionInfo(); const auto* kv_prot_info = NextProtectionInfo();
Status ret_status;
if (kv_prot_info != nullptr) { if (kv_prot_info != nullptr) {
// Memtable needs seqno, doesn't need CF ID // Memtable needs seqno, doesn't need CF ID
auto mem_kv_prot_info = auto mem_kv_prot_info =
kv_prot_info->StripC(column_family_id).ProtectS(sequence_); kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
return PutCFImpl(column_family_id, key, value, kTypeValue, ret_status = PutCFImpl(column_family_id, key, value, kTypeValue,
&mem_kv_prot_info); &mem_kv_prot_info);
} } else {
return PutCFImpl(column_family_id, key, value, kTypeValue, ret_status = PutCFImpl(column_family_id, key, value, kTypeValue,
nullptr /* kv_prot_info */); nullptr /* kv_prot_info */);
} }
// TODO: this assumes that if TryAgain status is returned to the caller,
// the operation is actually tried again. The proper way to do this is to
// pass a `try_again` parameter to the operation itself and decrement
// prot_info_idx_ based on that
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status;
}
Status DeleteImpl(uint32_t /*column_family_id*/, const Slice& key, Status DeleteImpl(uint32_t /*column_family_id*/, const Slice& key,
const Slice& value, ValueType delete_type, const Slice& value, ValueType delete_type,
@ -1926,6 +1950,9 @@ class MemTableInserter : public WriteBatch::Handler {
} else if (ret_status.ok()) { } else if (ret_status.ok()) {
MaybeAdvanceSeq(false /* batch_boundary */); MaybeAdvanceSeq(false /* batch_boundary */);
} }
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status; return ret_status;
} }
@ -1957,6 +1984,9 @@ class MemTableInserter : public WriteBatch::Handler {
ret_status = ret_status =
WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key); WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
} }
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status; return ret_status;
} }
@ -1985,6 +2015,9 @@ class MemTableInserter : public WriteBatch::Handler {
} else if (ret_status.ok()) { } else if (ret_status.ok()) {
MaybeAdvanceSeq(false /* batch_boundary */); MaybeAdvanceSeq(false /* batch_boundary */);
} }
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status; return ret_status;
} }
assert(ret_status.ok()); assert(ret_status.ok());
@ -2009,6 +2042,9 @@ class MemTableInserter : public WriteBatch::Handler {
ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_, ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_,
column_family_id, key); column_family_id, key);
} }
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status; return ret_status;
} }
@ -2038,6 +2074,9 @@ class MemTableInserter : public WriteBatch::Handler {
} else if (ret_status.ok()) { } else if (ret_status.ok()) {
MaybeAdvanceSeq(false /* batch_boundary */); MaybeAdvanceSeq(false /* batch_boundary */);
} }
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status; return ret_status;
} }
assert(ret_status.ok()); assert(ret_status.ok());
@ -2092,6 +2131,9 @@ class MemTableInserter : public WriteBatch::Handler {
ret_status = WriteBatchInternal::DeleteRange( ret_status = WriteBatchInternal::DeleteRange(
rebuilding_trx_, column_family_id, begin_key, end_key); rebuilding_trx_, column_family_id, begin_key, end_key);
} }
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status; return ret_status;
} }
@ -2121,6 +2163,9 @@ class MemTableInserter : public WriteBatch::Handler {
} else if (ret_status.ok()) { } else if (ret_status.ok()) {
MaybeAdvanceSeq(false /* batch_boundary */); MaybeAdvanceSeq(false /* batch_boundary */);
} }
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status; return ret_status;
} }
assert(ret_status.ok()); assert(ret_status.ok());
@ -2242,23 +2287,31 @@ class MemTableInserter : public WriteBatch::Handler {
ret_status = WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, ret_status = WriteBatchInternal::Merge(rebuilding_trx_, column_family_id,
key, value); key, value);
} }
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status; return ret_status;
} }
Status PutBlobIndexCF(uint32_t column_family_id, const Slice& key, Status PutBlobIndexCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override { const Slice& value) override {
const auto* kv_prot_info = NextProtectionInfo(); const auto* kv_prot_info = NextProtectionInfo();
Status ret_status;
if (kv_prot_info != nullptr) { if (kv_prot_info != nullptr) {
// Memtable needs seqno, doesn't need CF ID // Memtable needs seqno, doesn't need CF ID
auto mem_kv_prot_info = auto mem_kv_prot_info =
kv_prot_info->StripC(column_family_id).ProtectS(sequence_); kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
// Same as PutCF except for value type. // Same as PutCF except for value type.
return PutCFImpl(column_family_id, key, value, kTypeBlobIndex, ret_status = PutCFImpl(column_family_id, key, value, kTypeBlobIndex,
&mem_kv_prot_info); &mem_kv_prot_info);
} else { } else {
return PutCFImpl(column_family_id, key, value, kTypeBlobIndex, ret_status = PutCFImpl(column_family_id, key, value, kTypeBlobIndex,
nullptr /* kv_prot_info */); nullptr /* kv_prot_info */);
} }
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status;
} }
void CheckMemtableFull() { void CheckMemtableFull() {
@ -2401,6 +2454,7 @@ class MemTableInserter : public WriteBatch::Handler {
const auto& batch_info = trx->batches_.begin()->second; const auto& batch_info = trx->batches_.begin()->second;
// all inserts must reference this trx log number // all inserts must reference this trx log number
log_number_ref_ = batch_info.log_number_; log_number_ref_ = batch_info.log_number_;
ResetProtectionInfo();
s = batch_info.batch_->Iterate(this); s = batch_info.batch_->Iterate(this);
log_number_ref_ = 0; log_number_ref_ = 0;
} }
@ -2422,6 +2476,10 @@ class MemTableInserter : public WriteBatch::Handler {
const bool batch_boundry = true; const bool batch_boundry = true;
MaybeAdvanceSeq(batch_boundry); MaybeAdvanceSeq(batch_boundry);
if (UNLIKELY(s.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return s; return s;
} }
@ -2466,6 +2524,7 @@ class MemTableInserter : public WriteBatch::Handler {
return ucmp->timestamp_size(); return ucmp->timestamp_size();
}); });
if (s.ok()) { if (s.ok()) {
ResetProtectionInfo();
s = batch_info.batch_->Iterate(this); s = batch_info.batch_->Iterate(this);
log_number_ref_ = 0; log_number_ref_ = 0;
} }
@ -2488,6 +2547,10 @@ class MemTableInserter : public WriteBatch::Handler {
constexpr bool batch_boundary = true; constexpr bool batch_boundary = true;
MaybeAdvanceSeq(batch_boundary); MaybeAdvanceSeq(batch_boundary);
if (UNLIKELY(s.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return s; return s;
} }
@ -2523,6 +2586,8 @@ class MemTableInserter : public WriteBatch::Handler {
} }
}; };
} // namespace
// This function can only be called in these conditions: // This function can only be called in these conditions:
// 1) During Recovery() // 1) During Recovery()
// 2) During Write(), in a single-threaded write thread // 2) During Write(), in a single-threaded write thread
@ -2613,11 +2678,94 @@ Status WriteBatchInternal::InsertInto(
return s; return s;
} }
namespace {
// This class updates protection info for a WriteBatch.
class ProtectionInfoUpdater : public WriteBatch::Handler {
public:
explicit ProtectionInfoUpdater(WriteBatch::ProtectionInfo* prot_info)
: prot_info_(prot_info) {}
~ProtectionInfoUpdater() override {}
Status PutCF(uint32_t cf, const Slice& key, const Slice& val) override {
return UpdateProtInfo(cf, key, val, kTypeValue);
}
Status DeleteCF(uint32_t cf, const Slice& key) override {
return UpdateProtInfo(cf, key, "", kTypeDeletion);
}
Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
return UpdateProtInfo(cf, key, "", kTypeSingleDeletion);
}
Status DeleteRangeCF(uint32_t cf, const Slice& begin_key,
const Slice& end_key) override {
return UpdateProtInfo(cf, begin_key, end_key, kTypeRangeDeletion);
}
Status MergeCF(uint32_t cf, const Slice& key, const Slice& val) override {
return UpdateProtInfo(cf, key, val, kTypeMerge);
}
Status PutBlobIndexCF(uint32_t cf, const Slice& key,
const Slice& val) override {
return UpdateProtInfo(cf, key, val, kTypeBlobIndex);
}
Status MarkBeginPrepare(bool /* unprepare */) override {
return Status::OK();
}
Status MarkEndPrepare(const Slice& /* xid */) override {
return Status::OK();
}
Status MarkCommit(const Slice& /* xid */) override { return Status::OK(); }
Status MarkCommitWithTimestamp(const Slice& /* xid */,
const Slice& /* ts */) override {
return Status::OK();
}
Status MarkRollback(const Slice& /* xid */) override { return Status::OK(); }
Status MarkNoop(bool /* empty_batch */) override { return Status::OK(); }
private:
Status UpdateProtInfo(uint32_t cf, const Slice& key, const Slice& val,
const ValueType op_type) {
if (prot_info_) {
prot_info_->entries_.emplace_back(
ProtectionInfo64().ProtectKVO(key, val, op_type).ProtectC(cf));
}
return Status::OK();
}
// No copy or move.
ProtectionInfoUpdater(const ProtectionInfoUpdater&) = delete;
ProtectionInfoUpdater(ProtectionInfoUpdater&&) = delete;
ProtectionInfoUpdater& operator=(const ProtectionInfoUpdater&) = delete;
ProtectionInfoUpdater& operator=(ProtectionInfoUpdater&&) = delete;
WriteBatch::ProtectionInfo* const prot_info_ = nullptr;
};
} // namespace
Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
assert(contents.size() >= WriteBatchInternal::kHeader); assert(contents.size() >= WriteBatchInternal::kHeader);
assert(b->prot_info_ == nullptr);
b->rep_.assign(contents.data(), contents.size()); b->rep_.assign(contents.data(), contents.size());
b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed); b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed);
// If we have a prot_info_, update protection info entries for the batch.
if (b->prot_info_) {
ProtectionInfoUpdater prot_info_updater(b->prot_info_.get());
return b->Iterate(&prot_info_updater);
}
return Status::OK(); return Status::OK();
} }

View File

@ -2613,6 +2613,7 @@ void StressTest::Open() {
options_.listeners.emplace_back(new DbStressListener( options_.listeners.emplace_back(new DbStressListener(
FLAGS_db, options_.db_paths, cf_descriptors, db_stress_listener_env)); FLAGS_db, options_.db_paths, cf_descriptors, db_stress_listener_env));
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
RegisterAdditionalListeners();
options_.create_missing_column_families = true; options_.create_missing_column_families = true;
if (!FLAGS_use_txn) { if (!FLAGS_use_txn) {
#ifndef NDEBUG #ifndef NDEBUG
@ -2751,6 +2752,7 @@ void StressTest::Open() {
static_cast<size_t>(FLAGS_wp_snapshot_cache_bits); static_cast<size_t>(FLAGS_wp_snapshot_cache_bits);
txn_db_options.wp_commit_cache_bits = txn_db_options.wp_commit_cache_bits =
static_cast<size_t>(FLAGS_wp_commit_cache_bits); static_cast<size_t>(FLAGS_wp_commit_cache_bits);
PrepareTxnDbOptions(txn_db_options);
s = TransactionDB::Open(options_, txn_db_options, FLAGS_db, s = TransactionDB::Open(options_, txn_db_options, FLAGS_db,
cf_descriptors, &column_families_, &txn_db_); cf_descriptors, &column_families_, &txn_db_);
if (!s.ok()) { if (!s.ok()) {

View File

@ -16,6 +16,7 @@ namespace ROCKSDB_NAMESPACE {
class SystemClock; class SystemClock;
class Transaction; class Transaction;
class TransactionDB; class TransactionDB;
struct TransactionDBOptions;
class StressTest { class StressTest {
public: public:
@ -224,6 +225,12 @@ class StressTest {
void CheckAndSetOptionsForUserTimestamp(); void CheckAndSetOptionsForUserTimestamp();
virtual void RegisterAdditionalListeners() {}
#ifndef ROCKSDB_LITE
virtual void PrepareTxnDbOptions(TransactionDBOptions& /*txn_db_opts*/) {}
#endif
std::shared_ptr<Cache> cache_; std::shared_ptr<Cache> cache_;
std::shared_ptr<Cache> compressed_cache_; std::shared_ptr<Cache> compressed_cache_;
std::shared_ptr<const FilterPolicy> filter_policy_; std::shared_ptr<const FilterPolicy> filter_policy_;

View File

@ -15,6 +15,7 @@
#ifndef NDEBUG #ifndef NDEBUG
#include "utilities/fault_injection_fs.h" #include "utilities/fault_injection_fs.h"
#endif // NDEBUG #endif // NDEBUG
#include "utilities/transactions/write_prepared_txn_db.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -31,6 +32,21 @@ DEFINE_int32(delay_snapshot_read_one_in, 0,
"With a chance of 1/N, inject a random delay between taking " "With a chance of 1/N, inject a random delay between taking "
"snapshot and read."); "snapshot and read.");
DEFINE_int32(rollback_one_in, 0,
"If non-zero, rollback non-read-only transactions with a "
"probability of 1/N.");
DEFINE_int32(clear_wp_commit_cache_one_in, 0,
"If non-zero, evict all commit entries from commit cache with a "
"probability of 1/N. This options applies to write-prepared and "
"write-unprepared transactions.");
extern "C" bool rocksdb_write_prepared_TEST_ShouldClearCommitCache(void) {
static Random rand(static_cast<uint32_t>(db_stress_env->NowMicros()));
return FLAGS_clear_wp_commit_cache_one_in > 0 &&
rand.OneIn(FLAGS_clear_wp_commit_cache_one_in);
}
// MultiOpsTxnsStressTest can either operate on a database with pre-populated // MultiOpsTxnsStressTest can either operate on a database with pre-populated
// data (possibly from previous ones), or create a new db and preload it with // data (possibly from previous ones), or create a new db and preload it with
// data specified via `-lb_a`, `-ub_a`, `-lb_c`, `-ub_c`, etc. Among these, we // data specified via `-lb_a`, `-ub_a`, `-lb_c`, `-ub_c`, etc. Among these, we
@ -75,8 +91,9 @@ void MultiOpsTxnsStressTest::KeyGenerator::FinishInit() {
"Cannot allocate key in [%u, %u)\nStart with a new DB or try change " "Cannot allocate key in [%u, %u)\nStart with a new DB or try change "
"the number of threads for testing via -threads=<#threads>\n", "the number of threads for testing via -threads=<#threads>\n",
static_cast<unsigned int>(low_), static_cast<unsigned int>(high_)); static_cast<unsigned int>(low_), static_cast<unsigned int>(high_));
fflush(stdout);
fflush(stderr); fflush(stderr);
std::terminate(); assert(false);
} }
initialized_ = true; initialized_ = true;
} }
@ -131,33 +148,43 @@ void MultiOpsTxnsStressTest::KeyGenerator::UndoAllocation(uint32_t new_val) {
} }
std::string MultiOpsTxnsStressTest::Record::EncodePrimaryKey(uint32_t a) { std::string MultiOpsTxnsStressTest::Record::EncodePrimaryKey(uint32_t a) {
char buf[8]; std::string ret;
EncodeFixed32(buf, kPrimaryIndexId); PutFixed32(&ret, kPrimaryIndexId);
std::reverse(buf, buf + 4); PutFixed32(&ret, a);
EncodeFixed32(buf + 4, a);
std::reverse(buf + 4, buf + 8); char* const buf = &ret[0];
return std::string(buf, sizeof(buf)); std::reverse(buf, buf + sizeof(kPrimaryIndexId));
std::reverse(buf + sizeof(kPrimaryIndexId),
buf + sizeof(kPrimaryIndexId) + sizeof(a));
return ret;
} }
std::string MultiOpsTxnsStressTest::Record::EncodeSecondaryKey(uint32_t c) { std::string MultiOpsTxnsStressTest::Record::EncodeSecondaryKey(uint32_t c) {
char buf[8]; std::string ret;
EncodeFixed32(buf, kSecondaryIndexId); PutFixed32(&ret, kSecondaryIndexId);
std::reverse(buf, buf + 4); PutFixed32(&ret, c);
EncodeFixed32(buf + 4, c);
std::reverse(buf + 4, buf + 8); char* const buf = &ret[0];
return std::string(buf, sizeof(buf)); std::reverse(buf, buf + sizeof(kSecondaryIndexId));
std::reverse(buf + sizeof(kSecondaryIndexId),
buf + sizeof(kSecondaryIndexId) + sizeof(c));
return ret;
} }
std::string MultiOpsTxnsStressTest::Record::EncodeSecondaryKey(uint32_t c, std::string MultiOpsTxnsStressTest::Record::EncodeSecondaryKey(uint32_t c,
uint32_t a) { uint32_t a) {
char buf[12]; std::string ret;
EncodeFixed32(buf, kSecondaryIndexId); PutFixed32(&ret, kSecondaryIndexId);
std::reverse(buf, buf + 4); PutFixed32(&ret, c);
EncodeFixed32(buf + 4, c); PutFixed32(&ret, a);
EncodeFixed32(buf + 8, a);
std::reverse(buf + 4, buf + 8); char* const buf = &ret[0];
std::reverse(buf + 8, buf + 12); std::reverse(buf, buf + sizeof(kSecondaryIndexId));
return std::string(buf, sizeof(buf)); std::reverse(buf + sizeof(kSecondaryIndexId),
buf + sizeof(kSecondaryIndexId) + sizeof(c));
std::reverse(buf + sizeof(kSecondaryIndexId) + sizeof(c),
buf + sizeof(kSecondaryIndexId) + sizeof(c) + sizeof(a));
return ret;
} }
std::tuple<Status, uint32_t, uint32_t> std::tuple<Status, uint32_t, uint32_t>
@ -201,40 +228,26 @@ std::string MultiOpsTxnsStressTest::Record::EncodePrimaryKey() const {
} }
std::string MultiOpsTxnsStressTest::Record::EncodePrimaryIndexValue() const { std::string MultiOpsTxnsStressTest::Record::EncodePrimaryIndexValue() const {
char buf[8]; std::string ret;
EncodeFixed32(buf, b_); PutFixed32(&ret, b_);
EncodeFixed32(buf + 4, c_); PutFixed32(&ret, c_);
return std::string(buf, sizeof(buf)); return ret;
} }
std::pair<std::string, std::string> std::pair<std::string, std::string>
MultiOpsTxnsStressTest::Record::EncodeSecondaryIndexEntry() const { MultiOpsTxnsStressTest::Record::EncodeSecondaryIndexEntry() const {
std::string secondary_index_key; std::string secondary_index_key = EncodeSecondaryKey(c_, a_);
char buf[12];
EncodeFixed32(buf, kSecondaryIndexId);
std::reverse(buf, buf + 4);
EncodeFixed32(buf + 4, c_);
EncodeFixed32(buf + 8, a_);
std::reverse(buf + 4, buf + 8);
std::reverse(buf + 8, buf + 12);
secondary_index_key.assign(buf, sizeof(buf));
// Secondary index value is always 4-byte crc32 of the secondary key // Secondary index value is always 4-byte crc32 of the secondary key
std::string secondary_index_value; std::string secondary_index_value;
uint32_t crc = crc32c::Value(buf, sizeof(buf)); uint32_t crc =
crc32c::Value(secondary_index_key.data(), secondary_index_key.size());
PutFixed32(&secondary_index_value, crc); PutFixed32(&secondary_index_value, crc);
return std::make_pair(secondary_index_key, secondary_index_value); return std::make_pair(std::move(secondary_index_key), secondary_index_value);
} }
std::string MultiOpsTxnsStressTest::Record::EncodeSecondaryKey() const { std::string MultiOpsTxnsStressTest::Record::EncodeSecondaryKey() const {
char buf[12]; return EncodeSecondaryKey(c_, a_);
EncodeFixed32(buf, kSecondaryIndexId);
std::reverse(buf, buf + 4);
EncodeFixed32(buf + 4, c_);
EncodeFixed32(buf + 8, a_);
std::reverse(buf + 4, buf + 8);
std::reverse(buf + 8, buf + 12);
return std::string(buf, sizeof(buf));
} }
Status MultiOpsTxnsStressTest::Record::DecodePrimaryIndexEntry( Status MultiOpsTxnsStressTest::Record::DecodePrimaryIndexEntry(
@ -244,27 +257,22 @@ Status MultiOpsTxnsStressTest::Record::DecodePrimaryIndexEntry(
return Status::Corruption("Primary index key length is not 8"); return Status::Corruption("Primary index key length is not 8");
} }
const char* const index_id_buf = primary_index_key.data(); uint32_t index_id = 0;
uint32_t index_id =
static_cast<uint32_t>(static_cast<unsigned char>(index_id_buf[0])) << 24; [[maybe_unused]] bool res = GetFixed32(&primary_index_key, &index_id);
index_id += static_cast<uint32_t>(static_cast<unsigned char>(index_id_buf[1])) assert(res);
<< 16; index_id = EndianSwapValue(index_id);
index_id += static_cast<uint32_t>(static_cast<unsigned char>(index_id_buf[2]))
<< 8;
index_id +=
static_cast<uint32_t>(static_cast<unsigned char>(index_id_buf[3]));
primary_index_key.remove_prefix(sizeof(uint32_t));
if (index_id != kPrimaryIndexId) { if (index_id != kPrimaryIndexId) {
std::ostringstream oss; std::ostringstream oss;
oss << "Unexpected primary index id: " << index_id; oss << "Unexpected primary index id: " << index_id;
return Status::Corruption(oss.str()); return Status::Corruption(oss.str());
} }
const char* const buf = primary_index_key.data(); res = GetFixed32(&primary_index_key, &a_);
a_ = static_cast<uint32_t>(static_cast<unsigned char>(buf[0])) << 24; assert(res);
a_ += static_cast<uint32_t>(static_cast<unsigned char>(buf[1])) << 16; a_ = EndianSwapValue(a_);
a_ += static_cast<uint32_t>(static_cast<unsigned char>(buf[2])) << 8; assert(primary_index_key.empty());
a_ += static_cast<uint32_t>(static_cast<unsigned char>(buf[3]));
if (primary_index_value.size() != 8) { if (primary_index_value.size() != 8) {
return Status::Corruption("Primary index value length is not 8"); return Status::Corruption("Primary index value length is not 8");
@ -282,33 +290,28 @@ Status MultiOpsTxnsStressTest::Record::DecodeSecondaryIndexEntry(
uint32_t crc = uint32_t crc =
crc32c::Value(secondary_index_key.data(), secondary_index_key.size()); crc32c::Value(secondary_index_key.data(), secondary_index_key.size());
const char* const index_id_buf = secondary_index_key.data(); uint32_t index_id = 0;
uint32_t index_id =
static_cast<uint32_t>(static_cast<unsigned char>(index_id_buf[0])) << 24; [[maybe_unused]] bool res = GetFixed32(&secondary_index_key, &index_id);
index_id += static_cast<uint32_t>(static_cast<unsigned char>(index_id_buf[1])) assert(res);
<< 16; index_id = EndianSwapValue(index_id);
index_id += static_cast<uint32_t>(static_cast<unsigned char>(index_id_buf[2]))
<< 8;
index_id +=
static_cast<uint32_t>(static_cast<unsigned char>(index_id_buf[3]));
secondary_index_key.remove_prefix(sizeof(uint32_t));
if (index_id != kSecondaryIndexId) { if (index_id != kSecondaryIndexId) {
std::ostringstream oss; std::ostringstream oss;
oss << "Unexpected secondary index id: " << index_id; oss << "Unexpected secondary index id: " << index_id;
return Status::Corruption(oss.str()); return Status::Corruption(oss.str());
} }
const char* const buf = secondary_index_key.data();
assert(secondary_index_key.size() == 8); assert(secondary_index_key.size() == 8);
c_ = static_cast<uint32_t>(static_cast<unsigned char>(buf[0])) << 24; res = GetFixed32(&secondary_index_key, &c_);
c_ += static_cast<uint32_t>(static_cast<unsigned char>(buf[1])) << 16; assert(res);
c_ += static_cast<uint32_t>(static_cast<unsigned char>(buf[2])) << 8; c_ = EndianSwapValue(c_);
c_ += static_cast<uint32_t>(static_cast<unsigned char>(buf[3]));
a_ = static_cast<uint32_t>(static_cast<unsigned char>(buf[4])) << 24; assert(secondary_index_key.size() == 4);
a_ += static_cast<uint32_t>(static_cast<unsigned char>(buf[5])) << 16; res = GetFixed32(&secondary_index_key, &a_);
a_ += static_cast<uint32_t>(static_cast<unsigned char>(buf[6])) << 8; assert(res);
a_ += static_cast<uint32_t>(static_cast<unsigned char>(buf[7])); a_ = EndianSwapValue(a_);
assert(secondary_index_key.empty());
if (secondary_index_value.size() != 4) { if (secondary_index_value.size() != 4) {
return Status::Corruption("Secondary index value length is not 4"); return Status::Corruption("Secondary index value length is not 4");
@ -520,9 +523,35 @@ Status MultiOpsTxnsStressTest::TestCustomOperations(
// Should never reach here. // Should never reach here.
assert(false); assert(false);
} }
return s; return s;
} }
void MultiOpsTxnsStressTest::RegisterAdditionalListeners() {
options_.listeners.emplace_back(new MultiOpsTxnsStressListener(this));
}
#ifndef ROCKSDB_LITE
void MultiOpsTxnsStressTest::PrepareTxnDbOptions(
TransactionDBOptions& txn_db_opts) {
// MultiOpsTxnStressTest uses SingleDelete to delete secondary keys, thus we
// register this callback to let TxnDb know that when rolling back
// a transaction, use only SingleDelete to cancel prior Put from the same
// transaction if applicable.
txn_db_opts.rollback_deletion_type_callback =
[](TransactionDB* /*db*/, ColumnFamilyHandle* /*column_family*/,
const Slice& key) {
Slice ks = key;
uint32_t index_id = 0;
[[maybe_unused]] bool res = GetFixed32(&ks, &index_id);
assert(res);
index_id = EndianSwapValue(index_id);
assert(index_id <= Record::kSecondaryIndexId);
return index_id == Record::kSecondaryIndexId;
};
}
#endif // !ROCKSDB_LITE
Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread, Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread,
uint32_t old_a, uint32_t old_a,
uint32_t old_a_pos, uint32_t old_a_pos,
@ -561,8 +590,10 @@ Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread,
} }
if (s.IsNotFound()) { if (s.IsNotFound()) {
thread->stats.AddGets(/*ngets=*/1, /*nfounds=*/0); thread->stats.AddGets(/*ngets=*/1, /*nfounds=*/0);
} else if (s.IsBusy()) { } else if (s.IsBusy() || s.IsIncomplete()) {
// ignore. // ignore.
// Incomplete also means rollback by application. See the transaction
// implementations.
} else { } else {
thread->stats.AddErrors(1); thread->stats.AddErrors(1);
} }
@ -631,6 +662,16 @@ Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread,
return s; return s;
} }
if (FLAGS_rollback_one_in > 0 && thread->rand.OneIn(FLAGS_rollback_one_in)) {
s = Status::Incomplete();
return s;
}
s = WriteToCommitTimeWriteBatch(*txn);
if (!s.ok()) {
return s;
}
s = txn->Commit(); s = txn->Commit();
auto& key_gen = key_gen_for_a_.at(thread->tid); auto& key_gen = key_gen_for_a_.at(thread->tid);
@ -677,11 +718,12 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread,
Record::kPrimaryIndexEntrySize + Record::kSecondaryIndexEntrySize); Record::kPrimaryIndexEntrySize + Record::kSecondaryIndexEntrySize);
return; return;
} else if (s.IsBusy() || s.IsTimedOut() || s.IsTryAgain() || } else if (s.IsBusy() || s.IsTimedOut() || s.IsTryAgain() ||
s.IsMergeInProgress()) { s.IsMergeInProgress() || s.IsIncomplete()) {
// ww-conflict detected, or // ww-conflict detected, or
// lock cannot be acquired, or // lock cannot be acquired, or
// memtable history is not large enough for conflict checking, or // memtable history is not large enough for conflict checking, or
// Merge operation cannot be resolved. // Merge operation cannot be resolved, or
// application rollback.
// TODO (yanqin) add stats for other cases? // TODO (yanqin) add stats for other cases?
} else if (s.IsNotFound()) { } else if (s.IsNotFound()) {
// ignore. // ignore.
@ -727,8 +769,9 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread,
Record record; Record record;
s = record.DecodeSecondaryIndexEntry(it->key(), it->value()); s = record.DecodeSecondaryIndexEntry(it->key(), it->value());
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "Cannot decode secondary key: %s\n", fprintf(stderr, "Cannot decode secondary key (%s => %s): %s\n",
s.ToString().c_str()); it->key().ToString(true).c_str(),
it->value().ToString(true).c_str(), s.ToString().c_str());
assert(false); assert(false);
break; break;
} }
@ -749,21 +792,31 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread,
} else if (s.IsNotFound()) { } else if (s.IsNotFound()) {
// We can also fail verification here. // We can also fail verification here.
std::ostringstream oss; std::ostringstream oss;
oss << "pk should exist: " << Slice(pk).ToString(true); auto* dbimpl = static_cast_with_check<DBImpl>(db_->GetRootDB());
assert(dbimpl);
oss << "snap " << read_opts.snapshot->GetSequenceNumber()
<< " (published " << dbimpl->GetLastPublishedSequence()
<< "), pk should exist: " << Slice(pk).ToString(true);
fprintf(stderr, "%s\n", oss.str().c_str()); fprintf(stderr, "%s\n", oss.str().c_str());
assert(false); assert(false);
break; break;
} }
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "%s\n", s.ToString().c_str()); std::ostringstream oss;
auto* dbimpl = static_cast_with_check<DBImpl>(db_->GetRootDB());
assert(dbimpl);
oss << "snap " << read_opts.snapshot->GetSequenceNumber()
<< " (published " << dbimpl->GetLastPublishedSequence() << "), "
<< s.ToString();
fprintf(stderr, "%s\n", oss.str().c_str());
assert(false); assert(false);
break; break;
} }
auto result = Record::DecodePrimaryIndexValue(value); auto result = Record::DecodePrimaryIndexValue(value);
s = std::get<0>(result); s = std::get<0>(result);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "Cannot decode primary index value: %s\n", fprintf(stderr, "Cannot decode primary index value %s: %s\n",
s.ToString().c_str()); Slice(value).ToString(true).c_str(), s.ToString().c_str());
assert(false); assert(false);
break; break;
} }
@ -771,8 +824,12 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread,
uint32_t c = std::get<2>(result); uint32_t c = std::get<2>(result);
if (c != old_c) { if (c != old_c) {
std::ostringstream oss; std::ostringstream oss;
oss << "c in primary index does not match secondary index: " << c auto* dbimpl = static_cast_with_check<DBImpl>(db_->GetRootDB());
<< " != " << old_c; assert(dbimpl);
oss << "snap " << read_opts.snapshot->GetSequenceNumber()
<< " (published " << dbimpl->GetLastPublishedSequence()
<< "), pk/sk mismatch. pk: (a=" << record.a_value() << ", "
<< "c=" << c << "), sk: (c=" << old_c << ")";
s = Status::Corruption(); s = Status::Corruption();
fprintf(stderr, "%s\n", oss.str().c_str()); fprintf(stderr, "%s\n", oss.str().c_str());
assert(false); assert(false);
@ -811,6 +868,16 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread,
return s; return s;
} }
if (FLAGS_rollback_one_in > 0 && thread->rand.OneIn(FLAGS_rollback_one_in)) {
s = Status::Incomplete();
return s;
}
s = WriteToCommitTimeWriteBatch(*txn);
if (!s.ok()) {
return s;
}
s = txn->Commit(); s = txn->Commit();
if (s.ok()) { if (s.ok()) {
@ -856,7 +923,7 @@ Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread,
} else if (s.IsInvalidArgument()) { } else if (s.IsInvalidArgument()) {
// ignored. // ignored.
} else if (s.IsBusy() || s.IsTimedOut() || s.IsTryAgain() || } else if (s.IsBusy() || s.IsTimedOut() || s.IsTryAgain() ||
s.IsMergeInProgress()) { s.IsMergeInProgress() || s.IsIncomplete()) {
// ignored. // ignored.
} else { } else {
thread->stats.AddErrors(1); thread->stats.AddErrors(1);
@ -874,8 +941,8 @@ Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread,
auto result = Record::DecodePrimaryIndexValue(value); auto result = Record::DecodePrimaryIndexValue(value);
if (!std::get<0>(result).ok()) { if (!std::get<0>(result).ok()) {
s = std::get<0>(result); s = std::get<0>(result);
fprintf(stderr, "Cannot decode primary index value: %s\n", fprintf(stderr, "Cannot decode primary index value %s: %s\n",
s.ToString().c_str()); Slice(value).ToString(true).c_str(), s.ToString().c_str());
assert(false); assert(false);
return s; return s;
} }
@ -892,6 +959,17 @@ Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread,
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
if (FLAGS_rollback_one_in > 0 && thread->rand.OneIn(FLAGS_rollback_one_in)) {
s = Status::Incomplete();
return s;
}
s = WriteToCommitTimeWriteBatch(*txn);
if (!s.ok()) {
return s;
}
s = txn->Commit(); s = txn->Commit();
if (s.ok()) { if (s.ok()) {
delete txn; delete txn;
@ -1050,12 +1128,15 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const {
// First, iterate primary index. // First, iterate primary index.
size_t primary_index_entries_count = 0; size_t primary_index_entries_count = 0;
{ {
char buf[4]; std::string iter_ub_str;
EncodeFixed32(buf, Record::kPrimaryIndexId + 1); PutFixed32(&iter_ub_str, Record::kPrimaryIndexId + 1);
std::reverse(buf, buf + sizeof(buf)); std::reverse(iter_ub_str.begin(), iter_ub_str.end());
std::string iter_ub_str(buf, sizeof(buf));
Slice iter_ub = iter_ub_str; Slice iter_ub = iter_ub_str;
std::string start_key;
PutFixed32(&start_key, Record::kPrimaryIndexId);
std::reverse(start_key.begin(), start_key.end());
// This `ReadOptions` is for validation purposes. Ignore // This `ReadOptions` is for validation purposes. Ignore
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation. // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions ropts; ReadOptions ropts;
@ -1064,7 +1145,7 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const {
ropts.iterate_upper_bound = &iter_ub; ropts.iterate_upper_bound = &iter_ub;
std::unique_ptr<Iterator> it(db_->NewIterator(ropts)); std::unique_ptr<Iterator> it(db_->NewIterator(ropts));
for (it->SeekToFirst(); it->Valid(); it->Next()) { for (it->Seek(start_key); it->Valid(); it->Next()) {
Record record; Record record;
Status s = record.DecodePrimaryIndexEntry(it->key(), it->value()); Status s = record.DecodePrimaryIndexEntry(it->key(), it->value());
if (!s.ok()) { if (!s.ok()) {
@ -1101,10 +1182,9 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const {
// Second, iterate secondary index. // Second, iterate secondary index.
size_t secondary_index_entries_count = 0; size_t secondary_index_entries_count = 0;
{ {
char buf[4]; std::string start_key;
EncodeFixed32(buf, Record::kSecondaryIndexId); PutFixed32(&start_key, Record::kSecondaryIndexId);
std::reverse(buf, buf + sizeof(buf)); std::reverse(start_key.begin(), start_key.end());
const std::string start_key(buf, sizeof(buf));
// This `ReadOptions` is for validation purposes. Ignore // This `ReadOptions` is for validation purposes. Ignore
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation. // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
@ -1118,7 +1198,8 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const {
Record record; Record record;
Status s = record.DecodeSecondaryIndexEntry(it->key(), it->value()); Status s = record.DecodeSecondaryIndexEntry(it->key(), it->value());
if (!s.ok()) { if (!s.ok()) {
oss << "Cannot decode secondary index entry"; oss << "Cannot decode secondary index entry "
<< it->key().ToString(true) << "=>" << it->value().ToString(true);
VerificationAbort(thread->shared, oss.str(), s); VerificationAbort(thread->shared, oss.str(), s);
assert(false); assert(false);
return; return;
@ -1132,7 +1213,7 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const {
s = db_->Get(ropts, pk, &value); s = db_->Get(ropts, pk, &value);
if (!s.ok()) { if (!s.ok()) {
oss << "Error searching pk " << Slice(pk).ToString(true) << ". " oss << "Error searching pk " << Slice(pk).ToString(true) << ". "
<< s.ToString(); << s.ToString() << ". sk " << it->key().ToString(true);
VerificationAbort(thread->shared, oss.str(), s); VerificationAbort(thread->shared, oss.str(), s);
assert(false); assert(false);
return; return;
@ -1148,8 +1229,10 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const {
} }
uint32_t c_in_primary = std::get<2>(result); uint32_t c_in_primary = std::get<2>(result);
if (c_in_primary != record.c_value()) { if (c_in_primary != record.c_value()) {
oss << "Pk/sk mismatch. pk: (c=" << c_in_primary oss << "Pk/sk mismatch. pk: " << Slice(pk).ToString(true) << "=>"
<< "), sk: (c=" << record.c_value() << ")"; << Slice(value).ToString(true) << " (a=" << record.a_value()
<< ", c=" << c_in_primary << "), sk: " << it->key().ToString(true)
<< " (c=" << record.c_value() << ")";
VerificationAbort(thread->shared, oss.str(), s); VerificationAbort(thread->shared, oss.str(), s);
assert(false); assert(false);
return; return;
@ -1167,6 +1250,75 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const {
} }
} }
void MultiOpsTxnsStressTest::VerifyPkSkFast(int job_id) {
const Snapshot* const snapshot = db_->GetSnapshot();
assert(snapshot);
ManagedSnapshot snapshot_guard(db_, snapshot);
std::ostringstream oss;
auto* dbimpl = static_cast_with_check<DBImpl>(db_->GetRootDB());
assert(dbimpl);
oss << "Job " << job_id << ": [" << snapshot->GetSequenceNumber() << ","
<< dbimpl->GetLastPublishedSequence() << "] ";
std::string start_key;
PutFixed32(&start_key, Record::kSecondaryIndexId);
std::reverse(start_key.begin(), start_key.end());
// This `ReadOptions` is for validation purposes. Ignore
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions ropts;
ropts.snapshot = snapshot;
ropts.total_order_seek = true;
std::unique_ptr<Iterator> it(db_->NewIterator(ropts));
for (it->Seek(start_key); it->Valid(); it->Next()) {
Record record;
Status s = record.DecodeSecondaryIndexEntry(it->key(), it->value());
if (!s.ok()) {
oss << "Cannot decode secondary index entry " << it->key().ToString(true)
<< "=>" << it->value().ToString(true);
fprintf(stderr, "%s\n", oss.str().c_str());
fflush(stderr);
assert(false);
}
// After decoding secondary index entry, we know a and c. Crc is verified
// in decoding phase.
//
// Form a primary key and search in the primary index.
std::string pk = Record::EncodePrimaryKey(record.a_value());
std::string value;
s = db_->Get(ropts, pk, &value);
if (!s.ok()) {
oss << "Error searching pk " << Slice(pk).ToString(true) << ". "
<< s.ToString() << ". sk " << it->key().ToString(true);
fprintf(stderr, "%s\n", oss.str().c_str());
fflush(stderr);
assert(false);
}
auto result = Record::DecodePrimaryIndexValue(value);
s = std::get<0>(result);
if (!s.ok()) {
oss << "Error decoding primary index value "
<< Slice(value).ToString(true) << ". " << s.ToString();
fprintf(stderr, "%s\n", oss.str().c_str());
fflush(stderr);
assert(false);
}
uint32_t c_in_primary = std::get<2>(result);
if (c_in_primary != record.c_value()) {
oss << "Pk/sk mismatch. pk: " << Slice(pk).ToString(true) << "=>"
<< Slice(value).ToString(true) << " (a=" << record.a_value()
<< ", c=" << c_in_primary << "), sk: " << it->key().ToString(true)
<< " (c=" << record.c_value() << ")";
fprintf(stderr, "%s\n", oss.str().c_str());
fflush(stderr);
assert(false);
}
}
}
std::pair<uint32_t, uint32_t> MultiOpsTxnsStressTest::ChooseExistingA( std::pair<uint32_t, uint32_t> MultiOpsTxnsStressTest::ChooseExistingA(
ThreadState* thread) { ThreadState* thread) {
uint32_t tid = thread->tid; uint32_t tid = thread->tid;
@ -1193,6 +1345,22 @@ uint32_t MultiOpsTxnsStressTest::GenerateNextC(ThreadState* thread) {
return key_gen->Allocate(); return key_gen->Allocate();
} }
#ifndef ROCKSDB_LITE
Status MultiOpsTxnsStressTest::WriteToCommitTimeWriteBatch(Transaction& txn) {
WriteBatch* ctwb = txn.GetCommitTimeWriteBatch();
assert(ctwb);
// Do not change the content in key_buf.
static constexpr char key_buf[sizeof(Record::kMetadataPrefix) + 4] = {
'\0', '\0', '\0', '\0', '\0', '\0', '\0', '\xff'};
uint64_t counter_val = counter_.Next();
char val_buf[sizeof(counter_val)];
EncodeFixed64(val_buf, counter_val);
return ctwb->Put(Slice(key_buf, sizeof(key_buf)),
Slice(val_buf, sizeof(val_buf)));
}
#endif // !ROCKSDB_LITE
std::string MultiOpsTxnsStressTest::KeySpaces::EncodeTo() const { std::string MultiOpsTxnsStressTest::KeySpaces::EncodeTo() const {
std::string result; std::string result;
PutFixed32(&result, lb_a); PutFixed32(&result, lb_a);
@ -1428,8 +1596,9 @@ void MultiOpsTxnsStressTest::ScanExistingDb(SharedState* shared, int threads) {
Record record; Record record;
Status s = record.DecodePrimaryIndexEntry(it->key(), it->value()); Status s = record.DecodePrimaryIndexEntry(it->key(), it->value());
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "Cannot decode primary index entry: %s\n", fprintf(stderr, "Cannot decode primary index entry (%s => %s): %s\n",
s.ToString().c_str()); it->key().ToString(true).c_str(),
it->value().ToString(true).c_str(), s.ToString().c_str());
assert(false); assert(false);
} }
uint32_t a = record.a_value(); uint32_t a = record.a_value();

View File

@ -111,6 +111,7 @@ class MultiOpsTxnsStressTest : public StressTest {
public: public:
class Record { class Record {
public: public:
static constexpr uint32_t kMetadataPrefix = 0;
static constexpr uint32_t kPrimaryIndexId = 1; static constexpr uint32_t kPrimaryIndexId = 1;
static constexpr uint32_t kSecondaryIndexId = 2; static constexpr uint32_t kSecondaryIndexId = 2;
@ -261,6 +262,12 @@ class MultiOpsTxnsStressTest : public StressTest {
ThreadState* thread, ThreadState* thread,
const std::vector<int>& rand_column_families) override; const std::vector<int>& rand_column_families) override;
void RegisterAdditionalListeners() override;
#ifndef ROCKSDB_LITE
void PrepareTxnDbOptions(TransactionDBOptions& txn_db_opts) override;
#endif // !ROCKSDB_LITE
Status PrimaryKeyUpdateTxn(ThreadState* thread, uint32_t old_a, Status PrimaryKeyUpdateTxn(ThreadState* thread, uint32_t old_a,
uint32_t old_a_pos, uint32_t new_a); uint32_t old_a_pos, uint32_t new_a);
@ -280,7 +287,17 @@ class MultiOpsTxnsStressTest : public StressTest {
VerifyDb(thread); VerifyDb(thread);
} }
void VerifyPkSkFast(int job_id);
protected: protected:
class Counter {
public:
uint64_t Next() { return value_.fetch_add(1); }
private:
std::atomic<uint64_t> value_ = Env::Default()->NowNanos();
};
using KeySet = std::set<uint32_t>; using KeySet = std::set<uint32_t>;
class KeyGenerator { class KeyGenerator {
public: public:
@ -330,9 +347,21 @@ class MultiOpsTxnsStressTest : public StressTest {
uint32_t GenerateNextC(ThreadState* thread); uint32_t GenerateNextC(ThreadState* thread);
#ifndef ROCKSDB_LITE
// Some applications, e.g. MyRocks writes a KV pair to the database via
// commit-time-write-batch (ctwb) in additional to the transaction's regular
// write batch. The key is usually constant representing some system
// metadata, while the value is monoticailly increasing which represents the
// actual value of the metadata. Method WriteToCommitTimeWriteBatch()
// emulates this scenario.
Status WriteToCommitTimeWriteBatch(Transaction& txn);
#endif //! ROCKSDB_LITE
std::vector<std::unique_ptr<KeyGenerator>> key_gen_for_a_; std::vector<std::unique_ptr<KeyGenerator>> key_gen_for_a_;
std::vector<std::unique_ptr<KeyGenerator>> key_gen_for_c_; std::vector<std::unique_ptr<KeyGenerator>> key_gen_for_c_;
Counter counter_{};
private: private:
struct KeySpaces { struct KeySpaces {
uint32_t lb_a = 0; uint32_t lb_a = 0;
@ -370,5 +399,38 @@ class InvariantChecker {
"MultiOpsTxnsStressTest::Record::c_ must be 4 bytes"); "MultiOpsTxnsStressTest::Record::c_ must be 4 bytes");
}; };
class MultiOpsTxnsStressListener : public EventListener {
public:
explicit MultiOpsTxnsStressListener(MultiOpsTxnsStressTest* stress_test)
: stress_test_(stress_test) {
assert(stress_test_);
}
#ifndef ROCKSDB_LITE
~MultiOpsTxnsStressListener() override {}
void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
assert(db);
#ifdef NDEBUG
(void)db;
#endif
assert(info.cf_id == 0);
stress_test_->VerifyPkSkFast(info.job_id);
}
void OnCompactionCompleted(DB* db, const CompactionJobInfo& info) override {
assert(db);
#ifdef NDEBUG
(void)db;
#endif
assert(info.cf_id == 0);
stress_test_->VerifyPkSkFast(info.job_id);
}
#endif //! ROCKSDB_LITE
private:
MultiOpsTxnsStressTest* const stress_test_ = nullptr;
};
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
#endif // GFLAGS #endif // GFLAGS

8
env/fs_posix.cc vendored
View File

@ -1122,7 +1122,13 @@ class PosixFileSystem : public FileSystem {
// in posix for IOUring requests. Currently it calls Poll to wait for requests // in posix for IOUring requests. Currently it calls Poll to wait for requests
// to complete the request. // to complete the request.
virtual IOStatus AbortIO(std::vector<void*>& io_handles) override { virtual IOStatus AbortIO(std::vector<void*>& io_handles) override {
return Poll(io_handles, io_handles.size()); IOStatus s = Poll(io_handles, io_handles.size());
// If Poll is not supported then it didn't submit any request and it should
// return OK.
if (s.IsNotSupported()) {
return IOStatus::OK();
}
return s;
} }
#if defined(ROCKSDB_IOURING_PRESENT) #if defined(ROCKSDB_IOURING_PRESENT)

View File

@ -282,7 +282,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) { bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) {
offset += length; offset += length;
length = 0; length = 0;
prefetch_size -= length; prefetch_size = readahead_size;
} }
// Data is overlapping i.e. some of the data is in curr_ buffer and remaining // Data is overlapping i.e. some of the data is in curr_ buffer and remaining
// in second buffer. // in second buffer.
@ -311,7 +311,8 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
// sync prefetching and copy the remaining data to third buffer in the end. // sync prefetching and copy the remaining data to third buffer in the end.
// swap the buffers. // swap the buffers.
curr_ = curr_ ^ 1; curr_ = curr_ ^ 1;
prefetch_size -= length; // Update prefetch_size as length has been updated in CopyDataToBuffer.
prefetch_size = length + readahead_size;
} }
// Update second again if swap happened. // Update second again if swap happened.

View File

@ -63,7 +63,9 @@ struct SavePoint {
class WriteBatch : public WriteBatchBase { class WriteBatch : public WriteBatchBase {
public: public:
explicit WriteBatch(size_t reserved_bytes = 0, size_t max_bytes = 0); explicit WriteBatch(size_t reserved_bytes = 0, size_t max_bytes = 0)
: WriteBatch(reserved_bytes, max_bytes, 0, 0) {}
// `protection_bytes_per_key` is the number of bytes used to store // `protection_bytes_per_key` is the number of bytes used to store
// protection information for each key entry. Currently supported values are // protection information for each key entry. Currently supported values are
// zero (disabled) and eight. // zero (disabled) and eight.
@ -318,8 +320,17 @@ class WriteBatch : public WriteBatchBase {
protected: protected:
friend class WriteBatchInternal; friend class WriteBatchInternal;
virtual bool WriteAfterCommit() const { return true; } enum class OptionState {
virtual bool WriteBeforePrepare() const { return false; } kUnknown,
kDisabled,
kEnabled,
};
virtual OptionState WriteAfterCommit() const {
return OptionState::kUnknown;
}
virtual OptionState WriteBeforePrepare() const {
return OptionState::kUnknown;
}
}; };
Status Iterate(Handler* handler) const; Status Iterate(Handler* handler) const;
@ -402,6 +413,9 @@ class WriteBatch : public WriteBatchBase {
struct ProtectionInfo; struct ProtectionInfo;
size_t GetProtectionBytesPerKey() const; size_t GetProtectionBytesPerKey() const;
// Clears prot_info_ if there are no entries.
void ClearProtectionInfoIfEmpty();
private: private:
friend class WriteBatchInternal; friend class WriteBatchInternal;
friend class LocalSavePoint; friend class LocalSavePoint;

View File

@ -91,7 +91,7 @@ Status RandomTransactionInserter::DBGet(
Status s; Status s;
// Five digits (since the largest uint16_t is 65535) plus the NUL // Five digits (since the largest uint16_t is 65535) plus the NUL
// end char. // end char.
char prefix_buf[6]; char prefix_buf[6] = {0};
// Pad prefix appropriately so we can iterate over each set // Pad prefix appropriately so we can iterate over each set
assert(set_i + 1 <= 9999); assert(set_i + 1 <= 9999);
snprintf(prefix_buf, sizeof(prefix_buf), "%.4u", set_i + 1); snprintf(prefix_buf, sizeof(prefix_buf), "%.4u", set_i + 1);
@ -165,7 +165,11 @@ bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn,
// Increment key // Increment key
std::string sum = ToString(int_value + incr); std::string sum = ToString(int_value + incr);
if (txn != nullptr) { if (txn != nullptr) {
if ((set_i % 4) != 0) {
s = txn->SingleDelete(key); s = txn->SingleDelete(key);
} else {
s = txn->Delete(key);
}
if (!get_for_update && (s.IsBusy() || s.IsTimedOut())) { if (!get_for_update && (s.IsBusy() || s.IsTimedOut())) {
// If the initial get was not for update, then the key is not locked // If the initial get was not for update, then the key is not locked
// before put and put could fail due to concurrent writes. // before put and put could fail due to concurrent writes.

View File

@ -33,6 +33,23 @@ class Random64;
// RandomTransactionInserter with similar arguments using the same DB. // RandomTransactionInserter with similar arguments using the same DB.
class RandomTransactionInserter { class RandomTransactionInserter {
public: public:
static bool RollbackDeletionTypeCallback(const Slice& key) {
// These are hard-coded atm. See how RandomTransactionInserter::DoInsert()
// determines whether to use SingleDelete or Delete for a key.
assert(key.size() >= 4);
const char* ptr = key.data();
assert(ptr);
while (ptr && ptr < key.data() + 4 && *ptr == '0') {
++ptr;
}
std::string prefix(ptr, 4 - (ptr - key.data()));
unsigned long set_i = std::stoul(prefix);
assert(set_i > 0);
assert(set_i <= 9999);
--set_i;
return ((set_i % 4) != 0);
}
// num_keys is the number of keys in each set. // num_keys is the number of keys in each set.
// num_sets is the number of sets of keys. // num_sets is the number of sets of keys.
// cmt_delay_ms is the delay between prepare (if there is any) and commit // cmt_delay_ms is the delay between prepare (if there is any) and commit

View File

@ -383,6 +383,7 @@ multiops_txn_default_params = {
# compactions. # compactions.
"flush_one_in": 1000, "flush_one_in": 1000,
"key_spaces_path": setup_multiops_txn_key_spaces_file(), "key_spaces_path": setup_multiops_txn_key_spaces_file(),
"rollback_one_in": 4,
} }
multiops_wc_txn_params = { multiops_wc_txn_params = {
@ -401,6 +402,10 @@ multiops_wp_txn_params = {
"enable_pipelined_write": 0, "enable_pipelined_write": 0,
# OpenReadOnly after checkpoint is not currnetly compatible with WritePrepared txns # OpenReadOnly after checkpoint is not currnetly compatible with WritePrepared txns
"checkpoint_one_in": 0, "checkpoint_one_in": 0,
# Required to be 1 in order to use commit-time-batch
"use_only_the_last_commit_time_batch_for_recovery": 1,
"recycle_log_file_num": 0,
"clear_wp_commit_cache_one_in": 10,
} }
def finalize_and_sanitize(src_params): def finalize_and_sanitize(src_params):

View File

@ -2538,7 +2538,10 @@ class InMemoryHandler : public WriteBatch::Handler {
~InMemoryHandler() override {} ~InMemoryHandler() override {}
protected: protected:
bool WriteAfterCommit() const override { return write_after_commit_; } Handler::OptionState WriteAfterCommit() const override {
return write_after_commit_ ? Handler::OptionState::kEnabled
: Handler::OptionState::kDisabled;
}
private: private:
std::stringstream& row_; std::stringstream& row_;

View File

@ -5469,6 +5469,10 @@ Status TransactionStressTestInserter(
TEST_P(MySQLStyleTransactionTest, TransactionStressTest) { TEST_P(MySQLStyleTransactionTest, TransactionStressTest) {
// Small write buffer to trigger more compactions // Small write buffer to trigger more compactions
options.write_buffer_size = 1024; options.write_buffer_size = 1024;
txn_db_options.rollback_deletion_type_callback =
[](TransactionDB*, ColumnFamilyHandle*, const Slice& key) {
return RandomTransactionInserter::RollbackDeletionTypeCallback(key);
};
ASSERT_OK(ReOpenNoDelete()); ASSERT_OK(ReOpenNoDelete());
constexpr size_t num_workers = 4; // worker threads count constexpr size_t num_workers = 4; // worker threads count
constexpr size_t num_checkers = 2; // checker threads count constexpr size_t num_checkers = 2; // checker threads count

View File

@ -3202,6 +3202,8 @@ TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotAfterSeqZeroing2) {
TEST_P(WritePreparedTransactionTest, SingleDeleteAfterRollback) { TEST_P(WritePreparedTransactionTest, SingleDeleteAfterRollback) {
constexpr size_t kSnapshotCacheBits = 7; // same as default constexpr size_t kSnapshotCacheBits = 7; // same as default
constexpr size_t kCommitCacheBits = 0; // minimum commit cache constexpr size_t kCommitCacheBits = 0; // minimum commit cache
txn_db_options.rollback_deletion_type_callback =
[](TransactionDB*, ColumnFamilyHandle*, const Slice&) { return true; };
UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits); UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
ASSERT_OK(ReOpen()); ASSERT_OK(ReOpen());

View File

@ -374,7 +374,9 @@ Status WritePreparedTxn::RollbackInternal() {
} }
protected: protected:
bool WriteAfterCommit() const override { return false; } Handler::OptionState WriteAfterCommit() const override {
return Handler::OptionState::kDisabled;
}
} rollback_handler(db_impl_, wpt_db_, read_at_seq, &rollback_batch, } rollback_handler(db_impl_, wpt_db_, read_at_seq, &rollback_batch,
*cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(), *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(),
wpt_db_->txn_db_options_.rollback_merge_operands, wpt_db_->txn_db_options_.rollback_merge_operands,

View File

@ -26,6 +26,18 @@
#include "utilities/transactions/pessimistic_transaction.h" #include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/transaction_db_mutex_impl.h" #include "utilities/transactions/transaction_db_mutex_impl.h"
// This function is for testing only. If it returns true, then all entries in
// the commit cache will be evicted. Unit and/or stress tests (db_stress)
// can implement this function and customize how frequently commit cache
// eviction occurs.
// TODO: remove this function once we can configure commit cache to be very
// small so that eviction occurs very frequently. This requires the commit
// cache entry to be able to encode prepare and commit sequence numbers so that
// the commit sequence number does not have to be within a certain range of
// prepare sequence number.
extern "C" bool rocksdb_write_prepared_TEST_ShouldClearCommitCache(void)
__attribute__((__weak__));
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
Status WritePreparedTxnDB::Initialize( Status WritePreparedTxnDB::Initialize(
@ -505,6 +517,12 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
// legit when a commit entry in a write batch overwrite the previous one // legit when a commit entry in a write batch overwrite the previous one
max_evicted_seq = evicted.commit_seq; max_evicted_seq = evicted.commit_seq;
} }
#ifdef OS_LINUX
if (rocksdb_write_prepared_TEST_ShouldClearCommitCache &&
rocksdb_write_prepared_TEST_ShouldClearCommitCache()) {
max_evicted_seq = last;
}
#endif // OS_LINUX
ROCKS_LOG_DETAILS(info_log_, ROCKS_LOG_DETAILS(info_log_,
"%lu Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64 "%lu Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64
" => %lu", " => %lu",

View File

@ -513,6 +513,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
friend class WriteUnpreparedTxn; friend class WriteUnpreparedTxn;
friend class WriteUnpreparedTxnDB; friend class WriteUnpreparedTxnDB;
friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
friend class MultiOpsTxnsStressTest;
void Init(const TransactionDBOptions& txn_db_opts); void Init(const TransactionDBOptions& txn_db_opts);
@ -1081,7 +1082,9 @@ struct SubBatchCounter : public WriteBatch::Handler {
} }
Status MarkBeginPrepare(bool) override { return Status::OK(); } Status MarkBeginPrepare(bool) override { return Status::OK(); }
Status MarkRollback(const Slice&) override { return Status::OK(); } Status MarkRollback(const Slice&) override { return Status::OK(); }
bool WriteAfterCommit() const override { return false; } Handler::OptionState WriteAfterCommit() const override {
return Handler::OptionState::kDisabled;
}
}; };
SnapshotBackup WritePreparedTxnDB::AssignMinMaxSeqs(const Snapshot* snapshot, SnapshotBackup WritePreparedTxnDB::AssignMinMaxSeqs(const Snapshot* snapshot,

View File

@ -675,7 +675,11 @@ Status WriteUnpreparedTxn::WriteRollbackKeys(
s = rollback_batch->Put(cf_handle, key, pinnable_val); s = rollback_batch->Put(cf_handle, key, pinnable_val);
assert(s.ok()); assert(s.ok());
} else if (s.IsNotFound()) { } else if (s.IsNotFound()) {
if (wupt_db_->ShouldRollbackWithSingleDelete(cf_handle, key)) {
s = rollback_batch->SingleDelete(cf_handle, key);
} else {
s = rollback_batch->Delete(cf_handle, key); s = rollback_batch->Delete(cf_handle, key);
}
assert(s.ok()); assert(s.ok());
} else { } else {
return s; return s;