Compare commits
3 Commits
main
...
6.1.fb.pro
Author | SHA1 | Date | |
---|---|---|---|
|
8c9ac08735 | ||
|
f7e0545692 | ||
|
2c3eaeb4db |
@ -477,20 +477,6 @@ void CompactionIterator::NextFromInput() {
|
||||
// in this snapshot.
|
||||
assert(last_sequence >= current_user_key_sequence_);
|
||||
|
||||
// Note2: if last_snapshot < current_user_key_snapshot, it can only
|
||||
// mean last_snapshot is released between we process last value and
|
||||
// this value, and findEarliestVisibleSnapshot returns the next snapshot
|
||||
// as current_user_key_snapshot. In this case last value and current
|
||||
// value are both in current_user_key_snapshot currently.
|
||||
// Although last_snapshot is released we might still get a definitive
|
||||
// response when key sequence number changes, e.g., when seq is determined
|
||||
// too old and visible in all snapshots.
|
||||
assert(last_snapshot == current_user_key_snapshot_ ||
|
||||
(snapshot_checker_ != nullptr &&
|
||||
snapshot_checker_->CheckInSnapshot(current_user_key_sequence_,
|
||||
last_snapshot) !=
|
||||
SnapshotCheckerResult::kNotInSnapshot));
|
||||
|
||||
++iter_stats_.num_record_drop_hidden; // (A)
|
||||
input_->Next();
|
||||
} else if (compaction_ != nullptr && ikey_.type == kTypeDeletion &&
|
||||
|
@ -1026,10 +1026,13 @@ int DBImpl::FindMinimumEmptyLevelFitting(
|
||||
|
||||
Status DBImpl::FlushWAL(bool sync) {
|
||||
if (manual_wal_flush_) {
|
||||
Status s;
|
||||
{
|
||||
// We need to lock log_write_mutex_ since logs_ might change concurrently
|
||||
InstrumentedMutexLock wl(&log_write_mutex_);
|
||||
log::Writer* cur_log_writer = logs_.back().writer;
|
||||
auto s = cur_log_writer->WriteBuffer();
|
||||
s = cur_log_writer->WriteBuffer();
|
||||
}
|
||||
if (!s.ok()) {
|
||||
ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
|
||||
s.ToString().c_str());
|
||||
|
@ -1261,6 +1261,8 @@ class DBImpl : public DB {
|
||||
// logfile_number_. With two_write_queues it also protects alive_log_files_,
|
||||
// and log_empty_. Refer to the definition of each variable below for more
|
||||
// details.
|
||||
// Note: to avoid dealock, if needed to acquire both log_write_mutex_ and
|
||||
// mutex_, the order should be first mutex_ and then log_write_mutex_.
|
||||
InstrumentedMutex log_write_mutex_;
|
||||
|
||||
std::atomic<bool> shutting_down_;
|
||||
|
@ -178,9 +178,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
|
||||
WriteThread::WriteGroup write_group;
|
||||
bool in_parallel_group = false;
|
||||
uint64_t last_sequence = kMaxSequenceNumber;
|
||||
if (!two_write_queues_) {
|
||||
last_sequence = versions_->LastSequence();
|
||||
}
|
||||
|
||||
mutex_.Lock();
|
||||
|
||||
@ -195,6 +192,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
|
||||
PERF_TIMER_STOP(write_pre_and_post_process_time);
|
||||
|
||||
status = PreprocessWrite(write_options, &need_log_sync, &write_context);
|
||||
if (!two_write_queues_) {
|
||||
// Assign it after ::PreprocessWrite since the sequence might advance
|
||||
// inside it by WriteRecoverableState
|
||||
last_sequence = versions_->LastSequence();
|
||||
}
|
||||
|
||||
PERF_TIMER_START(write_pre_and_post_process_time);
|
||||
}
|
||||
@ -994,8 +996,12 @@ Status DBImpl::WriteRecoverableState() {
|
||||
for (uint64_t sub_batch_seq = seq + 1;
|
||||
sub_batch_seq < next_seq && status.ok(); sub_batch_seq++) {
|
||||
uint64_t const no_log_num = 0;
|
||||
// Unlock it since the callback might end up locking mutex. e.g.,
|
||||
// AddCommitted -> AdvanceMaxEvictedSeq -> GetSnapshotListFromDB
|
||||
mutex_.Unlock();
|
||||
status = recoverable_state_pre_release_callback_->Callback(
|
||||
sub_batch_seq, !DISABLE_MEMTABLE, no_log_num);
|
||||
mutex_.Lock();
|
||||
}
|
||||
}
|
||||
if (status.ok()) {
|
||||
|
@ -5,8 +5,8 @@
|
||||
#pragma once
|
||||
|
||||
#define ROCKSDB_MAJOR 6
|
||||
#define ROCKSDB_MINOR 1
|
||||
#define ROCKSDB_PATCH 1
|
||||
#define ROCKSDB_MINOR 2
|
||||
#define ROCKSDB_PATCH 0
|
||||
|
||||
// Do not use these. We made the mistake of declaring macros starting with
|
||||
// double underscore. Now we have to live with our choice. We'll deprecate these
|
||||
|
@ -205,6 +205,12 @@ bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn,
|
||||
ROCKS_LOG_DEBUG(db->GetDBOptions().info_log,
|
||||
"Prepare of %" PRIu64 " %s (%s)", txn->GetId(),
|
||||
s.ToString().c_str(), txn->GetName().c_str());
|
||||
if (rand_->OneIn(20)) {
|
||||
// This currently only tests the mechanics of writing commit time
|
||||
// write batch so the exact values would not matter.
|
||||
s = txn_->GetCommitTimeWriteBatch()->Put("cat", "dog");
|
||||
assert(s.ok());
|
||||
}
|
||||
db->GetDBOptions().env->SleepForMicroseconds(
|
||||
static_cast<int>(cmt_delay_ms_ * 1000));
|
||||
}
|
||||
|
@ -5003,6 +5003,9 @@ Status TransactionStressTestInserter(
|
||||
WriteOptions write_options;
|
||||
ReadOptions read_options;
|
||||
TransactionOptions txn_options;
|
||||
if (rand->OneIn(2)) {
|
||||
txn_options.use_only_the_last_commit_time_batch_for_recovery = true;
|
||||
}
|
||||
// Inside the inserter we might also retake the snapshot. We do both since two
|
||||
// separte functions are engaged for each.
|
||||
txn_options.set_snapshot = rand->OneIn(2);
|
||||
|
@ -466,6 +466,7 @@ class MySQLStyleTransactionTest
|
||||
// structures.
|
||||
txn_db_options.wp_snapshot_cache_bits = 1;
|
||||
txn_db_options.wp_commit_cache_bits = 10;
|
||||
options.write_buffer_size = 1024;
|
||||
EXPECT_OK(ReOpen());
|
||||
}
|
||||
};
|
||||
|
Loading…
x
Reference in New Issue
Block a user