WritePrepared Txn: Refactor conf params
Summary: Summary of changes: - Move seq_per_batch out of Options - Rename concurrent_prepare to two_write_queues - Add allocate_seq_only_for_data_ Closes https://github.com/facebook/rocksdb/pull/3136 Differential Revision: D6304458 Pulled By: maysamyabandeh fbshipit-source-id: 08e685bfa82bbc41b5b1c5eb7040a8ca6e05e58c
This commit is contained in:
parent
d1939b0dca
commit
4d06d2862d
@ -143,7 +143,7 @@ class CompactionJobTest : public testing::Test {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void SetLastSequence(const SequenceNumber sequence_number) {
|
void SetLastSequence(const SequenceNumber sequence_number) {
|
||||||
versions_->SetLastToBeWrittenSequence(sequence_number + 1);
|
versions_->SetLastAllocatedSequence(sequence_number + 1);
|
||||||
versions_->SetLastSequence(sequence_number + 1);
|
versions_->SetLastSequence(sequence_number + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -136,7 +136,8 @@ void DumpSupportInfo(Logger* logger) {
|
|||||||
int64_t kDefaultLowPriThrottledRate = 2 * 1024 * 1024;
|
int64_t kDefaultLowPriThrottledRate = 2 * 1024 * 1024;
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
|
DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
|
||||||
|
const bool seq_per_batch)
|
||||||
: env_(options.env),
|
: env_(options.env),
|
||||||
dbname_(dbname),
|
dbname_(dbname),
|
||||||
initial_db_options_(SanitizeOptions(dbname, options)),
|
initial_db_options_(SanitizeOptions(dbname, options)),
|
||||||
@ -185,18 +186,30 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
|
|||||||
env_options_, immutable_db_options_)),
|
env_options_, immutable_db_options_)),
|
||||||
num_running_ingest_file_(0),
|
num_running_ingest_file_(0),
|
||||||
#ifndef ROCKSDB_LITE
|
#ifndef ROCKSDB_LITE
|
||||||
wal_manager_(immutable_db_options_, env_options_),
|
wal_manager_(immutable_db_options_, env_options_, seq_per_batch),
|
||||||
#endif // ROCKSDB_LITE
|
#endif // ROCKSDB_LITE
|
||||||
event_logger_(immutable_db_options_.info_log.get()),
|
event_logger_(immutable_db_options_.info_log.get()),
|
||||||
bg_work_paused_(0),
|
bg_work_paused_(0),
|
||||||
bg_compaction_paused_(0),
|
bg_compaction_paused_(0),
|
||||||
refitting_level_(false),
|
refitting_level_(false),
|
||||||
opened_successfully_(false),
|
opened_successfully_(false),
|
||||||
concurrent_prepare_(options.concurrent_prepare),
|
two_write_queues_(options.two_write_queues),
|
||||||
manual_wal_flush_(options.manual_wal_flush),
|
manual_wal_flush_(options.manual_wal_flush),
|
||||||
seq_per_batch_(options.seq_per_batch),
|
seq_per_batch_(seq_per_batch),
|
||||||
// TODO(myabandeh): revise this when we change options.seq_per_batch
|
// When two_write_queues_ and seq_per_batch_ are both enabled we
|
||||||
use_custom_gc_(options.seq_per_batch),
|
// sometimes allocate a seq also to indicate the commit timestmamp of a
|
||||||
|
// transaction. In such cases last_sequence_ would not indicate the last
|
||||||
|
// visible sequence number in memtable and should not be used for
|
||||||
|
// snapshots. It should use last_allocated_sequence_ instaed but also
|
||||||
|
// needs other mechanisms to exclude the data that after last_sequence_
|
||||||
|
// and before last_allocated_sequence_ from the snapshot. In
|
||||||
|
// WritePreparedTxn this property is ensured since such data are not
|
||||||
|
// committed yet.
|
||||||
|
allocate_seq_only_for_data_(!(seq_per_batch && options.two_write_queues)),
|
||||||
|
// Since seq_per_batch_ is currently set only by WritePreparedTxn which
|
||||||
|
// requires a custom gc for compaction, we use that to set use_custom_gc_
|
||||||
|
// as well.
|
||||||
|
use_custom_gc_(seq_per_batch),
|
||||||
preserve_deletes_(options.preserve_deletes) {
|
preserve_deletes_(options.preserve_deletes) {
|
||||||
env_->GetAbsolutePath(dbname, &db_absolute_path_);
|
env_->GetAbsolutePath(dbname, &db_absolute_path_);
|
||||||
|
|
||||||
@ -751,7 +764,7 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() const {
|
|||||||
}
|
}
|
||||||
|
|
||||||
SequenceNumber DBImpl::IncAndFetchSequenceNumber() {
|
SequenceNumber DBImpl::IncAndFetchSequenceNumber() {
|
||||||
return versions_->FetchAddLastToBeWrittenSequence(1ull) + 1ull;
|
return versions_->FetchAddLastAllocatedSequence(1ull) + 1ull;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool DBImpl::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) {
|
bool DBImpl::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) {
|
||||||
@ -977,9 +990,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
|
|||||||
// super versipon because a flush happening in between may compact
|
// super versipon because a flush happening in between may compact
|
||||||
// away data for the snapshot, but the snapshot is earlier than the
|
// away data for the snapshot, but the snapshot is earlier than the
|
||||||
// data overwriting it, so users may see wrong results.
|
// data overwriting it, so users may see wrong results.
|
||||||
snapshot = concurrent_prepare_ && seq_per_batch_
|
snapshot = allocate_seq_only_for_data_ ? versions_->LastSequence()
|
||||||
? versions_->LastToBeWrittenSequence()
|
: versions_->LastAllocatedSequence();
|
||||||
: versions_->LastSequence();
|
|
||||||
}
|
}
|
||||||
TEST_SYNC_POINT("DBImpl::GetImpl:3");
|
TEST_SYNC_POINT("DBImpl::GetImpl:3");
|
||||||
TEST_SYNC_POINT("DBImpl::GetImpl:4");
|
TEST_SYNC_POINT("DBImpl::GetImpl:4");
|
||||||
@ -1070,9 +1082,8 @@ std::vector<Status> DBImpl::MultiGet(
|
|||||||
snapshot = reinterpret_cast<const SnapshotImpl*>(
|
snapshot = reinterpret_cast<const SnapshotImpl*>(
|
||||||
read_options.snapshot)->number_;
|
read_options.snapshot)->number_;
|
||||||
} else {
|
} else {
|
||||||
snapshot = concurrent_prepare_ && seq_per_batch_
|
snapshot = allocate_seq_only_for_data_ ? versions_->LastSequence()
|
||||||
? versions_->LastToBeWrittenSequence()
|
: versions_->LastAllocatedSequence();
|
||||||
: versions_->LastSequence();
|
|
||||||
}
|
}
|
||||||
for (auto mgd_iter : multiget_cf_data) {
|
for (auto mgd_iter : multiget_cf_data) {
|
||||||
mgd_iter.second->super_version =
|
mgd_iter.second->super_version =
|
||||||
@ -1478,8 +1489,9 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
|
|||||||
read_callback);
|
read_callback);
|
||||||
#endif
|
#endif
|
||||||
} else {
|
} else {
|
||||||
// Note: no need to consider the special case of concurrent_prepare_ &&
|
// Note: no need to consider the special case of
|
||||||
// seq_per_batch_ since NewIterator is overridden in WritePreparedTxnDB
|
// allocate_seq_only_for_data_==false since NewIterator is overridden in
|
||||||
|
// WritePreparedTxnDB
|
||||||
auto snapshot = read_options.snapshot != nullptr
|
auto snapshot = read_options.snapshot != nullptr
|
||||||
? read_options.snapshot->GetSequenceNumber()
|
? read_options.snapshot->GetSequenceNumber()
|
||||||
: versions_->LastSequence();
|
: versions_->LastSequence();
|
||||||
@ -1595,8 +1607,9 @@ Status DBImpl::NewIterators(
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
} else {
|
} else {
|
||||||
// Note: no need to consider the special case of concurrent_prepare_ &&
|
// Note: no need to consider the special case of
|
||||||
// seq_per_batch_ since NewIterators is overridden in WritePreparedTxnDB
|
// allocate_seq_only_for_data_==false since NewIterators is overridden in
|
||||||
|
// WritePreparedTxnDB
|
||||||
auto snapshot = read_options.snapshot != nullptr
|
auto snapshot = read_options.snapshot != nullptr
|
||||||
? read_options.snapshot->GetSequenceNumber()
|
? read_options.snapshot->GetSequenceNumber()
|
||||||
: versions_->LastSequence();
|
: versions_->LastSequence();
|
||||||
@ -1630,9 +1643,9 @@ const Snapshot* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary) {
|
|||||||
delete s;
|
delete s;
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
auto snapshot_seq = concurrent_prepare_ && seq_per_batch_
|
auto snapshot_seq = allocate_seq_only_for_data_
|
||||||
? versions_->LastToBeWrittenSequence()
|
? versions_->LastSequence()
|
||||||
: versions_->LastSequence();
|
: versions_->LastAllocatedSequence();
|
||||||
return snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary);
|
return snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1643,9 +1656,9 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {
|
|||||||
snapshots_.Delete(casted_s);
|
snapshots_.Delete(casted_s);
|
||||||
uint64_t oldest_snapshot;
|
uint64_t oldest_snapshot;
|
||||||
if (snapshots_.empty()) {
|
if (snapshots_.empty()) {
|
||||||
oldest_snapshot = concurrent_prepare_ && seq_per_batch_
|
oldest_snapshot = allocate_seq_only_for_data_
|
||||||
? versions_->LastToBeWrittenSequence()
|
? versions_->LastSequence()
|
||||||
: versions_->LastSequence();
|
: versions_->LastAllocatedSequence();
|
||||||
} else {
|
} else {
|
||||||
oldest_snapshot = snapshots_.oldest()->number_;
|
oldest_snapshot = snapshots_.oldest()->number_;
|
||||||
}
|
}
|
||||||
@ -2753,7 +2766,7 @@ Status DBImpl::IngestExternalFile(
|
|||||||
WriteThread::Writer w;
|
WriteThread::Writer w;
|
||||||
write_thread_.EnterUnbatched(&w, &mutex_);
|
write_thread_.EnterUnbatched(&w, &mutex_);
|
||||||
WriteThread::Writer nonmem_w;
|
WriteThread::Writer nonmem_w;
|
||||||
if (concurrent_prepare_) {
|
if (two_write_queues_) {
|
||||||
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
|
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2796,7 +2809,7 @@ Status DBImpl::IngestExternalFile(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Resume writes to the DB
|
// Resume writes to the DB
|
||||||
if (concurrent_prepare_) {
|
if (two_write_queues_) {
|
||||||
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
|
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
|
||||||
}
|
}
|
||||||
write_thread_.ExitUnbatched(&w);
|
write_thread_.ExitUnbatched(&w);
|
||||||
|
46
db/db_impl.h
46
db/db_impl.h
@ -68,7 +68,8 @@ struct MemTableInfo;
|
|||||||
|
|
||||||
class DBImpl : public DB {
|
class DBImpl : public DB {
|
||||||
public:
|
public:
|
||||||
DBImpl(const DBOptions& options, const std::string& dbname);
|
DBImpl(const DBOptions& options, const std::string& dbname,
|
||||||
|
const bool seq_per_batch = false);
|
||||||
virtual ~DBImpl();
|
virtual ~DBImpl();
|
||||||
|
|
||||||
// Implementations of the DB interface
|
// Implementations of the DB interface
|
||||||
@ -220,10 +221,10 @@ class DBImpl : public DB {
|
|||||||
|
|
||||||
virtual SequenceNumber GetLatestSequenceNumber() const override;
|
virtual SequenceNumber GetLatestSequenceNumber() const override;
|
||||||
virtual SequenceNumber IncAndFetchSequenceNumber();
|
virtual SequenceNumber IncAndFetchSequenceNumber();
|
||||||
// Returns LastToBeWrittenSequence in concurrent_prepare_ && seq_per_batch_
|
// Returns LastSequence in allocate_seq_only_for_data_
|
||||||
// mode and LastSequence otherwise. This is useful when visiblility depends
|
// mode and LastAllocatedSequence otherwise. This is useful when visiblility
|
||||||
// also on data written to the WAL but not to the memtable.
|
// depends also on data written to the WAL but not to the memtable.
|
||||||
SequenceNumber TEST_GetLatestVisibleSequenceNumber() const;
|
SequenceNumber TEST_GetLastVisibleSequence() const;
|
||||||
|
|
||||||
virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) override;
|
virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) override;
|
||||||
|
|
||||||
@ -606,6 +607,12 @@ class DBImpl : public DB {
|
|||||||
|
|
||||||
Status NewDB();
|
Status NewDB();
|
||||||
|
|
||||||
|
// This is to be used only by internal rocksdb classes.
|
||||||
|
static Status Open(const DBOptions& db_options, const std::string& name,
|
||||||
|
const std::vector<ColumnFamilyDescriptor>& column_families,
|
||||||
|
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
|
||||||
|
const bool seq_per_batch);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
Env* const env_;
|
Env* const env_;
|
||||||
const std::string dbname_;
|
const std::string dbname_;
|
||||||
@ -905,12 +912,12 @@ class DBImpl : public DB {
|
|||||||
FileLock* db_lock_;
|
FileLock* db_lock_;
|
||||||
|
|
||||||
// In addition to mutex_, log_write_mutex_ protected writes to logs_ and
|
// In addition to mutex_, log_write_mutex_ protected writes to logs_ and
|
||||||
// logfile_number_. With concurrent_prepare it also protects alive_log_files_,
|
// logfile_number_. With two_write_queues it also protects alive_log_files_,
|
||||||
// and log_empty_. Refer to the definition of each variable below for more
|
// and log_empty_. Refer to the definition of each variable below for more
|
||||||
// details.
|
// details.
|
||||||
InstrumentedMutex log_write_mutex_;
|
InstrumentedMutex log_write_mutex_;
|
||||||
// State below is protected by mutex_
|
// State below is protected by mutex_
|
||||||
// With concurrent_prepare enabled, some of the variables that accessed during
|
// With two_write_queues enabled, some of the variables that accessed during
|
||||||
// WriteToWAL need different synchronization: log_empty_, alive_log_files_,
|
// WriteToWAL need different synchronization: log_empty_, alive_log_files_,
|
||||||
// logs_, logfile_number_. Refer to the definition of each variable below for
|
// logs_, logfile_number_. Refer to the definition of each variable below for
|
||||||
// more description.
|
// more description.
|
||||||
@ -935,10 +942,10 @@ class DBImpl : public DB {
|
|||||||
std::deque<uint64_t>
|
std::deque<uint64_t>
|
||||||
log_recycle_files; // a list of log files that we can recycle
|
log_recycle_files; // a list of log files that we can recycle
|
||||||
bool log_dir_synced_;
|
bool log_dir_synced_;
|
||||||
// Without concurrent_prepare, read and writes to log_empty_ are protected by
|
// Without two_write_queues, read and writes to log_empty_ are protected by
|
||||||
// mutex_. Since it is currently updated/read only in write_thread_, it can be
|
// mutex_. Since it is currently updated/read only in write_thread_, it can be
|
||||||
// accessed from the same write_thread_ without any locks. With
|
// accessed from the same write_thread_ without any locks. With
|
||||||
// concurrent_prepare writes, where it can be updated in different threads,
|
// two_write_queues writes, where it can be updated in different threads,
|
||||||
// read and writes are protected by log_write_mutex_ instead. This is to avoid
|
// read and writes are protected by log_write_mutex_ instead. This is to avoid
|
||||||
// expesnive mutex_ lock during WAL write, which update log_empty_.
|
// expesnive mutex_ lock during WAL write, which update log_empty_.
|
||||||
bool log_empty_;
|
bool log_empty_;
|
||||||
@ -975,10 +982,10 @@ class DBImpl : public DB {
|
|||||||
// true for some prefix of logs_
|
// true for some prefix of logs_
|
||||||
bool getting_synced = false;
|
bool getting_synced = false;
|
||||||
};
|
};
|
||||||
// Without concurrent_prepare, read and writes to alive_log_files_ are
|
// Without two_write_queues, read and writes to alive_log_files_ are
|
||||||
// protected by mutex_. However since back() is never popped, and push_back()
|
// protected by mutex_. However since back() is never popped, and push_back()
|
||||||
// is done only from write_thread_, the same thread can access the item
|
// is done only from write_thread_, the same thread can access the item
|
||||||
// reffered by back() without mutex_. With concurrent_prepare_, writes
|
// reffered by back() without mutex_. With two_write_queues_, writes
|
||||||
// are protected by locking both mutex_ and log_write_mutex_, and reads must
|
// are protected by locking both mutex_ and log_write_mutex_, and reads must
|
||||||
// be under either mutex_ or log_write_mutex_.
|
// be under either mutex_ or log_write_mutex_.
|
||||||
std::deque<LogFileNumberSize> alive_log_files_;
|
std::deque<LogFileNumberSize> alive_log_files_;
|
||||||
@ -1003,7 +1010,7 @@ class DBImpl : public DB {
|
|||||||
// memtable on normal writes and hence improving the throughput. Each new
|
// memtable on normal writes and hence improving the throughput. Each new
|
||||||
// write of the state will replace the previous state entirely even if the
|
// write of the state will replace the previous state entirely even if the
|
||||||
// keys in the two consecuitive states do not overlap.
|
// keys in the two consecuitive states do not overlap.
|
||||||
// It is protected by log_write_mutex_ when concurrent_prepare_ is enabled.
|
// It is protected by log_write_mutex_ when two_write_queues_ is enabled.
|
||||||
// Otherwise only the heaad of write_thread_ can access it.
|
// Otherwise only the heaad of write_thread_ can access it.
|
||||||
WriteBatch cached_recoverable_state_;
|
WriteBatch cached_recoverable_state_;
|
||||||
std::atomic<bool> cached_recoverable_state_empty_ = {true};
|
std::atomic<bool> cached_recoverable_state_empty_ = {true};
|
||||||
@ -1319,9 +1326,22 @@ class DBImpl : public DB {
|
|||||||
|
|
||||||
// When set, we use a seprate queue for writes that dont write to memtable. In
|
// When set, we use a seprate queue for writes that dont write to memtable. In
|
||||||
// 2PC these are the writes at Prepare phase.
|
// 2PC these are the writes at Prepare phase.
|
||||||
const bool concurrent_prepare_;
|
const bool two_write_queues_;
|
||||||
const bool manual_wal_flush_;
|
const bool manual_wal_flush_;
|
||||||
|
// Increase the sequence number after writing each batch, whether memtable is
|
||||||
|
// disabled for that or not. Otherwise the sequence number is increased after
|
||||||
|
// writing each key into memtable. This implies that when disable_memtable is
|
||||||
|
// set, the seq is not increased at all.
|
||||||
|
//
|
||||||
|
// Default: false
|
||||||
const bool seq_per_batch_;
|
const bool seq_per_batch_;
|
||||||
|
// A sequence number is allocated only for data written to DB. Otherwise it
|
||||||
|
// could also be allocated for operational purposes such as commit timestamp
|
||||||
|
// of a transaction.
|
||||||
|
const bool allocate_seq_only_for_data_;
|
||||||
|
// It indicates that a customized gc algorithm must be used for
|
||||||
|
// flush/compaction and if it is not provided vis SnapshotChecker, we should
|
||||||
|
// disable gc to be safe.
|
||||||
const bool use_custom_gc_;
|
const bool use_custom_gc_;
|
||||||
|
|
||||||
// Clients must periodically call SetPreserveDeletesSequenceNumber()
|
// Clients must periodically call SetPreserveDeletesSequenceNumber()
|
||||||
|
@ -209,11 +209,11 @@ int DBImpl::TEST_BGFlushesAllowed() const {
|
|||||||
return GetBGJobLimits().max_flushes;
|
return GetBGJobLimits().max_flushes;
|
||||||
}
|
}
|
||||||
|
|
||||||
SequenceNumber DBImpl::TEST_GetLatestVisibleSequenceNumber() const {
|
SequenceNumber DBImpl::TEST_GetLastVisibleSequence() const {
|
||||||
if (concurrent_prepare_ && seq_per_batch_) {
|
if (allocate_seq_only_for_data_) {
|
||||||
return versions_->LastToBeWrittenSequence();
|
|
||||||
} else {
|
|
||||||
return versions_->LastSequence();
|
return versions_->LastSequence();
|
||||||
|
} else {
|
||||||
|
return versions_->LastAllocatedSequence();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -252,11 +252,11 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
|
|||||||
}
|
}
|
||||||
job_context->size_log_to_delete += earliest.size;
|
job_context->size_log_to_delete += earliest.size;
|
||||||
total_log_size_ -= earliest.size;
|
total_log_size_ -= earliest.size;
|
||||||
if (concurrent_prepare_) {
|
if (two_write_queues_) {
|
||||||
log_write_mutex_.Lock();
|
log_write_mutex_.Lock();
|
||||||
}
|
}
|
||||||
alive_log_files_.pop_front();
|
alive_log_files_.pop_front();
|
||||||
if (concurrent_prepare_) {
|
if (two_write_queues_) {
|
||||||
log_write_mutex_.Unlock();
|
log_write_mutex_.Unlock();
|
||||||
}
|
}
|
||||||
// Current log should always stay alive since it can't have
|
// Current log should always stay alive since it can't have
|
||||||
|
@ -592,9 +592,10 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
|
|||||||
// happen when we open and write to a corrupted DB, where sequence id
|
// happen when we open and write to a corrupted DB, where sequence id
|
||||||
// will start from the last sequence id we recovered.
|
// will start from the last sequence id we recovered.
|
||||||
if (sequence == *next_sequence ||
|
if (sequence == *next_sequence ||
|
||||||
// With seq_per_batch_, if previous run was with concurrent_prepare_
|
// With seq_per_batch_, if previous run was with two_write_queues_
|
||||||
// then gap in the sequence numbers is expected by the commits
|
// then allocate_seq_only_for_data_ was disabled and a gap in the
|
||||||
// without prepares.
|
// sequence numbers in the log is expected by the commits without
|
||||||
|
// prepares.
|
||||||
(seq_per_batch_ && sequence >= *next_sequence)) {
|
(seq_per_batch_ && sequence >= *next_sequence)) {
|
||||||
stop_replay_for_corruption = false;
|
stop_replay_for_corruption = false;
|
||||||
}
|
}
|
||||||
@ -754,7 +755,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
|
|||||||
auto last_sequence = *next_sequence - 1;
|
auto last_sequence = *next_sequence - 1;
|
||||||
if ((*next_sequence != kMaxSequenceNumber) &&
|
if ((*next_sequence != kMaxSequenceNumber) &&
|
||||||
(versions_->LastSequence() <= last_sequence)) {
|
(versions_->LastSequence() <= last_sequence)) {
|
||||||
versions_->SetLastToBeWrittenSequence(last_sequence);
|
versions_->SetLastAllocatedSequence(last_sequence);
|
||||||
versions_->SetLastSequence(last_sequence);
|
versions_->SetLastSequence(last_sequence);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -845,13 +846,13 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
|
|||||||
if (data_seen && !flushed) {
|
if (data_seen && !flushed) {
|
||||||
// Mark these as alive so they'll be considered for deletion later by
|
// Mark these as alive so they'll be considered for deletion later by
|
||||||
// FindObsoleteFiles()
|
// FindObsoleteFiles()
|
||||||
if (concurrent_prepare_) {
|
if (two_write_queues_) {
|
||||||
log_write_mutex_.Lock();
|
log_write_mutex_.Lock();
|
||||||
}
|
}
|
||||||
for (auto log_number : log_numbers) {
|
for (auto log_number : log_numbers) {
|
||||||
alive_log_files_.push_back(LogFileNumberSize(log_number));
|
alive_log_files_.push_back(LogFileNumberSize(log_number));
|
||||||
}
|
}
|
||||||
if (concurrent_prepare_) {
|
if (two_write_queues_) {
|
||||||
log_write_mutex_.Unlock();
|
log_write_mutex_.Unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -966,6 +967,15 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
|
|||||||
Status DB::Open(const DBOptions& db_options, const std::string& dbname,
|
Status DB::Open(const DBOptions& db_options, const std::string& dbname,
|
||||||
const std::vector<ColumnFamilyDescriptor>& column_families,
|
const std::vector<ColumnFamilyDescriptor>& column_families,
|
||||||
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
|
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
|
||||||
|
const bool seq_per_batch = true;
|
||||||
|
return DBImpl::Open(db_options, dbname, column_families, handles, dbptr,
|
||||||
|
!seq_per_batch);
|
||||||
|
}
|
||||||
|
|
||||||
|
Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
|
||||||
|
const std::vector<ColumnFamilyDescriptor>& column_families,
|
||||||
|
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
|
||||||
|
const bool seq_per_batch) {
|
||||||
Status s = SanitizeOptionsByTable(db_options, column_families);
|
Status s = SanitizeOptionsByTable(db_options, column_families);
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
return s;
|
return s;
|
||||||
@ -985,7 +995,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
|
|||||||
std::max(max_write_buffer_size, cf.options.write_buffer_size);
|
std::max(max_write_buffer_size, cf.options.write_buffer_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
DBImpl* impl = new DBImpl(db_options, dbname);
|
DBImpl* impl = new DBImpl(db_options, dbname, seq_per_batch);
|
||||||
s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.wal_dir);
|
s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.wal_dir);
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
for (auto db_path : impl->immutable_db_options_.db_paths) {
|
for (auto db_path : impl->immutable_db_options_.db_paths) {
|
||||||
@ -1070,12 +1080,12 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
|
|||||||
cfd, &sv_context, *cfd->GetLatestMutableCFOptions());
|
cfd, &sv_context, *cfd->GetLatestMutableCFOptions());
|
||||||
}
|
}
|
||||||
sv_context.Clean();
|
sv_context.Clean();
|
||||||
if (impl->concurrent_prepare_) {
|
if (impl->two_write_queues_) {
|
||||||
impl->log_write_mutex_.Lock();
|
impl->log_write_mutex_.Lock();
|
||||||
}
|
}
|
||||||
impl->alive_log_files_.push_back(
|
impl->alive_log_files_.push_back(
|
||||||
DBImpl::LogFileNumberSize(impl->logfile_number_));
|
DBImpl::LogFileNumberSize(impl->logfile_number_));
|
||||||
if (impl->concurrent_prepare_) {
|
if (impl->two_write_queues_) {
|
||||||
impl->log_write_mutex_.Unlock();
|
impl->log_write_mutex_.Unlock();
|
||||||
}
|
}
|
||||||
impl->DeleteObsoleteFiles();
|
impl->DeleteObsoleteFiles();
|
||||||
|
@ -67,7 +67,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
|
|||||||
if (write_options.sync && write_options.disableWAL) {
|
if (write_options.sync && write_options.disableWAL) {
|
||||||
return Status::InvalidArgument("Sync writes has to enable WAL.");
|
return Status::InvalidArgument("Sync writes has to enable WAL.");
|
||||||
}
|
}
|
||||||
if (concurrent_prepare_ && immutable_db_options_.enable_pipelined_write) {
|
if (two_write_queues_ && immutable_db_options_.enable_pipelined_write) {
|
||||||
return Status::NotSupported(
|
return Status::NotSupported(
|
||||||
"pipelined_writes is not compatible with concurrent prepares");
|
"pipelined_writes is not compatible with concurrent prepares");
|
||||||
}
|
}
|
||||||
@ -87,7 +87,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (concurrent_prepare_ && disable_memtable) {
|
if (two_write_queues_ && disable_memtable) {
|
||||||
return WriteImplWALOnly(write_options, my_batch, callback, log_used,
|
return WriteImplWALOnly(write_options, my_batch, callback, log_used,
|
||||||
log_ref, seq_used);
|
log_ref, seq_used);
|
||||||
}
|
}
|
||||||
@ -154,7 +154,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
|
|||||||
WriteThread::WriteGroup write_group;
|
WriteThread::WriteGroup write_group;
|
||||||
bool in_parallel_group = false;
|
bool in_parallel_group = false;
|
||||||
uint64_t last_sequence = kMaxSequenceNumber;
|
uint64_t last_sequence = kMaxSequenceNumber;
|
||||||
if (!concurrent_prepare_) {
|
if (!two_write_queues_) {
|
||||||
last_sequence = versions_->LastSequence();
|
last_sequence = versions_->LastSequence();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -162,7 +162,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
|
|||||||
|
|
||||||
bool need_log_sync = write_options.sync;
|
bool need_log_sync = write_options.sync;
|
||||||
bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
|
bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
|
||||||
if (!concurrent_prepare_ || !disable_memtable) {
|
if (!two_write_queues_ || !disable_memtable) {
|
||||||
// With concurrent writes we do preprocess only in the write thread that
|
// With concurrent writes we do preprocess only in the write thread that
|
||||||
// also does write to memtable to avoid sync issue on shared data structure
|
// also does write to memtable to avoid sync issue on shared data structure
|
||||||
// with the other thread
|
// with the other thread
|
||||||
@ -209,7 +209,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
|
|||||||
}
|
}
|
||||||
size_t seq_inc = seq_per_batch_ ? write_group.size : total_count;
|
size_t seq_inc = seq_per_batch_ ? write_group.size : total_count;
|
||||||
|
|
||||||
const bool concurrent_update = concurrent_prepare_;
|
const bool concurrent_update = two_write_queues_;
|
||||||
// Update stats while we are an exclusive group leader, so we know
|
// Update stats while we are an exclusive group leader, so we know
|
||||||
// that nobody else can be writing to these particular stats.
|
// that nobody else can be writing to these particular stats.
|
||||||
// We're optimistic, updating the stats before we successfully
|
// We're optimistic, updating the stats before we successfully
|
||||||
@ -237,7 +237,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
|
|||||||
|
|
||||||
PERF_TIMER_STOP(write_pre_and_post_process_time);
|
PERF_TIMER_STOP(write_pre_and_post_process_time);
|
||||||
|
|
||||||
if (!concurrent_prepare_) {
|
if (!two_write_queues_) {
|
||||||
if (status.ok() && !write_options.disableWAL) {
|
if (status.ok() && !write_options.disableWAL) {
|
||||||
PERF_TIMER_GUARD(write_wal_time);
|
PERF_TIMER_GUARD(write_wal_time);
|
||||||
status = WriteToWAL(write_group, log_writer, log_used, need_log_sync,
|
status = WriteToWAL(write_group, log_writer, log_used, need_log_sync,
|
||||||
@ -246,13 +246,13 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
|
|||||||
} else {
|
} else {
|
||||||
if (status.ok() && !write_options.disableWAL) {
|
if (status.ok() && !write_options.disableWAL) {
|
||||||
PERF_TIMER_GUARD(write_wal_time);
|
PERF_TIMER_GUARD(write_wal_time);
|
||||||
// LastToBeWrittenSequence is increased inside WriteToWAL under
|
// LastAllocatedSequence is increased inside WriteToWAL under
|
||||||
// wal_write_mutex_ to ensure ordered events in WAL
|
// wal_write_mutex_ to ensure ordered events in WAL
|
||||||
status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
|
status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
|
||||||
seq_inc);
|
seq_inc);
|
||||||
} else {
|
} else {
|
||||||
// Otherwise we inc seq number for memtable writes
|
// Otherwise we inc seq number for memtable writes
|
||||||
last_sequence = versions_->FetchAddLastToBeWrittenSequence(seq_inc);
|
last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assert(last_sequence != kMaxSequenceNumber);
|
assert(last_sequence != kMaxSequenceNumber);
|
||||||
@ -310,9 +310,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
|
|||||||
mutex_.Lock();
|
mutex_.Lock();
|
||||||
MarkLogsSynced(logfile_number_, need_log_dir_sync, status);
|
MarkLogsSynced(logfile_number_, need_log_dir_sync, status);
|
||||||
mutex_.Unlock();
|
mutex_.Unlock();
|
||||||
// Requesting sync with concurrent_prepare_ is expected to be very rare. We
|
// Requesting sync with two_write_queues_ is expected to be very rare. We
|
||||||
// hance provide a simple implementation that is not necessarily efficient.
|
// hance provide a simple implementation that is not necessarily efficient.
|
||||||
if (concurrent_prepare_) {
|
if (two_write_queues_) {
|
||||||
if (manual_wal_flush_) {
|
if (manual_wal_flush_) {
|
||||||
status = FlushWAL(true);
|
status = FlushWAL(true);
|
||||||
} else {
|
} else {
|
||||||
@ -532,7 +532,7 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
|
|||||||
PERF_TIMER_STOP(write_pre_and_post_process_time);
|
PERF_TIMER_STOP(write_pre_and_post_process_time);
|
||||||
|
|
||||||
PERF_TIMER_GUARD(write_wal_time);
|
PERF_TIMER_GUARD(write_wal_time);
|
||||||
// LastToBeWrittenSequence is increased inside WriteToWAL under
|
// LastAllocatedSequence is increased inside WriteToWAL under
|
||||||
// wal_write_mutex_ to ensure ordered events in WAL
|
// wal_write_mutex_ to ensure ordered events in WAL
|
||||||
size_t seq_inc = seq_per_batch_ ? write_group.size : 0 /*total_count*/;
|
size_t seq_inc = seq_per_batch_ ? write_group.size : 0 /*total_count*/;
|
||||||
status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc);
|
status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc);
|
||||||
@ -548,7 +548,7 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (status.ok() && write_options.sync) {
|
if (status.ok() && write_options.sync) {
|
||||||
// Requesting sync with concurrent_prepare_ is expected to be very rare. We
|
// Requesting sync with two_write_queues_ is expected to be very rare. We
|
||||||
// hance provide a simple implementation that is not necessarily efficient.
|
// hance provide a simple implementation that is not necessarily efficient.
|
||||||
if (manual_wal_flush_) {
|
if (manual_wal_flush_) {
|
||||||
status = FlushWAL(true);
|
status = FlushWAL(true);
|
||||||
@ -719,7 +719,7 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
|
|||||||
return merged_batch;
|
return merged_batch;
|
||||||
}
|
}
|
||||||
|
|
||||||
// When concurrent_prepare_ is disabled, this function is called from the only
|
// When two_write_queues_ is disabled, this function is called from the only
|
||||||
// write thread. Otherwise this must be called holding log_write_mutex_.
|
// write thread. Otherwise this must be called holding log_write_mutex_.
|
||||||
Status DBImpl::WriteToWAL(const WriteBatch& merged_batch,
|
Status DBImpl::WriteToWAL(const WriteBatch& merged_batch,
|
||||||
log::Writer* log_writer, uint64_t* log_used,
|
log::Writer* log_writer, uint64_t* log_used,
|
||||||
@ -828,7 +828,7 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
|
|||||||
writer->log_used = logfile_number_;
|
writer->log_used = logfile_number_;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
*last_sequence = versions_->FetchAddLastToBeWrittenSequence(seq_inc);
|
*last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
|
||||||
auto sequence = *last_sequence + 1;
|
auto sequence = *last_sequence + 1;
|
||||||
WriteBatchInternal::SetSequence(merged_batch, sequence);
|
WriteBatchInternal::SetSequence(merged_batch, sequence);
|
||||||
|
|
||||||
@ -858,7 +858,7 @@ Status DBImpl::WriteRecoverableState() {
|
|||||||
if (!cached_recoverable_state_empty_) {
|
if (!cached_recoverable_state_empty_) {
|
||||||
bool dont_care_bool;
|
bool dont_care_bool;
|
||||||
SequenceNumber next_seq;
|
SequenceNumber next_seq;
|
||||||
if (concurrent_prepare_) {
|
if (two_write_queues_) {
|
||||||
log_write_mutex_.Lock();
|
log_write_mutex_.Lock();
|
||||||
}
|
}
|
||||||
SequenceNumber seq = versions_->LastSequence();
|
SequenceNumber seq = versions_->LastSequence();
|
||||||
@ -869,7 +869,7 @@ Status DBImpl::WriteRecoverableState() {
|
|||||||
false /* concurrent_memtable_writes */, &next_seq, &dont_care_bool,
|
false /* concurrent_memtable_writes */, &next_seq, &dont_care_bool,
|
||||||
seq_per_batch_);
|
seq_per_batch_);
|
||||||
versions_->SetLastSequence(--next_seq);
|
versions_->SetLastSequence(--next_seq);
|
||||||
if (concurrent_prepare_) {
|
if (two_write_queues_) {
|
||||||
log_write_mutex_.Unlock();
|
log_write_mutex_.Unlock();
|
||||||
}
|
}
|
||||||
if (status.ok()) {
|
if (status.ok()) {
|
||||||
@ -1109,7 +1109,7 @@ void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* cfd,
|
|||||||
Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
|
Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
|
||||||
mutex_.AssertHeld();
|
mutex_.AssertHeld();
|
||||||
WriteThread::Writer nonmem_w;
|
WriteThread::Writer nonmem_w;
|
||||||
if (concurrent_prepare_) {
|
if (two_write_queues_) {
|
||||||
// SwitchMemtable is a rare event. To simply the reasoning, we make sure
|
// SwitchMemtable is a rare event. To simply the reasoning, we make sure
|
||||||
// that there is no concurrent thread writing to WAL.
|
// that there is no concurrent thread writing to WAL.
|
||||||
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
|
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
|
||||||
@ -1135,11 +1135,11 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
|
|||||||
// Attempt to switch to a new memtable and trigger flush of old.
|
// Attempt to switch to a new memtable and trigger flush of old.
|
||||||
// Do this without holding the dbmutex lock.
|
// Do this without holding the dbmutex lock.
|
||||||
assert(versions_->prev_log_number() == 0);
|
assert(versions_->prev_log_number() == 0);
|
||||||
if (concurrent_prepare_) {
|
if (two_write_queues_) {
|
||||||
log_write_mutex_.Lock();
|
log_write_mutex_.Lock();
|
||||||
}
|
}
|
||||||
bool creating_new_log = !log_empty_;
|
bool creating_new_log = !log_empty_;
|
||||||
if (concurrent_prepare_) {
|
if (two_write_queues_) {
|
||||||
log_write_mutex_.Unlock();
|
log_write_mutex_.Unlock();
|
||||||
}
|
}
|
||||||
uint64_t recycle_log_number = 0;
|
uint64_t recycle_log_number = 0;
|
||||||
@ -1224,7 +1224,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
|
|||||||
assert(creating_new_log);
|
assert(creating_new_log);
|
||||||
assert(!new_mem);
|
assert(!new_mem);
|
||||||
assert(!new_log);
|
assert(!new_log);
|
||||||
if (concurrent_prepare_) {
|
if (two_write_queues_) {
|
||||||
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
|
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
|
||||||
}
|
}
|
||||||
return s;
|
return s;
|
||||||
@ -1264,7 +1264,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
|
|||||||
cfd->SetMemtable(new_mem);
|
cfd->SetMemtable(new_mem);
|
||||||
InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context,
|
InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context,
|
||||||
mutable_cf_options);
|
mutable_cf_options);
|
||||||
if (concurrent_prepare_) {
|
if (two_write_queues_) {
|
||||||
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
|
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
|
||||||
}
|
}
|
||||||
return s;
|
return s;
|
||||||
|
@ -486,7 +486,7 @@ Options DBTestBase::GetOptions(
|
|||||||
}
|
}
|
||||||
case kConcurrentWALWrites: {
|
case kConcurrentWALWrites: {
|
||||||
// This options optimize 2PC commit path
|
// This options optimize 2PC commit path
|
||||||
options.concurrent_prepare = true;
|
options.two_write_queues = true;
|
||||||
options.manual_wal_flush = true;
|
options.manual_wal_flush = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -730,7 +730,7 @@ class RecoveryTestHelper {
|
|||||||
batch.Put(key, value);
|
batch.Put(key, value);
|
||||||
WriteBatchInternal::SetSequence(&batch, seq);
|
WriteBatchInternal::SetSequence(&batch, seq);
|
||||||
current_log_writer->AddRecord(WriteBatchInternal::Contents(&batch));
|
current_log_writer->AddRecord(WriteBatchInternal::Contents(&batch));
|
||||||
versions->SetLastToBeWrittenSequence(seq);
|
versions->SetLastAllocatedSequence(seq);
|
||||||
versions->SetLastSequence(seq);
|
versions->SetLastSequence(seq);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -164,7 +164,7 @@ Status ExternalSstFileIngestionJob::Run() {
|
|||||||
// if the dont overlap with any ranges since we have snapshots
|
// if the dont overlap with any ranges since we have snapshots
|
||||||
force_global_seqno = true;
|
force_global_seqno = true;
|
||||||
}
|
}
|
||||||
// It is safe to use this instead of LastToBeWrittenSequence since we are
|
// It is safe to use this instead of LastAllocatedSequence since we are
|
||||||
// the only active writer, and hence they are equal
|
// the only active writer, and hence they are equal
|
||||||
const SequenceNumber last_seqno = versions_->LastSequence();
|
const SequenceNumber last_seqno = versions_->LastSequence();
|
||||||
SuperVersion* super_version = cfd_->GetSuperVersion();
|
SuperVersion* super_version = cfd_->GetSuperVersion();
|
||||||
@ -199,7 +199,7 @@ Status ExternalSstFileIngestionJob::Run() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (consumed_seqno) {
|
if (consumed_seqno) {
|
||||||
versions_->SetLastToBeWrittenSequence(last_seqno + 1);
|
versions_->SetLastAllocatedSequence(last_seqno + 1);
|
||||||
versions_->SetLastSequence(last_seqno + 1);
|
versions_->SetLastSequence(last_seqno + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -546,7 +546,7 @@ class Repairer {
|
|||||||
max_sequence = tables_[i].max_sequence;
|
max_sequence = tables_[i].max_sequence;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
vset_.SetLastToBeWrittenSequence(max_sequence);
|
vset_.SetLastAllocatedSequence(max_sequence);
|
||||||
vset_.SetLastSequence(max_sequence);
|
vset_.SetLastSequence(max_sequence);
|
||||||
|
|
||||||
for (const auto& cf_id_and_tables : cf_id_to_tables) {
|
for (const auto& cf_id_and_tables : cf_id_to_tables) {
|
||||||
|
@ -19,7 +19,8 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl(
|
|||||||
const std::string& dir, const ImmutableDBOptions* options,
|
const std::string& dir, const ImmutableDBOptions* options,
|
||||||
const TransactionLogIterator::ReadOptions& read_options,
|
const TransactionLogIterator::ReadOptions& read_options,
|
||||||
const EnvOptions& soptions, const SequenceNumber seq,
|
const EnvOptions& soptions, const SequenceNumber seq,
|
||||||
std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions)
|
std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions,
|
||||||
|
const bool seq_per_batch)
|
||||||
: dir_(dir),
|
: dir_(dir),
|
||||||
options_(options),
|
options_(options),
|
||||||
read_options_(read_options),
|
read_options_(read_options),
|
||||||
@ -31,7 +32,8 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl(
|
|||||||
currentFileIndex_(0),
|
currentFileIndex_(0),
|
||||||
currentBatchSeq_(0),
|
currentBatchSeq_(0),
|
||||||
currentLastSeq_(0),
|
currentLastSeq_(0),
|
||||||
versions_(versions) {
|
versions_(versions),
|
||||||
|
seq_per_batch_(seq_per_batch) {
|
||||||
assert(files_ != nullptr);
|
assert(files_ != nullptr);
|
||||||
assert(versions_ != nullptr);
|
assert(versions_ != nullptr);
|
||||||
|
|
||||||
@ -241,12 +243,12 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
|
|||||||
}
|
}
|
||||||
startingSequenceNumber_ = expectedSeq;
|
startingSequenceNumber_ = expectedSeq;
|
||||||
// currentStatus_ will be set to Ok if reseek succeeds
|
// currentStatus_ will be set to Ok if reseek succeeds
|
||||||
// Note: this is still ok in seq_pre_batch_ && concurrent_preparep_ mode
|
// Note: this is still ok in seq_pre_batch_ && two_write_queuesp_ mode
|
||||||
// that allows gaps in the WAL since it will still skip over the gap.
|
// that allows gaps in the WAL since it will still skip over the gap.
|
||||||
currentStatus_ = Status::NotFound("Gap in sequence numbers");
|
currentStatus_ = Status::NotFound("Gap in sequence numbers");
|
||||||
// In seq_per_batch mode, gaps in the seq are possible so the strict mode
|
// In seq_per_batch_ mode, gaps in the seq are possible so the strict mode
|
||||||
// should be disabled
|
// should be disabled
|
||||||
return SeekToStartSequence(currentFileIndex_, !options_->seq_per_batch);
|
return SeekToStartSequence(currentFileIndex_, !seq_per_batch_);
|
||||||
}
|
}
|
||||||
|
|
||||||
struct BatchCounter : public WriteBatch::Handler {
|
struct BatchCounter : public WriteBatch::Handler {
|
||||||
@ -284,7 +286,7 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
|
|||||||
};
|
};
|
||||||
|
|
||||||
currentBatchSeq_ = WriteBatchInternal::Sequence(batch.get());
|
currentBatchSeq_ = WriteBatchInternal::Sequence(batch.get());
|
||||||
if (options_->seq_per_batch) {
|
if (seq_per_batch_) {
|
||||||
BatchCounter counter(currentBatchSeq_);
|
BatchCounter counter(currentBatchSeq_);
|
||||||
batch->Iterate(&counter);
|
batch->Iterate(&counter);
|
||||||
currentLastSeq_ = counter.sequence_;
|
currentLastSeq_ = counter.sequence_;
|
||||||
|
@ -62,7 +62,8 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
|
|||||||
const std::string& dir, const ImmutableDBOptions* options,
|
const std::string& dir, const ImmutableDBOptions* options,
|
||||||
const TransactionLogIterator::ReadOptions& read_options,
|
const TransactionLogIterator::ReadOptions& read_options,
|
||||||
const EnvOptions& soptions, const SequenceNumber seqNum,
|
const EnvOptions& soptions, const SequenceNumber seqNum,
|
||||||
std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions);
|
std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions,
|
||||||
|
const bool seq_per_batch);
|
||||||
|
|
||||||
virtual bool Valid() override;
|
virtual bool Valid() override;
|
||||||
|
|
||||||
@ -103,7 +104,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
|
|||||||
// Used only to get latest seq. num
|
// Used only to get latest seq. num
|
||||||
// TODO(icanadi) can this be just a callback?
|
// TODO(icanadi) can this be just a callback?
|
||||||
VersionSet const* const versions_;
|
VersionSet const* const versions_;
|
||||||
|
const bool seq_per_batch_;
|
||||||
// Reads from transaction log only if the writebatch record has been written
|
// Reads from transaction log only if the writebatch record has been written
|
||||||
bool RestrictedRead(Slice* record, std::string* scratch);
|
bool RestrictedRead(Slice* record, std::string* scratch);
|
||||||
// Seeks to startingSequenceNumber reading from startFileIndex in files_.
|
// Seeks to startingSequenceNumber reading from startFileIndex in files_.
|
||||||
|
@ -2414,7 +2414,7 @@ VersionSet::VersionSet(const std::string& dbname,
|
|||||||
manifest_file_number_(0), // Filled by Recover()
|
manifest_file_number_(0), // Filled by Recover()
|
||||||
pending_manifest_file_number_(0),
|
pending_manifest_file_number_(0),
|
||||||
last_sequence_(0),
|
last_sequence_(0),
|
||||||
last_to_be_written_sequence_(0),
|
last_allocated_sequence_(0),
|
||||||
prev_log_number_(0),
|
prev_log_number_(0),
|
||||||
current_version_number_(0),
|
current_version_number_(0),
|
||||||
manifest_file_size_(0),
|
manifest_file_size_(0),
|
||||||
@ -2754,10 +2754,9 @@ void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
|
|||||||
// updated the last_sequence_ yet. It is also possible that the log has is
|
// updated the last_sequence_ yet. It is also possible that the log has is
|
||||||
// expecting some new data that is not written yet. Since LastSequence is an
|
// expecting some new data that is not written yet. Since LastSequence is an
|
||||||
// upper bound on the sequence, it is ok to record
|
// upper bound on the sequence, it is ok to record
|
||||||
// last_to_be_written_sequence_ as the last sequence.
|
// last_allocated_sequence_ as the last sequence.
|
||||||
edit->SetLastSequence(db_options_->concurrent_prepare
|
edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
|
||||||
? last_to_be_written_sequence_
|
: last_sequence_);
|
||||||
: last_sequence_);
|
|
||||||
if (edit->is_column_family_drop_) {
|
if (edit->is_column_family_drop_) {
|
||||||
// if we drop column family, we have to make sure to save max column family,
|
// if we drop column family, we have to make sure to save max column family,
|
||||||
// so that we don't reuse existing ID
|
// so that we don't reuse existing ID
|
||||||
@ -2784,10 +2783,9 @@ void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
|
|||||||
// updated the last_sequence_ yet. It is also possible that the log has is
|
// updated the last_sequence_ yet. It is also possible that the log has is
|
||||||
// expecting some new data that is not written yet. Since LastSequence is an
|
// expecting some new data that is not written yet. Since LastSequence is an
|
||||||
// upper bound on the sequence, it is ok to record
|
// upper bound on the sequence, it is ok to record
|
||||||
// last_to_be_written_sequence_ as the last sequence.
|
// last_allocated_sequence_ as the last sequence.
|
||||||
edit->SetLastSequence(db_options_->concurrent_prepare
|
edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
|
||||||
? last_to_be_written_sequence_
|
: last_sequence_);
|
||||||
: last_sequence_);
|
|
||||||
|
|
||||||
builder->Apply(edit);
|
builder->Apply(edit);
|
||||||
}
|
}
|
||||||
@ -3077,7 +3075,7 @@ Status VersionSet::Recover(
|
|||||||
|
|
||||||
manifest_file_size_ = current_manifest_file_size;
|
manifest_file_size_ = current_manifest_file_size;
|
||||||
next_file_number_.store(next_file + 1);
|
next_file_number_.store(next_file + 1);
|
||||||
last_to_be_written_sequence_ = last_sequence;
|
last_allocated_sequence_ = last_sequence;
|
||||||
last_sequence_ = last_sequence;
|
last_sequence_ = last_sequence;
|
||||||
prev_log_number_ = previous_log_number;
|
prev_log_number_ = previous_log_number;
|
||||||
|
|
||||||
@ -3448,7 +3446,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
|
|||||||
}
|
}
|
||||||
|
|
||||||
next_file_number_.store(next_file + 1);
|
next_file_number_.store(next_file + 1);
|
||||||
last_to_be_written_sequence_ = last_sequence;
|
last_allocated_sequence_ = last_sequence;
|
||||||
last_sequence_ = last_sequence;
|
last_sequence_ = last_sequence;
|
||||||
prev_log_number_ = previous_log_number;
|
prev_log_number_ = previous_log_number;
|
||||||
|
|
||||||
|
@ -765,28 +765,27 @@ class VersionSet {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Note: memory_order_acquire must be sufficient.
|
// Note: memory_order_acquire must be sufficient.
|
||||||
uint64_t LastToBeWrittenSequence() const {
|
uint64_t LastAllocatedSequence() const {
|
||||||
return last_to_be_written_sequence_.load(std::memory_order_seq_cst);
|
return last_allocated_sequence_.load(std::memory_order_seq_cst);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set the last sequence number to s.
|
// Set the last sequence number to s.
|
||||||
void SetLastSequence(uint64_t s) {
|
void SetLastSequence(uint64_t s) {
|
||||||
assert(s >= last_sequence_);
|
assert(s >= last_sequence_);
|
||||||
// Last visible seqeunce must always be less than last written seq
|
// Last visible seqeunce must always be less than last written seq
|
||||||
assert(!db_options_->concurrent_prepare ||
|
assert(!db_options_->two_write_queues || s <= last_allocated_sequence_);
|
||||||
s <= last_to_be_written_sequence_);
|
|
||||||
last_sequence_.store(s, std::memory_order_release);
|
last_sequence_.store(s, std::memory_order_release);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Note: memory_order_release must be sufficient
|
// Note: memory_order_release must be sufficient
|
||||||
void SetLastToBeWrittenSequence(uint64_t s) {
|
void SetLastAllocatedSequence(uint64_t s) {
|
||||||
assert(s >= last_to_be_written_sequence_);
|
assert(s >= last_allocated_sequence_);
|
||||||
last_to_be_written_sequence_.store(s, std::memory_order_seq_cst);
|
last_allocated_sequence_.store(s, std::memory_order_seq_cst);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Note: memory_order_release must be sufficient
|
// Note: memory_order_release must be sufficient
|
||||||
uint64_t FetchAddLastToBeWrittenSequence(uint64_t s) {
|
uint64_t FetchAddLastAllocatedSequence(uint64_t s) {
|
||||||
return last_to_be_written_sequence_.fetch_add(s, std::memory_order_seq_cst);
|
return last_allocated_sequence_.fetch_add(s, std::memory_order_seq_cst);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mark the specified file number as used.
|
// Mark the specified file number as used.
|
||||||
@ -894,8 +893,9 @@ class VersionSet {
|
|||||||
uint64_t pending_manifest_file_number_;
|
uint64_t pending_manifest_file_number_;
|
||||||
// The last seq visible to reads
|
// The last seq visible to reads
|
||||||
std::atomic<uint64_t> last_sequence_;
|
std::atomic<uint64_t> last_sequence_;
|
||||||
// The last seq with which a writer has written/will write.
|
// The last seq that is already allocated. The seq might or might not have
|
||||||
std::atomic<uint64_t> last_to_be_written_sequence_;
|
// appreated in memtable.
|
||||||
|
std::atomic<uint64_t> last_allocated_sequence_;
|
||||||
uint64_t prev_log_number_; // 0 or backing store for memtable being compacted
|
uint64_t prev_log_number_; // 0 or backing store for memtable being compacted
|
||||||
|
|
||||||
// Opened lazily
|
// Opened lazily
|
||||||
|
@ -115,7 +115,7 @@ Status WalManager::GetUpdatesSince(
|
|||||||
}
|
}
|
||||||
iter->reset(new TransactionLogIteratorImpl(
|
iter->reset(new TransactionLogIteratorImpl(
|
||||||
db_options_.wal_dir, &db_options_, read_options, env_options_, seq,
|
db_options_.wal_dir, &db_options_, read_options, env_options_, seq,
|
||||||
std::move(wal_files), version_set));
|
std::move(wal_files), version_set, seq_per_batch_));
|
||||||
return (*iter)->status();
|
return (*iter)->status();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,11 +31,12 @@ namespace rocksdb {
|
|||||||
class WalManager {
|
class WalManager {
|
||||||
public:
|
public:
|
||||||
WalManager(const ImmutableDBOptions& db_options,
|
WalManager(const ImmutableDBOptions& db_options,
|
||||||
const EnvOptions& env_options)
|
const EnvOptions& env_options, const bool seq_per_batch = false)
|
||||||
: db_options_(db_options),
|
: db_options_(db_options),
|
||||||
env_options_(env_options),
|
env_options_(env_options),
|
||||||
env_(db_options.env),
|
env_(db_options.env),
|
||||||
purge_wal_files_last_run_(0) {}
|
purge_wal_files_last_run_(0),
|
||||||
|
seq_per_batch_(seq_per_batch) {}
|
||||||
|
|
||||||
Status GetSortedWalFiles(VectorLogPtr& files);
|
Status GetSortedWalFiles(VectorLogPtr& files);
|
||||||
|
|
||||||
@ -86,6 +87,8 @@ class WalManager {
|
|||||||
// last time when PurgeObsoleteWALFiles ran.
|
// last time when PurgeObsoleteWALFiles ran.
|
||||||
uint64_t purge_wal_files_last_run_;
|
uint64_t purge_wal_files_last_run_;
|
||||||
|
|
||||||
|
bool seq_per_batch_;
|
||||||
|
|
||||||
// obsolete files will be deleted every this seconds if ttl deletion is
|
// obsolete files will be deleted every this seconds if ttl deletion is
|
||||||
// enabled and archive size_limit is disabled.
|
// enabled and archive size_limit is disabled.
|
||||||
static const uint64_t kDefaultIntervalToDeleteObsoleteWAL = 600;
|
static const uint64_t kDefaultIntervalToDeleteObsoleteWAL = 600;
|
||||||
|
@ -67,7 +67,7 @@ class WalManagerTest : public testing::Test {
|
|||||||
batch.Put(key, value);
|
batch.Put(key, value);
|
||||||
WriteBatchInternal::SetSequence(&batch, seq);
|
WriteBatchInternal::SetSequence(&batch, seq);
|
||||||
current_log_writer_->AddRecord(WriteBatchInternal::Contents(&batch));
|
current_log_writer_->AddRecord(WriteBatchInternal::Contents(&batch));
|
||||||
versions_->SetLastToBeWrittenSequence(seq);
|
versions_->SetLastAllocatedSequence(seq);
|
||||||
versions_->SetLastSequence(seq);
|
versions_->SetLastSequence(seq);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -136,9 +136,8 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
|
|||||||
options.create_if_missing = true;
|
options.create_if_missing = true;
|
||||||
options.allow_concurrent_memtable_write = allow_parallel;
|
options.allow_concurrent_memtable_write = allow_parallel;
|
||||||
options.enable_pipelined_write = enable_pipelined_write;
|
options.enable_pipelined_write = enable_pipelined_write;
|
||||||
options.concurrent_prepare = two_queues;
|
options.two_write_queues = two_queues;
|
||||||
if (options.enable_pipelined_write &&
|
if (options.enable_pipelined_write && options.two_write_queues) {
|
||||||
options.concurrent_prepare) {
|
|
||||||
// This combination is not supported
|
// This combination is not supported
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -905,22 +905,12 @@ struct DBOptions {
|
|||||||
// allows the memtable writes not to lag behind other writes. It can be used
|
// allows the memtable writes not to lag behind other writes. It can be used
|
||||||
// to optimize MySQL 2PC in which only the commits, which are serial, write to
|
// to optimize MySQL 2PC in which only the commits, which are serial, write to
|
||||||
// memtable.
|
// memtable.
|
||||||
bool concurrent_prepare = false;
|
bool two_write_queues = false;
|
||||||
|
|
||||||
// If true WAL is not flushed automatically after each write. Instead it
|
// If true WAL is not flushed automatically after each write. Instead it
|
||||||
// relies on manual invocation of FlushWAL to write the WAL buffer to its
|
// relies on manual invocation of FlushWAL to write the WAL buffer to its
|
||||||
// file.
|
// file.
|
||||||
bool manual_wal_flush = false;
|
bool manual_wal_flush = false;
|
||||||
|
|
||||||
// Increase the sequence number after writing each batch, whether memtable is
|
|
||||||
// disabled for that or not. Otherwise the sequence number is increased after
|
|
||||||
// writing each key into memtable. This implies that when memtable_disable is
|
|
||||||
// set, the seq is not increased at all.
|
|
||||||
//
|
|
||||||
// Default: false
|
|
||||||
// Note: This option is experimental and meant to be used only for internal
|
|
||||||
// projects.
|
|
||||||
bool seq_per_batch = false;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// Options to control the behavior of a database (passed to DB::Open)
|
// Options to control the behavior of a database (passed to DB::Open)
|
||||||
|
@ -85,9 +85,8 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
|
|||||||
avoid_flush_during_recovery(options.avoid_flush_during_recovery),
|
avoid_flush_during_recovery(options.avoid_flush_during_recovery),
|
||||||
allow_ingest_behind(options.allow_ingest_behind),
|
allow_ingest_behind(options.allow_ingest_behind),
|
||||||
preserve_deletes(options.preserve_deletes),
|
preserve_deletes(options.preserve_deletes),
|
||||||
concurrent_prepare(options.concurrent_prepare),
|
two_write_queues(options.two_write_queues),
|
||||||
manual_wal_flush(options.manual_wal_flush),
|
manual_wal_flush(options.manual_wal_flush) {
|
||||||
seq_per_batch(options.seq_per_batch) {
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void ImmutableDBOptions::Dump(Logger* log) const {
|
void ImmutableDBOptions::Dump(Logger* log) const {
|
||||||
@ -217,11 +216,10 @@ void ImmutableDBOptions::Dump(Logger* log) const {
|
|||||||
allow_ingest_behind);
|
allow_ingest_behind);
|
||||||
ROCKS_LOG_HEADER(log, " Options.preserve_deletes: %d",
|
ROCKS_LOG_HEADER(log, " Options.preserve_deletes: %d",
|
||||||
preserve_deletes);
|
preserve_deletes);
|
||||||
ROCKS_LOG_HEADER(log, " Options.concurrent_prepare: %d",
|
ROCKS_LOG_HEADER(log, " Options.two_write_queues: %d",
|
||||||
concurrent_prepare);
|
two_write_queues);
|
||||||
ROCKS_LOG_HEADER(log, " Options.manual_wal_flush: %d",
|
ROCKS_LOG_HEADER(log, " Options.manual_wal_flush: %d",
|
||||||
manual_wal_flush);
|
manual_wal_flush);
|
||||||
ROCKS_LOG_HEADER(log, " Options.seq_per_batch: %d", seq_per_batch);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
MutableDBOptions::MutableDBOptions()
|
MutableDBOptions::MutableDBOptions()
|
||||||
|
@ -77,9 +77,8 @@ struct ImmutableDBOptions {
|
|||||||
bool avoid_flush_during_recovery;
|
bool avoid_flush_during_recovery;
|
||||||
bool allow_ingest_behind;
|
bool allow_ingest_behind;
|
||||||
bool preserve_deletes;
|
bool preserve_deletes;
|
||||||
bool concurrent_prepare;
|
bool two_write_queues;
|
||||||
bool manual_wal_flush;
|
bool manual_wal_flush;
|
||||||
bool seq_per_batch;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
struct MutableDBOptions {
|
struct MutableDBOptions {
|
||||||
|
@ -357,21 +357,21 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
|
|||||||
OptionVerificationType::kNormal, false,
|
OptionVerificationType::kNormal, false,
|
||||||
offsetof(struct ImmutableDBOptions, allow_ingest_behind)}},
|
offsetof(struct ImmutableDBOptions, allow_ingest_behind)}},
|
||||||
{"preserve_deletes",
|
{"preserve_deletes",
|
||||||
{offsetof(struct DBOptions, preserve_deletes), OptionType::kBoolean,
|
{offsetof(struct DBOptions, preserve_deletes), OptionType::kBoolean,
|
||||||
OptionVerificationType::kNormal, false,
|
OptionVerificationType::kNormal, false,
|
||||||
offsetof(struct ImmutableDBOptions, preserve_deletes)}},
|
offsetof(struct ImmutableDBOptions, preserve_deletes)}},
|
||||||
{"concurrent_prepare",
|
{"concurrent_prepare", // Deprecated by two_write_queues
|
||||||
{offsetof(struct DBOptions, concurrent_prepare), OptionType::kBoolean,
|
{0, OptionType::kBoolean, OptionVerificationType::kDeprecated, false, 0}},
|
||||||
|
{"two_write_queues",
|
||||||
|
{offsetof(struct DBOptions, two_write_queues), OptionType::kBoolean,
|
||||||
OptionVerificationType::kNormal, false,
|
OptionVerificationType::kNormal, false,
|
||||||
offsetof(struct ImmutableDBOptions, concurrent_prepare)}},
|
offsetof(struct ImmutableDBOptions, two_write_queues)}},
|
||||||
{"manual_wal_flush",
|
{"manual_wal_flush",
|
||||||
{offsetof(struct DBOptions, manual_wal_flush), OptionType::kBoolean,
|
{offsetof(struct DBOptions, manual_wal_flush), OptionType::kBoolean,
|
||||||
OptionVerificationType::kNormal, false,
|
OptionVerificationType::kNormal, false,
|
||||||
offsetof(struct ImmutableDBOptions, manual_wal_flush)}},
|
offsetof(struct ImmutableDBOptions, manual_wal_flush)}},
|
||||||
{"seq_per_batch",
|
{"seq_per_batch",
|
||||||
{offsetof(struct DBOptions, seq_per_batch), OptionType::kBoolean,
|
{0, OptionType::kBoolean, OptionVerificationType::kDeprecated, false, 0}}};
|
||||||
OptionVerificationType::kNormal, false,
|
|
||||||
offsetof(struct ImmutableDBOptions, seq_per_batch)}}};
|
|
||||||
|
|
||||||
// offset_of is used to get the offset of a class data member
|
// offset_of is used to get the offset of a class data member
|
||||||
// ex: offset_of(&ColumnFamilyOptions::num_levels)
|
// ex: offset_of(&ColumnFamilyOptions::num_levels)
|
||||||
|
@ -284,6 +284,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) {
|
|||||||
"allow_ingest_behind=false;"
|
"allow_ingest_behind=false;"
|
||||||
"preserve_deletes=false;"
|
"preserve_deletes=false;"
|
||||||
"concurrent_prepare=false;"
|
"concurrent_prepare=false;"
|
||||||
|
"two_write_queues=false;"
|
||||||
"manual_wal_flush=false;"
|
"manual_wal_flush=false;"
|
||||||
"seq_per_batch=false;",
|
"seq_per_batch=false;",
|
||||||
new_options));
|
new_options));
|
||||||
|
@ -189,12 +189,11 @@ Status TransactionDB::Open(
|
|||||||
std::vector<ColumnFamilyDescriptor> column_families_copy = column_families;
|
std::vector<ColumnFamilyDescriptor> column_families_copy = column_families;
|
||||||
std::vector<size_t> compaction_enabled_cf_indices;
|
std::vector<size_t> compaction_enabled_cf_indices;
|
||||||
DBOptions db_options_2pc = db_options;
|
DBOptions db_options_2pc = db_options;
|
||||||
if (txn_db_options.write_policy == WRITE_PREPARED) {
|
|
||||||
db_options_2pc.seq_per_batch = true;
|
|
||||||
}
|
|
||||||
PrepareWrap(&db_options_2pc, &column_families_copy,
|
PrepareWrap(&db_options_2pc, &column_families_copy,
|
||||||
&compaction_enabled_cf_indices);
|
&compaction_enabled_cf_indices);
|
||||||
s = DB::Open(db_options_2pc, dbname, column_families_copy, handles, &db);
|
const bool use_seq_per_batch = txn_db_options.write_policy == WRITE_PREPARED;
|
||||||
|
s = DBImpl::Open(db_options_2pc, dbname, column_families_copy, handles, &db,
|
||||||
|
use_seq_per_batch);
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles,
|
s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles,
|
||||||
dbptr);
|
dbptr);
|
||||||
|
@ -4856,12 +4856,12 @@ TEST_P(TransactionTest, SeqAdvanceTest) {
|
|||||||
auto seq = db_impl->GetLatestSequenceNumber();
|
auto seq = db_impl->GetLatestSequenceNumber();
|
||||||
exp_seq = seq;
|
exp_seq = seq;
|
||||||
txn_t0(0);
|
txn_t0(0);
|
||||||
seq = db_impl->TEST_GetLatestVisibleSequenceNumber();
|
seq = db_impl->TEST_GetLastVisibleSequence();
|
||||||
ASSERT_EQ(exp_seq, seq);
|
ASSERT_EQ(exp_seq, seq);
|
||||||
|
|
||||||
if (branch_do(n, &branch)) {
|
if (branch_do(n, &branch)) {
|
||||||
db_impl->Flush(fopt);
|
db_impl->Flush(fopt);
|
||||||
seq = db_impl->TEST_GetLatestVisibleSequenceNumber();
|
seq = db_impl->TEST_GetLastVisibleSequence();
|
||||||
ASSERT_EQ(exp_seq, seq);
|
ASSERT_EQ(exp_seq, seq);
|
||||||
}
|
}
|
||||||
if (branch_do(n, &branch)) {
|
if (branch_do(n, &branch)) {
|
||||||
@ -4874,16 +4874,16 @@ TEST_P(TransactionTest, SeqAdvanceTest) {
|
|||||||
|
|
||||||
// Doing it twice might detect some bugs
|
// Doing it twice might detect some bugs
|
||||||
txn_t0(1);
|
txn_t0(1);
|
||||||
seq = db_impl->TEST_GetLatestVisibleSequenceNumber();
|
seq = db_impl->TEST_GetLastVisibleSequence();
|
||||||
ASSERT_EQ(exp_seq, seq);
|
ASSERT_EQ(exp_seq, seq);
|
||||||
|
|
||||||
txn_t1(0);
|
txn_t1(0);
|
||||||
seq = db_impl->TEST_GetLatestVisibleSequenceNumber();
|
seq = db_impl->TEST_GetLastVisibleSequence();
|
||||||
ASSERT_EQ(exp_seq, seq);
|
ASSERT_EQ(exp_seq, seq);
|
||||||
|
|
||||||
if (branch_do(n, &branch)) {
|
if (branch_do(n, &branch)) {
|
||||||
db_impl->Flush(fopt);
|
db_impl->Flush(fopt);
|
||||||
seq = db_impl->TEST_GetLatestVisibleSequenceNumber();
|
seq = db_impl->TEST_GetLastVisibleSequence();
|
||||||
ASSERT_EQ(exp_seq, seq);
|
ASSERT_EQ(exp_seq, seq);
|
||||||
}
|
}
|
||||||
if (branch_do(n, &branch)) {
|
if (branch_do(n, &branch)) {
|
||||||
@ -4895,12 +4895,12 @@ TEST_P(TransactionTest, SeqAdvanceTest) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
txn_t3(0);
|
txn_t3(0);
|
||||||
seq = db_impl->TEST_GetLatestVisibleSequenceNumber();
|
seq = db_impl->TEST_GetLastVisibleSequence();
|
||||||
ASSERT_EQ(exp_seq, seq);
|
ASSERT_EQ(exp_seq, seq);
|
||||||
|
|
||||||
if (branch_do(n, &branch)) {
|
if (branch_do(n, &branch)) {
|
||||||
db_impl->Flush(fopt);
|
db_impl->Flush(fopt);
|
||||||
seq = db_impl->TEST_GetLatestVisibleSequenceNumber();
|
seq = db_impl->TEST_GetLastVisibleSequence();
|
||||||
ASSERT_EQ(exp_seq, seq);
|
ASSERT_EQ(exp_seq, seq);
|
||||||
}
|
}
|
||||||
if (branch_do(n, &branch)) {
|
if (branch_do(n, &branch)) {
|
||||||
@ -4912,16 +4912,16 @@ TEST_P(TransactionTest, SeqAdvanceTest) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
txn_t0(0);
|
txn_t0(0);
|
||||||
seq = db_impl->TEST_GetLatestVisibleSequenceNumber();
|
seq = db_impl->TEST_GetLastVisibleSequence();
|
||||||
ASSERT_EQ(exp_seq, seq);
|
ASSERT_EQ(exp_seq, seq);
|
||||||
|
|
||||||
txn_t2(0);
|
txn_t2(0);
|
||||||
seq = db_impl->TEST_GetLatestVisibleSequenceNumber();
|
seq = db_impl->TEST_GetLastVisibleSequence();
|
||||||
ASSERT_EQ(exp_seq, seq);
|
ASSERT_EQ(exp_seq, seq);
|
||||||
|
|
||||||
if (branch_do(n, &branch)) {
|
if (branch_do(n, &branch)) {
|
||||||
db_impl->Flush(fopt);
|
db_impl->Flush(fopt);
|
||||||
seq = db_impl->TEST_GetLatestVisibleSequenceNumber();
|
seq = db_impl->TEST_GetLastVisibleSequence();
|
||||||
ASSERT_EQ(exp_seq, seq);
|
ASSERT_EQ(exp_seq, seq);
|
||||||
}
|
}
|
||||||
if (branch_do(n, &branch)) {
|
if (branch_do(n, &branch)) {
|
||||||
|
@ -54,7 +54,7 @@ class TransactionTest : public ::testing::TestWithParam<
|
|||||||
options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
|
options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
|
||||||
env = new FaultInjectionTestEnv(Env::Default());
|
env = new FaultInjectionTestEnv(Env::Default());
|
||||||
options.env = env;
|
options.env = env;
|
||||||
options.concurrent_prepare = std::get<1>(GetParam());
|
options.two_write_queues = std::get<1>(GetParam());
|
||||||
dbname = test::TmpDir() + "/transaction_testdb";
|
dbname = test::TmpDir() + "/transaction_testdb";
|
||||||
|
|
||||||
DestroyDB(dbname, options);
|
DestroyDB(dbname, options);
|
||||||
@ -113,11 +113,10 @@ class TransactionTest : public ::testing::TestWithParam<
|
|||||||
std::vector<ColumnFamilyHandle*> handles;
|
std::vector<ColumnFamilyHandle*> handles;
|
||||||
DB* root_db;
|
DB* root_db;
|
||||||
Options options_copy(options);
|
Options options_copy(options);
|
||||||
if (txn_db_options.write_policy == WRITE_PREPARED) {
|
const bool use_seq_per_batch =
|
||||||
options_copy.seq_per_batch = true;
|
txn_db_options.write_policy == WRITE_PREPARED;
|
||||||
}
|
Status s = DBImpl::Open(options_copy, dbname, column_families, &handles,
|
||||||
Status s =
|
&root_db, use_seq_per_batch);
|
||||||
DB::Open(options_copy, dbname, column_families, &handles, &root_db);
|
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
assert(handles.size() == 1);
|
assert(handles.size() == 1);
|
||||||
s = TransactionDB::WrapStackableDB(
|
s = TransactionDB::WrapStackableDB(
|
||||||
@ -144,7 +143,7 @@ class TransactionTest : public ::testing::TestWithParam<
|
|||||||
} else {
|
} else {
|
||||||
// Consume one seq per batch
|
// Consume one seq per batch
|
||||||
exp_seq++;
|
exp_seq++;
|
||||||
if (options.concurrent_prepare) {
|
if (options.two_write_queues) {
|
||||||
// Consume one seq for commit
|
// Consume one seq for commit
|
||||||
exp_seq++;
|
exp_seq++;
|
||||||
}
|
}
|
||||||
@ -169,7 +168,7 @@ class TransactionTest : public ::testing::TestWithParam<
|
|||||||
} else {
|
} else {
|
||||||
// Consume one seq per batch
|
// Consume one seq per batch
|
||||||
exp_seq++;
|
exp_seq++;
|
||||||
if (options.concurrent_prepare) {
|
if (options.two_write_queues) {
|
||||||
// Consume one seq for commit
|
// Consume one seq for commit
|
||||||
exp_seq++;
|
exp_seq++;
|
||||||
}
|
}
|
||||||
@ -197,7 +196,7 @@ class TransactionTest : public ::testing::TestWithParam<
|
|||||||
} else {
|
} else {
|
||||||
// Consume one seq per batch
|
// Consume one seq per batch
|
||||||
exp_seq++;
|
exp_seq++;
|
||||||
if (options.concurrent_prepare) {
|
if (options.two_write_queues) {
|
||||||
// Consume one seq for commit
|
// Consume one seq for commit
|
||||||
exp_seq++;
|
exp_seq++;
|
||||||
}
|
}
|
||||||
|
@ -625,7 +625,7 @@ TEST_P(WritePreparedTransactionTest, SeqAdvanceConcurrentTest) {
|
|||||||
printf("Tested %" ROCKSDB_PRIszt " cases so far\n", n);
|
printf("Tested %" ROCKSDB_PRIszt " cases so far\n", n);
|
||||||
}
|
}
|
||||||
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
|
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
|
||||||
auto seq = db_impl->TEST_GetLatestVisibleSequenceNumber();
|
auto seq = db_impl->TEST_GetLastVisibleSequence();
|
||||||
exp_seq = seq;
|
exp_seq = seq;
|
||||||
// This is increased before writing the batch for commit
|
// This is increased before writing the batch for commit
|
||||||
commit_writes = 0;
|
commit_writes = 0;
|
||||||
@ -693,17 +693,17 @@ TEST_P(WritePreparedTransactionTest, SeqAdvanceConcurrentTest) {
|
|||||||
for (auto& t : threads) {
|
for (auto& t : threads) {
|
||||||
t.join();
|
t.join();
|
||||||
}
|
}
|
||||||
if (options.concurrent_prepare) {
|
if (options.two_write_queues) {
|
||||||
// In this case none of the above scheduling tricks to deterministically
|
// In this case none of the above scheduling tricks to deterministically
|
||||||
// form merged bactches works because the writes go to saparte queues.
|
// form merged bactches works because the writes go to saparte queues.
|
||||||
// This would result in different write groups in each run of the test. We
|
// This would result in different write groups in each run of the test. We
|
||||||
// still keep the test since althgouh non-deterministic and hard to debug,
|
// still keep the test since althgouh non-deterministic and hard to debug,
|
||||||
// it is still useful to have.
|
// it is still useful to have.
|
||||||
// TODO(myabandeh): Add a deterministic unit test for concurrent_prepare
|
// TODO(myabandeh): Add a deterministic unit test for two_write_queues
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if memtable inserts advanced seq number as expected
|
// Check if memtable inserts advanced seq number as expected
|
||||||
seq = db_impl->TEST_GetLatestVisibleSequenceNumber();
|
seq = db_impl->TEST_GetLastVisibleSequence();
|
||||||
ASSERT_EQ(exp_seq, seq);
|
ASSERT_EQ(exp_seq, seq);
|
||||||
|
|
||||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||||
@ -1258,7 +1258,7 @@ TEST_P(WritePreparedTransactionTest, DisableGCDuringRecoveryTest) {
|
|||||||
VerifyKeys({{"foo", v}});
|
VerifyKeys({{"foo", v}});
|
||||||
seq++; // one for the key/value
|
seq++; // one for the key/value
|
||||||
KeyVersion kv = {"foo", v, seq, kTypeValue};
|
KeyVersion kv = {"foo", v, seq, kTypeValue};
|
||||||
if (options.concurrent_prepare) {
|
if (options.two_write_queues) {
|
||||||
seq++; // one for the commit
|
seq++; // one for the commit
|
||||||
}
|
}
|
||||||
versions.emplace_back(kv);
|
versions.emplace_back(kv);
|
||||||
@ -1306,10 +1306,10 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepUncommittedKeys) {
|
|||||||
auto add_key = [&](std::function<Status()> func) {
|
auto add_key = [&](std::function<Status()> func) {
|
||||||
ASSERT_OK(func());
|
ASSERT_OK(func());
|
||||||
expected_seq++;
|
expected_seq++;
|
||||||
if (options.concurrent_prepare) {
|
if (options.two_write_queues) {
|
||||||
expected_seq++; // 1 for commit
|
expected_seq++; // 1 for commit
|
||||||
}
|
}
|
||||||
ASSERT_EQ(expected_seq, db_impl->TEST_GetLatestVisibleSequenceNumber());
|
ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
|
||||||
snapshots.push_back(db->GetSnapshot());
|
snapshots.push_back(db->GetSnapshot());
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -1397,7 +1397,7 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) {
|
|||||||
ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
|
ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
|
||||||
ASSERT_OK(txn1->Commit());
|
ASSERT_OK(txn1->Commit());
|
||||||
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
|
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
|
||||||
ASSERT_EQ(++expected_seq, db_impl->TEST_GetLatestVisibleSequenceNumber());
|
ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence());
|
||||||
delete txn1;
|
delete txn1;
|
||||||
// Take a snapshots to avoid keys get evicted before compaction.
|
// Take a snapshots to avoid keys get evicted before compaction.
|
||||||
const Snapshot* snapshot1 = db->GetSnapshot();
|
const Snapshot* snapshot1 = db->GetSnapshot();
|
||||||
@ -1410,24 +1410,24 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) {
|
|||||||
// txn2 commit after snapshot2 and it is not visible.
|
// txn2 commit after snapshot2 and it is not visible.
|
||||||
const Snapshot* snapshot2 = db->GetSnapshot();
|
const Snapshot* snapshot2 = db->GetSnapshot();
|
||||||
ASSERT_OK(txn2->Commit());
|
ASSERT_OK(txn2->Commit());
|
||||||
ASSERT_EQ(++expected_seq, db_impl->TEST_GetLatestVisibleSequenceNumber());
|
ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence());
|
||||||
delete txn2;
|
delete txn2;
|
||||||
// Take a snapshots to avoid keys get evicted before compaction.
|
// Take a snapshots to avoid keys get evicted before compaction.
|
||||||
const Snapshot* snapshot3 = db->GetSnapshot();
|
const Snapshot* snapshot3 = db->GetSnapshot();
|
||||||
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_2"));
|
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_2"));
|
||||||
expected_seq++; // 1 for write
|
expected_seq++; // 1 for write
|
||||||
SequenceNumber seq1 = expected_seq;
|
SequenceNumber seq1 = expected_seq;
|
||||||
if (options.concurrent_prepare) {
|
if (options.two_write_queues) {
|
||||||
expected_seq++; // 1 for commit
|
expected_seq++; // 1 for commit
|
||||||
}
|
}
|
||||||
ASSERT_EQ(expected_seq, db_impl->TEST_GetLatestVisibleSequenceNumber());
|
ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
|
||||||
ASSERT_OK(db->Put(WriteOptions(), "key2", "value2_2"));
|
ASSERT_OK(db->Put(WriteOptions(), "key2", "value2_2"));
|
||||||
expected_seq++; // 1 for write
|
expected_seq++; // 1 for write
|
||||||
SequenceNumber seq2 = expected_seq;
|
SequenceNumber seq2 = expected_seq;
|
||||||
if (options.concurrent_prepare) {
|
if (options.two_write_queues) {
|
||||||
expected_seq++; // 1 for commit
|
expected_seq++; // 1 for commit
|
||||||
}
|
}
|
||||||
ASSERT_EQ(expected_seq, db_impl->TEST_GetLatestVisibleSequenceNumber());
|
ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
|
||||||
ASSERT_OK(db->Flush(FlushOptions()));
|
ASSERT_OK(db->Flush(FlushOptions()));
|
||||||
db->ReleaseSnapshot(snapshot1);
|
db->ReleaseSnapshot(snapshot1);
|
||||||
db->ReleaseSnapshot(snapshot3);
|
db->ReleaseSnapshot(snapshot3);
|
||||||
|
@ -90,7 +90,7 @@ Status WritePreparedTxn::CommitWithoutPrepareInternal() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
SequenceNumber WritePreparedTxn::GetACommitSeqNumber(SequenceNumber prep_seq) {
|
SequenceNumber WritePreparedTxn::GetACommitSeqNumber(SequenceNumber prep_seq) {
|
||||||
if (db_impl_->immutable_db_options().concurrent_prepare) {
|
if (db_impl_->immutable_db_options().two_write_queues) {
|
||||||
return db_impl_->IncAndFetchSequenceNumber();
|
return db_impl_->IncAndFetchSequenceNumber();
|
||||||
} else {
|
} else {
|
||||||
return prep_seq;
|
return prep_seq;
|
||||||
|
@ -46,7 +46,7 @@ class WritePreparedTxn : public PessimisticTransaction {
|
|||||||
virtual ~WritePreparedTxn() {}
|
virtual ~WritePreparedTxn() {}
|
||||||
|
|
||||||
// To make WAL commit markers visible, the snapshot will be based on the last
|
// To make WAL commit markers visible, the snapshot will be based on the last
|
||||||
// seq in the WAL, LastToBeWrittenSquence, as opposed to the last seq in the
|
// seq in the WAL, LastAllocatedSequence, as opposed to the last seq in the
|
||||||
// memtable.
|
// memtable.
|
||||||
using Transaction::Get;
|
using Transaction::Get;
|
||||||
virtual Status Get(const ReadOptions& options,
|
virtual Status Get(const ReadOptions& options,
|
||||||
@ -54,7 +54,7 @@ class WritePreparedTxn : public PessimisticTransaction {
|
|||||||
PinnableSlice* value) override;
|
PinnableSlice* value) override;
|
||||||
|
|
||||||
// To make WAL commit markers visible, the snapshot will be based on the last
|
// To make WAL commit markers visible, the snapshot will be based on the last
|
||||||
// seq in the WAL, LastToBeWrittenSquence, as opposed to the last seq in the
|
// seq in the WAL, LastAllocatedSequence, as opposed to the last seq in the
|
||||||
// memtable.
|
// memtable.
|
||||||
using Transaction::GetIterator;
|
using Transaction::GetIterator;
|
||||||
virtual Iterator* GetIterator(const ReadOptions& options) override;
|
virtual Iterator* GetIterator(const ReadOptions& options) override;
|
||||||
@ -76,7 +76,7 @@ class WritePreparedTxn : public PessimisticTransaction {
|
|||||||
// commit entails writing only a commit marker in the WAL. The sequence number
|
// commit entails writing only a commit marker in the WAL. The sequence number
|
||||||
// of the commit marker is then the commit timestamp of the transaction. To
|
// of the commit marker is then the commit timestamp of the transaction. To
|
||||||
// make the commit timestamp visible to readers, their snapshot is based on
|
// make the commit timestamp visible to readers, their snapshot is based on
|
||||||
// the last seq in the WAL, LastToBeWrittenSquence, as opposed to the last seq
|
// the last seq in the WAL, LastAllocatedSequence, as opposed to the last seq
|
||||||
// in the memtable.
|
// in the memtable.
|
||||||
Status CommitInternal() override;
|
Status CommitInternal() override;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user