From 3122cb435875d720fc3d23a48eb7c0fa89d869aa Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Tue, 1 Feb 2022 22:17:46 -0800 Subject: [PATCH] Revise APIs related to user-defined timestamp (#8946) Summary: ajkr reminded me that we have a rule of not including per-kv related data in `WriteOptions`. Namely, `WriteOptions` should not include information about "what-to-write", but should just include information about "how-to-write". According to this rule, `WriteOptions::timestamp` (experimental) is clearly a violation. Therefore, this PR removes `WriteOptions::timestamp` for compliance. After the removal, we need to pass timestamp info via another set of APIs. This PR proposes a set of overloaded functions `Put(write_opts, key, value, ts)`, `Delete(write_opts, key, ts)`, and `SingleDelete(write_opts, key, ts)`. Planned to add `Write(write_opts, batch, ts)`, but its complexity made me reconsider doing it in another PR (maybe). For better checking and returning error early, we also add a new set of APIs to `WriteBatch` that take extra `timestamp` information when writing to `WriteBatch`es. These set of APIs in `WriteBatchWithIndex` are currently not supported, and are on our TODO list. Removed `WriteBatch::AssignTimestamps()` and renamed `WriteBatch::AssignTimestamp()` to `WriteBatch::UpdateTimestamps()` since this method require that all keys have space for timestamps allocated already and multiple timestamps can be updated. The constructor of `WriteBatch` now takes a fourth argument `default_cf_ts_sz` which is the timestamp size of the default column family. This will be used to allocate space when calling APIs that do not specify a column family handle. Also, updated `DB::Get()`, `DB::MultiGet()`, `DB::NewIterator()`, `DB::NewIterators()` methods, replacing some assertions about timestamp to returning Status code. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8946 Test Plan: make check ./db_bench -benchmarks=fillseq,fillrandom,readrandom,readseq,deleterandom -user_timestamp_size=8 ./db_stress --user_timestamp_size=8 -nooverwritepercent=0 -test_secondary=0 -secondary_catch_up_one_in=0 -continuous_verification_interval=0 Make sure there is no perf regression by running the following ``` ./db_bench_opt -db=/dev/shm/rocksdb -use_existing_db=0 -level0_stop_writes_trigger=256 -level0_slowdown_writes_trigger=256 -level0_file_num_compaction_trigger=256 -disable_wal=1 -duration=10 -benchmarks=fillrandom ``` Before this PR ``` DB path: [/dev/shm/rocksdb] fillrandom : 1.831 micros/op 546235 ops/sec; 60.4 MB/s ``` After this PR ``` DB path: [/dev/shm/rocksdb] fillrandom : 1.820 micros/op 549404 ops/sec; 60.8 MB/s ``` Reviewed By: ltamasi Differential Revision: D33721359 Pulled By: riversand963 fbshipit-source-id: c131561534272c120ffb80711d42748d21badf09 --- HISTORY.md | 1 + db/db_impl/compacted_db_impl.cc | 11 + db/db_impl/db_impl.cc | 129 +++- db/db_impl/db_impl.h | 81 ++- db/db_impl/db_impl_readonly.cc | 31 + db/db_impl/db_impl_secondary.cc | 31 + db/db_impl/db_impl_write.cc | 191 +++--- db/db_kv_checksum_test.cc | 2 +- db/db_test.cc | 13 + db/db_test2.cc | 9 +- db/db_with_timestamp_basic_test.cc | 574 +++++++++--------- db/db_with_timestamp_compaction_test.cc | 15 +- db/write_batch.cc | 350 +++++++++-- db/write_batch_internal.h | 152 ++--- db/write_batch_test.cc | 108 ++-- db_stress_tool/batched_ops_stress.cc | 6 +- db_stress_tool/db_stress_test_base.cc | 6 +- db_stress_tool/no_batched_ops_stress.cc | 28 +- include/rocksdb/db.h | 35 ++ include/rocksdb/options.h | 14 +- include/rocksdb/utilities/stackable_db.h | 20 + include/rocksdb/utilities/transaction_db.h | 1 + .../utilities/write_batch_with_index.h | 18 + include/rocksdb/write_batch.h | 88 ++- include/rocksdb/write_batch_base.h | 11 + tools/db_bench_tool.cc | 88 +-- utilities/blob_db/blob_db.h | 1 + utilities/blob_db/blob_db_impl.h | 1 + .../optimistic_transaction_db_impl.h | 1 + .../transactions/write_prepared_txn_db.cc | 1 + .../write_batch_with_index.cc | 29 + 31 files changed, 1293 insertions(+), 753 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index cd5ffe43d..3875eaff0 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -21,6 +21,7 @@ * Remove ReadOptions::iter_start_seqnum which has been deprecated. * Remove DBOptions::preserved_deletes and DB::SetPreserveDeletesSequenceNumber(). * Remove deprecated API AdvancedColumnFamilyOptions::rate_limit_delay_max_milliseconds. +* Removed timestamp from WriteOptions. Accordingly, added to DB APIs Put, Delete, SingleDelete, etc. accepting an additional argument 'timestamp'. Added Put, Delete, SingleDelete, etc to WriteBatch accepting an additional argument 'timestamp'. Removed WriteBatch::AssignTimestamps(vector) API. Renamed WriteBatch::AssignTimestamp() to WriteBatch::UpdateTimestamps() with clarified comments. ### Behavior Changes * Disallow the combination of DBOptions.use_direct_io_for_flush_and_compaction == true and DBOptions.writable_file_max_buffer_size == 0. This combination can cause WritableFileWriter::Append() to loop forever, and it does not make much sense in direct IO. diff --git a/db/db_impl/compacted_db_impl.cc b/db/db_impl/compacted_db_impl.cc index 72d9c57c0..b65455437 100644 --- a/db/db_impl/compacted_db_impl.cc +++ b/db/db_impl/compacted_db_impl.cc @@ -40,6 +40,11 @@ size_t CompactedDBImpl::FindFile(const Slice& key) { Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*, const Slice& key, PinnableSlice* value) { + assert(user_comparator_); + if (options.timestamp || user_comparator_->timestamp_size()) { + // TODO: support timestamp + return Status::NotSupported(); + } GetContext get_context(user_comparator_, nullptr, nullptr, nullptr, GetContext::kNotFound, key, value, nullptr, nullptr, nullptr, true, nullptr, nullptr); @@ -58,6 +63,11 @@ Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*, std::vector CompactedDBImpl::MultiGet(const ReadOptions& options, const std::vector&, const std::vector& keys, std::vector* values) { + assert(user_comparator_); + if (user_comparator_->timestamp_size() || options.timestamp) { + // TODO: support timestamp + return std::vector(keys.size(), Status::NotSupported()); + } autovector reader_list; for (const auto& key : keys) { const FdWithKeyRange& f = files_.files[FindFile(key)]; @@ -69,6 +79,7 @@ std::vector CompactedDBImpl::MultiGet(const ReadOptions& options, reader_list.push_back(f.fd.table_reader); } } + std::vector statuses(keys.size(), Status::NotFound()); values->resize(keys.size()); int idx = 0; diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 38bac895c..93c70276d 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1730,19 +1730,21 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, get_impl_options.merge_operands != nullptr); assert(get_impl_options.column_family); - const Comparator* ucmp = get_impl_options.column_family->GetComparator(); - assert(ucmp); - size_t ts_sz = ucmp->timestamp_size(); - GetWithTimestampReadCallback read_cb(0); // Will call Refresh -#ifndef NDEBUG - if (ts_sz > 0) { - assert(read_options.timestamp); - assert(read_options.timestamp->size() == ts_sz); + if (read_options.timestamp) { + const Status s = FailIfTsSizesMismatch(get_impl_options.column_family, + *(read_options.timestamp)); + if (!s.ok()) { + return s; + } } else { - assert(!read_options.timestamp); + const Status s = FailIfCfHasTs(get_impl_options.column_family); + if (!s.ok()) { + return s; + } } -#endif // NDEBUG + + GetWithTimestampReadCallback read_cb(0); // Will call Refresh PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock); StopWatch sw(immutable_db_options_.clock, stats_, DB_GET); @@ -1811,7 +1813,9 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, // only if t <= read_opts.timestamp and s <= snapshot. // HACK: temporarily overwrite input struct field but restore SaveAndRestore restore_callback(&get_impl_options.callback); - if (ts_sz > 0) { + const Comparator* ucmp = get_impl_options.column_family->GetComparator(); + assert(ucmp); + if (ucmp->timestamp_size() > 0) { assert(!get_impl_options .callback); // timestamp with callback is not supported read_cb.Refresh(snapshot); @@ -1834,7 +1838,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, bool skip_memtable = (read_options.read_tier == kPersistedTier && has_unpersisted_data_.load(std::memory_order_relaxed)); bool done = false; - std::string* timestamp = ts_sz > 0 ? get_impl_options.timestamp : nullptr; + std::string* timestamp = + ucmp->timestamp_size() > 0 ? get_impl_options.timestamp : nullptr; if (!skip_memtable) { // Get value associated with key if (get_impl_options.get_value) { @@ -1941,19 +1946,36 @@ std::vector DBImpl::MultiGet( StopWatch sw(immutable_db_options_.clock, stats_, DB_MULTIGET); PERF_TIMER_GUARD(get_snapshot_time); -#ifndef NDEBUG - for (const auto* cfh : column_family) { - assert(cfh); - const Comparator* const ucmp = cfh->GetComparator(); - assert(ucmp); - if (ucmp->timestamp_size() > 0) { - assert(read_options.timestamp); - assert(ucmp->timestamp_size() == read_options.timestamp->size()); + size_t num_keys = keys.size(); + assert(column_family.size() == num_keys); + std::vector stat_list(num_keys); + + bool should_fail = false; + for (size_t i = 0; i < num_keys; ++i) { + assert(column_family[i]); + if (read_options.timestamp) { + stat_list[i] = + FailIfTsSizesMismatch(column_family[i], *(read_options.timestamp)); + if (!stat_list[i].ok()) { + should_fail = true; + } } else { - assert(!read_options.timestamp); + stat_list[i] = FailIfCfHasTs(column_family[i]); + if (!stat_list[i].ok()) { + should_fail = true; + } } } -#endif // NDEBUG + + if (should_fail) { + for (auto& s : stat_list) { + if (s.ok()) { + s = Status::Incomplete( + "DB not queried due to invalid argument(s) in the same MultiGet"); + } + } + return stat_list; + } if (tracer_) { // TODO: This mutex should be removed later, to improve performance when @@ -1996,8 +2018,6 @@ std::vector DBImpl::MultiGet( MergeContext merge_context; // Note: this always resizes the values array - size_t num_keys = keys.size(); - std::vector stat_list(num_keys); values->resize(num_keys); if (timestamps) { timestamps->resize(num_keys); @@ -2262,20 +2282,31 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys, return; } -#ifndef NDEBUG + bool should_fail = false; for (size_t i = 0; i < num_keys; ++i) { ColumnFamilyHandle* cfh = column_families[i]; assert(cfh); - const Comparator* const ucmp = cfh->GetComparator(); - assert(ucmp); - if (ucmp->timestamp_size() > 0) { - assert(read_options.timestamp); - assert(read_options.timestamp->size() == ucmp->timestamp_size()); + if (read_options.timestamp) { + statuses[i] = FailIfTsSizesMismatch(cfh, *(read_options.timestamp)); + if (!statuses[i].ok()) { + should_fail = true; + } } else { - assert(!read_options.timestamp); + statuses[i] = FailIfCfHasTs(cfh); + if (!statuses[i].ok()) { + should_fail = true; + } } } -#endif // NDEBUG + if (should_fail) { + for (size_t i = 0; i < num_keys; ++i) { + if (statuses[i].ok()) { + statuses[i] = Status::Incomplete( + "DB not queried due to invalid argument(s) in the same MultiGet"); + } + } + return; + } if (tracer_) { // TODO: This mutex should be removed later, to improve performance when @@ -2903,6 +2934,21 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, "ReadTier::kPersistedData is not yet supported in iterators.")); } + assert(column_family); + + if (read_options.timestamp) { + const Status s = + FailIfTsSizesMismatch(column_family, *(read_options.timestamp)); + if (!s.ok()) { + return NewErrorIterator(s); + } + } else { + const Status s = FailIfCfHasTs(column_family); + if (!s.ok()) { + return NewErrorIterator(s); + } + } + auto cfh = static_cast_with_check(column_family); ColumnFamilyData* cfd = cfh->cfd(); assert(cfd != nullptr); @@ -3029,6 +3075,25 @@ Status DBImpl::NewIterators( return Status::NotSupported( "ReadTier::kPersistedData is not yet supported in iterators."); } + + if (read_options.timestamp) { + for (auto* cf : column_families) { + assert(cf); + const Status s = FailIfTsSizesMismatch(cf, *(read_options.timestamp)); + if (!s.ok()) { + return s; + } + } + } else { + for (auto* cf : column_families) { + assert(cf); + const Status s = FailIfCfHasTs(cf); + if (!s.ok()) { + return s; + } + } + } + ReadCallback* read_callback = nullptr; // No read callback provided. iterators->clear(); iterators->reserve(column_families.size()); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index c7328b0ac..af4b1dd77 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -146,24 +146,36 @@ class DBImpl : public DB { // ---- Implementations of the DB interface ---- using DB::Resume; - virtual Status Resume() override; + Status Resume() override; using DB::Put; - virtual Status Put(const WriteOptions& options, - ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) override; + Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value) override; + Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family, + const Slice& key, const Slice& ts, const Slice& value) override; + using DB::Merge; - virtual Status Merge(const WriteOptions& options, - ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) override; + Status Merge(const WriteOptions& options, ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value) override; using DB::Delete; - virtual Status Delete(const WriteOptions& options, - ColumnFamilyHandle* column_family, - const Slice& key) override; + Status Delete(const WriteOptions& options, ColumnFamilyHandle* column_family, + const Slice& key) override; + Status Delete(const WriteOptions& options, ColumnFamilyHandle* column_family, + const Slice& key, const Slice& ts) override; + using DB::SingleDelete; - virtual Status SingleDelete(const WriteOptions& options, - ColumnFamilyHandle* column_family, - const Slice& key) override; + Status SingleDelete(const WriteOptions& options, + ColumnFamilyHandle* column_family, + const Slice& key) override; + Status SingleDelete(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& ts) override; + + using DB::DeleteRange; + Status DeleteRange(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& begin_key, + const Slice& end_key) override; + using DB::Write; virtual Status Write(const WriteOptions& options, WriteBatch* updates) override; @@ -1368,6 +1380,10 @@ class DBImpl : public DB { // to ensure that db_session_id_ gets updated every time the DB is opened void SetDbSessionId(); + Status FailIfCfHasTs(const ColumnFamilyHandle* column_family) const; + Status FailIfTsSizesMismatch(const ColumnFamilyHandle* column_family, + const Slice& ts) const; + private: friend class DB; friend class ErrorHandler; @@ -2396,4 +2412,43 @@ static void ClipToRange(T* ptr, V minvalue, V maxvalue) { if (static_cast(*ptr) < minvalue) *ptr = minvalue; } +inline Status DBImpl::FailIfCfHasTs( + const ColumnFamilyHandle* column_family) const { + column_family = column_family ? column_family : DefaultColumnFamily(); + assert(column_family); + const Comparator* const ucmp = column_family->GetComparator(); + assert(ucmp); + if (ucmp->timestamp_size() > 0) { + std::ostringstream oss; + oss << "cannot call this method on column family " + << column_family->GetName() << " that enables timestamp"; + return Status::InvalidArgument(oss.str()); + } + return Status::OK(); +} + +inline Status DBImpl::FailIfTsSizesMismatch( + const ColumnFamilyHandle* column_family, const Slice& ts) const { + if (!column_family) { + return Status::InvalidArgument("column family handle cannot be null"); + } + assert(column_family); + const Comparator* const ucmp = column_family->GetComparator(); + assert(ucmp); + if (0 == ucmp->timestamp_size()) { + std::stringstream oss; + oss << "cannot call this method on column family " + << column_family->GetName() << " that does not enable timestamp"; + return Status::InvalidArgument(oss.str()); + } + const size_t ts_sz = ts.size(); + if (ts_sz != ucmp->timestamp_size()) { + std::stringstream oss; + oss << "Timestamp sizes mismatch: expect " << ucmp->timestamp_size() << ", " + << ts_sz << " given"; + return Status::InvalidArgument(oss.str()); + } + return Status::OK(); +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_impl_readonly.cc b/db/db_impl/db_impl_readonly.cc index 6970dcefd..d4a92db08 100644 --- a/db/db_impl/db_impl_readonly.cc +++ b/db/db_impl/db_impl_readonly.cc @@ -36,6 +36,15 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options, assert(pinnable_val != nullptr); // TODO: stopwatch DB_GET needed?, perf timer needed? PERF_TIMER_GUARD(get_snapshot_time); + + assert(column_family); + const Comparator* ucmp = column_family->GetComparator(); + assert(ucmp); + if (ucmp->timestamp_size() || read_options.timestamp) { + // TODO: support timestamp + return Status::NotSupported(); + } + Status s; SequenceNumber snapshot = versions_->LastSequence(); auto cfh = static_cast_with_check(column_family); @@ -73,6 +82,13 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options, Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options, ColumnFamilyHandle* column_family) { + assert(column_family); + const Comparator* ucmp = column_family->GetComparator(); + assert(ucmp); + if (ucmp->timestamp_size() || read_options.timestamp) { + // TODO: support timestamp + return NewErrorIterator(Status::NotSupported()); + } auto cfh = static_cast_with_check(column_family); auto cfd = cfh->cfd(); SuperVersion* super_version = cfd->GetSuperVersion()->Ref(); @@ -100,6 +116,21 @@ Status DBImplReadOnly::NewIterators( const ReadOptions& read_options, const std::vector& column_families, std::vector* iterators) { + if (read_options.timestamp) { + // TODO: support timestamp + return Status::NotSupported(); + } else { + for (auto* cf : column_families) { + assert(cf); + const Comparator* ucmp = cf->GetComparator(); + assert(ucmp); + if (ucmp->timestamp_size()) { + // TODO: support timestamp + return Status::NotSupported(); + } + } + } + ReadCallback* read_callback = nullptr; // No read callback provided. if (iterators == nullptr) { return Status::InvalidArgument("iterators not allowed to be nullptr"); diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index 9a0b8af9f..39250ecfa 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -339,6 +339,14 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options, StopWatch sw(immutable_db_options_.clock, stats_, DB_GET); PERF_TIMER_GUARD(get_snapshot_time); + assert(column_family); + const Comparator* ucmp = column_family->GetComparator(); + assert(ucmp); + if (ucmp->timestamp_size() || read_options.timestamp) { + // TODO: support timestamp + return Status::NotSupported(); + } + auto cfh = static_cast(column_family); ColumnFamilyData* cfd = cfh->cfd(); if (tracer_) { @@ -404,6 +412,15 @@ Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options, return NewErrorIterator(Status::NotSupported( "ReadTier::kPersistedData is not yet supported in iterators.")); } + + assert(column_family); + const Comparator* ucmp = column_family->GetComparator(); + assert(ucmp); + if (ucmp->timestamp_size() || read_options.timestamp) { + // TODO: support timestamp + return NewErrorIterator(Status::NotSupported()); + } + Iterator* result = nullptr; auto cfh = static_cast_with_check(column_family); auto cfd = cfh->cfd(); @@ -460,6 +477,20 @@ Status DBImplSecondary::NewIterators( if (iterators == nullptr) { return Status::InvalidArgument("iterators not allowed to be nullptr"); } + if (read_options.timestamp) { + // TODO: support timestamp + return Status::NotSupported(); + } else { + for (auto* cf : column_families) { + assert(cf); + const Comparator* ucmp = cf->GetComparator(); + assert(ucmp); + if (ucmp->timestamp_size()) { + // TODO: support timestamp + return Status::NotSupported(); + } + } + } iterators->clear(); iterators->reserve(column_families.size()); if (read_options.tailing) { diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 72a47d83a..a5bd89a7e 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -21,11 +21,28 @@ namespace ROCKSDB_NAMESPACE { // Convenience methods Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family, const Slice& key, const Slice& val) { + const Status s = FailIfCfHasTs(column_family); + if (!s.ok()) { + return s; + } return DB::Put(o, column_family, key, val); } +Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family, + const Slice& key, const Slice& ts, const Slice& val) { + const Status s = FailIfTsSizesMismatch(column_family, ts); + if (!s.ok()) { + return s; + } + return DB::Put(o, column_family, key, ts, val); +} + Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family, const Slice& key, const Slice& val) { + const Status s = FailIfCfHasTs(column_family); + if (!s.ok()) { + return s; + } auto cfh = static_cast_with_check(column_family); if (!cfh->cfd()->ioptions()->merge_operator) { return Status::NotSupported("Provide a merge_operator when opening DB"); @@ -36,22 +53,61 @@ Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family, Status DBImpl::Delete(const WriteOptions& write_options, ColumnFamilyHandle* column_family, const Slice& key) { + const Status s = FailIfCfHasTs(column_family); + if (!s.ok()) { + return s; + } return DB::Delete(write_options, column_family, key); } +Status DBImpl::Delete(const WriteOptions& write_options, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& ts) { + const Status s = FailIfTsSizesMismatch(column_family, ts); + if (!s.ok()) { + return s; + } + return DB::Delete(write_options, column_family, key, ts); +} + Status DBImpl::SingleDelete(const WriteOptions& write_options, ColumnFamilyHandle* column_family, const Slice& key) { + const Status s = FailIfCfHasTs(column_family); + if (!s.ok()) { + return s; + } return DB::SingleDelete(write_options, column_family, key); } +Status DBImpl::SingleDelete(const WriteOptions& write_options, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& ts) { + const Status s = FailIfTsSizesMismatch(column_family, ts); + if (!s.ok()) { + return s; + } + return DB::SingleDelete(write_options, column_family, key, ts); +} + +Status DBImpl::DeleteRange(const WriteOptions& write_options, + ColumnFamilyHandle* column_family, + const Slice& begin_key, const Slice& end_key) { + const Status s = FailIfCfHasTs(column_family); + if (!s.ok()) { + return s; + } + return DB::DeleteRange(write_options, column_family, begin_key, end_key); +} + void DBImpl::SetRecoverableStatePreReleaseCallback( PreReleaseCallback* callback) { recoverable_state_pre_release_callback_.reset(callback); } Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) { - return WriteImpl(write_options, my_batch, nullptr, nullptr); + return WriteImpl(write_options, my_batch, /*callback=*/nullptr, + /*log_used=*/nullptr); } #ifndef ROCKSDB_LITE @@ -74,6 +130,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, assert(!seq_per_batch_ || batch_cnt != 0); if (my_batch == nullptr) { return Status::Corruption("Batch is nullptr!"); + } else if (WriteBatchInternal::TimestampsUpdateNeeded(*my_batch)) { + return Status::InvalidArgument("write batch must have timestamp(s) set"); } // TODO: this use of operator bool on `tracer_` can avoid unnecessary lock // grabs but does not seem thread-safe. @@ -283,6 +341,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, size_t total_byte_size = 0; size_t pre_release_callback_cnt = 0; for (auto* writer : write_group) { + assert(writer); if (writer->CheckCallback(this)) { valid_batches += writer->batch_cnt; if (writer->ShouldWriteToMemtable()) { @@ -487,7 +546,8 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, WriteContext write_context; WriteThread::Writer w(write_options, my_batch, callback, log_ref, - disable_memtable); + disable_memtable, /*_batch_cnt=*/0, + /*_pre_release_callback=*/nullptr); write_thread_.JoinBatchGroup(&w); TEST_SYNC_POINT("DBImplWrite::PipelinedWriteImpl:AfterJoinBatchGroup"); if (w.state == WriteThread::STATE_GROUP_LEADER) { @@ -526,7 +586,8 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, } } SequenceNumber next_sequence = current_sequence; - for (auto writer : wal_write_group) { + for (auto* writer : wal_write_group) { + assert(writer); if (writer->CheckCallback(this)) { if (writer->ShouldWriteToMemtable()) { writer->sequence = next_sequence; @@ -764,6 +825,7 @@ Status DBImpl::WriteImplWALOnly( size_t pre_release_callback_cnt = 0; size_t total_byte_size = 0; for (auto* writer : write_group) { + assert(writer); if (writer->CheckCallback(this)) { total_byte_size = WriteBatchInternal::AppendedByteSize( total_byte_size, WriteBatchInternal::ByteSize(writer->batch)); @@ -2019,34 +2081,25 @@ size_t DBImpl::GetWalPreallocateBlockSize(uint64_t write_buffer_size) const { // can call if they wish Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { - if (nullptr == opt.timestamp) { - // Pre-allocate size of write batch conservatively. - // 8 bytes are taken by header, 4 bytes for count, 1 byte for type, - // and we allocate 11 extra bytes for key length, as well as value length. - WriteBatch batch(key.size() + value.size() + 24); - Status s = batch.Put(column_family, key, value); - if (!s.ok()) { - return s; - } - return Write(opt, &batch); - } - const Slice* ts = opt.timestamp; - assert(nullptr != ts); - size_t ts_sz = ts->size(); - assert(column_family->GetComparator()); - assert(ts_sz == column_family->GetComparator()->timestamp_size()); - WriteBatch batch; - Status s; - if (key.data() + key.size() == ts->data()) { - Slice key_with_ts = Slice(key.data(), key.size() + ts_sz); - s = batch.Put(column_family, key_with_ts, value); - } else { - std::array key_with_ts_slices{{key, *ts}}; - SliceParts key_with_ts(key_with_ts_slices.data(), 2); - std::array value_slices{{value}}; - SliceParts values(value_slices.data(), 1); - s = batch.Put(column_family, key_with_ts, values); + // Pre-allocate size of write batch conservatively. + // 8 bytes are taken by header, 4 bytes for count, 1 byte for type, + // and we allocate 11 extra bytes for key length, as well as value length. + WriteBatch batch(key.size() + value.size() + 24); + Status s = batch.Put(column_family, key, value); + if (!s.ok()) { + return s; } + return Write(opt, &batch); +} + +Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family, + const Slice& key, const Slice& ts, const Slice& value) { + ColumnFamilyHandle* default_cf = DefaultColumnFamily(); + assert(default_cf); + const Comparator* const default_cf_ucmp = default_cf->GetComparator(); + assert(default_cf_ucmp); + WriteBatch batch(0, 0, 0, default_cf_ucmp->timestamp_size()); + Status s = batch.Put(column_family, key, ts, value); if (!s.ok()) { return s; } @@ -2055,29 +2108,22 @@ Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family, Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family, const Slice& key) { - if (nullptr == opt.timestamp) { - WriteBatch batch; - Status s = batch.Delete(column_family, key); - if (!s.ok()) { - return s; - } - return Write(opt, &batch); - } - const Slice* ts = opt.timestamp; - assert(ts != nullptr); - size_t ts_sz = ts->size(); - assert(column_family->GetComparator()); - assert(ts_sz == column_family->GetComparator()->timestamp_size()); WriteBatch batch; - Status s; - if (key.data() + key.size() == ts->data()) { - Slice key_with_ts = Slice(key.data(), key.size() + ts_sz); - s = batch.Delete(column_family, key_with_ts); - } else { - std::array key_with_ts_slices{{key, *ts}}; - SliceParts key_with_ts(key_with_ts_slices.data(), 2); - s = batch.Delete(column_family, key_with_ts); + Status s = batch.Delete(column_family, key); + if (!s.ok()) { + return s; } + return Write(opt, &batch); +} + +Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family, + const Slice& key, const Slice& ts) { + ColumnFamilyHandle* default_cf = DefaultColumnFamily(); + assert(default_cf); + const Comparator* const default_cf_ucmp = default_cf->GetComparator(); + assert(default_cf_ucmp); + WriteBatch batch(0, 0, 0, default_cf_ucmp->timestamp_size()); + Status s = batch.Delete(column_family, key, ts); if (!s.ok()) { return s; } @@ -2086,36 +2132,27 @@ Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family, Status DB::SingleDelete(const WriteOptions& opt, ColumnFamilyHandle* column_family, const Slice& key) { - Status s; - if (opt.timestamp == nullptr) { - WriteBatch batch; - s = batch.SingleDelete(column_family, key); - if (!s.ok()) { - return s; - } - s = Write(opt, &batch); - return s; - } - - const Slice* ts = opt.timestamp; - assert(ts != nullptr); - size_t ts_sz = ts->size(); - assert(column_family->GetComparator()); - assert(ts_sz == column_family->GetComparator()->timestamp_size()); WriteBatch batch; - if (key.data() + key.size() == ts->data()) { - Slice key_with_ts = Slice(key.data(), key.size() + ts_sz); - s = batch.SingleDelete(column_family, key_with_ts); - } else { - std::array key_with_ts_slices{{key, *ts}}; - SliceParts key_with_ts(key_with_ts_slices.data(), 2); - s = batch.SingleDelete(column_family, key_with_ts); - } + Status s = batch.SingleDelete(column_family, key); if (!s.ok()) { return s; } - s = Write(opt, &batch); - return s; + return Write(opt, &batch); +} + +Status DB::SingleDelete(const WriteOptions& opt, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& ts) { + ColumnFamilyHandle* default_cf = DefaultColumnFamily(); + assert(default_cf); + const Comparator* const default_cf_ucmp = default_cf->GetComparator(); + assert(default_cf_ucmp); + WriteBatch batch(0, 0, 0, default_cf_ucmp->timestamp_size()); + Status s = batch.SingleDelete(column_family, key, ts); + if (!s.ok()) { + return s; + } + return Write(opt, &batch); } Status DB::DeleteRange(const WriteOptions& opt, diff --git a/db/db_kv_checksum_test.cc b/db/db_kv_checksum_test.cc index 48f50422f..0375dd18c 100644 --- a/db/db_kv_checksum_test.cc +++ b/db/db_kv_checksum_test.cc @@ -37,7 +37,7 @@ class DbKvChecksumTest std::pair GetWriteBatch(ColumnFamilyHandle* cf_handle) { Status s; WriteBatch wb(0 /* reserved_bytes */, 0 /* max_bytes */, - 8 /* protection_bytes_per_entry */); + 8 /* protection_bytes_per_entry */, 0 /* default_cf_ts_sz */); switch (op_type_) { case WriteBatchOpType::kPut: s = wb.Put(cf_handle, "key", "val"); diff --git a/db/db_test.cc b/db/db_test.cc index 8f6609141..6dfc0d245 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2862,6 +2862,11 @@ class ModelDB : public DB { } return Write(o, &batch); } + Status Put(const WriteOptions& /*o*/, ColumnFamilyHandle* /*cf*/, + const Slice& /*k*/, const Slice& /*ts*/, + const Slice& /*v*/) override { + return Status::NotSupported(); + } using DB::Close; Status Close() override { return Status::OK(); } using DB::Delete; @@ -2874,6 +2879,10 @@ class ModelDB : public DB { } return Write(o, &batch); } + Status Delete(const WriteOptions& /*o*/, ColumnFamilyHandle* /*cf*/, + const Slice& /*key*/, const Slice& /*ts*/) override { + return Status::NotSupported(); + } using DB::SingleDelete; Status SingleDelete(const WriteOptions& o, ColumnFamilyHandle* cf, const Slice& key) override { @@ -2884,6 +2893,10 @@ class ModelDB : public DB { } return Write(o, &batch); } + Status SingleDelete(const WriteOptions& /*o*/, ColumnFamilyHandle* /*cf*/, + const Slice& /*key*/, const Slice& /*ts*/) override { + return Status::NotSupported(); + } using DB::Merge; Status Merge(const WriteOptions& o, ColumnFamilyHandle* cf, const Slice& k, const Slice& v) override { diff --git a/db/db_test2.cc b/db/db_test2.cc index a61add461..d3b909cbf 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -6845,16 +6845,13 @@ TEST_F(DBTest2, GetLatestSeqAndTsForKey) { constexpr uint64_t kTsU64Value = 12; for (uint64_t key = 0; key < 100; ++key) { - std::string ts_str; - PutFixed64(&ts_str, kTsU64Value); - Slice ts = ts_str; - WriteOptions write_opts; - write_opts.timestamp = &ts; + std::string ts; + PutFixed64(&ts, kTsU64Value); std::string key_str; PutFixed64(&key_str, key); std::reverse(key_str.begin(), key_str.end()); - ASSERT_OK(Put(key_str, "value", write_opts)); + ASSERT_OK(db_->Put(WriteOptions(), key_str, ts, "value")); } ASSERT_OK(Flush()); diff --git a/db/db_with_timestamp_basic_test.cc b/db/db_with_timestamp_basic_test.cc index 82ebb53fc..d5e5f65eb 100644 --- a/db/db_with_timestamp_basic_test.cc +++ b/db/db_with_timestamp_basic_test.cc @@ -196,6 +196,83 @@ class DBBasicTestWithTimestamp : public DBBasicTestWithTimestampBase { : DBBasicTestWithTimestampBase("db_basic_test_with_timestamp") {} }; +TEST_F(DBBasicTestWithTimestamp, SanityChecks) { + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + options.avoid_flush_during_shutdown = true; + options.merge_operator = MergeOperators::CreateStringAppendTESTOperator(); + DestroyAndReopen(options); + + Options options1 = CurrentOptions(); + options1.env = env_; + options1.comparator = test::ComparatorWithU64Ts(); + options1.merge_operator = MergeOperators::CreateStringAppendTESTOperator(); + assert(options1.comparator && + options1.comparator->timestamp_size() == sizeof(uint64_t)); + ColumnFamilyHandle* handle = nullptr; + Status s = db_->CreateColumnFamily(options1, "data", &handle); + ASSERT_OK(s); + + std::string dummy_ts(sizeof(uint64_t), '\0'); + // Perform timestamp operations on default cf. + ASSERT_TRUE( + db_->Put(WriteOptions(), "key", dummy_ts, "value").IsInvalidArgument()); + ASSERT_TRUE(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), "key", + dummy_ts, "value") + .IsNotSupported()); + ASSERT_TRUE(db_->Delete(WriteOptions(), "key", dummy_ts).IsInvalidArgument()); + ASSERT_TRUE( + db_->SingleDelete(WriteOptions(), "key", dummy_ts).IsInvalidArgument()); + ASSERT_TRUE(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), + "begin_key", "end_key", dummy_ts) + .IsNotSupported()); + + // Perform non-timestamp operations on "data" cf. + ASSERT_TRUE( + db_->Put(WriteOptions(), handle, "key", "value").IsInvalidArgument()); + ASSERT_TRUE(db_->Delete(WriteOptions(), handle, "key").IsInvalidArgument()); + ASSERT_TRUE( + db_->SingleDelete(WriteOptions(), handle, "key").IsInvalidArgument()); + + ASSERT_TRUE( + db_->Merge(WriteOptions(), handle, "key", "value").IsInvalidArgument()); + ASSERT_TRUE(db_->DeleteRange(WriteOptions(), handle, "begin_key", "end_key") + .IsInvalidArgument()); + + { + WriteBatch wb; + ASSERT_OK(wb.Put(handle, "key", "value")); + ASSERT_TRUE(db_->Write(WriteOptions(), &wb).IsInvalidArgument()); + } + { + WriteBatch wb; + ASSERT_OK(wb.Delete(handle, "key")); + ASSERT_TRUE(db_->Write(WriteOptions(), &wb).IsInvalidArgument()); + } + { + WriteBatch wb; + ASSERT_OK(wb.SingleDelete(handle, "key")); + ASSERT_TRUE(db_->Write(WriteOptions(), &wb).IsInvalidArgument()); + } + + // Perform timestamp operations with timestamps of incorrect size. + const std::string wrong_ts(sizeof(uint32_t), '\0'); + ASSERT_TRUE(db_->Put(WriteOptions(), handle, "key", wrong_ts, "value") + .IsInvalidArgument()); + ASSERT_TRUE(db_->Merge(WriteOptions(), handle, "key", wrong_ts, "value") + .IsNotSupported()); + ASSERT_TRUE( + db_->Delete(WriteOptions(), handle, "key", wrong_ts).IsInvalidArgument()); + ASSERT_TRUE(db_->SingleDelete(WriteOptions(), handle, "key", wrong_ts) + .IsInvalidArgument()); + ASSERT_TRUE( + db_->DeleteRange(WriteOptions(), handle, "begin_key", "end_key", wrong_ts) + .IsNotSupported()); + + delete handle; +} + TEST_F(DBBasicTestWithTimestamp, MixedCfs) { Options options = CurrentOptions(); options.env = env_; @@ -214,35 +291,37 @@ TEST_F(DBBasicTestWithTimestamp, MixedCfs) { WriteBatch wb; ASSERT_OK(wb.Put("a", "value")); - { - std::string key("a"); - std::string ts(kTimestampSize, '\0'); - std::array key_with_ts_slices{{key, ts}}; - SliceParts key_with_ts(key_with_ts_slices.data(), 2); - std::string value_str("value"); - Slice value_slice(value_str.data(), value_str.size()); - SliceParts value(&value_slice, 1); - ASSERT_OK(wb.Put(handle, key_with_ts, value)); - } + ASSERT_OK(wb.Put(handle, "a", "value")); { std::string ts = Timestamp(1, 0); - std::vector ts_list({Slice(), ts}); - ASSERT_OK(wb.AssignTimestamps(ts_list)); + const auto ts_sz_func = [kTimestampSize, handle](uint32_t cf_id) { + assert(handle); + if (cf_id == 0) { + return static_cast(0); + } else if (cf_id == handle->GetID()) { + return kTimestampSize; + } else { + assert(false); + return std::numeric_limits::max(); + } + }; + ASSERT_OK(wb.UpdateTimestamps(ts, ts_sz_func)); ASSERT_OK(db_->Write(WriteOptions(), &wb)); } - const auto verify_db = [this](ColumnFamilyHandle* h) { - ASSERT_EQ("value", Get("a")); - std::string ts = Timestamp(1, 0); + const auto verify_db = [this](ColumnFamilyHandle* h, const std::string& key, + const std::string& ts, + const std::string& expected_value) { + ASSERT_EQ(expected_value, Get(key)); Slice read_ts_slice(ts); ReadOptions read_opts; read_opts.timestamp = &read_ts_slice; std::string value; - ASSERT_OK(db_->Get(read_opts, h, "a", &value)); - ASSERT_EQ("value", value); + ASSERT_OK(db_->Get(read_opts, h, key, &value)); + ASSERT_EQ(expected_value, value); }; - verify_db(handle); + verify_db(handle, "a", Timestamp(1, 0), "value"); delete handle; Close(); @@ -254,7 +333,7 @@ TEST_F(DBBasicTestWithTimestamp, MixedCfs) { s = DB::Open(options, dbname_, cf_descs, &handles_, &db_); ASSERT_OK(s); - verify_db(handles_[1]); + verify_db(handles_[1], "a", Timestamp(1, 0), "value"); Close(); } @@ -269,14 +348,12 @@ TEST_F(DBBasicTestWithTimestamp, CompactRangeWithSpecifiedRange) { DestroyAndReopen(options); WriteOptions write_opts; - std::string ts_str = Timestamp(1, 0); - Slice ts = ts_str; - write_opts.timestamp = &ts; + std::string ts = Timestamp(1, 0); - ASSERT_OK(db_->Put(write_opts, "foo1", "bar")); + ASSERT_OK(db_->Put(write_opts, "foo1", ts, "bar")); ASSERT_OK(Flush()); - ASSERT_OK(db_->Put(write_opts, "foo2", "bar")); + ASSERT_OK(db_->Put(write_opts, "foo2", ts, "bar")); ASSERT_OK(Flush()); std::string start_str = "foo"; @@ -298,24 +375,18 @@ TEST_F(DBBasicTestWithTimestamp, GcPreserveLatestVersionBelowFullHistoryLow) { std::string ts_str = Timestamp(1, 0); WriteOptions wopts; - Slice ts = ts_str; - wopts.timestamp = &ts; - ASSERT_OK(db_->Put(wopts, "k1", "v1")); - ASSERT_OK(db_->Put(wopts, "k2", "v2")); - ASSERT_OK(db_->Put(wopts, "k3", "v3")); + ASSERT_OK(db_->Put(wopts, "k1", ts_str, "v1")); + ASSERT_OK(db_->Put(wopts, "k2", ts_str, "v2")); + ASSERT_OK(db_->Put(wopts, "k3", ts_str, "v3")); ts_str = Timestamp(2, 0); - ts = ts_str; - wopts.timestamp = &ts; - ASSERT_OK(db_->Delete(wopts, "k3")); + ASSERT_OK(db_->Delete(wopts, "k3", ts_str)); ts_str = Timestamp(4, 0); - ts = ts_str; - wopts.timestamp = &ts; - ASSERT_OK(db_->Put(wopts, "k1", "v5")); + ASSERT_OK(db_->Put(wopts, "k1", ts_str, "v5")); ts_str = Timestamp(3, 0); - ts = ts_str; + Slice ts = ts_str; CompactRangeOptions cro; cro.full_history_ts_low = &ts; ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); @@ -362,10 +433,8 @@ TEST_F(DBBasicTestWithTimestamp, UpdateFullHistoryTsLow) { for (int i = 0; i < 10; i++) { WriteOptions write_opts; - std::string ts_str = Timestamp(i, 0); - Slice ts = ts_str; - write_opts.timestamp = &ts; - ASSERT_OK(db_->Put(write_opts, kKey, Key(i))); + std::string ts = Timestamp(i, 0); + ASSERT_OK(db_->Put(write_opts, kKey, ts, Key(i))); } ASSERT_OK(Flush()); @@ -388,10 +457,8 @@ TEST_F(DBBasicTestWithTimestamp, UpdateFullHistoryTsLow) { // Test set ts_low and then trigger compaction for (int i = 10; i < 20; i++) { WriteOptions write_opts; - std::string ts_str = Timestamp(i, 0); - Slice ts = ts_str; - write_opts.timestamp = &ts; - ASSERT_OK(db_->Put(write_opts, kKey, Key(i))); + std::string ts = Timestamp(i, 0); + ASSERT_OK(db_->Put(write_opts, kKey, ts, Key(i))); } ASSERT_OK(Flush()); @@ -492,14 +559,12 @@ TEST_F(DBBasicTestWithTimestamp, GetApproximateSizes) { auto default_cf = db_->DefaultColumnFamily(); WriteOptions write_opts; - std::string ts_str = Timestamp(1, 0); - Slice ts = ts_str; - write_opts.timestamp = &ts; + std::string ts = Timestamp(1, 0); const int N = 128; Random rnd(301); for (int i = 0; i < N; i++) { - ASSERT_OK(db_->Put(write_opts, Key(i), rnd.RandomString(1024))); + ASSERT_OK(db_->Put(write_opts, Key(i), ts, rnd.RandomString(1024))); } uint64_t size; @@ -538,7 +603,7 @@ TEST_F(DBBasicTestWithTimestamp, GetApproximateSizes) { ASSERT_EQ(size, 0); // Test range boundaries - ASSERT_OK(db_->Put(write_opts, Key(1000), rnd.RandomString(1024))); + ASSERT_OK(db_->Put(write_opts, Key(1000), ts, rnd.RandomString(1024))); // Should include start key start = Key(1000); end = Key(1100); @@ -577,10 +642,9 @@ TEST_F(DBBasicTestWithTimestamp, SimpleIterate) { Timestamp(4, 0)}; for (size_t i = 0; i < write_timestamps.size(); ++i) { WriteOptions write_opts; - Slice write_ts = write_timestamps[i]; - write_opts.timestamp = &write_ts; for (uint64_t key = start_keys[i]; key <= kMaxKey; ++key) { - Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(i)); + Status s = db_->Put(write_opts, Key1(key), write_timestamps[i], + "value" + std::to_string(i)); ASSERT_OK(s); } } @@ -650,11 +714,8 @@ TEST_F(DBBasicTestWithTimestamp, GetTimestampTableProperties) { // Create 2 tables for (int table = 0; table < 2; ++table) { for (int i = 0; i < 10; i++) { - WriteOptions write_opts; - std::string ts_str = Timestamp(i, 0); - Slice ts = ts_str; - write_opts.timestamp = &ts; - ASSERT_OK(db_->Put(write_opts, "key", Key(i))); + std::string ts = Timestamp(i, 0); + ASSERT_OK(db_->Put(WriteOptions(), "key", ts, Key(i))); } ASSERT_OK(Flush()); } @@ -708,10 +769,8 @@ TEST_P(DBBasicTestWithTimestampTableOptions, GetAndMultiGet) { constexpr uint64_t kNumKeys = 1024; for (uint64_t k = 0; k < kNumKeys; ++k) { WriteOptions write_opts; - std::string ts_str = Timestamp(1, 0); - Slice ts = ts_str; - write_opts.timestamp = &ts; - ASSERT_OK(db_->Put(write_opts, Key1(k), "value" + std::to_string(k))); + ASSERT_OK(db_->Put(write_opts, Key1(k), Timestamp(1, 0), + "value" + std::to_string(k))); } ASSERT_OK(Flush()); { @@ -780,26 +839,23 @@ TEST_P(DBBasicTestWithTimestampTableOptions, SeekWithPrefixLessThanKey) { DestroyAndReopen(options); WriteOptions write_opts; - std::string ts_str = Timestamp(1, 0); - Slice ts = ts_str; - write_opts.timestamp = &ts; + std::string ts = Timestamp(1, 0); - ASSERT_OK(db_->Put(write_opts, "foo1", "bar")); + ASSERT_OK(db_->Put(write_opts, "foo1", ts, "bar")); ASSERT_OK(Flush()); - ASSERT_OK(db_->Put(write_opts, "foo2", "bar")); + ASSERT_OK(db_->Put(write_opts, "foo2", ts, "bar")); ASSERT_OK(Flush()); // Move sst file to next level ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); - ASSERT_OK(db_->Put(write_opts, "foo3", "bar")); + ASSERT_OK(db_->Put(write_opts, "foo3", ts, "bar")); ASSERT_OK(Flush()); ReadOptions read_opts; - std::string read_ts = Timestamp(1, 0); - ts = read_ts; - read_opts.timestamp = &ts; + Slice read_ts = ts; + read_opts.timestamp = &read_ts; { std::unique_ptr iter(db_->NewIterator(read_opts)); iter->Seek("foo"); @@ -838,26 +894,24 @@ TEST_P(DBBasicTestWithTimestampTableOptions, SeekWithCappedPrefix) { DestroyAndReopen(options); WriteOptions write_opts; - std::string ts_str = Timestamp(1, 0); - Slice ts = ts_str; - write_opts.timestamp = &ts; + std::string ts = Timestamp(1, 0); - ASSERT_OK(db_->Put(write_opts, "foo1", "bar")); + ASSERT_OK(db_->Put(write_opts, "foo1", ts, "bar")); ASSERT_OK(Flush()); - ASSERT_OK(db_->Put(write_opts, "foo2", "bar")); + ASSERT_OK(db_->Put(write_opts, "foo2", ts, "bar")); ASSERT_OK(Flush()); // Move sst file to next level ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); - ASSERT_OK(db_->Put(write_opts, "foo3", "bar")); + ASSERT_OK(db_->Put(write_opts, "foo3", ts, "bar")); ASSERT_OK(Flush()); ReadOptions read_opts; - std::string read_ts = Timestamp(2, 0); - ts = read_ts; - read_opts.timestamp = &ts; + ts = Timestamp(2, 0); + Slice read_ts = ts; + read_opts.timestamp = &read_ts; { std::unique_ptr iter(db_->NewIterator(read_opts)); // Make sure the prefix extractor doesn't include timestamp, otherwise it @@ -890,29 +944,27 @@ TEST_P(DBBasicTestWithTimestampTableOptions, SeekWithBound) { DestroyAndReopen(options); WriteOptions write_opts; - std::string ts_str = Timestamp(1, 0); - Slice ts = ts_str; - write_opts.timestamp = &ts; + std::string ts = Timestamp(1, 0); - ASSERT_OK(db_->Put(write_opts, "foo1", "bar1")); + ASSERT_OK(db_->Put(write_opts, "foo1", ts, "bar1")); ASSERT_OK(Flush()); - ASSERT_OK(db_->Put(write_opts, "foo2", "bar2")); + ASSERT_OK(db_->Put(write_opts, "foo2", ts, "bar2")); ASSERT_OK(Flush()); // Move sst file to next level ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); for (int i = 3; i < 9; ++i) { - ASSERT_OK(db_->Put(write_opts, "foo" + std::to_string(i), + ASSERT_OK(db_->Put(write_opts, "foo" + std::to_string(i), ts, "bar" + std::to_string(i))); } ASSERT_OK(Flush()); ReadOptions read_opts; - std::string read_ts = Timestamp(2, 0); - ts = read_ts; - read_opts.timestamp = &ts; + ts = Timestamp(2, 0); + Slice read_ts = ts; + read_opts.timestamp = &read_ts; std::string up_bound = "foo5"; // exclusive Slice up_bound_slice = up_bound; std::string lo_bound = "foo2"; // inclusive @@ -954,18 +1006,15 @@ TEST_F(DBBasicTestWithTimestamp, ChangeIterationDirection) { const std::vector> kvs = { std::make_tuple("aa", "value1"), std::make_tuple("ab", "value2")}; for (const auto& ts : timestamps) { - WriteBatch wb; + WriteBatch wb(0, 0, 0, kTimestampSize); for (const auto& kv : kvs) { const std::string& key = std::get<0>(kv); const std::string& value = std::get<1>(kv); - std::array key_with_ts_slices{{Slice(key), Slice(ts)}}; - SliceParts key_with_ts(key_with_ts_slices.data(), 2); - std::array value_slices{{Slice(value)}}; - SliceParts values(value_slices.data(), 1); - ASSERT_OK(wb.Put(key_with_ts, values)); + ASSERT_OK(wb.Put(key, value)); } - ASSERT_OK(wb.AssignTimestamp(ts)); + ASSERT_OK(wb.UpdateTimestamps( + ts, [kTimestampSize](uint32_t) { return kTimestampSize; })); ASSERT_OK(db_->Write(WriteOptions(), &wb)); } std::string read_ts_str = Timestamp(5, 3); @@ -1047,10 +1096,9 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterateLowerTsBound) { Timestamp(1, 0)}; for (size_t i = 0; i < write_timestamps.size(); ++i) { WriteOptions write_opts; - Slice write_ts = write_timestamps[i]; - write_opts.timestamp = &write_ts; for (uint64_t key = 0; key <= kMaxKey; ++key) { - Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(i)); + Status s = db_->Put(write_opts, Key1(key), write_timestamps[i], + "value" + std::to_string(i)); ASSERT_OK(s); } } @@ -1080,10 +1128,8 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterateLowerTsBound) { { std::string write_timestamp = Timestamp(5, 0); WriteOptions write_opts; - Slice write_ts = write_timestamp; - write_opts.timestamp = &write_ts; for (uint64_t key = 0; key < kMaxKey + 1; ++key) { - Status s = db_->Delete(write_opts, Key1(key)); + Status s = db_->Delete(write_opts, Key1(key), write_timestamp); ASSERT_OK(s); } @@ -1099,7 +1145,7 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterateLowerTsBound) { uint64_t key = 0; for (it->Seek(Key1(0)), key = 0; it->Valid(); it->Next(), ++count, ++key) { CheckIterEntry(it.get(), Key1(key), kTypeDeletionWithTimestamp, Slice(), - write_ts); + write_timestamp); // Skip key@ts=3 and land on tombstone key@ts=5 it->Next(); } @@ -1123,10 +1169,8 @@ TEST_F(DBBasicTestWithTimestamp, ReseekToTargetTimestamp) { WriteOptions write_opts; Status s; for (size_t i = 0; i != kNumKeys; ++i) { - std::string ts_str = Timestamp(static_cast(i + 1), 0); - Slice ts = ts_str; - write_opts.timestamp = &ts; - s = db_->Put(write_opts, "foo", "value" + std::to_string(i)); + std::string ts = Timestamp(static_cast(i + 1), 0); + s = db_->Put(write_opts, "foo", ts, "value" + std::to_string(i)); ASSERT_OK(s); } { @@ -1168,29 +1212,17 @@ TEST_F(DBBasicTestWithTimestamp, ReseekToNextUserKey) { WriteOptions write_opts; Status s; for (size_t i = 0; i != kNumKeys; ++i) { - std::string ts_str = Timestamp(static_cast(i + 1), 0); - Slice ts = ts_str; - write_opts.timestamp = &ts; - s = db_->Put(write_opts, "a", "value" + std::to_string(i)); + std::string ts = Timestamp(static_cast(i + 1), 0); + s = db_->Put(write_opts, "a", ts, "value" + std::to_string(i)); ASSERT_OK(s); } { std::string ts_str = Timestamp(static_cast(kNumKeys + 1), 0); - WriteBatch batch; - const std::string dummy_ts(kTimestampSize, '\0'); - { - std::array key_with_ts_slices{{"a", dummy_ts}}; - SliceParts key_with_ts(key_with_ts_slices.data(), 2); - std::array value_slices{{"new_value"}}; - SliceParts values(value_slices.data(), 1); - ASSERT_OK(batch.Put(key_with_ts, values)); - } - { - std::string key_with_ts("b"); - key_with_ts.append(dummy_ts); - ASSERT_OK(batch.Put(key_with_ts, "new_value")); - } - s = batch.AssignTimestamp(ts_str); + WriteBatch batch(0, 0, 0, kTimestampSize); + { ASSERT_OK(batch.Put("a", "new_value")); } + { ASSERT_OK(batch.Put("b", "new_value")); } + s = batch.UpdateTimestamps( + ts_str, [kTimestampSize](uint32_t) { return kTimestampSize; }); ASSERT_OK(s); s = db_->Write(write_opts, &batch); ASSERT_OK(s); @@ -1222,19 +1254,15 @@ TEST_F(DBBasicTestWithTimestamp, ReseekToUserKeyBeforeSavedKey) { options.comparator = &test_cmp; DestroyAndReopen(options); for (size_t i = 0; i < kNumKeys; ++i) { - std::string ts_str = Timestamp(static_cast(i + 1), 0); - Slice ts = ts_str; + std::string ts = Timestamp(static_cast(i + 1), 0); WriteOptions write_opts; - write_opts.timestamp = &ts; - Status s = db_->Put(write_opts, "b", "value" + std::to_string(i)); + Status s = db_->Put(write_opts, "b", ts, "value" + std::to_string(i)); ASSERT_OK(s); } { - std::string ts_str = Timestamp(1, 0); - Slice ts = ts_str; + std::string ts = Timestamp(1, 0); WriteOptions write_opts; - write_opts.timestamp = &ts; - ASSERT_OK(db_->Put(write_opts, "a", "value")); + ASSERT_OK(db_->Put(write_opts, "a", ts, "value")); } { ReadOptions read_opts; @@ -1267,17 +1295,16 @@ TEST_F(DBBasicTestWithTimestamp, MultiGetWithFastLocalBloom) { // Write any value WriteOptions write_opts; - std::string ts_str = Timestamp(1, 0); - Slice ts = ts_str; - write_opts.timestamp = &ts; + std::string ts = Timestamp(1, 0); - ASSERT_OK(db_->Put(write_opts, "foo", "bar")); + ASSERT_OK(db_->Put(write_opts, "foo", ts, "bar")); ASSERT_OK(Flush()); // Read with MultiGet ReadOptions read_opts; - read_opts.timestamp = &ts; + Slice read_ts = ts; + read_opts.timestamp = &read_ts; size_t batch_size = 1; std::vector keys(batch_size); std::vector values(batch_size); @@ -1309,17 +1336,16 @@ TEST_P(DBBasicTestWithTimestampTableOptions, MultiGetWithPrefix) { // Write any value WriteOptions write_opts; - std::string ts_str = Timestamp(1, 0); - Slice ts = ts_str; - write_opts.timestamp = &ts; + std::string ts = Timestamp(1, 0); - ASSERT_OK(db_->Put(write_opts, "foo", "bar")); + ASSERT_OK(db_->Put(write_opts, "foo", ts, "bar")); ASSERT_OK(Flush()); // Read with MultiGet ReadOptions read_opts; - read_opts.timestamp = &ts; + Slice read_ts = ts; + read_opts.timestamp = &read_ts; size_t batch_size = 1; std::vector keys(batch_size); std::vector values(batch_size); @@ -1352,17 +1378,15 @@ TEST_P(DBBasicTestWithTimestampTableOptions, MultiGetWithMemBloomFilter) { // Write any value WriteOptions write_opts; - std::string ts_str = Timestamp(1, 0); - Slice ts = ts_str; - write_opts.timestamp = &ts; + std::string ts = Timestamp(1, 0); - ASSERT_OK(db_->Put(write_opts, "foo", "bar")); + ASSERT_OK(db_->Put(write_opts, "foo", ts, "bar")); // Read with MultiGet - ts_str = Timestamp(2, 0); - ts = ts_str; + ts = Timestamp(2, 0); + Slice read_ts = ts; ReadOptions read_opts; - read_opts.timestamp = &ts; + read_opts.timestamp = &read_ts; size_t batch_size = 1; std::vector keys(batch_size); std::vector values(batch_size); @@ -1393,9 +1417,7 @@ TEST_F(DBBasicTestWithTimestamp, MultiGetRangeFiltering) { // Write any value WriteOptions write_opts; - std::string ts_str = Timestamp(1, 0); - Slice ts = ts_str; - write_opts.timestamp = &ts; + std::string ts = Timestamp(1, 0); // random data for (int i = 0; i < 3; i++) { @@ -1403,22 +1425,22 @@ TEST_F(DBBasicTestWithTimestamp, MultiGetRangeFiltering) { auto value = ToString(i * 10); Slice key_slice = key; Slice value_slice = value; - ASSERT_OK(db_->Put(write_opts, key_slice, value_slice)); + ASSERT_OK(db_->Put(write_opts, key_slice, ts, value_slice)); ASSERT_OK(Flush()); } // Make num_levels to 2 to do key range filtering of sst files ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); - ASSERT_OK(db_->Put(write_opts, "foo", "bar")); + ASSERT_OK(db_->Put(write_opts, "foo", ts, "bar")); ASSERT_OK(Flush()); // Read with MultiGet - ts_str = Timestamp(2, 0); - ts = ts_str; + ts = Timestamp(2, 0); + Slice read_ts = ts; ReadOptions read_opts; - read_opts.timestamp = &ts; + read_opts.timestamp = &read_ts; size_t batch_size = 1; std::vector keys(batch_size); std::vector values(batch_size); @@ -1450,18 +1472,16 @@ TEST_P(DBBasicTestWithTimestampTableOptions, MultiGetPrefixFilter) { DestroyAndReopen(options); WriteOptions write_opts; - std::string ts_str = Timestamp(1, 0); - Slice ts = ts_str; - write_opts.timestamp = &ts; + std::string ts = Timestamp(1, 0); - ASSERT_OK(db_->Put(write_opts, "foo", "bar")); + ASSERT_OK(db_->Put(write_opts, "foo", ts, "bar")); ASSERT_OK(Flush()); // Read with MultiGet - ts_str = Timestamp(2, 0); - ts = ts_str; + ts = Timestamp(2, 0); + Slice read_ts = ts; ReadOptions read_opts; - read_opts.timestamp = &ts; + read_opts.timestamp = &read_ts; size_t batch_size = 1; std::vector keys(batch_size); std::vector values(batch_size); @@ -1489,16 +1509,12 @@ TEST_F(DBBasicTestWithTimestamp, MaxKeysSkippedDuringNext) { WriteOptions write_opts; Status s; { - std::string ts_str = Timestamp(1, 0); - Slice ts = ts_str; - write_opts.timestamp = &ts; - ASSERT_OK(db_->Put(write_opts, "a", "value")); + std::string ts = Timestamp(1, 0); + ASSERT_OK(db_->Put(write_opts, "a", ts, "value")); } for (size_t i = 0; i < kNumKeys; ++i) { - std::string ts_str = Timestamp(static_cast(i + 1), 0); - Slice ts = ts_str; - write_opts.timestamp = &ts; - s = db_->Put(write_opts, "b", "value" + std::to_string(i)); + std::string ts = Timestamp(static_cast(i + 1), 0); + s = db_->Put(write_opts, "b", ts, "value" + std::to_string(i)); ASSERT_OK(s); } { @@ -1528,16 +1544,12 @@ TEST_F(DBBasicTestWithTimestamp, MaxKeysSkippedDuringPrev) { WriteOptions write_opts; Status s; { - std::string ts_str = Timestamp(1, 0); - Slice ts = ts_str; - write_opts.timestamp = &ts; - ASSERT_OK(db_->Put(write_opts, "b", "value")); + std::string ts = Timestamp(1, 0); + ASSERT_OK(db_->Put(write_opts, "b", ts, "value")); } for (size_t i = 0; i < kNumKeys; ++i) { - std::string ts_str = Timestamp(static_cast(i + 1), 0); - Slice ts = ts_str; - write_opts.timestamp = &ts; - s = db_->Put(write_opts, "a", "value" + std::to_string(i)); + std::string ts = Timestamp(static_cast(i + 1), 0); + s = db_->Put(write_opts, "a", ts, "value" + std::to_string(i)); ASSERT_OK(s); } { @@ -1571,42 +1583,36 @@ TEST_F(DBBasicTestWithTimestamp, CompactDeletionWithTimestampMarkerToBottom) { options.level0_file_num_compaction_trigger = 2; DestroyAndReopen(options); WriteOptions write_opts; - std::string ts_str = Timestamp(1, 0); - Slice ts = ts_str; - write_opts.timestamp = &ts; - ASSERT_OK(db_->Put(write_opts, "a", "value0")); + std::string ts = Timestamp(1, 0); + ASSERT_OK(db_->Put(write_opts, "a", ts, "value0")); ASSERT_OK(Flush()); - ts_str = Timestamp(2, 0); - ts = ts_str; - write_opts.timestamp = &ts; - ASSERT_OK(db_->Put(write_opts, "b", "value0")); - ts_str = Timestamp(3, 0); - ts = ts_str; - write_opts.timestamp = &ts; - ASSERT_OK(db_->Delete(write_opts, "a")); + ts = Timestamp(2, 0); + ASSERT_OK(db_->Put(write_opts, "b", ts, "value0")); + ts = Timestamp(3, 0); + ASSERT_OK(db_->Delete(write_opts, "a", ts)); ASSERT_OK(Flush()); ASSERT_OK(dbfull()->TEST_WaitForCompact()); ReadOptions read_opts; - ts_str = Timestamp(1, 0); - ts = ts_str; - read_opts.timestamp = &ts; + ts = Timestamp(1, 0); + Slice read_ts = ts; + read_opts.timestamp = &read_ts; std::string value; Status s = db_->Get(read_opts, "a", &value); ASSERT_OK(s); ASSERT_EQ("value0", value); - ts_str = Timestamp(3, 0); - ts = ts_str; - read_opts.timestamp = &ts; + ts = Timestamp(3, 0); + read_ts = ts; + read_opts.timestamp = &read_ts; s = db_->Get(read_opts, "a", &value); ASSERT_TRUE(s.IsNotFound()); // Time-travel to the past before deletion - ts_str = Timestamp(2, 0); - ts = ts_str; - read_opts.timestamp = &ts; + ts = Timestamp(2, 0); + read_ts = ts; + read_opts.timestamp = &read_ts; s = db_->Get(read_opts, "a", &value); ASSERT_OK(s); ASSERT_EQ("value0", value); @@ -1648,38 +1654,33 @@ TEST_P(DBBasicTestWithTimestampFilterPrefixSettings, GetAndMultiGet) { // Write any value WriteOptions write_opts; - std::string ts_str = Timestamp(1, 0); - Slice ts = ts_str; - write_opts.timestamp = &ts; + std::string ts = Timestamp(1, 0); int idx = 0; for (; idx < kMaxKey / 4; idx++) { - ASSERT_OK(db_->Put(write_opts, Key1(idx), "bar")); - ASSERT_OK(db_->Put(write_opts, KeyWithPrefix("foo", idx), "bar")); + ASSERT_OK(db_->Put(write_opts, Key1(idx), ts, "bar")); + ASSERT_OK(db_->Put(write_opts, KeyWithPrefix("foo", idx), ts, "bar")); } ASSERT_OK(Flush()); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); for (; idx < kMaxKey / 2; idx++) { - ASSERT_OK(db_->Put(write_opts, Key1(idx), "bar")); - ASSERT_OK(db_->Put(write_opts, KeyWithPrefix("foo", idx), "bar")); + ASSERT_OK(db_->Put(write_opts, Key1(idx), ts, "bar")); + ASSERT_OK(db_->Put(write_opts, KeyWithPrefix("foo", idx), ts, "bar")); } ASSERT_OK(Flush()); for (; idx < kMaxKey; idx++) { - ASSERT_OK(db_->Put(write_opts, Key1(idx), "bar")); - ASSERT_OK(db_->Put(write_opts, KeyWithPrefix("foo", idx), "bar")); + ASSERT_OK(db_->Put(write_opts, Key1(idx), ts, "bar")); + ASSERT_OK(db_->Put(write_opts, KeyWithPrefix("foo", idx), ts, "bar")); } // Read with MultiGet ReadOptions read_opts; - read_opts.timestamp = &ts; - - ReadOptions read_opts_total_order; - read_opts_total_order.timestamp = &ts; - read_opts_total_order.total_order_seek = true; + Slice read_ts = ts; + read_opts.timestamp = &read_ts; for (idx = 0; idx < kMaxKey; idx++) { size_t batch_size = 4; @@ -1775,12 +1776,10 @@ class DataVisibilityTest : public DBBasicTestWithTimestampBase { void PutTestData(int index, ColumnFamilyHandle* cfh = nullptr) { ASSERT_LE(index, kTestDataSize); WriteOptions write_opts; - Slice ts_slice = test_data_[index].timestamp; - write_opts.timestamp = &ts_slice; if (cfh == nullptr) { - ASSERT_OK( - db_->Put(write_opts, test_data_[index].key, test_data_[index].value)); + ASSERT_OK(db_->Put(write_opts, test_data_[index].key, + test_data_[index].timestamp, test_data_[index].value)); const Snapshot* snap = db_->GetSnapshot(); test_data_[index].seq_num = snap->GetSequenceNumber(); if (index > 0) { @@ -1789,7 +1788,7 @@ class DataVisibilityTest : public DBBasicTestWithTimestampBase { db_->ReleaseSnapshot(snap); } else { ASSERT_OK(db_->Put(write_opts, cfh, test_data_[index].key, - test_data_[index].value)); + test_data_[index].timestamp, test_data_[index].value)); } } @@ -1901,13 +1900,11 @@ TEST_F(DataVisibilityTest, PointLookupWithoutSnapshot1) { }); SyncPoint::GetInstance()->EnableProcessing(); port::Thread writer_thread([this]() { - std::string write_ts_str = Timestamp(1, 0); - Slice write_ts = write_ts_str; + std::string write_ts = Timestamp(1, 0); WriteOptions write_opts; - write_opts.timestamp = &write_ts; TEST_SYNC_POINT( "DataVisibilityTest::PointLookupWithoutSnapshot1:BeforePut"); - Status s = db_->Put(write_opts, "foo", "value"); + Status s = db_->Put(write_opts, "foo", write_ts, "value"); ASSERT_OK(s); TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithoutSnapshot1:AfterPut"); }); @@ -1949,20 +1946,16 @@ TEST_F(DataVisibilityTest, PointLookupWithoutSnapshot2) { }); SyncPoint::GetInstance()->EnableProcessing(); port::Thread writer_thread([this]() { - std::string write_ts_str = Timestamp(1, 0); - Slice write_ts = write_ts_str; + std::string write_ts = Timestamp(1, 0); WriteOptions write_opts; - write_opts.timestamp = &write_ts; TEST_SYNC_POINT( "DataVisibilityTest::PointLookupWithoutSnapshot2:BeforePut"); - Status s = db_->Put(write_opts, "foo", "value"); + Status s = db_->Put(write_opts, "foo", write_ts, "value"); ASSERT_OK(s); ASSERT_OK(Flush()); - write_ts_str = Timestamp(2, 0); - write_ts = write_ts_str; - write_opts.timestamp = &write_ts; - s = db_->Put(write_opts, "bar", "value"); + write_ts = Timestamp(2, 0); + s = db_->Put(write_opts, "bar", write_ts, "value"); ASSERT_OK(s); TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithoutSnapshot2:AfterPut"); }); @@ -2003,12 +1996,10 @@ TEST_F(DataVisibilityTest, PointLookupWithSnapshot1) { }); SyncPoint::GetInstance()->EnableProcessing(); port::Thread writer_thread([this]() { - std::string write_ts_str = Timestamp(1, 0); - Slice write_ts = write_ts_str; + std::string write_ts = Timestamp(1, 0); WriteOptions write_opts; - write_opts.timestamp = &write_ts; TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithSnapshot1:BeforePut"); - Status s = db_->Put(write_opts, "foo", "value"); + Status s = db_->Put(write_opts, "foo", write_ts, "value"); TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithSnapshot1:AfterPut"); ASSERT_OK(s); }); @@ -2055,19 +2046,15 @@ TEST_F(DataVisibilityTest, PointLookupWithSnapshot2) { }); SyncPoint::GetInstance()->EnableProcessing(); port::Thread writer_thread([this]() { - std::string write_ts_str = Timestamp(1, 0); - Slice write_ts = write_ts_str; + std::string write_ts = Timestamp(1, 0); WriteOptions write_opts; - write_opts.timestamp = &write_ts; TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithSnapshot2:BeforePut"); - Status s = db_->Put(write_opts, "foo", "value1"); + Status s = db_->Put(write_opts, "foo", write_ts, "value1"); ASSERT_OK(s); ASSERT_OK(Flush()); - write_ts_str = Timestamp(2, 0); - write_ts = write_ts_str; - write_opts.timestamp = &write_ts; - s = db_->Put(write_opts, "bar", "value2"); + write_ts = Timestamp(2, 0); + s = db_->Put(write_opts, "bar", write_ts, "value2"); ASSERT_OK(s); }); const Snapshot* snap = db_->GetSnapshot(); @@ -2112,10 +2099,8 @@ TEST_F(DataVisibilityTest, RangeScanWithoutSnapshot) { WriteOptions write_opts; TEST_SYNC_POINT("DataVisibilityTest::RangeScanWithoutSnapshot:BeforePut"); for (int i = 0; i < 3; ++i) { - std::string write_ts_str = Timestamp(i + 1, 0); - Slice write_ts = write_ts_str; - write_opts.timestamp = &write_ts; - Status s = db_->Put(write_opts, "key" + std::to_string(i), + std::string write_ts = Timestamp(i + 1, 0); + Status s = db_->Put(write_opts, "key" + std::to_string(i), write_ts, "value" + std::to_string(i)); ASSERT_OK(s); } @@ -2159,10 +2144,8 @@ TEST_F(DataVisibilityTest, RangeScanWithSnapshot) { WriteOptions write_opts; TEST_SYNC_POINT("DataVisibilityTest::RangeScanWithSnapshot:BeforePut"); for (int i = 0; i < 3; ++i) { - std::string write_ts_str = Timestamp(i + 1, 0); - Slice write_ts = write_ts_str; - write_opts.timestamp = &write_ts; - Status s = db_->Put(write_opts, "key" + std::to_string(i), + std::string write_ts = Timestamp(i + 1, 0); + Status s = db_->Put(write_opts, "key" + std::to_string(i), write_ts, "value" + std::to_string(i)); ASSERT_OK(s); } @@ -2396,12 +2379,11 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGet) { read_ts_list.push_back(Timestamp(1 + i * 2, 0)); const Slice write_ts = write_ts_list.back(); WriteOptions wopts; - wopts.timestamp = &write_ts; for (int cf = 0; cf != static_cast(num_cfs); ++cf) { for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps; ++j) { - ASSERT_OK(Put(cf, Key1(j), - "value_" + std::to_string(j) + "_" + std::to_string(i), - wopts)); + ASSERT_OK( + db_->Put(wopts, handles_[cf], Key1(j), write_ts, + "value_" + std::to_string(j) + "_" + std::to_string(i))); } } } @@ -2469,50 +2451,44 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutDeleteGet) { // Half of the keys will go through Deletion and remaining half with // SingleDeletion. Generate enough L0 files with ts=1 to trigger compaction // to L1 - std::string ts_str = Timestamp(1, 0); - Slice ts = ts_str; + std::string ts = Timestamp(1, 0); WriteOptions wopts; - wopts.timestamp = &ts; for (size_t i = 0; i < kNumL0Files; ++i) { for (int j = 0; j < kNumKeysPerFile; ++j) { - ASSERT_OK(db_->Put(wopts, Key1(j), "value" + std::to_string(i))); + ASSERT_OK(db_->Put(wopts, Key1(j), ts, "value" + std::to_string(i))); } ASSERT_OK(db_->Flush(FlushOptions())); } ASSERT_OK(dbfull()->TEST_WaitForCompact()); // Generate another L0 at ts=3 - ts_str = Timestamp(3, 0); - ts = ts_str; - wopts.timestamp = &ts; + ts = Timestamp(3, 0); for (int i = 0; i < kNumKeysPerFile; ++i) { std::string key_str = Key1(i); Slice key(key_str); if ((i % 3) == 0) { if (i < kNumKeysPerFile / 2) { - ASSERT_OK(db_->Delete(wopts, key)); + ASSERT_OK(db_->Delete(wopts, key, ts)); } else { - ASSERT_OK(db_->SingleDelete(wopts, key)); + ASSERT_OK(db_->SingleDelete(wopts, key, ts)); } } else { - ASSERT_OK(db_->Put(wopts, key, "new_value")); + ASSERT_OK(db_->Put(wopts, key, ts, "new_value")); } } ASSERT_OK(db_->Flush(FlushOptions())); // Populate memtable at ts=5 - ts_str = Timestamp(5, 0); - ts = ts_str; - wopts.timestamp = &ts; + ts = Timestamp(5, 0); for (int i = 0; i != kNumKeysPerFile; ++i) { std::string key_str = Key1(i); Slice key(key_str); if ((i % 3) == 1) { if (i < kNumKeysPerFile / 2) { - ASSERT_OK(db_->Delete(wopts, key)); + ASSERT_OK(db_->Delete(wopts, key, ts)); } else { - ASSERT_OK(db_->SingleDelete(wopts, key)); + ASSERT_OK(db_->SingleDelete(wopts, key, ts)); } } else if ((i % 3) == 2) { - ASSERT_OK(db_->Put(wopts, key, "new_value_2")); + ASSERT_OK(db_->Put(wopts, key, ts, "new_value_2")); } } } @@ -2637,13 +2613,12 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) { read_ts_list.push_back(Timestamp(1 + i * 2, 0)); const Slice write_ts = write_ts_list.back(); WriteOptions wopts; - wopts.timestamp = &write_ts; for (int cf = 0; cf != static_cast(num_cfs); ++cf) { size_t memtable_get_start = 0; for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) { - ASSERT_OK(Put(cf, Key1(j), - "value_" + std::to_string(j) + "_" + std::to_string(i), - wopts)); + ASSERT_OK( + db_->Put(wopts, handles_[cf], Key1(j), write_ts, + "value_" + std::to_string(j) + "_" + std::to_string(i))); if (j == kSplitPosBase + i || j == kNumKeysPerTimestamp - 1) { verify_records_func(i, memtable_get_start, j, handles_[cf]); memtable_get_start = j + 1; @@ -2749,18 +2724,15 @@ TEST_F(DBBasicTestWithTimestamp, BatchWriteAndMultiGet) { const Slice& write_ts = write_ts_list.back(); for (int cf = 0; cf != static_cast(num_cfs); ++cf) { WriteOptions wopts; - WriteBatch batch; + WriteBatch batch(0, 0, 0, ts_sz); for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) { const std::string key = Key1(j); const std::string value = "value_" + std::to_string(j) + "_" + std::to_string(i); - std::array key_with_ts_slices{{key, dummy_ts}}; - SliceParts key_with_ts(key_with_ts_slices.data(), 2); - std::array value_slices{{value}}; - SliceParts values(value_slices.data(), 1); - ASSERT_OK(batch.Put(handles_[cf], key_with_ts, values)); + ASSERT_OK(batch.Put(handles_[cf], key, value)); } - ASSERT_OK(batch.AssignTimestamp(write_ts)); + ASSERT_OK(batch.UpdateTimestamps(write_ts, + [ts_sz](uint32_t) { return ts_sz; })); ASSERT_OK(db_->Write(wopts, &batch)); verify_records_func(i, handles_[cf]); @@ -2792,18 +2764,16 @@ TEST_F(DBBasicTestWithTimestamp, MultiGetNoReturnTs) { options.comparator = &test_cmp; DestroyAndReopen(options); WriteOptions write_opts; - std::string ts_str = Timestamp(1, 0); - Slice ts = ts_str; - write_opts.timestamp = &ts; - ASSERT_OK(db_->Put(write_opts, "foo", "value")); - ASSERT_OK(db_->Put(write_opts, "bar", "value")); - ASSERT_OK(db_->Put(write_opts, "fooxxxxxxxxxxxxxxxx", "value")); - ASSERT_OK(db_->Put(write_opts, "barxxxxxxxxxxxxxxxx", "value")); + std::string ts = Timestamp(1, 0); + ASSERT_OK(db_->Put(write_opts, "foo", ts, "value")); + ASSERT_OK(db_->Put(write_opts, "bar", ts, "value")); + ASSERT_OK(db_->Put(write_opts, "fooxxxxxxxxxxxxxxxx", ts, "value")); + ASSERT_OK(db_->Put(write_opts, "barxxxxxxxxxxxxxxxx", ts, "value")); ColumnFamilyHandle* cfh = dbfull()->DefaultColumnFamily(); - ts_str = Timestamp(2, 0); - ts = ts_str; + ts = Timestamp(2, 0); + Slice read_ts = ts; ReadOptions read_opts; - read_opts.timestamp = &ts; + read_opts.timestamp = &read_ts; { ColumnFamilyHandle* column_families[] = {cfh, cfh}; Slice keys[] = {"foo", "bar"}; @@ -2881,10 +2851,9 @@ TEST_P(DBBasicTestWithTimestampPrefixSeek, IterateWithPrefix) { WriteOptions write_opts; { for (size_t i = 0; i != write_ts_list.size(); ++i) { - Slice write_ts = write_ts_list[i]; - write_opts.timestamp = &write_ts; for (uint64_t key = kMaxKey; key >= kMinKey; --key) { - Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(i)); + Status s = db_->Put(write_opts, Key1(key), write_ts_list[i], + "value" + std::to_string(i)); ASSERT_OK(s); } } @@ -3036,9 +3005,9 @@ TEST_P(DBBasicTestWithTsIterTombstones, IterWithDelete) { uint64_t key = kMinKey; WriteOptions write_opts; Slice ts = write_ts_strs[0]; - write_opts.timestamp = &ts; do { - Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(key)); + Status s = db_->Put(write_opts, Key1(key), write_ts_strs[0], + "value" + std::to_string(key)); ASSERT_OK(s); if (kMaxKey == key) { break; @@ -3046,14 +3015,13 @@ TEST_P(DBBasicTestWithTsIterTombstones, IterWithDelete) { ++key; } while (true); - ts = write_ts_strs[1]; - write_opts.timestamp = &ts; for (key = kMaxKey; key >= kMinKey; --key) { Status s; if (0 != (key % 2)) { - s = db_->Put(write_opts, Key1(key), "value1" + std::to_string(key)); + s = db_->Put(write_opts, Key1(key), write_ts_strs[1], + "value1" + std::to_string(key)); } else { - s = db_->Delete(write_opts, Key1(key)); + s = db_->Delete(write_opts, Key1(key), write_ts_strs[1]); } ASSERT_OK(s); } diff --git a/db/db_with_timestamp_compaction_test.cc b/db/db_with_timestamp_compaction_test.cc index 9c8e3057f..fb93b7b6e 100644 --- a/db/db_with_timestamp_compaction_test.cc +++ b/db/db_with_timestamp_compaction_test.cc @@ -78,9 +78,8 @@ TEST_F(TimestampCompatibleCompactionTest, UserKeyCrossFileBoundary) { WriteOptions write_opts; for (; key < kNumKeysPerFile - 1; ++key, ++ts) { std::string ts_str = Timestamp(ts); - Slice ts_slice = ts_str; - write_opts.timestamp = &ts_slice; - ASSERT_OK(db_->Put(write_opts, Key1(key), "foo_" + std::to_string(key))); + ASSERT_OK( + db_->Put(write_opts, Key1(key), ts_str, "foo_" + std::to_string(key))); } // Write another L0 with keys 99 with newer ts. ASSERT_OK(Flush()); @@ -88,18 +87,16 @@ TEST_F(TimestampCompatibleCompactionTest, UserKeyCrossFileBoundary) { key = 99; for (int i = 0; i < 4; ++i, ++ts) { std::string ts_str = Timestamp(ts); - Slice ts_slice = ts_str; - write_opts.timestamp = &ts_slice; - ASSERT_OK(db_->Put(write_opts, Key1(key), "bar_" + std::to_string(key))); + ASSERT_OK( + db_->Put(write_opts, Key1(key), ts_str, "bar_" + std::to_string(key))); } ASSERT_OK(Flush()); uint64_t saved_read_ts2 = ts++; // Write another L0 with keys 99, 100, 101, ..., 150 for (; key <= 150; ++key, ++ts) { std::string ts_str = Timestamp(ts); - Slice ts_slice = ts_str; - write_opts.timestamp = &ts_slice; - ASSERT_OK(db_->Put(write_opts, Key1(key), "foo1_" + std::to_string(key))); + ASSERT_OK( + db_->Put(write_opts, Key1(key), ts_str, "foo1_" + std::to_string(key))); } ASSERT_OK(Flush()); // Wait for compaction to finish diff --git a/db/write_batch.cc b/db/write_batch.cc index 6ee264e18..a8865c8b2 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -160,8 +160,11 @@ WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes) } WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes, - size_t protection_bytes_per_key) - : content_flags_(0), max_bytes_(max_bytes), rep_() { + size_t protection_bytes_per_key, size_t default_cf_ts_sz) + : content_flags_(0), + max_bytes_(max_bytes), + default_cf_ts_sz_(default_cf_ts_sz), + rep_() { // Currently `protection_bytes_per_key` can only be enabled at 8 bytes per // entry. assert(protection_bytes_per_key == 0 || protection_bytes_per_key == 8); @@ -186,6 +189,7 @@ WriteBatch::WriteBatch(const WriteBatch& src) : wal_term_point_(src.wal_term_point_), content_flags_(src.content_flags_.load(std::memory_order_relaxed)), max_bytes_(src.max_bytes_), + default_cf_ts_sz_(src.default_cf_ts_sz_), rep_(src.rep_) { if (src.save_points_ != nullptr) { save_points_.reset(new SavePoints()); @@ -203,6 +207,7 @@ WriteBatch::WriteBatch(WriteBatch&& src) noexcept content_flags_(src.content_flags_.load(std::memory_order_relaxed)), max_bytes_(src.max_bytes_), prot_info_(std::move(src.prot_info_)), + default_cf_ts_sz_(src.default_cf_ts_sz_), rep_(std::move(src.rep_)) {} WriteBatch& WriteBatch::operator=(const WriteBatch& src) { @@ -250,6 +255,7 @@ void WriteBatch::Clear() { prot_info_->entries_.clear(); } wal_term_point_.clear(); + default_cf_ts_sz_ = 0; } uint32_t WriteBatch::Count() const { return WriteBatchInternal::Count(this); } @@ -700,6 +706,45 @@ size_t WriteBatchInternal::GetFirstOffset(WriteBatch* /*b*/) { return WriteBatchInternal::kHeader; } +std::tuple +WriteBatchInternal::GetColumnFamilyIdAndTimestampSize( + WriteBatch* b, ColumnFamilyHandle* column_family) { + uint32_t cf_id = GetColumnFamilyID(column_family); + size_t ts_sz = 0; + Status s; + if (column_family) { + const Comparator* const ucmp = column_family->GetComparator(); + if (ucmp) { + ts_sz = ucmp->timestamp_size(); + if (0 == cf_id && b->default_cf_ts_sz_ != ts_sz) { + s = Status::InvalidArgument("Default cf timestamp size mismatch"); + } + } + } else if (b->default_cf_ts_sz_ > 0) { + ts_sz = b->default_cf_ts_sz_; + } + return std::make_tuple(s, cf_id, ts_sz); +} + +namespace { +Status CheckColumnFamilyTimestampSize(ColumnFamilyHandle* column_family, + const Slice& ts) { + if (!column_family) { + return Status::InvalidArgument("column family handle cannot be null"); + } + const Comparator* const ucmp = column_family->GetComparator(); + assert(ucmp); + size_t cf_ts_sz = ucmp->timestamp_size(); + if (0 == cf_ts_sz) { + return Status::InvalidArgument("timestamp disabled"); + } + if (cf_ts_sz != ts.size()) { + return Status::InvalidArgument("timestamp size mismatch"); + } + return Status::OK(); +} +} // namespace + Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, const Slice& key, const Slice& value) { if (key.size() > size_t{port::kMaxUint32}) { @@ -738,8 +783,40 @@ Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { - return WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, - value); + size_t ts_sz = 0; + uint32_t cf_id = 0; + Status s; + + std::tie(s, cf_id, ts_sz) = + WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this, + column_family); + + if (!s.ok()) { + return s; + } + + if (0 == ts_sz) { + return WriteBatchInternal::Put(this, cf_id, key, value); + } + + needs_in_place_update_ts_ = true; + std::string dummy_ts(ts_sz, '\0'); + std::array key_with_ts{{key, dummy_ts}}; + return WriteBatchInternal::Put(this, cf_id, SliceParts(key_with_ts.data(), 2), + SliceParts(&value, 1)); +} + +Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& ts, const Slice& value) { + const Status s = CheckColumnFamilyTimestampSize(column_family, ts); + if (!s.ok()) { + return s; + } + assert(column_family); + uint32_t cf_id = column_family->GetID(); + std::array key_with_ts{{key, ts}}; + return WriteBatchInternal::Put(this, cf_id, SliceParts(key_with_ts.data(), 2), + SliceParts(&value, 1)); } Status WriteBatchInternal::CheckSlicePartsLength(const SliceParts& key, @@ -794,8 +871,24 @@ Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, Status WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key, const SliceParts& value) { - return WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, - value); + size_t ts_sz = 0; + uint32_t cf_id = 0; + Status s; + + std::tie(s, cf_id, ts_sz) = + WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this, + column_family); + + if (!s.ok()) { + return s; + } + + if (ts_sz == 0) { + return WriteBatchInternal::Put(this, cf_id, key, value); + } + + return Status::InvalidArgument( + "Cannot call this method on column family enabling timestamp"); } Status WriteBatchInternal::InsertNoop(WriteBatch* b) { @@ -892,8 +985,40 @@ Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, } Status WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) { - return WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family), - key); + size_t ts_sz = 0; + uint32_t cf_id = 0; + Status s; + + std::tie(s, cf_id, ts_sz) = + WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this, + column_family); + + if (!s.ok()) { + return s; + } + + if (0 == ts_sz) { + return WriteBatchInternal::Delete(this, cf_id, key); + } + + needs_in_place_update_ts_ = true; + std::string dummy_ts(ts_sz, '\0'); + std::array key_with_ts{{key, dummy_ts}}; + return WriteBatchInternal::Delete(this, cf_id, + SliceParts(key_with_ts.data(), 2)); +} + +Status WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& ts) { + const Status s = CheckColumnFamilyTimestampSize(column_family, ts); + if (!s.ok()) { + return s; + } + assert(column_family); + uint32_t cf_id = column_family->GetID(); + std::array key_with_ts{{key, ts}}; + return WriteBatchInternal::Delete(this, cf_id, + SliceParts(key_with_ts.data(), 2)); } Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, @@ -925,8 +1050,24 @@ Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, Status WriteBatch::Delete(ColumnFamilyHandle* column_family, const SliceParts& key) { - return WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family), - key); + size_t ts_sz = 0; + uint32_t cf_id = 0; + Status s; + + std::tie(s, cf_id, ts_sz) = + WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this, + column_family); + + if (!s.ok()) { + return s; + } + + if (0 == ts_sz) { + return WriteBatchInternal::Delete(this, cf_id, key); + } + + return Status::InvalidArgument( + "Cannot call this method on column family enabling timestamp"); } Status WriteBatchInternal::SingleDelete(WriteBatch* b, @@ -957,8 +1098,40 @@ Status WriteBatchInternal::SingleDelete(WriteBatch* b, Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family, const Slice& key) { - return WriteBatchInternal::SingleDelete( - this, GetColumnFamilyID(column_family), key); + size_t ts_sz = 0; + uint32_t cf_id = 0; + Status s; + + std::tie(s, cf_id, ts_sz) = + WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this, + column_family); + + if (!s.ok()) { + return s; + } + + if (0 == ts_sz) { + return WriteBatchInternal::SingleDelete(this, cf_id, key); + } + + needs_in_place_update_ts_ = true; + std::string dummy_ts(ts_sz, '\0'); + std::array key_with_ts{{key, dummy_ts}}; + return WriteBatchInternal::SingleDelete(this, cf_id, + SliceParts(key_with_ts.data(), 2)); +} + +Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family, + const Slice& key, const Slice& ts) { + const Status s = CheckColumnFamilyTimestampSize(column_family, ts); + if (!s.ok()) { + return s; + } + assert(column_family); + uint32_t cf_id = column_family->GetID(); + std::array key_with_ts{{key, ts}}; + return WriteBatchInternal::SingleDelete(this, cf_id, + SliceParts(key_with_ts.data(), 2)); } Status WriteBatchInternal::SingleDelete(WriteBatch* b, @@ -992,8 +1165,24 @@ Status WriteBatchInternal::SingleDelete(WriteBatch* b, Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family, const SliceParts& key) { - return WriteBatchInternal::SingleDelete( - this, GetColumnFamilyID(column_family), key); + size_t ts_sz = 0; + uint32_t cf_id = 0; + Status s; + + std::tie(s, cf_id, ts_sz) = + WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this, + column_family); + + if (!s.ok()) { + return s; + } + + if (0 == ts_sz) { + return WriteBatchInternal::SingleDelete(this, cf_id, key); + } + + return Status::InvalidArgument( + "Cannot call this method on column family enabling timestamp"); } Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id, @@ -1026,8 +1215,24 @@ Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id, Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family, const Slice& begin_key, const Slice& end_key) { - return WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family), - begin_key, end_key); + size_t ts_sz = 0; + uint32_t cf_id = 0; + Status s; + + std::tie(s, cf_id, ts_sz) = + WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this, + column_family); + + if (!s.ok()) { + return s; + } + + if (0 == ts_sz) { + return WriteBatchInternal::DeleteRange(this, cf_id, begin_key, end_key); + } + + return Status::InvalidArgument( + "Cannot call this method on column family enabling timestamp"); } Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id, @@ -1061,8 +1266,24 @@ Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id, Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family, const SliceParts& begin_key, const SliceParts& end_key) { - return WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family), - begin_key, end_key); + size_t ts_sz = 0; + uint32_t cf_id = 0; + Status s; + + std::tie(s, cf_id, ts_sz) = + WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this, + column_family); + + if (!s.ok()) { + return s; + } + + if (0 == ts_sz) { + return WriteBatchInternal::DeleteRange(this, cf_id, begin_key, end_key); + } + + return Status::InvalidArgument( + "Cannot call this method on column family enabling timestamp"); } Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id, @@ -1099,8 +1320,24 @@ Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id, Status WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { - return WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key, - value); + size_t ts_sz = 0; + uint32_t cf_id = 0; + Status s; + + std::tie(s, cf_id, ts_sz) = + WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this, + column_family); + + if (!s.ok()) { + return s; + } + + if (0 == ts_sz) { + return WriteBatchInternal::Merge(this, cf_id, key, value); + } + + return Status::InvalidArgument( + "Cannot call this method on column family enabling timestamp"); } Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id, @@ -1136,8 +1373,24 @@ Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id, Status WriteBatch::Merge(ColumnFamilyHandle* column_family, const SliceParts& key, const SliceParts& value) { - return WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key, - value); + size_t ts_sz = 0; + uint32_t cf_id = 0; + Status s; + + std::tie(s, cf_id, ts_sz) = + WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this, + column_family); + + if (!s.ok()) { + return s; + } + + if (0 == ts_sz) { + return WriteBatchInternal::Merge(this, cf_id, key, value); + } + + return Status::InvalidArgument( + "Cannot call this method on column family enabling timestamp"); } Status WriteBatchInternal::PutBlobIndex(WriteBatch* b, @@ -1223,18 +1476,15 @@ Status WriteBatch::PopSavePoint() { return Status::OK(); } -Status WriteBatch::AssignTimestamp( - const Slice& ts, std::function checker) { - TimestampAssigner ts_assigner(prot_info_.get(), std::move(checker), ts); - return Iterate(&ts_assigner); -} - -Status WriteBatch::AssignTimestamps( - const std::vector& ts_list, - std::function checker) { - SimpleListTimestampAssigner ts_assigner(prot_info_.get(), std::move(checker), - ts_list); - return Iterate(&ts_assigner); +Status WriteBatch::UpdateTimestamps( + const Slice& ts, std::function ts_sz_func) { + TimestampUpdater ts_updater(prot_info_.get(), + std::move(ts_sz_func), ts); + const Status s = Iterate(&ts_updater); + if (s.ok()) { + needs_in_place_update_ts_ = false; + } + return s; } class MemTableInserter : public WriteBatch::Handler { @@ -2189,24 +2439,20 @@ class MemTableInserter : public WriteBatch::Handler { const auto& batch_info = trx->batches_.begin()->second; // all inserts must reference this trx log number log_number_ref_ = batch_info.log_number_; - const auto checker = [this](uint32_t cf, size_t& ts_sz) { - assert(db_); - VersionSet* const vset = db_->GetVersionSet(); - assert(vset); - ColumnFamilySet* const cf_set = vset->GetColumnFamilySet(); - assert(cf_set); - ColumnFamilyData* cfd = cf_set->GetColumnFamily(cf); - assert(cfd); - const auto* const ucmp = cfd->user_comparator(); - assert(ucmp); - if (ucmp->timestamp_size() == 0) { - ts_sz = 0; - } else if (ucmp->timestamp_size() != ts_sz) { - return Status::InvalidArgument("Timestamp size mismatch"); - } - return Status::OK(); - }; - s = batch_info.batch_->AssignTimestamp(commit_ts, checker); + + s = batch_info.batch_->UpdateTimestamps( + commit_ts, [this](uint32_t cf) { + assert(db_); + VersionSet* const vset = db_->GetVersionSet(); + assert(vset); + ColumnFamilySet* const cf_set = vset->GetColumnFamilySet(); + assert(cf_set); + ColumnFamilyData* cfd = cf_set->GetColumnFamily(cf); + assert(cfd); + const auto* const ucmp = cfd->user_comparator(); + assert(ucmp); + return ucmp->timestamp_size(); + }); if (s.ok()) { s = batch_info.batch_->Iterate(this); log_number_ref_ = 0; diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 8b52d93c0..321be6c4c 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -79,7 +79,7 @@ class WriteBatchInternal { public: // WriteBatch header has an 8-byte sequence number followed by a 4-byte count. - static const size_t kHeader = 12; + static constexpr size_t kHeader = 12; // WriteBatch methods with column_family_id instead of ColumnFamilyHandle* static Status Put(WriteBatch* batch, uint32_t column_family_id, @@ -221,6 +221,13 @@ class WriteBatchInternal { // state meant to be used only during recovery. static void SetAsLatestPersistentState(WriteBatch* b); static bool IsLatestPersistentState(const WriteBatch* b); + + static std::tuple GetColumnFamilyIdAndTimestampSize( + WriteBatch* b, ColumnFamilyHandle* column_family); + + static bool TimestampsUpdateNeeded(const WriteBatch& wb) { + return wb.needs_in_place_update_ts_; + } }; // LocalSavePoint is similar to a scope guard @@ -265,39 +272,42 @@ class LocalSavePoint { #endif }; -template -class TimestampAssignerBase : public WriteBatch::Handler { +template +class TimestampUpdater : public WriteBatch::Handler { public: - explicit TimestampAssignerBase( - WriteBatch::ProtectionInfo* prot_info, - std::function&& checker) - : prot_info_(prot_info), checker_(std::move(checker)) {} + explicit TimestampUpdater(WriteBatch::ProtectionInfo* prot_info, + TimestampSizeFuncType&& ts_sz_func, const Slice& ts) + : prot_info_(prot_info), + ts_sz_func_(std::move(ts_sz_func)), + timestamp_(ts) { + assert(!timestamp_.empty()); + } - ~TimestampAssignerBase() override {} + ~TimestampUpdater() override {} Status PutCF(uint32_t cf, const Slice& key, const Slice&) override { - return AssignTimestamp(cf, key); + return UpdateTimestamp(cf, key); } Status DeleteCF(uint32_t cf, const Slice& key) override { - return AssignTimestamp(cf, key); + return UpdateTimestamp(cf, key); } Status SingleDeleteCF(uint32_t cf, const Slice& key) override { - return AssignTimestamp(cf, key); + return UpdateTimestamp(cf, key); } Status DeleteRangeCF(uint32_t cf, const Slice& begin_key, const Slice&) override { - return AssignTimestamp(cf, begin_key); + return UpdateTimestamp(cf, begin_key); } Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override { - return AssignTimestamp(cf, key); + return UpdateTimestamp(cf, key); } Status PutBlobIndexCF(uint32_t cf, const Slice& key, const Slice&) override { - return AssignTimestamp(cf, key); + return UpdateTimestamp(cf, key); } Status MarkBeginPrepare(bool) override { return Status::OK(); } @@ -314,25 +324,32 @@ class TimestampAssignerBase : public WriteBatch::Handler { Status MarkNoop(bool /*empty_batch*/) override { return Status::OK(); } - protected: - Status AssignTimestamp(uint32_t cf, const Slice& key) { - Status s = static_cast_with_check(this)->AssignTimestampImpl( - cf, key, idx_); + private: + Status UpdateTimestamp(uint32_t cf, const Slice& key) { + Status s = UpdateTimestampImpl(cf, key, idx_); ++idx_; return s; } - Status CheckTimestampSize(uint32_t cf, size_t& ts_sz) { - return checker_(cf, ts_sz); - } - - Status UpdateTimestampIfNeeded(size_t ts_sz, const Slice& key, - const Slice& ts) { - if (ts_sz > 0) { - assert(ts_sz == ts.size()); - UpdateProtectionInformationIfNeeded(key, ts); - UpdateTimestamp(key, ts); + Status UpdateTimestampImpl(uint32_t cf, const Slice& key, size_t /*idx*/) { + if (timestamp_.empty()) { + return Status::InvalidArgument("Timestamp is empty"); } + size_t cf_ts_sz = ts_sz_func_(cf); + if (0 == cf_ts_sz) { + // Skip this column family. + return Status::OK(); + } else if (std::numeric_limits::max() == cf_ts_sz) { + // Column family timestamp info not found. + return Status::NotFound(); + } else if (cf_ts_sz != timestamp_.size()) { + return Status::InvalidArgument("timestamp size mismatch"); + } + UpdateProtectionInformationIfNeeded(key, timestamp_); + + char* ptr = const_cast(key.data() + key.size() - cf_ts_sz); + assert(ptr); + memcpy(ptr, timestamp_.data(), timestamp_.size()); return Status::OK(); } @@ -347,83 +364,16 @@ class TimestampAssignerBase : public WriteBatch::Handler { } } - void UpdateTimestamp(const Slice& key, const Slice& ts) { - const size_t ts_sz = ts.size(); - char* ptr = const_cast(key.data() + key.size() - ts_sz); - assert(ptr); - memcpy(ptr, ts.data(), ts_sz); - } - // No copy or move. - TimestampAssignerBase(const TimestampAssignerBase&) = delete; - TimestampAssignerBase(TimestampAssignerBase&&) = delete; - TimestampAssignerBase& operator=(const TimestampAssignerBase&) = delete; - TimestampAssignerBase& operator=(TimestampAssignerBase&&) = delete; + TimestampUpdater(const TimestampUpdater&) = delete; + TimestampUpdater(TimestampUpdater&&) = delete; + TimestampUpdater& operator=(const TimestampUpdater&) = delete; + TimestampUpdater& operator=(TimestampUpdater&&) = delete; WriteBatch::ProtectionInfo* const prot_info_ = nullptr; - const std::function checker_{}; + const TimestampSizeFuncType ts_sz_func_{}; + const Slice timestamp_; size_t idx_ = 0; }; -class SimpleListTimestampAssigner - : public TimestampAssignerBase { - public: - explicit SimpleListTimestampAssigner( - WriteBatch::ProtectionInfo* prot_info, - std::function&& checker, - const std::vector& timestamps) - : TimestampAssignerBase(prot_info, - std::move(checker)), - timestamps_(timestamps) {} - - ~SimpleListTimestampAssigner() override {} - - private: - friend class TimestampAssignerBase; - - Status AssignTimestampImpl(uint32_t cf, const Slice& key, size_t idx) { - if (idx >= timestamps_.size()) { - return Status::InvalidArgument("Need more timestamps for the assignment"); - } - const Slice& ts = timestamps_[idx]; - size_t ts_sz = ts.size(); - const Status s = this->CheckTimestampSize(cf, ts_sz); - if (!s.ok()) { - return s; - } - return this->UpdateTimestampIfNeeded(ts_sz, key, ts); - } - - const std::vector& timestamps_; -}; - -class TimestampAssigner : public TimestampAssignerBase { - public: - explicit TimestampAssigner(WriteBatch::ProtectionInfo* prot_info, - std::function&& checker, - const Slice& ts) - : TimestampAssignerBase(prot_info, std::move(checker)), - timestamp_(ts) { - assert(!timestamp_.empty()); - } - ~TimestampAssigner() override {} - - private: - friend class TimestampAssignerBase; - - Status AssignTimestampImpl(uint32_t cf, const Slice& key, size_t /*idx*/) { - if (timestamp_.empty()) { - return Status::InvalidArgument("Timestamp is empty"); - } - size_t ts_sz = timestamp_.size(); - const Status s = this->CheckTimestampSize(cf, ts_sz); - if (!s.ok()) { - return s; - } - return this->UpdateTimestampIfNeeded(ts_sz, key, timestamp_); - } - - const Slice timestamp_; -}; - } // namespace ROCKSDB_NAMESPACE diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index a777f0586..0bef85571 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -949,7 +949,48 @@ Status CheckTimestampsInWriteBatch( } } // namespace -TEST_F(WriteBatchTest, AssignTimestamps) { +TEST_F(WriteBatchTest, SanityChecks) { + ColumnFamilyHandleImplDummy cf0(0, test::ComparatorWithU64Ts()); + ColumnFamilyHandleImplDummy cf4(4); + + WriteBatch wb(0, 0, 0, /*default_cf_ts_sz=*/sizeof(uint64_t)); + + // Sanity checks for the new WriteBatch APIs with extra 'ts' arg. + ASSERT_TRUE(wb.Put(nullptr, "key", "ts", "value").IsInvalidArgument()); + ASSERT_TRUE(wb.Delete(nullptr, "key", "ts").IsInvalidArgument()); + ASSERT_TRUE(wb.SingleDelete(nullptr, "key", "ts").IsInvalidArgument()); + ASSERT_TRUE(wb.Merge(nullptr, "key", "ts", "value").IsNotSupported()); + ASSERT_TRUE( + wb.DeleteRange(nullptr, "begin_key", "end_key", "ts").IsNotSupported()); + + ASSERT_TRUE(wb.Put(&cf4, "key", "ts", "value").IsInvalidArgument()); + ASSERT_TRUE(wb.Delete(&cf4, "key", "ts").IsInvalidArgument()); + ASSERT_TRUE(wb.SingleDelete(&cf4, "key", "ts").IsInvalidArgument()); + ASSERT_TRUE(wb.Merge(&cf4, "key", "ts", "value").IsNotSupported()); + ASSERT_TRUE( + wb.DeleteRange(&cf4, "begin_key", "end_key", "ts").IsNotSupported()); + + constexpr size_t wrong_ts_sz = 1 + sizeof(uint64_t); + std::string ts(wrong_ts_sz, '\0'); + + ASSERT_TRUE(wb.Put(&cf0, "key", ts, "value").IsInvalidArgument()); + ASSERT_TRUE(wb.Delete(&cf0, "key", ts).IsInvalidArgument()); + ASSERT_TRUE(wb.SingleDelete(&cf0, "key", ts).IsInvalidArgument()); + ASSERT_TRUE(wb.Merge(&cf0, "key", ts, "value").IsNotSupported()); + ASSERT_TRUE( + wb.DeleteRange(&cf0, "begin_key", "end_key", ts).IsNotSupported()); + + // Sanity checks for the new WriteBatch APIs without extra 'ts' arg. + WriteBatch wb1(0, 0, 0, wrong_ts_sz); + ASSERT_TRUE(wb1.Put(&cf0, "key", "value").IsInvalidArgument()); + ASSERT_TRUE(wb1.Delete(&cf0, "key").IsInvalidArgument()); + ASSERT_TRUE(wb1.SingleDelete(&cf0, "key").IsInvalidArgument()); + ASSERT_TRUE(wb1.Merge(&cf0, "key", "value").IsInvalidArgument()); + ASSERT_TRUE( + wb1.DeleteRange(&cf0, "begin_key", "end_key").IsInvalidArgument()); +} + +TEST_F(WriteBatchTest, UpdateTimestamps) { // We assume the last eight bytes of each key is reserved for timestamps. // Therefore, we must make sure each key is longer than eight bytes. constexpr size_t key_size = 16; @@ -974,21 +1015,17 @@ TEST_F(WriteBatchTest, AssignTimestamps) { } static constexpr size_t timestamp_size = sizeof(uint64_t); - const auto checker1 = [](uint32_t cf, size_t& ts_sz) { + const auto checker1 = [](uint32_t cf) { if (cf == 4 || cf == 5) { - if (ts_sz != timestamp_size) { - return Status::InvalidArgument("Timestamp size mismatch"); - } + return timestamp_size; } else if (cf == 0) { - ts_sz = 0; - return Status::OK(); + return static_cast(0); } else { - return Status::Corruption("Invalid cf"); + return std::numeric_limits::max(); } - return Status::OK(); }; ASSERT_OK( - batch.AssignTimestamp(std::string(timestamp_size, '\xfe'), checker1)); + batch.UpdateTimestamps(std::string(timestamp_size, '\xfe'), checker1)); ASSERT_OK(CheckTimestampsInWriteBatch( batch, std::string(timestamp_size, '\xfe'), cf_to_ucmps)); @@ -1001,65 +1038,30 @@ TEST_F(WriteBatchTest, AssignTimestamps) { // mapping from cf to user comparators. If indexing is disabled, a transaction // writes directly to the underlying raw WriteBatch. We will need to track the // comparator information for the column families to which un-indexed writes - // are performed. When calling AssignTimestamp(s) API of WriteBatch, we need + // are performed. When calling UpdateTimestamp API of WriteBatch, we need // indexed_cf_to_ucmps, non_indexed_cfs_with_ts, and timestamp_size to perform // checking. std::unordered_map indexed_cf_to_ucmps = { {0, cf0.GetComparator()}, {4, cf4.GetComparator()}}; std::unordered_set non_indexed_cfs_with_ts = {cf5.GetID()}; - const auto checker2 = [&indexed_cf_to_ucmps, &non_indexed_cfs_with_ts]( - uint32_t cf, size_t& ts_sz) { + const auto checker2 = [&indexed_cf_to_ucmps, + &non_indexed_cfs_with_ts](uint32_t cf) { if (non_indexed_cfs_with_ts.count(cf) > 0) { - if (ts_sz != timestamp_size) { - return Status::InvalidArgument("Timestamp size mismatch"); - } - return Status::OK(); + return timestamp_size; } auto cf_iter = indexed_cf_to_ucmps.find(cf); if (cf_iter == indexed_cf_to_ucmps.end()) { - return Status::Corruption("Unknown cf"); + assert(false); + return std::numeric_limits::max(); } const Comparator* const ucmp = cf_iter->second; assert(ucmp); - if (ucmp->timestamp_size() == 0) { - ts_sz = 0; - } else if (ts_sz != ucmp->timestamp_size()) { - return Status::InvalidArgument("Timestamp size mismatch"); - } - return Status::OK(); + return ucmp->timestamp_size(); }; ASSERT_OK( - batch.AssignTimestamp(std::string(timestamp_size, '\xef'), checker2)); + batch.UpdateTimestamps(std::string(timestamp_size, '\xef'), checker2)); ASSERT_OK(CheckTimestampsInWriteBatch( batch, std::string(timestamp_size, '\xef'), cf_to_ucmps)); - - std::vector ts_strs; - for (size_t i = 0; i < 3 * key_strs.size(); ++i) { - if (0 == (i % 3)) { - ts_strs.emplace_back(); - } else { - ts_strs.emplace_back(std::string(timestamp_size, '\xee')); - } - } - std::vector ts_vec(ts_strs.size()); - for (size_t i = 0; i < ts_vec.size(); ++i) { - ts_vec[i] = ts_strs[i]; - } - const auto checker3 = [&cf_to_ucmps](uint32_t cf, size_t& ts_sz) { - auto cf_iter = cf_to_ucmps.find(cf); - if (cf_iter == cf_to_ucmps.end()) { - return Status::Corruption("Invalid cf"); - } - const Comparator* const ucmp = cf_iter->second; - assert(ucmp); - if (ucmp->timestamp_size() != ts_sz) { - return Status::InvalidArgument("Timestamp size mismatch"); - } - return Status::OK(); - }; - ASSERT_OK(batch.AssignTimestamps(ts_vec, checker3)); - ASSERT_OK(CheckTimestampsInWriteBatch( - batch, std::string(timestamp_size, '\xee'), cf_to_ucmps)); } TEST_F(WriteBatchTest, CommitWithTimestamp) { diff --git a/db_stress_tool/batched_ops_stress.cc b/db_stress_tool/batched_ops_stress.cc index ab5155f90..1aa1cc264 100644 --- a/db_stress_tool/batched_ops_stress.cc +++ b/db_stress_tool/batched_ops_stress.cc @@ -34,7 +34,8 @@ class BatchedOpsStressTest : public StressTest { std::string values[10] = {"9", "8", "7", "6", "5", "4", "3", "2", "1", "0"}; Slice value_slices[10]; WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, - FLAGS_batch_protection_bytes_per_key); + FLAGS_batch_protection_bytes_per_key, + FLAGS_user_timestamp_size); Status s; auto cfh = column_families_[rand_column_families[0]]; std::string key_str = Key(rand_keys[0]); @@ -70,7 +71,8 @@ class BatchedOpsStressTest : public StressTest { std::string keys[10] = {"9", "7", "5", "3", "1", "8", "6", "4", "2", "0"}; WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, - FLAGS_batch_protection_bytes_per_key); + FLAGS_batch_protection_bytes_per_key, + FLAGS_user_timestamp_size); Status s; auto cfh = column_families_[rand_column_families[0]]; std::string key_str = Key(rand_keys[0]); diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 6f532c888..b1cfee9c2 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -513,9 +513,10 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys, if (FLAGS_user_timestamp_size > 0) { ts_str = NowNanosStr(); ts = ts_str; - write_opts.timestamp = &ts; + s = db_->Put(write_opts, cfh, key, ts, v); + } else { + s = db_->Put(write_opts, cfh, key, v); } - s = db_->Put(write_opts, cfh, key, v); } else { #ifndef ROCKSDB_LITE Transaction* txn; @@ -878,7 +879,6 @@ void StressTest::OperateDb(ThreadState* thread) { read_opts.timestamp = &read_ts; write_ts_str = NowNanosStr(); write_ts = write_ts_str; - write_opts.timestamp = &write_ts; } int prob_op = thread->rand.Uniform(100); diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index aff6d7452..fdabef965 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -500,9 +500,12 @@ class NonBatchedOpsStressTest : public StressTest { if (FLAGS_user_timestamp_size > 0) { write_ts_str = NowNanosStr(); write_ts = write_ts_str; - write_opts.timestamp = &write_ts; } } + if (write_ts.size() == 0 && FLAGS_user_timestamp_size) { + write_ts_str = NowNanosStr(); + write_ts = write_ts_str; + } std::string key_str = Key(rand_key); Slice key = key_str; @@ -540,7 +543,11 @@ class NonBatchedOpsStressTest : public StressTest { } } else { if (!FLAGS_use_txn) { - s = db_->Put(write_opts, cfh, key, v); + if (FLAGS_user_timestamp_size == 0) { + s = db_->Put(write_opts, cfh, key, v); + } else { + s = db_->Put(write_opts, cfh, key, write_ts, v); + } } else { #ifndef ROCKSDB_LITE Transaction* txn; @@ -599,9 +606,12 @@ class NonBatchedOpsStressTest : public StressTest { if (FLAGS_user_timestamp_size > 0) { write_ts_str = NowNanosStr(); write_ts = write_ts_str; - write_opts.timestamp = &write_ts; } } + if (write_ts.size() == 0 && FLAGS_user_timestamp_size) { + write_ts_str = NowNanosStr(); + write_ts = write_ts_str; + } std::string key_str = Key(rand_key); Slice key = key_str; @@ -613,7 +623,11 @@ class NonBatchedOpsStressTest : public StressTest { if (shared->AllowsOverwrite(rand_key)) { shared->Delete(rand_column_family, rand_key, true /* pending */); if (!FLAGS_use_txn) { - s = db_->Delete(write_opts, cfh, key); + if (FLAGS_user_timestamp_size == 0) { + s = db_->Delete(write_opts, cfh, key); + } else { + s = db_->Delete(write_opts, cfh, key, write_ts); + } } else { #ifndef ROCKSDB_LITE Transaction* txn; @@ -646,7 +660,11 @@ class NonBatchedOpsStressTest : public StressTest { } else { shared->SingleDelete(rand_column_family, rand_key, true /* pending */); if (!FLAGS_use_txn) { - s = db_->SingleDelete(write_opts, cfh, key); + if (FLAGS_user_timestamp_size == 0) { + s = db_->SingleDelete(write_opts, cfh, key); + } else { + s = db_->SingleDelete(write_opts, cfh, key, write_ts); + } } else { #ifndef ROCKSDB_LITE Transaction* txn; diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 10b274877..e792073ad 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -356,10 +356,17 @@ class DB { virtual Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) = 0; + virtual Status Put(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& ts, const Slice& value) = 0; virtual Status Put(const WriteOptions& options, const Slice& key, const Slice& value) { return Put(options, DefaultColumnFamily(), key, value); } + virtual Status Put(const WriteOptions& options, const Slice& key, + const Slice& ts, const Slice& value) { + return Put(options, DefaultColumnFamily(), key, ts, value); + } // Remove the database entry (if any) for "key". Returns OK on // success, and a non-OK status on error. It is not an error if "key" @@ -368,9 +375,16 @@ class DB { virtual Status Delete(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key) = 0; + virtual Status Delete(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& ts) = 0; virtual Status Delete(const WriteOptions& options, const Slice& key) { return Delete(options, DefaultColumnFamily(), key); } + virtual Status Delete(const WriteOptions& options, const Slice& key, + const Slice& ts) { + return Delete(options, DefaultColumnFamily(), key, ts); + } // Remove the database entry for "key". Requires that the key exists // and was not overwritten. Returns OK on success, and a non-OK status @@ -391,9 +405,16 @@ class DB { virtual Status SingleDelete(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key) = 0; + virtual Status SingleDelete(const WriteOptions& options, + ColumnFamilyHandle* column_family, + const Slice& key, const Slice& ts) = 0; virtual Status SingleDelete(const WriteOptions& options, const Slice& key) { return SingleDelete(options, DefaultColumnFamily(), key); } + virtual Status SingleDelete(const WriteOptions& options, const Slice& key, + const Slice& ts) { + return SingleDelete(options, DefaultColumnFamily(), key, ts); + } // Removes the database entries in the range ["begin_key", "end_key"), i.e., // including "begin_key" and excluding "end_key". Returns OK on success, and @@ -416,6 +437,13 @@ class DB { virtual Status DeleteRange(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& begin_key, const Slice& end_key); + virtual Status DeleteRange(const WriteOptions& /*options*/, + ColumnFamilyHandle* /*column_family*/, + const Slice& /*begin_key*/, + const Slice& /*end_key*/, const Slice& /*ts*/) { + return Status::NotSupported( + "DeleteRange does not support user-defined timestamp yet"); + } // Merge the database entry for "key" with "value". Returns OK on success, // and a non-OK status on error. The semantics of this operation is @@ -428,6 +456,13 @@ class DB { const Slice& value) { return Merge(options, DefaultColumnFamily(), key, value); } + virtual Status Merge(const WriteOptions& /*options*/, + ColumnFamilyHandle* /*column_family*/, + const Slice& /*key*/, const Slice& /*ts*/, + const Slice& /*value*/) { + return Status::NotSupported( + "Merge does not support user-defined timestamp yet"); + } // Apply the specified updates to the database. // If `updates` contains no update, WAL will still be synced if diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 390347a0d..7a9cbf378 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1661,25 +1661,13 @@ struct WriteOptions { // Default: false bool memtable_insert_hint_per_batch; - // Timestamp of write operation, e.g. Put. All timestamps of the same - // database must share the same length and format. The user is also - // responsible for providing a customized compare function via Comparator to - // order tuples. If the user wants to enable timestamp, then - // all write operations must be associated with timestamp because RocksDB, as - // a single-node storage engine currently has no knowledge of global time, - // thus has to rely on the application. - // The user-specified timestamp feature is still under active development, - // and the API is subject to change. - const Slice* timestamp; - WriteOptions() : sync(false), disableWAL(false), ignore_missing_column_families(false), no_slowdown(false), low_pri(false), - memtable_insert_hint_per_batch(false), - timestamp(nullptr) {} + memtable_insert_hint_per_batch(false) {} }; // Options that control flush operations diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index b8f792a45..5bdd8d4a6 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -80,6 +80,10 @@ class StackableDB : public DB { const Slice& val) override { return db_->Put(options, column_family, key, val); } + Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family, + const Slice& key, const Slice& ts, const Slice& val) override { + return db_->Put(options, column_family, key, ts, val); + } using DB::Get; virtual Status Get(const ReadOptions& options, @@ -166,6 +170,10 @@ class StackableDB : public DB { const Slice& key) override { return db_->Delete(wopts, column_family, key); } + Status Delete(const WriteOptions& wopts, ColumnFamilyHandle* column_family, + const Slice& key, const Slice& ts) override { + return db_->Delete(wopts, column_family, key, ts); + } using DB::SingleDelete; virtual Status SingleDelete(const WriteOptions& wopts, @@ -173,6 +181,18 @@ class StackableDB : public DB { const Slice& key) override { return db_->SingleDelete(wopts, column_family, key); } + Status SingleDelete(const WriteOptions& wopts, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& ts) override { + return db_->SingleDelete(wopts, column_family, key, ts); + } + + using DB::DeleteRange; + Status DeleteRange(const WriteOptions& wopts, + ColumnFamilyHandle* column_family, const Slice& start_key, + const Slice& end_key) override { + return db_->DeleteRange(wopts, column_family, start_key, end_key); + } using DB::Merge; virtual Status Merge(const WriteOptions& options, diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index 6a54bfe51..627f266c5 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -371,6 +371,7 @@ class TransactionDB : public StackableDB { // used and `skip_concurrency_control` must be set. When using either // WRITE_PREPARED or WRITE_UNPREPARED , `skip_duplicate_key_check` must // additionally be set. + using StackableDB::DeleteRange; virtual Status DeleteRange(const WriteOptions&, ColumnFamilyHandle*, const Slice&, const Slice&) override { return Status::NotSupported(); diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index 65feaa7b8..828a5c8d2 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -110,20 +110,32 @@ class WriteBatchWithIndex : public WriteBatchBase { Status Put(const Slice& key, const Slice& value) override; + Status Put(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& ts, const Slice& value) override; + using WriteBatchBase::Merge; Status Merge(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) override; Status Merge(const Slice& key, const Slice& value) override; + Status Merge(ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/, + const Slice& /*ts*/, const Slice& /*value*/) override { + return Status::NotSupported( + "Merge does not support user-defined timestamp"); + } using WriteBatchBase::Delete; Status Delete(ColumnFamilyHandle* column_family, const Slice& key) override; Status Delete(const Slice& key) override; + Status Delete(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& ts) override; using WriteBatchBase::SingleDelete; Status SingleDelete(ColumnFamilyHandle* column_family, const Slice& key) override; Status SingleDelete(const Slice& key) override; + Status SingleDelete(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& ts) override; using WriteBatchBase::DeleteRange; Status DeleteRange(ColumnFamilyHandle* /* column_family */, @@ -137,6 +149,12 @@ class WriteBatchWithIndex : public WriteBatchBase { return Status::NotSupported( "DeleteRange unsupported in WriteBatchWithIndex"); } + Status DeleteRange(ColumnFamilyHandle* /*column_family*/, + const Slice& /*begin_key*/, const Slice& /*end_key*/, + const Slice& /*ts*/) override { + return Status::NotSupported( + "DeleteRange unsupported in WriteBatchWithIndex"); + } using WriteBatchBase::PutLogData; Status PutLogData(const Slice& blob) override; diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 94ab7e62c..4c1462478 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -68,7 +68,7 @@ class WriteBatch : public WriteBatchBase { // protection information for each key entry. Currently supported values are // zero (disabled) and eight. explicit WriteBatch(size_t reserved_bytes, size_t max_bytes, - size_t protection_bytes_per_key); + size_t protection_bytes_per_key, size_t default_cf_ts_sz); ~WriteBatch() override; using WriteBatchBase::Put; @@ -82,6 +82,8 @@ class WriteBatch : public WriteBatchBase { Status Put(const Slice& key, const Slice& value) override { return Put(nullptr, key, value); } + Status Put(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& ts, const Slice& value) override; // Variant of Put() that gathers output like writev(2). The key and value // that will be written to the database are concatenations of arrays of @@ -104,6 +106,8 @@ class WriteBatch : public WriteBatchBase { // up the memory buffer pointed to by `key`. Status Delete(ColumnFamilyHandle* column_family, const Slice& key) override; Status Delete(const Slice& key) override { return Delete(nullptr, key); } + Status Delete(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& ts) override; // variant that takes SliceParts // These two variants of Delete(..., const SliceParts& key) can be used when @@ -121,6 +125,8 @@ class WriteBatch : public WriteBatchBase { Status SingleDelete(const Slice& key) override { return SingleDelete(nullptr, key); } + Status SingleDelete(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& ts) override; // variant that takes SliceParts Status SingleDelete(ColumnFamilyHandle* column_family, @@ -136,6 +142,12 @@ class WriteBatch : public WriteBatchBase { Status DeleteRange(const Slice& begin_key, const Slice& end_key) override { return DeleteRange(nullptr, begin_key, end_key); } + Status DeleteRange(ColumnFamilyHandle* /*column_family*/, + const Slice& /*begin_key*/, const Slice& /*end_key*/, + const Slice& /*ts*/) override { + return Status::NotSupported( + "DeleteRange does not support user-defined timestamp"); + } // variant that takes SliceParts Status DeleteRange(ColumnFamilyHandle* column_family, @@ -154,6 +166,11 @@ class WriteBatch : public WriteBatchBase { Status Merge(const Slice& key, const Slice& value) override { return Merge(nullptr, key, value); } + Status Merge(ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/, + const Slice& /*ts*/, const Slice& /*value*/) override { + return Status::NotSupported( + "Merge does not support user-defined timestamp"); + } // variant that takes SliceParts Status Merge(ColumnFamilyHandle* column_family, const SliceParts& key, @@ -343,55 +360,25 @@ class WriteBatch : public WriteBatchBase { bool HasRollback() const; // Experimental. - // Assign timestamp to write batch. + // + // Update timestamps of existing entries in the write batch if + // applicable. If a key is intended for a column family that disables + // timestamp, then this API won't set the timestamp for this key. // This requires that all keys, if enable timestamp, (possibly from multiple // column families) in the write batch have timestamps of the same format. // - // checker: callable object to check the timestamp sizes of column families. + // ts_sz_func: callable object to obtain the timestamp sizes of column + // families. If ts_sz_func() accesses data structures, then the caller of this + // API must guarantee thread-safety. Like other parts of RocksDB, this API is + // not exception-safe. Therefore, ts_sz_func() must not throw. // // in: cf, the column family id. - // in/out: ts_sz. Input as the expected timestamp size of the column - // family, output as the actual timestamp size of the column family. - // ret: OK if assignment succeeds. - // Status checker(uint32_t cf, size_t& ts_sz); - // - // User can call checker(uint32_t cf, size_t& ts_sz) which does the - // following: - // 1. find out the timestamp size of the column family whose id equals `cf`. - // 2. if cf's timestamp size is 0, then set ts_sz to 0 and return OK. - // 3. otherwise, compare ts_sz with cf's timestamp size and return - // Status::InvalidArgument() if different. - Status AssignTimestamp( - const Slice& ts, - std::function checker = - [](uint32_t /*cf*/, size_t& /*ts_sz*/) { return Status::OK(); }); - - // Experimental. - // Assign timestamps to write batch. - // This API allows the write batch to include keys from multiple column - // families whose timestamps' formats can differ. For example, some column - // families can enable timestamp, while others disable the feature. - // If key does not have timestamp, then put an empty Slice in ts_list as - // a placeholder. - // - // checker: callable object specified by caller to check the timestamp sizes - // of column families. - // - // in: cf, the column family id. - // in/out: ts_sz. Input as the expected timestamp size of the column - // family, output as the actual timestamp size of the column family. - // ret: OK if assignment succeeds. - // Status checker(uint32_t cf, size_t& ts_sz); - // - // User can call checker(uint32_t cf, size_t& ts_sz) which does the - // following: - // 1. find out the timestamp size of the column family whose id equals `cf`. - // 2. compare ts_sz with cf's timestamp size and return - // Status::InvalidArgument() if different. - Status AssignTimestamps( - const std::vector& ts_list, - std::function checker = - [](uint32_t /*cf*/, size_t& /*ts_sz*/) { return Status::OK(); }); + // ret: timestamp size of the given column family. Return + // std::numeric_limits::max() indicating "dont know or column + // family info not found", this will cause UpdateTimestamps() to fail. + // size_t ts_sz_func(uint32_t cf); + Status UpdateTimestamps(const Slice& ts, + std::function ts_sz_func); using WriteBatchBase::GetWriteBatch; WriteBatch* GetWriteBatch() override { return this; } @@ -446,6 +433,17 @@ class WriteBatch : public WriteBatchBase { std::unique_ptr prot_info_; + size_t default_cf_ts_sz_ = 0; + + // False if all keys are from column families that disable user-defined + // timestamp OR UpdateTimestamps() has been called at least once. + // This flag will be set to true if any of the above Put(), Delete(), + // SingleDelete(), etc. APIs are called at least once. + // Calling Put(ts), Delete(ts), SingleDelete(ts), etc. will not set this flag + // to true because the assumption is that these APIs have already set the + // timestamps to desired values. + bool needs_in_place_update_ts_ = false; + protected: std::string rep_; // See comment in write_batch.cc for the format of rep_ }; diff --git a/include/rocksdb/write_batch_base.h b/include/rocksdb/write_batch_base.h index 19ff877e7..480a922e3 100644 --- a/include/rocksdb/write_batch_base.h +++ b/include/rocksdb/write_batch_base.h @@ -31,6 +31,8 @@ class WriteBatchBase { virtual Status Put(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) = 0; virtual Status Put(const Slice& key, const Slice& value) = 0; + virtual Status Put(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& ts, const Slice& value) = 0; // Variant of Put() that gathers output like writev(2). The key and value // that will be written to the database are concatenations of arrays of @@ -44,6 +46,8 @@ class WriteBatchBase { virtual Status Merge(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) = 0; virtual Status Merge(const Slice& key, const Slice& value) = 0; + virtual Status Merge(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& ts, const Slice& value) = 0; // variant that takes SliceParts virtual Status Merge(ColumnFamilyHandle* column_family, const SliceParts& key, @@ -54,6 +58,8 @@ class WriteBatchBase { virtual Status Delete(ColumnFamilyHandle* column_family, const Slice& key) = 0; virtual Status Delete(const Slice& key) = 0; + virtual Status Delete(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& ts) = 0; // variant that takes SliceParts virtual Status Delete(ColumnFamilyHandle* column_family, @@ -65,6 +71,8 @@ class WriteBatchBase { virtual Status SingleDelete(ColumnFamilyHandle* column_family, const Slice& key) = 0; virtual Status SingleDelete(const Slice& key) = 0; + virtual Status SingleDelete(ColumnFamilyHandle* column_family, + const Slice& key, const Slice& ts) = 0; // variant that takes SliceParts virtual Status SingleDelete(ColumnFamilyHandle* column_family, @@ -76,6 +84,9 @@ class WriteBatchBase { virtual Status DeleteRange(ColumnFamilyHandle* column_family, const Slice& begin_key, const Slice& end_key) = 0; virtual Status DeleteRange(const Slice& begin_key, const Slice& end_key) = 0; + virtual Status DeleteRange(ColumnFamilyHandle* column_family, + const Slice& begin_key, const Slice& end_key, + const Slice& ts) = 0; // variant that takes SliceParts virtual Status DeleteRange(ColumnFamilyHandle* column_family, diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 7cb59e500..704d3814d 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -4794,7 +4794,7 @@ class Benchmark { RandomGenerator gen; WriteBatch batch(/*reserved_bytes=*/0, /*max_bytes=*/0, - user_timestamp_size_); + /*protection_bytes_per_key=*/0, user_timestamp_size_); Status s; int64_t bytes = 0; @@ -5122,7 +5122,8 @@ class Benchmark { } if (user_timestamp_size_ > 0) { Slice user_ts = mock_app_clock_->Allocate(ts_guard.get()); - s = batch.AssignTimestamp(user_ts); + s = batch.UpdateTimestamps( + user_ts, [this](uint32_t) { return user_timestamp_size_; }); if (!s.ok()) { fprintf(stderr, "assign timestamp to write batch: %s\n", s.ToString().c_str()); @@ -6512,7 +6513,7 @@ class Benchmark { void DoDelete(ThreadState* thread, bool seq) { WriteBatch batch(/*reserved_bytes=*/0, /*max_bytes=*/0, - user_timestamp_size_); + /*protection_bytes_per_key=*/0, user_timestamp_size_); Duration duration(seq ? 0 : FLAGS_duration, deletes_); int64_t i = 0; std::unique_ptr key_guard; @@ -6534,7 +6535,8 @@ class Benchmark { Status s; if (user_timestamp_size_ > 0) { ts = mock_app_clock_->Allocate(ts_guard.get()); - s = batch.AssignTimestamp(ts); + s = batch.UpdateTimestamps( + ts, [this](uint32_t) { return user_timestamp_size_; }); if (!s.ok()) { fprintf(stderr, "assign timestamp: %s\n", s.ToString().c_str()); ErrorExit(); @@ -6628,17 +6630,17 @@ class Benchmark { Slice ts; if (user_timestamp_size_ > 0) { ts = mock_app_clock_->Allocate(ts_guard.get()); - write_options_.timestamp = &ts; } if (write_merge == kWrite) { - s = db->Put(write_options_, key, val); + if (user_timestamp_size_ == 0) { + s = db->Put(write_options_, key, val); + } else { + s = db->Put(write_options_, key, ts, val); + } } else { s = db->Merge(write_options_, key, val); } // Restore write_options_ - if (user_timestamp_size_ > 0) { - write_options_.timestamp = nullptr; - } written++; if (!s.ok()) { @@ -6711,7 +6713,7 @@ class Benchmark { std::string keys[3]; WriteBatch batch(/*reserved_bytes=*/0, /*max_bytes=*/0, - user_timestamp_size_); + /*protection_bytes_per_key=*/0, user_timestamp_size_); Status s; for (int i = 0; i < 3; i++) { keys[i] = key.ToString() + suffixes[i]; @@ -6722,7 +6724,8 @@ class Benchmark { if (user_timestamp_size_ > 0) { ts_guard.reset(new char[user_timestamp_size_]); Slice ts = mock_app_clock_->Allocate(ts_guard.get()); - s = batch.AssignTimestamp(ts); + s = batch.UpdateTimestamps( + ts, [this](uint32_t) { return user_timestamp_size_; }); if (!s.ok()) { fprintf(stderr, "assign timestamp to batch: %s\n", s.ToString().c_str()); @@ -6742,7 +6745,8 @@ class Benchmark { std::string suffixes[3] = {"1", "2", "0"}; std::string keys[3]; - WriteBatch batch(0, 0, user_timestamp_size_); + WriteBatch batch(0, 0, /*protection_bytes_per_key=*/0, + user_timestamp_size_); Status s; for (int i = 0; i < 3; i++) { keys[i] = key.ToString() + suffixes[i]; @@ -6753,7 +6757,8 @@ class Benchmark { if (user_timestamp_size_ > 0) { ts_guard.reset(new char[user_timestamp_size_]); Slice ts = mock_app_clock_->Allocate(ts_guard.get()); - s = batch.AssignTimestamp(ts); + s = batch.UpdateTimestamps( + ts, [this](uint32_t) { return user_timestamp_size_; }); if (!s.ok()) { fprintf(stderr, "assign timestamp to batch: %s\n", s.ToString().c_str()); @@ -6940,12 +6945,13 @@ class Benchmark { } else if (put_weight > 0) { // then do all the corresponding number of puts // for all the gets we have done earlier - Slice ts; + Status s; if (user_timestamp_size_ > 0) { - ts = mock_app_clock_->Allocate(ts_guard.get()); - write_options_.timestamp = &ts; + Slice ts = mock_app_clock_->Allocate(ts_guard.get()); + s = db->Put(write_options_, key, ts, gen.Generate()); + } else { + s = db->Put(write_options_, key, gen.Generate()); } - Status s = db->Put(write_options_, key, gen.Generate()); if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); ErrorExit(); @@ -7006,11 +7012,13 @@ class Benchmark { } Slice val = gen.Generate(); + Status s; if (user_timestamp_size_ > 0) { ts = mock_app_clock_->Allocate(ts_guard.get()); - write_options_.timestamp = &ts; + s = db->Put(write_options_, key, ts, val); + } else { + s = db->Put(write_options_, key, val); } - Status s = db->Put(write_options_, key, val); if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); exit(1); @@ -7073,12 +7081,13 @@ class Benchmark { xor_operator.XOR(nullptr, value, &new_value); } + Status s; if (user_timestamp_size_ > 0) { ts = mock_app_clock_->Allocate(ts_guard.get()); - write_options_.timestamp = &ts; + s = db->Put(write_options_, key, ts, Slice(new_value)); + } else { + s = db->Put(write_options_, key, Slice(new_value)); } - - Status s = db->Put(write_options_, key, Slice(new_value)); if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); ErrorExit(); @@ -7139,13 +7148,14 @@ class Benchmark { } value.append(operand.data(), operand.size()); + Status s; if (user_timestamp_size_ > 0) { ts = mock_app_clock_->Allocate(ts_guard.get()); - write_options_.timestamp = &ts; + s = db->Put(write_options_, key, ts, value); + } else { + // Write back to the database + s = db->Put(write_options_, key, value); } - - // Write back to the database - Status s = db->Put(write_options_, key, value); if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); ErrorExit(); @@ -7521,12 +7531,12 @@ class Benchmark { DB* db = SelectDB(thread); for (int64_t i = 0; i < FLAGS_numdistinct; i++) { GenerateKeyFromInt(i * max_counter, FLAGS_num, &key); - Slice ts; if (user_timestamp_size_ > 0) { - ts = mock_app_clock_->Allocate(ts_guard.get()); - write_options_.timestamp = &ts; + Slice ts = mock_app_clock_->Allocate(ts_guard.get()); + s = db->Put(write_options_, key, ts, gen.Generate()); + } else { + s = db->Put(write_options_, key, gen.Generate()); } - s = db->Put(write_options_, key, gen.Generate()); if (!s.ok()) { fprintf(stderr, "Operation failed: %s\n", s.ToString().c_str()); exit(1); @@ -7545,22 +7555,24 @@ class Benchmark { static_cast(0)); GenerateKeyFromInt(key_id * max_counter + counters[key_id], FLAGS_num, &key); - Slice ts; if (user_timestamp_size_ > 0) { - ts = mock_app_clock_->Allocate(ts_guard.get()); - write_options_.timestamp = &ts; + Slice ts = mock_app_clock_->Allocate(ts_guard.get()); + s = FLAGS_use_single_deletes ? db->SingleDelete(write_options_, key, ts) + : db->Delete(write_options_, key, ts); + } else { + s = FLAGS_use_single_deletes ? db->SingleDelete(write_options_, key) + : db->Delete(write_options_, key); } - s = FLAGS_use_single_deletes ? db->SingleDelete(write_options_, key) - : db->Delete(write_options_, key); if (s.ok()) { counters[key_id] = (counters[key_id] + 1) % max_counter; GenerateKeyFromInt(key_id * max_counter + counters[key_id], FLAGS_num, &key); if (user_timestamp_size_ > 0) { - ts = mock_app_clock_->Allocate(ts_guard.get()); - write_options_.timestamp = &ts; + Slice ts = mock_app_clock_->Allocate(ts_guard.get()); + s = db->Put(write_options_, key, ts, Slice()); + } else { + s = db->Put(write_options_, key, Slice()); } - s = db->Put(write_options_, key, Slice()); } if (!s.ok()) { diff --git a/utilities/blob_db/blob_db.h b/utilities/blob_db/blob_db.h index 27c3aaf31..ab430f8c0 100644 --- a/utilities/blob_db/blob_db.h +++ b/utilities/blob_db/blob_db.h @@ -200,6 +200,7 @@ class BlobDB : public StackableDB { virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override = 0; + using ROCKSDB_NAMESPACE::StackableDB::NewIterator; virtual Iterator* NewIterator(const ReadOptions& options) override = 0; virtual Iterator* NewIterator(const ReadOptions& options, diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index 3282011f3..07d881136 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -128,6 +128,7 @@ class BlobDBImpl : public BlobDB { const std::vector& keys, std::vector* values) override; + using BlobDB::Write; virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; virtual Status Close() override; diff --git a/utilities/transactions/optimistic_transaction_db_impl.h b/utilities/transactions/optimistic_transaction_db_impl.h index a23d9a06d..49651be01 100644 --- a/utilities/transactions/optimistic_transaction_db_impl.h +++ b/utilities/transactions/optimistic_transaction_db_impl.h @@ -47,6 +47,7 @@ class OptimisticTransactionDBImpl : public OptimisticTransactionDB { Transaction* old_txn) override; // Transactional `DeleteRange()` is not yet supported. + using StackableDB::DeleteRange; virtual Status DeleteRange(const WriteOptions&, ColumnFamilyHandle*, const Slice&, const Slice&) override { return Status::NotSupported(); diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index 7715d8d99..992a99cc1 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -246,6 +246,7 @@ Status WritePreparedTxnDB::Get(const ReadOptions& options, backed_by_snapshot))) { return res; } else { + res.PermitUncheckedError(); WPRecordTick(TXN_GET_TRY_AGAIN); return Status::TryAgain(); } diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 6ad54f219..704549f17 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -322,6 +322,16 @@ Status WriteBatchWithIndex::Put(const Slice& key, const Slice& value) { return s; } +Status WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family, + const Slice& /*key*/, const Slice& /*ts*/, + const Slice& /*value*/) { + if (!column_family) { + return Status::InvalidArgument("column family handle cannot be nullptr"); + } + // TODO: support WBWI::Put() with timestamp. + return Status::NotSupported(); +} + Status WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family, const Slice& key) { rep->SetLastEntryOffset(); @@ -341,6 +351,15 @@ Status WriteBatchWithIndex::Delete(const Slice& key) { return s; } +Status WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family, + const Slice& /*key*/, const Slice& /*ts*/) { + if (!column_family) { + return Status::InvalidArgument("column family handle cannot be nullptr"); + } + // TODO: support WBWI::Delete() with timestamp. + return Status::NotSupported(); +} + Status WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family, const Slice& key) { rep->SetLastEntryOffset(); @@ -360,6 +379,16 @@ Status WriteBatchWithIndex::SingleDelete(const Slice& key) { return s; } +Status WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family, + const Slice& /*key*/, + const Slice& /*ts*/) { + if (!column_family) { + return Status::InvalidArgument("column family handle cannot be nullptr"); + } + // TODO: support WBWI::SingleDelete() with timestamp. + return Status::NotSupported(); +} + Status WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { rep->SetLastEntryOffset();