diff --git a/HISTORY.md b/HISTORY.md index e02fe361c..84d6e852a 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,6 +5,7 @@ ### New Features * Add support for key-value integrity protection in live updates from the user buffers provided to `WriteBatch` through the write to RocksDB's in-memory update buffer (memtable). This is intended to detect some cases of in-memory data corruption, due to either software or hardware errors. Users can enable protection by constructing their `WriteBatch` with `protection_bytes_per_key == 8`. +* Add support for updating `full_history_ts_low` option in manual compaction, which is for old timestamp data GC. ### Bug Fixes * Since 6.15.0, `TransactionDB` returns error `Status`es from calls to `DeleteRange()` and calls to `Write()` where the `WriteBatch` contains a range deletion. Previously such operations may have succeeded while not providing the expected transactional guarantees. There are certain cases where range deletion can still be used on such DBs; see the API doc on `TransactionDB::DeleteRange()` for details. diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 98d95927a..470135612 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1871,6 +1871,8 @@ class DBImpl : public DB { Status DisableFileDeletionsWithLock(); + Status IncreaseFullHistoryTsLow(ColumnFamilyData* cfd, std::string ts_low); + // table_cache_ provides its own synchronization std::shared_ptr table_cache_; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 28f570d54..3d120b0f6 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -806,6 +806,25 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options, end_with_ts); } +Status DBImpl::IncreaseFullHistoryTsLow(ColumnFamilyData* cfd, + std::string ts_low) { + VersionEdit edit; + edit.SetColumnFamily(cfd->GetID()); + edit.SetFullHistoryTsLow(ts_low); + + InstrumentedMutexLock l(&mutex_); + std::string current_ts_low = cfd->GetFullHistoryTsLow(); + const Comparator* ucmp = cfd->user_comparator(); + if (!current_ts_low.empty() && + ucmp->CompareTimestamp(ts_low, current_ts_low) < 0) { + return Status::InvalidArgument( + "Cannot decrease full_history_timestamp_low"); + } + + return versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit, + &mutex_); +} + Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options, ColumnFamilyHandle* column_family, const Slice* begin, const Slice* end) { @@ -817,6 +836,22 @@ Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options, } bool flush_needed = true; + + // Update full_history_ts_low if it's set + if (options.full_history_ts_low != nullptr && + !options.full_history_ts_low->empty()) { + std::string ts_low = options.full_history_ts_low->ToString(); + if (begin != nullptr || end != nullptr) { + return Status::InvalidArgument( + "Cannot specify compaction range with full_history_ts_low"); + } + Status s = IncreaseFullHistoryTsLow(cfd, ts_low); + if (!s.ok()) { + LogFlush(immutable_db_options_.info_log); + return s; + } + } + Status s; if (begin != nullptr && end != nullptr) { // TODO(ajkr): We could also optimize away the flush in certain cases where diff --git a/db/db_with_timestamp_basic_test.cc b/db/db_with_timestamp_basic_test.cc index 5d99814c2..9eca45027 100644 --- a/db/db_with_timestamp_basic_test.cc +++ b/db/db_with_timestamp_basic_test.cc @@ -223,6 +223,111 @@ TEST_F(DBBasicTestWithTimestamp, CompactRangeWithSpecifiedRange) { Close(); } +TEST_F(DBBasicTestWithTimestamp, UpdateFullHistoryTsLow) { + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + DestroyAndReopen(options); + + const std::string kKey = "test kKey"; + + // Test set ts_low first and flush() + int current_ts_low = 5; + std::string ts_low_str = Timestamp(current_ts_low, 0); + Slice ts_low = ts_low_str; + CompactRangeOptions comp_opts; + comp_opts.full_history_ts_low = &ts_low; + comp_opts.bottommost_level_compaction = BottommostLevelCompaction::kForce; + + ASSERT_OK(db_->CompactRange(comp_opts, nullptr, nullptr)); + + auto* cfd = + static_cast_with_check(db_->DefaultColumnFamily()) + ->cfd(); + auto result_ts_low = cfd->GetFullHistoryTsLow(); + + ASSERT_TRUE(test_cmp.CompareTimestamp(ts_low, result_ts_low) == 0); + + for (int i = 0; i < 10; i++) { + WriteOptions write_opts; + std::string ts_str = Timestamp(i, 0); + Slice ts = ts_str; + write_opts.timestamp = &ts; + ASSERT_OK(db_->Put(write_opts, kKey, Key(i))); + } + ASSERT_OK(Flush()); + + for (int i = 0; i < 10; i++) { + ReadOptions read_opts; + std::string ts_str = Timestamp(i, 0); + Slice ts = ts_str; + read_opts.timestamp = &ts; + std::string value; + Status status = db_->Get(read_opts, kKey, &value); + if (i < current_ts_low) { + ASSERT_TRUE(status.IsNotFound()); + } else { + ASSERT_OK(status); + ASSERT_TRUE(value.compare(Key(i)) == 0); + } + } + + // Test set ts_low and then trigger compaction + for (int i = 10; i < 20; i++) { + WriteOptions write_opts; + std::string ts_str = Timestamp(i, 0); + Slice ts = ts_str; + write_opts.timestamp = &ts; + ASSERT_OK(db_->Put(write_opts, kKey, Key(i))); + } + + ASSERT_OK(Flush()); + + current_ts_low = 15; + ts_low_str = Timestamp(current_ts_low, 0); + ts_low = ts_low_str; + comp_opts.full_history_ts_low = &ts_low; + ASSERT_OK(db_->CompactRange(comp_opts, nullptr, nullptr)); + result_ts_low = cfd->GetFullHistoryTsLow(); + ASSERT_TRUE(test_cmp.CompareTimestamp(ts_low, result_ts_low) == 0); + + for (int i = 0; i < 20; i++) { + ReadOptions read_opts; + std::string ts_str = Timestamp(i, 0); + Slice ts = ts_str; + read_opts.timestamp = &ts; + std::string value; + Status status = db_->Get(read_opts, kKey, &value); + if (i < current_ts_low) { + ASSERT_TRUE(status.IsNotFound()); + } else { + ASSERT_OK(status); + ASSERT_TRUE(value.compare(Key(i)) == 0); + } + } + + // Test invalid compaction with range + Slice start(kKey), end(kKey); + Status s = db_->CompactRange(comp_opts, &start, &end); + ASSERT_TRUE(s.IsInvalidArgument()); + s = db_->CompactRange(comp_opts, &start, nullptr); + ASSERT_TRUE(s.IsInvalidArgument()); + s = db_->CompactRange(comp_opts, nullptr, &end); + ASSERT_TRUE(s.IsInvalidArgument()); + + // Test invalid compaction with the decreasing ts_low + ts_low_str = Timestamp(current_ts_low - 1, 0); + ts_low = ts_low_str; + comp_opts.full_history_ts_low = &ts_low; + s = db_->CompactRange(comp_opts, nullptr, nullptr); + ASSERT_TRUE(s.IsInvalidArgument()); + + Close(); +} + TEST_F(DBBasicTestWithTimestamp, GetApproximateSizes) { Options options = CurrentOptions(); options.write_buffer_size = 100000000; // Large write buffer diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 4404c4e29..ab6262007 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1580,6 +1580,9 @@ struct CompactRangeOptions { bool allow_write_stall = false; // If > 0, it will replace the option in the DBOptions for this compaction. uint32_t max_subcompactions = 0; + // Set user-defined timestamp low bound, the data with older timestamp than + // low bound maybe GCed by compaction. Default: nullptr + Slice* full_history_ts_low = nullptr; }; // IngestExternalFileOptions is used by IngestExternalFile()