WritePrepared Txn: Refactor conf params

Summary:
Summary of changes:
- Move seq_per_batch out of Options
- Rename concurrent_prepare to two_write_queues
- Add allocate_seq_only_for_data_
Closes https://github.com/facebook/rocksdb/pull/3136

Differential Revision: D6304458

Pulled By: maysamyabandeh

fbshipit-source-id: 08e685bfa82bbc41b5b1c5eb7040a8ca6e05e58c
This commit is contained in:
Maysam Yabandeh 2017-11-10 17:18:01 -08:00 committed by Facebook Github Bot
parent 07c2738ffa
commit 857adf388f
30 changed files with 215 additions and 183 deletions

View File

@ -143,7 +143,7 @@ class CompactionJobTest : public testing::Test {
}
void SetLastSequence(const SequenceNumber sequence_number) {
versions_->SetLastToBeWrittenSequence(sequence_number + 1);
versions_->SetLastAllocatedSequence(sequence_number + 1);
versions_->SetLastSequence(sequence_number + 1);
}

View File

@ -136,7 +136,8 @@ void DumpSupportInfo(Logger* logger) {
int64_t kDefaultLowPriThrottledRate = 2 * 1024 * 1024;
} // namespace
DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
const bool seq_per_batch)
: env_(options.env),
dbname_(dbname),
initial_db_options_(SanitizeOptions(dbname, options)),
@ -185,18 +186,30 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
env_options_, immutable_db_options_)),
num_running_ingest_file_(0),
#ifndef ROCKSDB_LITE
wal_manager_(immutable_db_options_, env_options_),
wal_manager_(immutable_db_options_, env_options_, seq_per_batch),
#endif // ROCKSDB_LITE
event_logger_(immutable_db_options_.info_log.get()),
bg_work_paused_(0),
bg_compaction_paused_(0),
refitting_level_(false),
opened_successfully_(false),
concurrent_prepare_(options.concurrent_prepare),
two_write_queues_(options.two_write_queues),
manual_wal_flush_(options.manual_wal_flush),
seq_per_batch_(options.seq_per_batch),
// TODO(myabandeh): revise this when we change options.seq_per_batch
use_custom_gc_(options.seq_per_batch),
seq_per_batch_(seq_per_batch),
// When two_write_queues_ and seq_per_batch_ are both enabled we
// sometimes allocate a seq also to indicate the commit timestmamp of a
// transaction. In such cases last_sequence_ would not indicate the last
// visible sequence number in memtable and should not be used for
// snapshots. It should use last_allocated_sequence_ instaed but also
// needs other mechanisms to exclude the data that after last_sequence_
// and before last_allocated_sequence_ from the snapshot. In
// WritePreparedTxn this property is ensured since such data are not
// committed yet.
allocate_seq_only_for_data_(!(seq_per_batch && options.two_write_queues)),
// Since seq_per_batch_ is currently set only by WritePreparedTxn which
// requires a custom gc for compaction, we use that to set use_custom_gc_
// as well.
use_custom_gc_(seq_per_batch),
preserve_deletes_(options.preserve_deletes) {
env_->GetAbsolutePath(dbname, &db_absolute_path_);
@ -751,7 +764,7 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() const {
}
SequenceNumber DBImpl::IncAndFetchSequenceNumber() {
return versions_->FetchAddLastToBeWrittenSequence(1ull) + 1ull;
return versions_->FetchAddLastAllocatedSequence(1ull) + 1ull;
}
bool DBImpl::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) {
@ -977,9 +990,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
// super versipon because a flush happening in between may compact
// away data for the snapshot, but the snapshot is earlier than the
// data overwriting it, so users may see wrong results.
snapshot = concurrent_prepare_ && seq_per_batch_
? versions_->LastToBeWrittenSequence()
: versions_->LastSequence();
snapshot = allocate_seq_only_for_data_ ? versions_->LastSequence()
: versions_->LastAllocatedSequence();
}
TEST_SYNC_POINT("DBImpl::GetImpl:3");
TEST_SYNC_POINT("DBImpl::GetImpl:4");
@ -1070,9 +1082,8 @@ std::vector<Status> DBImpl::MultiGet(
snapshot = reinterpret_cast<const SnapshotImpl*>(
read_options.snapshot)->number_;
} else {
snapshot = concurrent_prepare_ && seq_per_batch_
? versions_->LastToBeWrittenSequence()
: versions_->LastSequence();
snapshot = allocate_seq_only_for_data_ ? versions_->LastSequence()
: versions_->LastAllocatedSequence();
}
for (auto mgd_iter : multiget_cf_data) {
mgd_iter.second->super_version =
@ -1478,8 +1489,9 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
read_callback);
#endif
} else {
// Note: no need to consider the special case of concurrent_prepare_ &&
// seq_per_batch_ since NewIterator is overridden in WritePreparedTxnDB
// Note: no need to consider the special case of
// allocate_seq_only_for_data_==false since NewIterator is overridden in
// WritePreparedTxnDB
auto snapshot = read_options.snapshot != nullptr
? read_options.snapshot->GetSequenceNumber()
: versions_->LastSequence();
@ -1595,8 +1607,9 @@ Status DBImpl::NewIterators(
}
#endif
} else {
// Note: no need to consider the special case of concurrent_prepare_ &&
// seq_per_batch_ since NewIterators is overridden in WritePreparedTxnDB
// Note: no need to consider the special case of
// allocate_seq_only_for_data_==false since NewIterators is overridden in
// WritePreparedTxnDB
auto snapshot = read_options.snapshot != nullptr
? read_options.snapshot->GetSequenceNumber()
: versions_->LastSequence();
@ -1630,9 +1643,9 @@ const Snapshot* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary) {
delete s;
return nullptr;
}
auto snapshot_seq = concurrent_prepare_ && seq_per_batch_
? versions_->LastToBeWrittenSequence()
: versions_->LastSequence();
auto snapshot_seq = allocate_seq_only_for_data_
? versions_->LastSequence()
: versions_->LastAllocatedSequence();
return snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary);
}
@ -1643,9 +1656,9 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {
snapshots_.Delete(casted_s);
uint64_t oldest_snapshot;
if (snapshots_.empty()) {
oldest_snapshot = concurrent_prepare_ && seq_per_batch_
? versions_->LastToBeWrittenSequence()
: versions_->LastSequence();
oldest_snapshot = allocate_seq_only_for_data_
? versions_->LastSequence()
: versions_->LastAllocatedSequence();
} else {
oldest_snapshot = snapshots_.oldest()->number_;
}
@ -2753,7 +2766,7 @@ Status DBImpl::IngestExternalFile(
WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_);
WriteThread::Writer nonmem_w;
if (concurrent_prepare_) {
if (two_write_queues_) {
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}
@ -2796,7 +2809,7 @@ Status DBImpl::IngestExternalFile(
}
// Resume writes to the DB
if (concurrent_prepare_) {
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
write_thread_.ExitUnbatched(&w);

View File

@ -68,7 +68,8 @@ struct MemTableInfo;
class DBImpl : public DB {
public:
DBImpl(const DBOptions& options, const std::string& dbname);
DBImpl(const DBOptions& options, const std::string& dbname,
const bool seq_per_batch = false);
virtual ~DBImpl();
// Implementations of the DB interface
@ -220,10 +221,10 @@ class DBImpl : public DB {
virtual SequenceNumber GetLatestSequenceNumber() const override;
virtual SequenceNumber IncAndFetchSequenceNumber();
// Returns LastToBeWrittenSequence in concurrent_prepare_ && seq_per_batch_
// mode and LastSequence otherwise. This is useful when visiblility depends
// also on data written to the WAL but not to the memtable.
SequenceNumber TEST_GetLatestVisibleSequenceNumber() const;
// Returns LastSequence in allocate_seq_only_for_data_
// mode and LastAllocatedSequence otherwise. This is useful when visiblility
// depends also on data written to the WAL but not to the memtable.
SequenceNumber TEST_GetLastVisibleSequence() const;
virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) override;
@ -606,6 +607,12 @@ class DBImpl : public DB {
Status NewDB();
// This is to be used only by internal rocksdb classes.
static Status Open(const DBOptions& db_options, const std::string& name,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
const bool seq_per_batch);
protected:
Env* const env_;
const std::string dbname_;
@ -905,12 +912,12 @@ class DBImpl : public DB {
FileLock* db_lock_;
// In addition to mutex_, log_write_mutex_ protected writes to logs_ and
// logfile_number_. With concurrent_prepare it also protects alive_log_files_,
// logfile_number_. With two_write_queues it also protects alive_log_files_,
// and log_empty_. Refer to the definition of each variable below for more
// details.
InstrumentedMutex log_write_mutex_;
// State below is protected by mutex_
// With concurrent_prepare enabled, some of the variables that accessed during
// With two_write_queues enabled, some of the variables that accessed during
// WriteToWAL need different synchronization: log_empty_, alive_log_files_,
// logs_, logfile_number_. Refer to the definition of each variable below for
// more description.
@ -935,10 +942,10 @@ class DBImpl : public DB {
std::deque<uint64_t>
log_recycle_files; // a list of log files that we can recycle
bool log_dir_synced_;
// Without concurrent_prepare, read and writes to log_empty_ are protected by
// Without two_write_queues, read and writes to log_empty_ are protected by
// mutex_. Since it is currently updated/read only in write_thread_, it can be
// accessed from the same write_thread_ without any locks. With
// concurrent_prepare writes, where it can be updated in different threads,
// two_write_queues writes, where it can be updated in different threads,
// read and writes are protected by log_write_mutex_ instead. This is to avoid
// expesnive mutex_ lock during WAL write, which update log_empty_.
bool log_empty_;
@ -975,10 +982,10 @@ class DBImpl : public DB {
// true for some prefix of logs_
bool getting_synced = false;
};
// Without concurrent_prepare, read and writes to alive_log_files_ are
// Without two_write_queues, read and writes to alive_log_files_ are
// protected by mutex_. However since back() is never popped, and push_back()
// is done only from write_thread_, the same thread can access the item
// reffered by back() without mutex_. With concurrent_prepare_, writes
// reffered by back() without mutex_. With two_write_queues_, writes
// are protected by locking both mutex_ and log_write_mutex_, and reads must
// be under either mutex_ or log_write_mutex_.
std::deque<LogFileNumberSize> alive_log_files_;
@ -1003,7 +1010,7 @@ class DBImpl : public DB {
// memtable on normal writes and hence improving the throughput. Each new
// write of the state will replace the previous state entirely even if the
// keys in the two consecuitive states do not overlap.
// It is protected by log_write_mutex_ when concurrent_prepare_ is enabled.
// It is protected by log_write_mutex_ when two_write_queues_ is enabled.
// Otherwise only the heaad of write_thread_ can access it.
WriteBatch cached_recoverable_state_;
std::atomic<bool> cached_recoverable_state_empty_ = {true};
@ -1322,9 +1329,22 @@ class DBImpl : public DB {
// When set, we use a seprate queue for writes that dont write to memtable. In
// 2PC these are the writes at Prepare phase.
const bool concurrent_prepare_;
const bool two_write_queues_;
const bool manual_wal_flush_;
// Increase the sequence number after writing each batch, whether memtable is
// disabled for that or not. Otherwise the sequence number is increased after
// writing each key into memtable. This implies that when disable_memtable is
// set, the seq is not increased at all.
//
// Default: false
const bool seq_per_batch_;
// A sequence number is allocated only for data written to DB. Otherwise it
// could also be allocated for operational purposes such as commit timestamp
// of a transaction.
const bool allocate_seq_only_for_data_;
// It indicates that a customized gc algorithm must be used for
// flush/compaction and if it is not provided vis SnapshotChecker, we should
// disable gc to be safe.
const bool use_custom_gc_;
// Clients must periodically call SetPreserveDeletesSequenceNumber()

View File

@ -209,11 +209,11 @@ int DBImpl::TEST_BGFlushesAllowed() const {
return GetBGJobLimits().max_flushes;
}
SequenceNumber DBImpl::TEST_GetLatestVisibleSequenceNumber() const {
if (concurrent_prepare_ && seq_per_batch_) {
return versions_->LastToBeWrittenSequence();
} else {
SequenceNumber DBImpl::TEST_GetLastVisibleSequence() const {
if (allocate_seq_only_for_data_) {
return versions_->LastSequence();
} else {
return versions_->LastAllocatedSequence();
}
}

View File

@ -252,11 +252,11 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
}
job_context->size_log_to_delete += earliest.size;
total_log_size_ -= earliest.size;
if (concurrent_prepare_) {
if (two_write_queues_) {
log_write_mutex_.Lock();
}
alive_log_files_.pop_front();
if (concurrent_prepare_) {
if (two_write_queues_) {
log_write_mutex_.Unlock();
}
// Current log should always stay alive since it can't have

View File

@ -592,9 +592,10 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// happen when we open and write to a corrupted DB, where sequence id
// will start from the last sequence id we recovered.
if (sequence == *next_sequence ||
// With seq_per_batch_, if previous run was with concurrent_prepare_
// then gap in the sequence numbers is expected by the commits
// without prepares.
// With seq_per_batch_, if previous run was with two_write_queues_
// then allocate_seq_only_for_data_ was disabled and a gap in the
// sequence numbers in the log is expected by the commits without
// prepares.
(seq_per_batch_ && sequence >= *next_sequence)) {
stop_replay_for_corruption = false;
}
@ -754,7 +755,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
auto last_sequence = *next_sequence - 1;
if ((*next_sequence != kMaxSequenceNumber) &&
(versions_->LastSequence() <= last_sequence)) {
versions_->SetLastToBeWrittenSequence(last_sequence);
versions_->SetLastAllocatedSequence(last_sequence);
versions_->SetLastSequence(last_sequence);
}
}
@ -845,13 +846,13 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
if (data_seen && !flushed) {
// Mark these as alive so they'll be considered for deletion later by
// FindObsoleteFiles()
if (concurrent_prepare_) {
if (two_write_queues_) {
log_write_mutex_.Lock();
}
for (auto log_number : log_numbers) {
alive_log_files_.push_back(LogFileNumberSize(log_number));
}
if (concurrent_prepare_) {
if (two_write_queues_) {
log_write_mutex_.Unlock();
}
}
@ -967,6 +968,15 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
Status DB::Open(const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
const bool seq_per_batch = true;
return DBImpl::Open(db_options, dbname, column_families, handles, dbptr,
!seq_per_batch);
}
Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
const bool seq_per_batch) {
Status s = SanitizeOptionsByTable(db_options, column_families);
if (!s.ok()) {
return s;
@ -986,7 +996,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
std::max(max_write_buffer_size, cf.options.write_buffer_size);
}
DBImpl* impl = new DBImpl(db_options, dbname);
DBImpl* impl = new DBImpl(db_options, dbname, seq_per_batch);
s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.wal_dir);
if (s.ok()) {
for (auto db_path : impl->immutable_db_options_.db_paths) {
@ -1073,12 +1083,12 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
cfd, &sv_context, *cfd->GetLatestMutableCFOptions());
}
sv_context.Clean();
if (impl->concurrent_prepare_) {
if (impl->two_write_queues_) {
impl->log_write_mutex_.Lock();
}
impl->alive_log_files_.push_back(
DBImpl::LogFileNumberSize(impl->logfile_number_));
if (impl->concurrent_prepare_) {
if (impl->two_write_queues_) {
impl->log_write_mutex_.Unlock();
}
impl->DeleteObsoleteFiles();

View File

@ -67,7 +67,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if (write_options.sync && write_options.disableWAL) {
return Status::InvalidArgument("Sync writes has to enable WAL.");
}
if (concurrent_prepare_ && immutable_db_options_.enable_pipelined_write) {
if (two_write_queues_ && immutable_db_options_.enable_pipelined_write) {
return Status::NotSupported(
"pipelined_writes is not compatible with concurrent prepares");
}
@ -87,7 +87,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
}
}
if (concurrent_prepare_ && disable_memtable) {
if (two_write_queues_ && disable_memtable) {
return WriteImplWALOnly(write_options, my_batch, callback, log_used,
log_ref, seq_used);
}
@ -154,7 +154,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
WriteThread::WriteGroup write_group;
bool in_parallel_group = false;
uint64_t last_sequence = kMaxSequenceNumber;
if (!concurrent_prepare_) {
if (!two_write_queues_) {
last_sequence = versions_->LastSequence();
}
@ -162,7 +162,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
bool need_log_sync = write_options.sync;
bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
if (!concurrent_prepare_ || !disable_memtable) {
if (!two_write_queues_ || !disable_memtable) {
// With concurrent writes we do preprocess only in the write thread that
// also does write to memtable to avoid sync issue on shared data structure
// with the other thread
@ -209,7 +209,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
}
size_t seq_inc = seq_per_batch_ ? write_group.size : total_count;
const bool concurrent_update = concurrent_prepare_;
const bool concurrent_update = two_write_queues_;
// Update stats while we are an exclusive group leader, so we know
// that nobody else can be writing to these particular stats.
// We're optimistic, updating the stats before we successfully
@ -237,7 +237,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
PERF_TIMER_STOP(write_pre_and_post_process_time);
if (!concurrent_prepare_) {
if (!two_write_queues_) {
if (status.ok() && !write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
status = WriteToWAL(write_group, log_writer, log_used, need_log_sync,
@ -246,13 +246,13 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
} else {
if (status.ok() && !write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
// LastToBeWrittenSequence is increased inside WriteToWAL under
// LastAllocatedSequence is increased inside WriteToWAL under
// wal_write_mutex_ to ensure ordered events in WAL
status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
seq_inc);
} else {
// Otherwise we inc seq number for memtable writes
last_sequence = versions_->FetchAddLastToBeWrittenSequence(seq_inc);
last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
}
}
assert(last_sequence != kMaxSequenceNumber);
@ -310,9 +310,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
mutex_.Lock();
MarkLogsSynced(logfile_number_, need_log_dir_sync, status);
mutex_.Unlock();
// Requesting sync with concurrent_prepare_ is expected to be very rare. We
// Requesting sync with two_write_queues_ is expected to be very rare. We
// hance provide a simple implementation that is not necessarily efficient.
if (concurrent_prepare_) {
if (two_write_queues_) {
if (manual_wal_flush_) {
status = FlushWAL(true);
} else {
@ -532,7 +532,7 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
PERF_TIMER_STOP(write_pre_and_post_process_time);
PERF_TIMER_GUARD(write_wal_time);
// LastToBeWrittenSequence is increased inside WriteToWAL under
// LastAllocatedSequence is increased inside WriteToWAL under
// wal_write_mutex_ to ensure ordered events in WAL
size_t seq_inc = seq_per_batch_ ? write_group.size : 0 /*total_count*/;
status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc);
@ -548,7 +548,7 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
}
}
if (status.ok() && write_options.sync) {
// Requesting sync with concurrent_prepare_ is expected to be very rare. We
// Requesting sync with two_write_queues_ is expected to be very rare. We
// hance provide a simple implementation that is not necessarily efficient.
if (manual_wal_flush_) {
status = FlushWAL(true);
@ -719,7 +719,7 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
return merged_batch;
}
// When concurrent_prepare_ is disabled, this function is called from the only
// When two_write_queues_ is disabled, this function is called from the only
// write thread. Otherwise this must be called holding log_write_mutex_.
Status DBImpl::WriteToWAL(const WriteBatch& merged_batch,
log::Writer* log_writer, uint64_t* log_used,
@ -828,7 +828,7 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
writer->log_used = logfile_number_;
}
}
*last_sequence = versions_->FetchAddLastToBeWrittenSequence(seq_inc);
*last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
auto sequence = *last_sequence + 1;
WriteBatchInternal::SetSequence(merged_batch, sequence);
@ -858,7 +858,7 @@ Status DBImpl::WriteRecoverableState() {
if (!cached_recoverable_state_empty_) {
bool dont_care_bool;
SequenceNumber next_seq;
if (concurrent_prepare_) {
if (two_write_queues_) {
log_write_mutex_.Lock();
}
SequenceNumber seq = versions_->LastSequence();
@ -869,7 +869,7 @@ Status DBImpl::WriteRecoverableState() {
false /* concurrent_memtable_writes */, &next_seq, &dont_care_bool,
seq_per_batch_);
versions_->SetLastSequence(--next_seq);
if (concurrent_prepare_) {
if (two_write_queues_) {
log_write_mutex_.Unlock();
}
if (status.ok()) {
@ -1109,7 +1109,7 @@ void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* cfd,
Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
mutex_.AssertHeld();
WriteThread::Writer nonmem_w;
if (concurrent_prepare_) {
if (two_write_queues_) {
// SwitchMemtable is a rare event. To simply the reasoning, we make sure
// that there is no concurrent thread writing to WAL.
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
@ -1135,11 +1135,11 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
// Attempt to switch to a new memtable and trigger flush of old.
// Do this without holding the dbmutex lock.
assert(versions_->prev_log_number() == 0);
if (concurrent_prepare_) {
if (two_write_queues_) {
log_write_mutex_.Lock();
}
bool creating_new_log = !log_empty_;
if (concurrent_prepare_) {
if (two_write_queues_) {
log_write_mutex_.Unlock();
}
uint64_t recycle_log_number = 0;
@ -1226,7 +1226,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
assert(creating_new_log);
assert(!new_mem);
assert(!new_log);
if (concurrent_prepare_) {
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
return s;
@ -1266,7 +1266,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
cfd->SetMemtable(new_mem);
InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context,
mutable_cf_options);
if (concurrent_prepare_) {
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
return s;

View File

@ -486,7 +486,7 @@ Options DBTestBase::GetOptions(
}
case kConcurrentWALWrites: {
// This options optimize 2PC commit path
options.concurrent_prepare = true;
options.two_write_queues = true;
options.manual_wal_flush = true;
break;
}

View File

@ -730,7 +730,7 @@ class RecoveryTestHelper {
batch.Put(key, value);
WriteBatchInternal::SetSequence(&batch, seq);
current_log_writer->AddRecord(WriteBatchInternal::Contents(&batch));
versions->SetLastToBeWrittenSequence(seq);
versions->SetLastAllocatedSequence(seq);
versions->SetLastSequence(seq);
}
}

View File

@ -164,7 +164,7 @@ Status ExternalSstFileIngestionJob::Run() {
// if the dont overlap with any ranges since we have snapshots
force_global_seqno = true;
}
// It is safe to use this instead of LastToBeWrittenSequence since we are
// It is safe to use this instead of LastAllocatedSequence since we are
// the only active writer, and hence they are equal
const SequenceNumber last_seqno = versions_->LastSequence();
SuperVersion* super_version = cfd_->GetSuperVersion();
@ -199,7 +199,7 @@ Status ExternalSstFileIngestionJob::Run() {
}
if (consumed_seqno) {
versions_->SetLastToBeWrittenSequence(last_seqno + 1);
versions_->SetLastAllocatedSequence(last_seqno + 1);
versions_->SetLastSequence(last_seqno + 1);
}

View File

@ -548,7 +548,7 @@ class Repairer {
max_sequence = tables_[i].max_sequence;
}
}
vset_.SetLastToBeWrittenSequence(max_sequence);
vset_.SetLastAllocatedSequence(max_sequence);
vset_.SetLastSequence(max_sequence);
for (const auto& cf_id_and_tables : cf_id_to_tables) {

View File

@ -19,7 +19,8 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl(
const std::string& dir, const ImmutableDBOptions* options,
const TransactionLogIterator::ReadOptions& read_options,
const EnvOptions& soptions, const SequenceNumber seq,
std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions)
std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions,
const bool seq_per_batch)
: dir_(dir),
options_(options),
read_options_(read_options),
@ -31,7 +32,8 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl(
currentFileIndex_(0),
currentBatchSeq_(0),
currentLastSeq_(0),
versions_(versions) {
versions_(versions),
seq_per_batch_(seq_per_batch) {
assert(files_ != nullptr);
assert(versions_ != nullptr);
@ -241,12 +243,12 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
}
startingSequenceNumber_ = expectedSeq;
// currentStatus_ will be set to Ok if reseek succeeds
// Note: this is still ok in seq_pre_batch_ && concurrent_preparep_ mode
// Note: this is still ok in seq_pre_batch_ && two_write_queuesp_ mode
// that allows gaps in the WAL since it will still skip over the gap.
currentStatus_ = Status::NotFound("Gap in sequence numbers");
// In seq_per_batch mode, gaps in the seq are possible so the strict mode
// In seq_per_batch_ mode, gaps in the seq are possible so the strict mode
// should be disabled
return SeekToStartSequence(currentFileIndex_, !options_->seq_per_batch);
return SeekToStartSequence(currentFileIndex_, !seq_per_batch_);
}
struct BatchCounter : public WriteBatch::Handler {
@ -284,7 +286,7 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
};
currentBatchSeq_ = WriteBatchInternal::Sequence(batch.get());
if (options_->seq_per_batch) {
if (seq_per_batch_) {
BatchCounter counter(currentBatchSeq_);
batch->Iterate(&counter);
currentLastSeq_ = counter.sequence_;

View File

@ -62,7 +62,8 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
const std::string& dir, const ImmutableDBOptions* options,
const TransactionLogIterator::ReadOptions& read_options,
const EnvOptions& soptions, const SequenceNumber seqNum,
std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions);
std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions,
const bool seq_per_batch);
virtual bool Valid() override;
@ -103,7 +104,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
// Used only to get latest seq. num
// TODO(icanadi) can this be just a callback?
VersionSet const* const versions_;
const bool seq_per_batch_;
// Reads from transaction log only if the writebatch record has been written
bool RestrictedRead(Slice* record, std::string* scratch);
// Seeks to startingSequenceNumber reading from startFileIndex in files_.

View File

@ -2414,7 +2414,7 @@ VersionSet::VersionSet(const std::string& dbname,
manifest_file_number_(0), // Filled by Recover()
pending_manifest_file_number_(0),
last_sequence_(0),
last_to_be_written_sequence_(0),
last_allocated_sequence_(0),
prev_log_number_(0),
current_version_number_(0),
manifest_file_size_(0),
@ -2754,10 +2754,9 @@ void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
// updated the last_sequence_ yet. It is also possible that the log has is
// expecting some new data that is not written yet. Since LastSequence is an
// upper bound on the sequence, it is ok to record
// last_to_be_written_sequence_ as the last sequence.
edit->SetLastSequence(db_options_->concurrent_prepare
? last_to_be_written_sequence_
: last_sequence_);
// last_allocated_sequence_ as the last sequence.
edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
: last_sequence_);
if (edit->is_column_family_drop_) {
// if we drop column family, we have to make sure to save max column family,
// so that we don't reuse existing ID
@ -2784,10 +2783,9 @@ void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
// updated the last_sequence_ yet. It is also possible that the log has is
// expecting some new data that is not written yet. Since LastSequence is an
// upper bound on the sequence, it is ok to record
// last_to_be_written_sequence_ as the last sequence.
edit->SetLastSequence(db_options_->concurrent_prepare
? last_to_be_written_sequence_
: last_sequence_);
// last_allocated_sequence_ as the last sequence.
edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
: last_sequence_);
builder->Apply(edit);
}
@ -3077,7 +3075,7 @@ Status VersionSet::Recover(
manifest_file_size_ = current_manifest_file_size;
next_file_number_.store(next_file + 1);
last_to_be_written_sequence_ = last_sequence;
last_allocated_sequence_ = last_sequence;
last_sequence_ = last_sequence;
prev_log_number_ = previous_log_number;
@ -3448,7 +3446,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
}
next_file_number_.store(next_file + 1);
last_to_be_written_sequence_ = last_sequence;
last_allocated_sequence_ = last_sequence;
last_sequence_ = last_sequence;
prev_log_number_ = previous_log_number;

View File

@ -765,28 +765,27 @@ class VersionSet {
}
// Note: memory_order_acquire must be sufficient.
uint64_t LastToBeWrittenSequence() const {
return last_to_be_written_sequence_.load(std::memory_order_seq_cst);
uint64_t LastAllocatedSequence() const {
return last_allocated_sequence_.load(std::memory_order_seq_cst);
}
// Set the last sequence number to s.
void SetLastSequence(uint64_t s) {
assert(s >= last_sequence_);
// Last visible seqeunce must always be less than last written seq
assert(!db_options_->concurrent_prepare ||
s <= last_to_be_written_sequence_);
assert(!db_options_->two_write_queues || s <= last_allocated_sequence_);
last_sequence_.store(s, std::memory_order_release);
}
// Note: memory_order_release must be sufficient
void SetLastToBeWrittenSequence(uint64_t s) {
assert(s >= last_to_be_written_sequence_);
last_to_be_written_sequence_.store(s, std::memory_order_seq_cst);
void SetLastAllocatedSequence(uint64_t s) {
assert(s >= last_allocated_sequence_);
last_allocated_sequence_.store(s, std::memory_order_seq_cst);
}
// Note: memory_order_release must be sufficient
uint64_t FetchAddLastToBeWrittenSequence(uint64_t s) {
return last_to_be_written_sequence_.fetch_add(s, std::memory_order_seq_cst);
uint64_t FetchAddLastAllocatedSequence(uint64_t s) {
return last_allocated_sequence_.fetch_add(s, std::memory_order_seq_cst);
}
// Mark the specified file number as used.
@ -894,8 +893,9 @@ class VersionSet {
uint64_t pending_manifest_file_number_;
// The last seq visible to reads
std::atomic<uint64_t> last_sequence_;
// The last seq with which a writer has written/will write.
std::atomic<uint64_t> last_to_be_written_sequence_;
// The last seq that is already allocated. The seq might or might not have
// appreated in memtable.
std::atomic<uint64_t> last_allocated_sequence_;
uint64_t prev_log_number_; // 0 or backing store for memtable being compacted
// Opened lazily

View File

@ -115,7 +115,7 @@ Status WalManager::GetUpdatesSince(
}
iter->reset(new TransactionLogIteratorImpl(
db_options_.wal_dir, &db_options_, read_options, env_options_, seq,
std::move(wal_files), version_set));
std::move(wal_files), version_set, seq_per_batch_));
return (*iter)->status();
}

View File

@ -31,11 +31,12 @@ namespace rocksdb {
class WalManager {
public:
WalManager(const ImmutableDBOptions& db_options,
const EnvOptions& env_options)
const EnvOptions& env_options, const bool seq_per_batch = false)
: db_options_(db_options),
env_options_(env_options),
env_(db_options.env),
purge_wal_files_last_run_(0) {}
purge_wal_files_last_run_(0),
seq_per_batch_(seq_per_batch) {}
Status GetSortedWalFiles(VectorLogPtr& files);
@ -86,6 +87,8 @@ class WalManager {
// last time when PurgeObsoleteWALFiles ran.
uint64_t purge_wal_files_last_run_;
bool seq_per_batch_;
// obsolete files will be deleted every this seconds if ttl deletion is
// enabled and archive size_limit is disabled.
static const uint64_t kDefaultIntervalToDeleteObsoleteWAL = 600;

View File

@ -67,7 +67,7 @@ class WalManagerTest : public testing::Test {
batch.Put(key, value);
WriteBatchInternal::SetSequence(&batch, seq);
current_log_writer_->AddRecord(WriteBatchInternal::Contents(&batch));
versions_->SetLastToBeWrittenSequence(seq);
versions_->SetLastAllocatedSequence(seq);
versions_->SetLastSequence(seq);
}

View File

@ -136,9 +136,8 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
options.create_if_missing = true;
options.allow_concurrent_memtable_write = allow_parallel;
options.enable_pipelined_write = enable_pipelined_write;
options.concurrent_prepare = two_queues;
if (options.enable_pipelined_write &&
options.concurrent_prepare) {
options.two_write_queues = two_queues;
if (options.enable_pipelined_write && options.two_write_queues) {
// This combination is not supported
continue;
}

View File

@ -904,22 +904,12 @@ struct DBOptions {
// allows the memtable writes not to lag behind other writes. It can be used
// to optimize MySQL 2PC in which only the commits, which are serial, write to
// memtable.
bool concurrent_prepare = false;
bool two_write_queues = false;
// If true WAL is not flushed automatically after each write. Instead it
// relies on manual invocation of FlushWAL to write the WAL buffer to its
// file.
bool manual_wal_flush = false;
// Increase the sequence number after writing each batch, whether memtable is
// disabled for that or not. Otherwise the sequence number is increased after
// writing each key into memtable. This implies that when memtable_disable is
// set, the seq is not increased at all.
//
// Default: false
// Note: This option is experimental and meant to be used only for internal
// projects.
bool seq_per_batch = false;
};
// Options to control the behavior of a database (passed to DB::Open)

View File

@ -85,9 +85,8 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
avoid_flush_during_recovery(options.avoid_flush_during_recovery),
allow_ingest_behind(options.allow_ingest_behind),
preserve_deletes(options.preserve_deletes),
concurrent_prepare(options.concurrent_prepare),
manual_wal_flush(options.manual_wal_flush),
seq_per_batch(options.seq_per_batch) {
two_write_queues(options.two_write_queues),
manual_wal_flush(options.manual_wal_flush) {
}
void ImmutableDBOptions::Dump(Logger* log) const {
@ -217,11 +216,10 @@ void ImmutableDBOptions::Dump(Logger* log) const {
allow_ingest_behind);
ROCKS_LOG_HEADER(log, " Options.preserve_deletes: %d",
preserve_deletes);
ROCKS_LOG_HEADER(log, " Options.concurrent_prepare: %d",
concurrent_prepare);
ROCKS_LOG_HEADER(log, " Options.two_write_queues: %d",
two_write_queues);
ROCKS_LOG_HEADER(log, " Options.manual_wal_flush: %d",
manual_wal_flush);
ROCKS_LOG_HEADER(log, " Options.seq_per_batch: %d", seq_per_batch);
}
MutableDBOptions::MutableDBOptions()

View File

@ -77,9 +77,8 @@ struct ImmutableDBOptions {
bool avoid_flush_during_recovery;
bool allow_ingest_behind;
bool preserve_deletes;
bool concurrent_prepare;
bool two_write_queues;
bool manual_wal_flush;
bool seq_per_batch;
};
struct MutableDBOptions {

View File

@ -357,21 +357,21 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
OptionVerificationType::kNormal, false,
offsetof(struct ImmutableDBOptions, allow_ingest_behind)}},
{"preserve_deletes",
{offsetof(struct DBOptions, preserve_deletes), OptionType::kBoolean,
{offsetof(struct DBOptions, preserve_deletes), OptionType::kBoolean,
OptionVerificationType::kNormal, false,
offsetof(struct ImmutableDBOptions, preserve_deletes)}},
{"concurrent_prepare",
{offsetof(struct DBOptions, concurrent_prepare), OptionType::kBoolean,
{"concurrent_prepare", // Deprecated by two_write_queues
{0, OptionType::kBoolean, OptionVerificationType::kDeprecated, false, 0}},
{"two_write_queues",
{offsetof(struct DBOptions, two_write_queues), OptionType::kBoolean,
OptionVerificationType::kNormal, false,
offsetof(struct ImmutableDBOptions, concurrent_prepare)}},
offsetof(struct ImmutableDBOptions, two_write_queues)}},
{"manual_wal_flush",
{offsetof(struct DBOptions, manual_wal_flush), OptionType::kBoolean,
OptionVerificationType::kNormal, false,
offsetof(struct ImmutableDBOptions, manual_wal_flush)}},
{"seq_per_batch",
{offsetof(struct DBOptions, seq_per_batch), OptionType::kBoolean,
OptionVerificationType::kNormal, false,
offsetof(struct ImmutableDBOptions, seq_per_batch)}}};
{0, OptionType::kBoolean, OptionVerificationType::kDeprecated, false, 0}}};
// offset_of is used to get the offset of a class data member
// ex: offset_of(&ColumnFamilyOptions::num_levels)

View File

@ -284,6 +284,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) {
"allow_ingest_behind=false;"
"preserve_deletes=false;"
"concurrent_prepare=false;"
"two_write_queues=false;"
"manual_wal_flush=false;"
"seq_per_batch=false;",
new_options));

View File

@ -189,12 +189,11 @@ Status TransactionDB::Open(
std::vector<ColumnFamilyDescriptor> column_families_copy = column_families;
std::vector<size_t> compaction_enabled_cf_indices;
DBOptions db_options_2pc = db_options;
if (txn_db_options.write_policy == WRITE_PREPARED) {
db_options_2pc.seq_per_batch = true;
}
PrepareWrap(&db_options_2pc, &column_families_copy,
&compaction_enabled_cf_indices);
s = DB::Open(db_options_2pc, dbname, column_families_copy, handles, &db);
const bool use_seq_per_batch = txn_db_options.write_policy == WRITE_PREPARED;
s = DBImpl::Open(db_options_2pc, dbname, column_families_copy, handles, &db,
use_seq_per_batch);
if (s.ok()) {
s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles,
dbptr);

View File

@ -4862,12 +4862,12 @@ TEST_P(TransactionTest, SeqAdvanceTest) {
auto seq = db_impl->GetLatestSequenceNumber();
exp_seq = seq;
txn_t0(0);
seq = db_impl->TEST_GetLatestVisibleSequenceNumber();
seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_EQ(exp_seq, seq);
if (branch_do(n, &branch)) {
db_impl->Flush(fopt);
seq = db_impl->TEST_GetLatestVisibleSequenceNumber();
seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_EQ(exp_seq, seq);
}
if (branch_do(n, &branch)) {
@ -4880,16 +4880,16 @@ TEST_P(TransactionTest, SeqAdvanceTest) {
// Doing it twice might detect some bugs
txn_t0(1);
seq = db_impl->TEST_GetLatestVisibleSequenceNumber();
seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_EQ(exp_seq, seq);
txn_t1(0);
seq = db_impl->TEST_GetLatestVisibleSequenceNumber();
seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_EQ(exp_seq, seq);
if (branch_do(n, &branch)) {
db_impl->Flush(fopt);
seq = db_impl->TEST_GetLatestVisibleSequenceNumber();
seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_EQ(exp_seq, seq);
}
if (branch_do(n, &branch)) {
@ -4901,12 +4901,12 @@ TEST_P(TransactionTest, SeqAdvanceTest) {
}
txn_t3(0);
seq = db_impl->TEST_GetLatestVisibleSequenceNumber();
seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_EQ(exp_seq, seq);
if (branch_do(n, &branch)) {
db_impl->Flush(fopt);
seq = db_impl->TEST_GetLatestVisibleSequenceNumber();
seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_EQ(exp_seq, seq);
}
if (branch_do(n, &branch)) {
@ -4918,16 +4918,16 @@ TEST_P(TransactionTest, SeqAdvanceTest) {
}
txn_t0(0);
seq = db_impl->TEST_GetLatestVisibleSequenceNumber();
seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_EQ(exp_seq, seq);
txn_t2(0);
seq = db_impl->TEST_GetLatestVisibleSequenceNumber();
seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_EQ(exp_seq, seq);
if (branch_do(n, &branch)) {
db_impl->Flush(fopt);
seq = db_impl->TEST_GetLatestVisibleSequenceNumber();
seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_EQ(exp_seq, seq);
}
if (branch_do(n, &branch)) {

View File

@ -54,7 +54,7 @@ class TransactionTest : public ::testing::TestWithParam<
options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
env = new FaultInjectionTestEnv(Env::Default());
options.env = env;
options.concurrent_prepare = std::get<1>(GetParam());
options.two_write_queues = std::get<1>(GetParam());
dbname = test::TmpDir() + "/transaction_testdb";
DestroyDB(dbname, options);
@ -113,11 +113,10 @@ class TransactionTest : public ::testing::TestWithParam<
std::vector<ColumnFamilyHandle*> handles;
DB* root_db;
Options options_copy(options);
if (txn_db_options.write_policy == WRITE_PREPARED) {
options_copy.seq_per_batch = true;
}
Status s =
DB::Open(options_copy, dbname, column_families, &handles, &root_db);
const bool use_seq_per_batch =
txn_db_options.write_policy == WRITE_PREPARED;
Status s = DBImpl::Open(options_copy, dbname, column_families, &handles,
&root_db, use_seq_per_batch);
if (s.ok()) {
assert(handles.size() == 1);
s = TransactionDB::WrapStackableDB(
@ -144,7 +143,7 @@ class TransactionTest : public ::testing::TestWithParam<
} else {
// Consume one seq per batch
exp_seq++;
if (options.concurrent_prepare) {
if (options.two_write_queues) {
// Consume one seq for commit
exp_seq++;
}
@ -169,7 +168,7 @@ class TransactionTest : public ::testing::TestWithParam<
} else {
// Consume one seq per batch
exp_seq++;
if (options.concurrent_prepare) {
if (options.two_write_queues) {
// Consume one seq for commit
exp_seq++;
}
@ -197,7 +196,7 @@ class TransactionTest : public ::testing::TestWithParam<
} else {
// Consume one seq per batch
exp_seq++;
if (options.concurrent_prepare) {
if (options.two_write_queues) {
// Consume one seq for commit
exp_seq++;
}

View File

@ -625,7 +625,7 @@ TEST_P(WritePreparedTransactionTest, SeqAdvanceConcurrentTest) {
printf("Tested %" ROCKSDB_PRIszt " cases so far\n", n);
}
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
auto seq = db_impl->TEST_GetLatestVisibleSequenceNumber();
auto seq = db_impl->TEST_GetLastVisibleSequence();
exp_seq = seq;
// This is increased before writing the batch for commit
commit_writes = 0;
@ -693,17 +693,17 @@ TEST_P(WritePreparedTransactionTest, SeqAdvanceConcurrentTest) {
for (auto& t : threads) {
t.join();
}
if (options.concurrent_prepare) {
if (options.two_write_queues) {
// In this case none of the above scheduling tricks to deterministically
// form merged bactches works because the writes go to saparte queues.
// This would result in different write groups in each run of the test. We
// still keep the test since althgouh non-deterministic and hard to debug,
// it is still useful to have.
// TODO(myabandeh): Add a deterministic unit test for concurrent_prepare
// TODO(myabandeh): Add a deterministic unit test for two_write_queues
}
// Check if memtable inserts advanced seq number as expected
seq = db_impl->TEST_GetLatestVisibleSequenceNumber();
seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_EQ(exp_seq, seq);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
@ -1258,7 +1258,7 @@ TEST_P(WritePreparedTransactionTest, DisableGCDuringRecoveryTest) {
VerifyKeys({{"foo", v}});
seq++; // one for the key/value
KeyVersion kv = {"foo", v, seq, kTypeValue};
if (options.concurrent_prepare) {
if (options.two_write_queues) {
seq++; // one for the commit
}
versions.emplace_back(kv);
@ -1306,10 +1306,10 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepUncommittedKeys) {
auto add_key = [&](std::function<Status()> func) {
ASSERT_OK(func());
expected_seq++;
if (options.concurrent_prepare) {
if (options.two_write_queues) {
expected_seq++; // 1 for commit
}
ASSERT_EQ(expected_seq, db_impl->TEST_GetLatestVisibleSequenceNumber());
ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
snapshots.push_back(db->GetSnapshot());
};
@ -1397,7 +1397,7 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) {
ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
ASSERT_OK(txn1->Commit());
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
ASSERT_EQ(++expected_seq, db_impl->TEST_GetLatestVisibleSequenceNumber());
ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence());
delete txn1;
// Take a snapshots to avoid keys get evicted before compaction.
const Snapshot* snapshot1 = db->GetSnapshot();
@ -1410,24 +1410,24 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) {
// txn2 commit after snapshot2 and it is not visible.
const Snapshot* snapshot2 = db->GetSnapshot();
ASSERT_OK(txn2->Commit());
ASSERT_EQ(++expected_seq, db_impl->TEST_GetLatestVisibleSequenceNumber());
ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence());
delete txn2;
// Take a snapshots to avoid keys get evicted before compaction.
const Snapshot* snapshot3 = db->GetSnapshot();
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_2"));
expected_seq++; // 1 for write
SequenceNumber seq1 = expected_seq;
if (options.concurrent_prepare) {
if (options.two_write_queues) {
expected_seq++; // 1 for commit
}
ASSERT_EQ(expected_seq, db_impl->TEST_GetLatestVisibleSequenceNumber());
ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
ASSERT_OK(db->Put(WriteOptions(), "key2", "value2_2"));
expected_seq++; // 1 for write
SequenceNumber seq2 = expected_seq;
if (options.concurrent_prepare) {
if (options.two_write_queues) {
expected_seq++; // 1 for commit
}
ASSERT_EQ(expected_seq, db_impl->TEST_GetLatestVisibleSequenceNumber());
ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
ASSERT_OK(db->Flush(FlushOptions()));
db->ReleaseSnapshot(snapshot1);
db->ReleaseSnapshot(snapshot3);

View File

@ -90,7 +90,7 @@ Status WritePreparedTxn::CommitWithoutPrepareInternal() {
}
SequenceNumber WritePreparedTxn::GetACommitSeqNumber(SequenceNumber prep_seq) {
if (db_impl_->immutable_db_options().concurrent_prepare) {
if (db_impl_->immutable_db_options().two_write_queues) {
return db_impl_->IncAndFetchSequenceNumber();
} else {
return prep_seq;

View File

@ -46,7 +46,7 @@ class WritePreparedTxn : public PessimisticTransaction {
virtual ~WritePreparedTxn() {}
// To make WAL commit markers visible, the snapshot will be based on the last
// seq in the WAL, LastToBeWrittenSquence, as opposed to the last seq in the
// seq in the WAL, LastAllocatedSequence, as opposed to the last seq in the
// memtable.
using Transaction::Get;
virtual Status Get(const ReadOptions& options,
@ -54,7 +54,7 @@ class WritePreparedTxn : public PessimisticTransaction {
PinnableSlice* value) override;
// To make WAL commit markers visible, the snapshot will be based on the last
// seq in the WAL, LastToBeWrittenSquence, as opposed to the last seq in the
// seq in the WAL, LastAllocatedSequence, as opposed to the last seq in the
// memtable.
using Transaction::GetIterator;
virtual Iterator* GetIterator(const ReadOptions& options) override;
@ -76,7 +76,7 @@ class WritePreparedTxn : public PessimisticTransaction {
// commit entails writing only a commit marker in the WAL. The sequence number
// of the commit marker is then the commit timestamp of the transaction. To
// make the commit timestamp visible to readers, their snapshot is based on
// the last seq in the WAL, LastToBeWrittenSquence, as opposed to the last seq
// the last seq in the WAL, LastAllocatedSequence, as opposed to the last seq
// in the memtable.
Status CommitInternal() override;