rocksdb/db/compaction/compaction_iterator.h
Yanqin Jin 9f1c84ca47 Fix a bug in compaction iterator with timestamp (#7645)
Summary:
https://github.com/facebook/rocksdb/issues/7556 introduced support for compaction iterator to perform timestamp-aware garbage collection.
However, there was a bug. The comparison between `ikey_.user_key` and `current_user_key_` should happen
before `key_ = current_key_.SetInternalKey(key_, &ikey_);` (line 336 of compaction_iterator.cc).
Otherwise, after this line, `current_key_` is always the same as `ikey_.user_key`.

This PR also re-arranged the order of some data members because some of them are state variables of `CompactionIterator` while others are inputs from callers.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7645

Test Plan: make check

Reviewed By: ltamasi

Differential Revision: D24845028

Pulled By: riversand963

fbshipit-source-id: c7e79914832701462b86867e8463cd463b6c0c25
2020-11-09 18:23:31 -08:00

284 lines
11 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <algorithm>
#include <deque>
#include <string>
#include <unordered_set>
#include <vector>
#include "db/compaction/compaction.h"
#include "db/compaction/compaction_iteration_stats.h"
#include "db/merge_helper.h"
#include "db/pinned_iterators_manager.h"
#include "db/range_del_aggregator.h"
#include "db/snapshot_checker.h"
#include "options/cf_options.h"
#include "rocksdb/compaction_filter.h"
namespace ROCKSDB_NAMESPACE {
class BlobFileBuilder;
class CompactionIterator {
public:
// A wrapper around Compaction. Has a much smaller interface, only what
// CompactionIterator uses. Tests can override it.
class CompactionProxy {
public:
explicit CompactionProxy(const Compaction* compaction)
: compaction_(compaction) {}
virtual ~CompactionProxy() = default;
virtual int level(size_t /*compaction_input_level*/ = 0) const {
return compaction_->level();
}
virtual bool KeyNotExistsBeyondOutputLevel(
const Slice& user_key, std::vector<size_t>* level_ptrs) const {
return compaction_->KeyNotExistsBeyondOutputLevel(user_key, level_ptrs);
}
virtual bool bottommost_level() const {
return compaction_->bottommost_level();
}
virtual int number_levels() const { return compaction_->number_levels(); }
virtual Slice GetLargestUserKey() const {
return compaction_->GetLargestUserKey();
}
virtual bool allow_ingest_behind() const {
return compaction_->immutable_cf_options()->allow_ingest_behind;
}
virtual bool preserve_deletes() const {
return compaction_->immutable_cf_options()->preserve_deletes;
}
protected:
CompactionProxy() = default;
private:
const Compaction* compaction_;
};
CompactionIterator(InternalIterator* input, const Comparator* cmp,
MergeHelper* merge_helper, SequenceNumber last_sequence,
std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder,
bool allow_data_in_errors,
const Compaction* compaction = nullptr,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const SequenceNumber preserve_deletes_seqnum = 0,
const std::atomic<int>* manual_compaction_paused = nullptr,
const std::shared_ptr<Logger> info_log = nullptr,
const std::string* full_history_ts_low = nullptr);
// Constructor with custom CompactionProxy, used for tests.
CompactionIterator(InternalIterator* input, const Comparator* cmp,
MergeHelper* merge_helper, SequenceNumber last_sequence,
std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder,
bool allow_data_in_errors,
std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const SequenceNumber preserve_deletes_seqnum = 0,
const std::atomic<int>* manual_compaction_paused = nullptr,
const std::shared_ptr<Logger> info_log = nullptr,
const std::string* full_history_ts_low = nullptr);
~CompactionIterator();
void ResetRecordCounts();
// Seek to the beginning of the compaction iterator output.
//
// REQUIRED: Call only once.
void SeekToFirst();
// Produces the next record in the compaction.
//
// REQUIRED: SeekToFirst() has been called.
void Next();
// Getters
const Slice& key() const { return key_; }
const Slice& value() const { return value_; }
const Status& status() const { return status_; }
const ParsedInternalKey& ikey() const { return ikey_; }
bool Valid() const { return valid_; }
const Slice& user_key() const { return current_user_key_; }
const CompactionIterationStats& iter_stats() const { return iter_stats_; }
private:
// Processes the input stream to find the next output
void NextFromInput();
// Do last preparations before presenting the output to the callee. At this
// point this only zeroes out the sequence number if possible for better
// compression.
void PrepareOutput();
// Invoke compaction filter if needed.
// Return true on success, false on failures (e.g.: kIOError).
bool InvokeFilterIfNeeded(bool* need_skip, Slice* skip_until);
// Given a sequence number, return the sequence number of the
// earliest snapshot that this sequence number is visible in.
// The snapshots themselves are arranged in ascending order of
// sequence numbers.
// Employ a sequential search because the total number of
// snapshots are typically small.
inline SequenceNumber findEarliestVisibleSnapshot(
SequenceNumber in, SequenceNumber* prev_snapshot);
// Checks whether the currently seen ikey_ is needed for
// incremental (differential) snapshot and hence can't be dropped
// or seqnum be zero-ed out even if all other conditions for it are met.
inline bool ikeyNotNeededForIncrementalSnapshot();
inline bool KeyCommitted(SequenceNumber sequence) {
return snapshot_checker_ == nullptr ||
snapshot_checker_->CheckInSnapshot(sequence, kMaxSequenceNumber) ==
SnapshotCheckerResult::kInSnapshot;
}
bool IsInEarliestSnapshot(SequenceNumber sequence);
// Extract user-defined timestamp from user key if possible and compare it
// with *full_history_ts_low_ if applicable.
inline void UpdateTimestampAndCompareWithFullHistoryLow() {
if (!timestamp_size_) {
return;
}
Slice ts = ExtractTimestampFromUserKey(ikey_.user_key, timestamp_size_);
curr_ts_.assign(ts.data(), ts.size());
if (full_history_ts_low_) {
cmp_with_history_ts_low_ =
cmp_->CompareTimestamp(ts, *full_history_ts_low_);
}
}
InternalIterator* input_;
const Comparator* cmp_;
MergeHelper* merge_helper_;
const std::vector<SequenceNumber>* snapshots_;
// List of snapshots released during compaction.
// findEarliestVisibleSnapshot() find them out from return of
// snapshot_checker, and make sure they will not be returned as
// earliest visible snapshot of an older value.
// See WritePreparedTransactionTest::ReleaseSnapshotDuringCompaction3.
std::unordered_set<SequenceNumber> released_snapshots_;
std::vector<SequenceNumber>::const_iterator earliest_snapshot_iter_;
const SequenceNumber earliest_write_conflict_snapshot_;
const SnapshotChecker* const snapshot_checker_;
Env* env_;
bool report_detailed_time_;
bool expect_valid_internal_key_;
CompactionRangeDelAggregator* range_del_agg_;
BlobFileBuilder* blob_file_builder_;
std::unique_ptr<CompactionProxy> compaction_;
const CompactionFilter* compaction_filter_;
const std::atomic<bool>* shutting_down_;
const std::atomic<int>* manual_compaction_paused_;
const SequenceNumber preserve_deletes_seqnum_;
bool bottommost_level_;
bool valid_ = false;
bool visible_at_tip_;
SequenceNumber earliest_snapshot_;
SequenceNumber latest_snapshot_;
std::shared_ptr<Logger> info_log_;
bool allow_data_in_errors_;
// Comes from comparator.
const size_t timestamp_size_;
// Lower bound timestamp to retain full history in terms of user-defined
// timestamp. If a key's timestamp is older than full_history_ts_low_, then
// the key *may* be eligible for garbage collection (GC). The skipping logic
// is in `NextFromInput()` and `PrepareOutput()`.
// If nullptr, NO GC will be performed and all history will be preserved.
const std::string* const full_history_ts_low_;
// State
//
// Points to a copy of the current compaction iterator output (current_key_)
// if valid_.
Slice key_;
// Points to the value in the underlying iterator that corresponds to the
// current output.
Slice value_;
// The status is OK unless compaction iterator encounters a merge operand
// while not having a merge operator defined.
Status status_;
// Stores the user key, sequence number and type of the current compaction
// iterator output (or current key in the underlying iterator during
// NextFromInput()).
ParsedInternalKey ikey_;
// Stores whether ikey_.user_key is valid. If set to false, the user key is
// not compared against the current key in the underlying iterator.
bool has_current_user_key_ = false;
// If false, the iterator holds a copy of the current compaction iterator
// output (or current key in the underlying iterator during NextFromInput()).
bool at_next_ = false;
IterKey current_key_;
Slice current_user_key_;
std::string curr_ts_;
SequenceNumber current_user_key_sequence_;
SequenceNumber current_user_key_snapshot_;
// True if the iterator has already returned a record for the current key.
bool has_outputted_key_ = false;
// truncated the value of the next key and output it without applying any
// compaction rules. This is used for outputting a put after a single delete.
bool clear_and_output_next_key_ = false;
MergeOutputIterator merge_out_iter_;
// PinnedIteratorsManager used to pin input_ Iterator blocks while reading
// merge operands and then releasing them after consuming them.
PinnedIteratorsManager pinned_iters_mgr_;
std::string blob_index_;
std::string compaction_filter_value_;
InternalKey compaction_filter_skip_until_;
// "level_ptrs" holds indices that remember which file of an associated
// level we were last checking during the last call to compaction->
// KeyNotExistsBeyondOutputLevel(). This allows future calls to the function
// to pick off where it left off since each subcompaction's key range is
// increasing so a later call to the function must be looking for a key that
// is in or beyond the last file checked during the previous call
std::vector<size_t> level_ptrs_;
CompactionIterationStats iter_stats_;
// Used to avoid purging uncommitted values. The application can specify
// uncommitted values by providing a SnapshotChecker object.
bool current_key_committed_;
// Saved result of ucmp->CompareTimestamp(current_ts_, *full_history_ts_low_)
int cmp_with_history_ts_low_;
bool IsShuttingDown() {
// This is a best-effort facility, so memory_order_relaxed is sufficient.
return shutting_down_ && shutting_down_->load(std::memory_order_relaxed);
}
bool IsPausingManualCompaction() {
// This is a best-effort facility, so memory_order_relaxed is sufficient.
return manual_compaction_paused_ &&
manual_compaction_paused_->load(std::memory_order_relaxed) > 0;
}
};
} // namespace ROCKSDB_NAMESPACE