6fbe96baf8
Summary: This diff introduces RangeDelAggregator, which takes ownership of iterators provided to it via AddTombstones(). The tombstones are organized in a two-level map (snapshot stripe -> begin key -> tombstone). Tombstone creation avoids data copy by holding Slices returned by the iterator, which remain valid thanks to pinning. For compaction, we create a hierarchical range tombstone iterator with structure matching the iterator over compaction input data. An aggregator based on that iterator is used by CompactionIterator to determine which keys are covered by range tombstones. In case of merge operand, the same aggregator is used by MergeHelper. Upon finishing each file in the compaction, relevant range tombstones are added to the output file's range tombstone metablock and file boundaries are updated accordingly. To check whether a key is covered by range tombstone, RangeDelAggregator::ShouldDelete() considers tombstones in the key's snapshot stripe. When this function is used outside of compaction, it also checks newer stripes, which can contain covering tombstones. Currently the intra-stripe check involves a linear scan; however, in the future we plan to collapse ranges within a stripe such that binary search can be used. RangeDelAggregator::AddToBuilder() adds all range tombstones in the table's key-range to a new table's range tombstone meta-block. Since range tombstones may fall in the gap between files, we may need to extend some files' key-ranges. The strategy is (1) first file extends as far left as possible and other files do not extend left, (2) all files extend right until either the start of the next file or the end of the last range tombstone in the gap, whichever comes first. One other notable change is adding release/move semantics to ScopedArenaIterator such that it can be used to transfer ownership of an arena-allocated iterator, similar to how unique_ptr is used for malloc'd data. Depends on D61473 Test Plan: compaction_iterator_test, mock_table, end-to-end tests in D63927 Reviewers: sdong, IslamAbdelRahman, wanning, yhchiang, lightmark Reviewed By: lightmark Subscribers: andrewkr, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D62205
165 lines
5.5 KiB
C++
165 lines
5.5 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under the BSD-style license found in the
|
|
// LICENSE file in the root directory of this source tree. An additional grant
|
|
// of patent rights can be found in the PATENTS file in the same directory.
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
#pragma once
|
|
|
|
#include <atomic>
|
|
#include <deque>
|
|
#include <functional>
|
|
#include <limits>
|
|
#include <set>
|
|
#include <string>
|
|
#include <utility>
|
|
#include <vector>
|
|
|
|
#include "db/column_family.h"
|
|
#include "db/compaction_iterator.h"
|
|
#include "db/dbformat.h"
|
|
#include "db/flush_scheduler.h"
|
|
#include "db/internal_stats.h"
|
|
#include "db/job_context.h"
|
|
#include "db/log_writer.h"
|
|
#include "db/memtable_list.h"
|
|
#include "db/range_del_aggregator.h"
|
|
#include "db/version_edit.h"
|
|
#include "db/write_controller.h"
|
|
#include "db/write_thread.h"
|
|
#include "port/port.h"
|
|
#include "rocksdb/compaction_filter.h"
|
|
#include "rocksdb/compaction_job_stats.h"
|
|
#include "rocksdb/db.h"
|
|
#include "rocksdb/env.h"
|
|
#include "rocksdb/memtablerep.h"
|
|
#include "rocksdb/transaction_log.h"
|
|
#include "table/scoped_arena_iterator.h"
|
|
#include "util/autovector.h"
|
|
#include "util/db_options.h"
|
|
#include "util/event_logger.h"
|
|
#include "util/stop_watch.h"
|
|
#include "util/thread_local.h"
|
|
|
|
namespace rocksdb {
|
|
|
|
class MemTable;
|
|
class TableCache;
|
|
class Version;
|
|
class VersionEdit;
|
|
class VersionSet;
|
|
class Arena;
|
|
|
|
class CompactionJob {
|
|
public:
|
|
CompactionJob(int job_id, Compaction* compaction,
|
|
const ImmutableDBOptions& db_options,
|
|
const EnvOptions& env_options, VersionSet* versions,
|
|
std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
|
|
Directory* db_directory, Directory* output_directory,
|
|
Statistics* stats, InstrumentedMutex* db_mutex,
|
|
Status* db_bg_error,
|
|
std::vector<SequenceNumber> existing_snapshots,
|
|
SequenceNumber earliest_write_conflict_snapshot,
|
|
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
|
|
bool paranoid_file_checks, bool measure_io_stats,
|
|
const std::string& dbname,
|
|
CompactionJobStats* compaction_job_stats);
|
|
|
|
~CompactionJob();
|
|
|
|
// no copy/move
|
|
CompactionJob(CompactionJob&& job) = delete;
|
|
CompactionJob(const CompactionJob& job) = delete;
|
|
CompactionJob& operator=(const CompactionJob& job) = delete;
|
|
|
|
// REQUIRED: mutex held
|
|
void Prepare();
|
|
// REQUIRED mutex not held
|
|
Status Run();
|
|
|
|
// REQUIRED: mutex held
|
|
Status Install(const MutableCFOptions& mutable_cf_options);
|
|
|
|
private:
|
|
struct SubcompactionState;
|
|
|
|
void AggregateStatistics();
|
|
void GenSubcompactionBoundaries();
|
|
|
|
// update the thread status for starting a compaction.
|
|
void ReportStartedCompaction(Compaction* compaction);
|
|
void AllocateCompactionOutputFileNumbers();
|
|
// Call compaction filter. Then iterate through input and compact the
|
|
// kv-pairs
|
|
void ProcessKeyValueCompaction(SubcompactionState* sub_compact);
|
|
|
|
Status FinishCompactionOutputFile(const Status& input_status,
|
|
SubcompactionState* sub_compact,
|
|
RangeDelAggregator* range_del_agg = nullptr,
|
|
const Slice* next_table_min_key = nullptr);
|
|
Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options);
|
|
void RecordCompactionIOStats();
|
|
Status OpenCompactionOutputFile(SubcompactionState* sub_compact);
|
|
void CleanupCompaction();
|
|
void UpdateCompactionJobStats(
|
|
const InternalStats::CompactionStats& stats) const;
|
|
void RecordDroppedKeys(const CompactionIteratorStats& c_iter_stats,
|
|
CompactionJobStats* compaction_job_stats = nullptr);
|
|
|
|
void UpdateCompactionStats();
|
|
void UpdateCompactionInputStatsHelper(
|
|
int* num_files, uint64_t* bytes_read, int input_level);
|
|
|
|
void LogCompaction();
|
|
|
|
int job_id_;
|
|
|
|
// CompactionJob state
|
|
struct CompactionState;
|
|
CompactionState* compact_;
|
|
CompactionJobStats* compaction_job_stats_;
|
|
InternalStats::CompactionStats compaction_stats_;
|
|
|
|
// DBImpl state
|
|
const std::string& dbname_;
|
|
const ImmutableDBOptions& db_options_;
|
|
const EnvOptions& env_options_;
|
|
|
|
Env* env_;
|
|
VersionSet* versions_;
|
|
std::atomic<bool>* shutting_down_;
|
|
LogBuffer* log_buffer_;
|
|
Directory* db_directory_;
|
|
Directory* output_directory_;
|
|
Statistics* stats_;
|
|
InstrumentedMutex* db_mutex_;
|
|
Status* db_bg_error_;
|
|
// If there were two snapshots with seq numbers s1 and
|
|
// s2 and s1 < s2, and if we find two instances of a key k1 then lies
|
|
// entirely within s1 and s2, then the earlier version of k1 can be safely
|
|
// deleted because that version is not visible in any snapshot.
|
|
std::vector<SequenceNumber> existing_snapshots_;
|
|
|
|
// This is the earliest snapshot that could be used for write-conflict
|
|
// checking by a transaction. For any user-key newer than this snapshot, we
|
|
// should make sure not to remove evidence that a write occurred.
|
|
SequenceNumber earliest_write_conflict_snapshot_;
|
|
|
|
std::shared_ptr<Cache> table_cache_;
|
|
|
|
EventLogger* event_logger_;
|
|
|
|
bool bottommost_level_;
|
|
bool paranoid_file_checks_;
|
|
bool measure_io_stats_;
|
|
// Stores the Slices that designate the boundaries for each subcompaction
|
|
std::vector<Slice> boundaries_;
|
|
// Stores the approx size of keys covered in the range of each subcompaction
|
|
std::vector<uint64_t> sizes_;
|
|
};
|
|
|
|
} // namespace rocksdb
|