Refactored common code of Builder/CompactionJob out into a CompactionIterator

Summary:
Builder and CompactionJob share a lot of fairly complex code. This patch
refactors this code into a separate class, the CompactionIterator. Because the
shared code is fairly complex, this patch hopefully improves maintainability.
While there are is a lot of potential for further improvements, the patch is
intentionally pretty close to the original structure because the change is
already complex enough.

Test Plan: make clean all check && ./db_stress

Reviewers: rven, anthony, yhchiang, sdong, igor

Reviewed By: igor

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D46197
This commit is contained in:
Andres Noetzli 2015-09-10 14:35:25 -07:00
parent 41bce05869
commit 8aa1f15197
11 changed files with 589 additions and 512 deletions

View File

@ -74,6 +74,7 @@ set(SOURCES
db/column_family.cc db/column_family.cc
db/compacted_db_impl.cc db/compacted_db_impl.cc
db/compaction.cc db/compaction.cc
db/compaction_iterator.cc
db/compaction_job.cc db/compaction_job.cc
db/compaction_picker.cc db/compaction_picker.cc
db/convenience.cc db/convenience.cc

View File

@ -13,6 +13,7 @@
#include <deque> #include <deque>
#include <vector> #include <vector>
#include "db/compaction_iterator.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/filename.h" #include "db/filename.h"
#include "db/internal_stats.h" #include "db/internal_stats.h"
@ -32,28 +33,6 @@
namespace rocksdb { namespace rocksdb {
namespace {
inline SequenceNumber EarliestVisibleSnapshot(
SequenceNumber in, const std::vector<SequenceNumber>& snapshots,
SequenceNumber* prev_snapshot) {
if (snapshots.empty()) {
*prev_snapshot = 0; // 0 means no previous snapshot
return kMaxSequenceNumber;
}
SequenceNumber prev = 0;
for (const auto cur : snapshots) {
assert(prev <= cur);
if (cur >= in) {
*prev_snapshot = prev;
return cur;
}
prev = cur; // assignment
}
*prev_snapshot = prev;
return kMaxSequenceNumber;
}
} // namespace
class TableFactory; class TableFactory;
TableBuilder* NewTableBuilder( TableBuilder* NewTableBuilder(
@ -84,7 +63,6 @@ Status BuildTable(
const size_t kReportFlushIOStatsEvery = 1048576; const size_t kReportFlushIOStatsEvery = 1048576;
Status s; Status s;
meta->fd.file_size = 0; meta->fd.file_size = 0;
meta->smallest_seqno = meta->largest_seqno = 0;
iter->SeekToFirst(); iter->SeekToFirst();
std::string fname = TableFileName(ioptions.db_paths, meta->fd.GetNumber(), std::string fname = TableFileName(ioptions.db_paths, meta->fd.GetNumber(),
@ -107,120 +85,22 @@ Status BuildTable(
file_writer.get(), compression, compression_opts); file_writer.get(), compression, compression_opts);
} }
{
// the first key is the smallest key
Slice key = iter->key();
meta->smallest.DecodeFrom(key);
meta->smallest_seqno = GetInternalKeySeqno(key);
meta->largest_seqno = meta->smallest_seqno;
}
MergeHelper merge(internal_comparator.user_comparator(), MergeHelper merge(internal_comparator.user_comparator(),
ioptions.merge_operator, ioptions.info_log, ioptions.merge_operator, ioptions.info_log,
ioptions.min_partial_merge_operands, ioptions.min_partial_merge_operands,
true /* internal key corruption is not ok */); true /* internal key corruption is not ok */);
IterKey current_user_key; CompactionIterator c_iter(iter, internal_comparator.user_comparator(),
bool has_current_user_key = false; &merge, kMaxSequenceNumber, &snapshots, env,
// If has_current_user_key == true, this variable remembers the earliest true /* internal key corruption is not ok */);
// snapshot in which this current key already exists. If two internal keys c_iter.SeekToFirst();
// have the same user key AND the earlier one should be visible in the for (; c_iter.Valid(); c_iter.Next()) {
// snapshot in which we already have a user key, we can drop the earlier const Slice& key = c_iter.key();
// user key const Slice& value = c_iter.value();
SequenceNumber current_user_key_exists_in_snapshot = kMaxSequenceNumber; builder->Add(key, value);
meta->UpdateBoundaries(key, c_iter.ikey().sequence);
while (iter->Valid()) {
// Get current key
ParsedInternalKey ikey;
Slice key = iter->key();
Slice value = iter->value();
// In-memory key corruption is not ok;
// TODO: find a clean way to treat in memory key corruption
// Ugly workaround to avoid compiler error for release build
bool ok __attribute__((unused)) = true;
ok = ParseInternalKey(key, &ikey);
assert(ok);
meta->smallest_seqno = std::min(meta->smallest_seqno, ikey.sequence);
meta->largest_seqno = std::max(meta->largest_seqno, ikey.sequence);
// If the key is the same as the previous key (and it is not the
// first key), then we skip it, since it is an older version.
// Otherwise we output the key and mark it as the "new" previous key.
if (!has_current_user_key ||
!internal_comparator.user_comparator()->Equal(
ikey.user_key, current_user_key.GetKey())) {
// First occurrence of this user key
current_user_key.SetKey(ikey.user_key);
has_current_user_key = true;
current_user_key_exists_in_snapshot = 0;
}
// If there are no snapshots, then this kv affect visibility at tip.
// Otherwise, search though all existing snapshots to find
// the earliest snapshot that is affected by this kv.
SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot
SequenceNumber key_needs_to_exist_in_snapshot =
EarliestVisibleSnapshot(ikey.sequence, snapshots, &prev_snapshot);
if (current_user_key_exists_in_snapshot ==
key_needs_to_exist_in_snapshot) {
// If this user key already exists in snapshot in which it needs to
// exist, we can drop it.
// In other words, if the earliest snapshot is which this key is visible
// in is the same as the visibily of a previous instance of the
// same key, then this kv is not visible in any snapshot.
// Hidden by an newer entry for same user key
iter->Next();
} else if (ikey.type == kTypeMerge) {
meta->largest.DecodeFrom(key);
// TODO(tbd): Add a check here to prevent RocksDB from crash when
// reopening a DB w/o properly specifying the merge operator. But
// currently we observed a memory leak on failing in RocksDB
// recovery, so we decide to let it crash instead of causing
// memory leak for now before we have identified the real cause
// of the memory leak.
// Handle merge-type keys using the MergeHelper
// TODO: pass statistics to MergeUntil
merge.MergeUntil(iter, prev_snapshot, false, nullptr, env);
// IMPORTANT: Slice key doesn't point to a valid value anymore!!
const auto& keys = merge.keys();
const auto& values = merge.values();
assert(!keys.empty());
assert(keys.size() == values.size());
// largest possible sequence number in a merge queue is already stored
// in ikey.sequence.
// we additionally have to consider the front of the merge queue, which
// might have the smallest sequence number (out of all the merges with
// the same key)
meta->smallest_seqno =
std::min(meta->smallest_seqno, GetInternalKeySeqno(keys.front()));
// We have a list of keys to write, write all keys in the list.
for (auto key_iter = keys.rbegin(), value_iter = values.rbegin();
key_iter != keys.rend(); key_iter++, value_iter++) {
key = Slice(*key_iter);
value = Slice(*value_iter);
bool valid_key __attribute__((__unused__)) =
ParseInternalKey(key, &ikey);
// MergeUntil stops when it encounters a corrupt key and does not
// include them in the result, so we expect the keys here to valid.
assert(valid_key);
builder->Add(key, value);
}
} else { // just write out the key-value
builder->Add(key, value);
meta->largest.DecodeFrom(key);
iter->Next();
}
current_user_key_exists_in_snapshot = key_needs_to_exist_in_snapshot;
// TODO(noetzli): Update stats after flush, too.
if (io_priority == Env::IO_HIGH && if (io_priority == Env::IO_HIGH &&
IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) { IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) {
ThreadStatusUtil::IncreaseThreadOperationProperty( ThreadStatusUtil::IncreaseThreadOperationProperty(
@ -230,6 +110,7 @@ Status BuildTable(
} }
// Finish and check for builder errors // Finish and check for builder errors
s = c_iter.status();
if (s.ok()) { if (s.ok()) {
s = builder->Finish(); s = builder->Finish();
} else { } else {

266
db/compaction_iterator.cc Normal file
View File

@ -0,0 +1,266 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "db/compaction_iterator.h"
namespace rocksdb {
CompactionIterator::CompactionIterator(
Iterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
Env* env, bool expect_valid_internal_key, Statistics* stats,
Compaction* compaction, const CompactionFilter* compaction_filter,
LogBuffer* log_buffer)
: input_(input),
cmp_(cmp),
merge_helper_(merge_helper),
snapshots_(snapshots),
env_(env),
expect_valid_internal_key_(expect_valid_internal_key),
stats_(stats),
compaction_(compaction),
compaction_filter_(compaction_filter),
log_buffer_(log_buffer),
merge_out_iter_(merge_helper_) {
assert(compaction_filter_ == nullptr || compaction_ != nullptr);
bottommost_level_ =
compaction_ == nullptr ? false : compaction_->bottommost_level();
if (compaction_ != nullptr) {
level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0);
}
if (snapshots_->size() == 0) {
// optimize for fast path if there are no snapshots
visible_at_tip_ = last_sequence;
earliest_snapshot_ = visible_at_tip_;
latest_snapshot_ = 0;
} else {
visible_at_tip_ = 0;
earliest_snapshot_ = snapshots_->at(0);
latest_snapshot_ = snapshots_->back();
}
}
void CompactionIterator::ResetRecordCounts() {
iter_stats_.num_record_drop_user = 0;
iter_stats_.num_record_drop_hidden = 0;
iter_stats_.num_record_drop_obsolete = 0;
}
void CompactionIterator::SeekToFirst() {
NextFromInput();
PrepareOutput();
}
void CompactionIterator::Next() {
// If there is a merge output, return it before continuing to process the
// input.
if (merge_out_iter_.Valid()) {
merge_out_iter_.Next();
// Check if we returned all records of the merge output.
if (merge_out_iter_.Valid()) {
key_ = merge_out_iter_.key();
value_ = merge_out_iter_.value();
bool valid_key __attribute__((__unused__)) =
ParseInternalKey(key_, &ikey_);
// MergeUntil stops when it encounters a corrupt key and does not
// include them in the result, so we expect the keys here to be valid.
assert(valid_key);
valid_ = true;
} else {
// MergeHelper moves the iterator to the first record after the merged
// records, so even though we reached the end of the merge output, we do
// not want to advance the iterator.
NextFromInput();
}
} else {
// Only advance the input iterator if there is no merge output.
input_->Next();
NextFromInput();
}
PrepareOutput();
}
void CompactionIterator::NextFromInput() {
valid_ = false;
while (input_->Valid()) {
key_ = input_->key();
value_ = input_->value();
iter_stats_.num_input_records++;
if (!ParseInternalKey(key_, &ikey_)) {
// If `expect_valid_internal_key_` is false, return the corrupted key
// and let the caller decide what to do with it.
// TODO(noetzli): Maybe we should have a more elegant solution for this.
assert(!expect_valid_internal_key_);
current_user_key_.Clear();
has_current_user_key_ = false;
current_user_key_sequence_ = kMaxSequenceNumber;
current_user_key_snapshot_ = 0;
iter_stats_.num_input_corrupt_records++;
valid_ = true;
break;
}
// Update input statistics
if (ikey_.type == kTypeDeletion) {
iter_stats_.num_input_deletion_records++;
}
iter_stats_.total_input_raw_key_bytes += key_.size();
iter_stats_.total_input_raw_value_bytes += value_.size();
if (!has_current_user_key_ ||
cmp_->Compare(ikey_.user_key, current_user_key_.GetKey()) != 0) {
// First occurrence of this user key
current_user_key_.SetKey(ikey_.user_key);
has_current_user_key_ = true;
current_user_key_sequence_ = kMaxSequenceNumber;
current_user_key_snapshot_ = 0;
// apply the compaction filter to the first occurrence of the user key
if (compaction_filter_ != nullptr && ikey_.type == kTypeValue &&
(visible_at_tip_ || latest_snapshot_)) {
// If the user has specified a compaction filter and the sequence
// number is greater than any external snapshot, then invoke the
// filter. If the return value of the compaction filter is true,
// replace the entry with a deletion marker.
bool value_changed = false;
bool to_delete = false;
compaction_filter_value_.clear();
{
StopWatchNano timer(env_, true);
to_delete = compaction_filter_->Filter(
compaction_->level(), ikey_.user_key, value_,
&compaction_filter_value_, &value_changed);
iter_stats_.total_filter_time +=
env_ != nullptr ? timer.ElapsedNanos() : 0;
}
if (to_delete) {
// make a copy of the original key and convert it to a delete
delete_key_.SetInternalKey(ExtractUserKey(key_), ikey_.sequence,
kTypeDeletion);
// anchor the key again
key_ = delete_key_.GetKey();
// needed because ikey_ is backed by key
ParseInternalKey(key_, &ikey_);
// no value associated with delete
value_.clear();
iter_stats_.num_record_drop_user++;
} else if (value_changed) {
value_ = compaction_filter_value_;
}
}
}
// If there are no snapshots, then this kv affect visibility at tip.
// Otherwise, search though all existing snapshots to find the earliest
// snapshot that is affected by this kv.
SequenceNumber last_sequence __attribute__((__unused__)) =
current_user_key_sequence_;
current_user_key_sequence_ = ikey_.sequence;
SequenceNumber last_snapshot = current_user_key_snapshot_;
SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot
current_user_key_snapshot_ =
visible_at_tip_ ? visible_at_tip_ : findEarliestVisibleSnapshot(
ikey_.sequence, &prev_snapshot);
if (last_snapshot == current_user_key_snapshot_) {
// If the earliest snapshot is which this key is visible in
// is the same as the visibility of a previous instance of the
// same key, then this kv is not visible in any snapshot.
// Hidden by an newer entry for same user key
// TODO: why not > ?
assert(last_sequence >= current_user_key_sequence_);
++iter_stats_.num_record_drop_hidden; // (A)
} else if (compaction_ != nullptr && ikey_.type == kTypeDeletion &&
ikey_.sequence <= earliest_snapshot_ &&
compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
&level_ptrs_)) {
// TODO(noetzli): This is the only place where we use compaction_
// (besides the constructor). We should probably get rid of this
// dependency and find a way to do similar filtering during flushes.
//
// For this user key:
// (1) there is no data in higher levels
// (2) data in lower levels will have larger sequence numbers
// (3) data in layers that are being compacted here and have
// smaller sequence numbers will be dropped in the next
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
++iter_stats_.num_record_drop_obsolete;
} else if (ikey_.type == kTypeMerge) {
if (!merge_helper_->HasOperator()) {
LogToBuffer(log_buffer_, "Options::merge_operator is null.");
status_ = Status::InvalidArgument(
"merge_operator is not properly initialized.");
return;
}
// We know the merge type entry is not hidden, otherwise we would
// have hit (A)
// We encapsulate the merge related state machine in a different
// object to minimize change to the existing flow.
merge_helper_->MergeUntil(input_, prev_snapshot, bottommost_level_,
stats_, env_);
merge_out_iter_.SeekToFirst();
// NOTE: key, value, and ikey_ refer to old entries.
// These will be correctly set below.
key_ = merge_out_iter_.key();
value_ = merge_out_iter_.value();
bool valid_key __attribute__((__unused__)) =
ParseInternalKey(key_, &ikey_);
// MergeUntil stops when it encounters a corrupt key and does not
// include them in the result, so we expect the keys here to valid.
assert(valid_key);
valid_ = true;
break;
} else {
valid_ = true;
break;
}
input_->Next();
}
}
void CompactionIterator::PrepareOutput() {
// Zeroing out the sequence number leads to better compression.
// If this is the bottommost level (no files in lower levels)
// and the earliest snapshot is larger than this seqno
// then we can squash the seqno to zero.
if (bottommost_level_ && valid_ && ikey_.sequence < earliest_snapshot_ &&
ikey_.type != kTypeMerge) {
assert(ikey_.type != kTypeDeletion);
// make a copy because updating in place would cause problems
// with the priority queue that is managing the input key iterator
updated_key_.assign(key_.data(), key_.size());
UpdateInternalKey(&updated_key_, (uint64_t)0, ikey_.type);
key_ = Slice(updated_key_);
}
}
inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot(
SequenceNumber in, SequenceNumber* prev_snapshot) {
assert(snapshots_->size());
SequenceNumber prev __attribute__((unused)) = 0;
for (const auto cur : *snapshots_) {
assert(prev <= cur);
if (cur >= in) {
*prev_snapshot = prev;
return cur;
}
prev = cur;
assert(prev);
}
*prev_snapshot = prev;
return kMaxSequenceNumber;
}
} // namespace rocksdb

126
db/compaction_iterator.h Normal file
View File

@ -0,0 +1,126 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <algorithm>
#include <deque>
#include <string>
#include <vector>
#include "db/compaction.h"
#include "db/merge_helper.h"
#include "rocksdb/compaction_filter.h"
#include "util/log_buffer.h"
namespace rocksdb {
struct CompactionIteratorStats {
// Compaction statistics
int64_t num_record_drop_user = 0;
int64_t num_record_drop_hidden = 0;
int64_t num_record_drop_obsolete = 0;
uint64_t total_filter_time = 0;
// Input statistics
// TODO(noetzli): The stats are incomplete. They are lacking everything
// consumed by MergeHelper.
uint64_t num_input_records = 0;
uint64_t num_input_deletion_records = 0;
uint64_t num_input_corrupt_records = 0;
uint64_t total_input_raw_key_bytes = 0;
uint64_t total_input_raw_value_bytes = 0;
};
class CompactionIterator {
public:
CompactionIterator(Iterator* input, const Comparator* cmp,
MergeHelper* merge_helper, SequenceNumber last_sequence,
std::vector<SequenceNumber>* snapshots, Env* env,
bool expect_valid_internal_key,
Statistics* stats = nullptr,
Compaction* compaction = nullptr,
const CompactionFilter* compaction_filter = nullptr,
LogBuffer* log_buffer = nullptr);
void ResetRecordCounts();
// Seek to the beginning of the compaction iterator output.
//
// REQUIRED: Call only once.
void SeekToFirst();
// Produces the next record in the compaction.
//
// REQUIRED: SeekToFirst() has been called.
void Next();
// Getters
const Slice& key() const { return key_; }
const Slice& value() const { return value_; }
const Status& status() const { return status_; }
const ParsedInternalKey& ikey() const { return ikey_; }
bool Valid() const { return valid_; }
Slice user_key() const { return current_user_key_.GetKey(); }
const CompactionIteratorStats& iter_stats() const { return iter_stats_; }
private:
// Processes the input stream to find the next output
void NextFromInput();
// Do last preparations before presenting the output to the callee. At this
// point this only zeroes out the sequence number if possible for better
// compression.
void PrepareOutput();
// Given a sequence number, return the sequence number of the
// earliest snapshot that this sequence number is visible in.
// The snapshots themselves are arranged in ascending order of
// sequence numbers.
// Employ a sequential search because the total number of
// snapshots are typically small.
inline SequenceNumber findEarliestVisibleSnapshot(
SequenceNumber in, SequenceNumber* prev_snapshot);
Iterator* input_;
const Comparator* cmp_;
MergeHelper* merge_helper_;
const std::vector<SequenceNumber>* snapshots_;
Env* env_;
bool expect_valid_internal_key_ __attribute__((__unused__));
Statistics* stats_;
Compaction* compaction_;
const CompactionFilter* compaction_filter_;
LogBuffer* log_buffer_;
bool bottommost_level_;
bool valid_ = false;
SequenceNumber visible_at_tip_;
SequenceNumber earliest_snapshot_;
SequenceNumber latest_snapshot_;
// State
Slice key_;
Slice value_;
Status status_;
ParsedInternalKey ikey_;
bool has_current_user_key_ = false;
IterKey current_user_key_;
SequenceNumber current_user_key_sequence_;
SequenceNumber current_user_key_snapshot_;
MergeOutputIterator merge_out_iter_;
std::string updated_key_;
std::string compaction_filter_value_;
IterKey delete_key_;
// "level_ptrs" holds indices that remember which file of an associated
// level we were last checking during the last call to compaction->
// KeyNotExistsBeyondOutputLevel(). This allows future calls to the function
// to pick off where it left off since each subcompaction's key range is
// increasing so a later call to the function must be looking for a key that
// is in or beyond the last file checked during the previous call
std::vector<size_t> level_ptrs_;
CompactionIteratorStats iter_stats_;
};
} // namespace rocksdb

View File

@ -31,12 +31,12 @@
#include "db/log_reader.h" #include "db/log_reader.h"
#include "db/log_writer.h" #include "db/log_writer.h"
#include "db/memtable.h" #include "db/memtable.h"
#include "db/merge_helper.h"
#include "db/memtable_list.h" #include "db/memtable_list.h"
#include "db/merge_context.h" #include "db/merge_context.h"
#include "db/merge_helper.h"
#include "db/version_set.h" #include "db/version_set.h"
#include "port/port.h"
#include "port/likely.h" #include "port/likely.h"
#include "port/port.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/statistics.h" #include "rocksdb/statistics.h"
@ -46,14 +46,13 @@
#include "table/block_based_table_factory.h" #include "table/block_based_table_factory.h"
#include "table/merger.h" #include "table/merger.h"
#include "table/table_builder.h" #include "table/table_builder.h"
#include "table/two_level_iterator.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/file_reader_writer.h" #include "util/file_reader_writer.h"
#include "util/logging.h" #include "util/iostats_context_imp.h"
#include "util/log_buffer.h" #include "util/log_buffer.h"
#include "util/logging.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/perf_context_imp.h" #include "util/perf_context_imp.h"
#include "util/iostats_context_imp.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "util/sync_point.h" #include "util/sync_point.h"
@ -64,6 +63,7 @@ namespace rocksdb {
// Maintains state for each sub-compaction // Maintains state for each sub-compaction
struct CompactionJob::SubcompactionState { struct CompactionJob::SubcompactionState {
Compaction* compaction; Compaction* compaction;
std::unique_ptr<CompactionIterator> c_iter;
// The boundaries of the key-range this compaction is interested in. No two // The boundaries of the key-range this compaction is interested in. No two
// subcompactions may have overlapping key-ranges. // subcompactions may have overlapping key-ranges.
@ -75,12 +75,8 @@ struct CompactionJob::SubcompactionState {
// Files produced by this subcompaction // Files produced by this subcompaction
struct Output { struct Output {
uint64_t number; FileMetaData meta;
uint32_t path_id; bool finished;
uint64_t file_size;
InternalKey smallest, largest;
SequenceNumber smallest_seqno, largest_seqno;
bool need_compaction;
}; };
// State kept for output being generated // State kept for output being generated
@ -89,7 +85,7 @@ struct CompactionJob::SubcompactionState {
std::unique_ptr<TableBuilder> builder; std::unique_ptr<TableBuilder> builder;
Output* current_output() { Output* current_output() {
if (outputs.empty()) { if (outputs.empty()) {
// This subcompaction's ouptut could be empty if compaction was aborted // This subcompaction's outptut could be empty if compaction was aborted
// before this subcompaction had a chance to generate any output files. // before this subcompaction had a chance to generate any output files.
// When subcompactions are executed sequentially this is more likely and // When subcompactions are executed sequentially this is more likely and
// will be particulalry likely for the later subcompactions to be empty. // will be particulalry likely for the later subcompactions to be empty.
@ -107,14 +103,6 @@ struct CompactionJob::SubcompactionState {
CompactionJobStats compaction_job_stats; CompactionJobStats compaction_job_stats;
uint64_t approx_size; uint64_t approx_size;
// "level_ptrs" holds indices that remember which file of an associated
// level we were last checking during the last call to compaction->
// KeyNotExistsBeyondOutputLevel(). This allows future calls to the function
// to pick off where it left off since each subcompaction's key range is
// increasing so a later call to the function must be looking for a key that
// is in or beyond the last file checked during the previous call
std::vector<size_t> level_ptrs;
SubcompactionState(Compaction* c, Slice* _start, Slice* _end, SubcompactionState(Compaction* c, Slice* _start, Slice* _end,
uint64_t size = 0) uint64_t size = 0)
: compaction(c), : compaction(c),
@ -127,7 +115,6 @@ struct CompactionJob::SubcompactionState {
num_output_records(0), num_output_records(0),
approx_size(size) { approx_size(size) {
assert(compaction != nullptr); assert(compaction != nullptr);
level_ptrs = std::vector<size_t>(compaction->number_levels(), 0);
} }
SubcompactionState(SubcompactionState&& o) { *this = std::move(o); } SubcompactionState(SubcompactionState&& o) { *this = std::move(o); }
@ -145,7 +132,6 @@ struct CompactionJob::SubcompactionState {
num_output_records = std::move(o.num_output_records); num_output_records = std::move(o.num_output_records);
compaction_job_stats = std::move(o.compaction_job_stats); compaction_job_stats = std::move(o.compaction_job_stats);
approx_size = std::move(o.approx_size); approx_size = std::move(o.approx_size);
level_ptrs = std::move(o.level_ptrs);
return *this; return *this;
} }
@ -183,21 +169,25 @@ struct CompactionJob::CompactionState {
} }
Slice SmallestUserKey() { Slice SmallestUserKey() {
for (auto& s : sub_compact_states) { for (const auto& sub_compact_state : sub_compact_states) {
if (!s.outputs.empty()) { if (!sub_compact_state.outputs.empty() &&
return s.outputs[0].smallest.user_key(); sub_compact_state.outputs[0].finished) {
return sub_compact_state.outputs[0].meta.smallest.user_key();
} }
} }
// If there is no finished output, return an empty slice.
return Slice(nullptr, 0); return Slice(nullptr, 0);
} }
Slice LargestUserKey() { Slice LargestUserKey() {
for (int i = static_cast<int>(sub_compact_states.size() - 1); i >= 0; i--) { for (auto it = sub_compact_states.rbegin(); it < sub_compact_states.rend();
if (!sub_compact_states[i].outputs.empty()) { ++it) {
assert(sub_compact_states[i].current_output() != nullptr); if (!it->outputs.empty() && it->current_output()->finished) {
return sub_compact_states[i].current_output()->largest.user_key(); assert(it->current_output() != nullptr);
return it->current_output()->meta.largest.user_key();
} }
} }
// If there is no finished output, return an empty slice.
return Slice(nullptr, 0); return Slice(nullptr, 0);
} }
}; };
@ -313,22 +303,6 @@ void CompactionJob::Prepare() {
// Is this compaction producing files at the bottommost level? // Is this compaction producing files at the bottommost level?
bottommost_level_ = c->bottommost_level(); bottommost_level_ = c->bottommost_level();
// Initialize subcompaction states
latest_snapshot_ = 0;
visible_at_tip_ = 0;
if (existing_snapshots_.size() == 0) {
// optimize for fast path if there are no snapshots
visible_at_tip_ = versions_->LastSequence();
earliest_snapshot_ = visible_at_tip_;
} else {
latest_snapshot_ = existing_snapshots_.back();
// Add the current seqno as the 'latest' virtual
// snapshot to the end of this list.
existing_snapshots_.push_back(versions_->LastSequence());
earliest_snapshot_ = existing_snapshots_[0];
}
if (c->ShouldFormSubcompactions()) { if (c->ShouldFormSubcompactions()) {
const uint64_t start_micros = env_->NowMicros(); const uint64_t start_micros = env_->NowMicros();
GenSubcompactionBoundaries(); GenSubcompactionBoundaries();
@ -597,15 +571,15 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options,
void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
assert(sub_compact != nullptr); assert(sub_compact != nullptr);
std::unique_ptr<Iterator> input_ptr( std::unique_ptr<Iterator> input(
versions_->MakeInputIterator(sub_compact->compaction)); versions_->MakeInputIterator(sub_compact->compaction));
Iterator* input = input_ptr.get();
AutoThreadOperationStageUpdater stage_updater( AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_PROCESS_KV); ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
// I/O measurement variables // I/O measurement variables
PerfLevel prev_perf_level = PerfLevel::kEnableTime; PerfLevel prev_perf_level = PerfLevel::kEnableTime;
const uint64_t kRecordStatsEvery = 1000;
uint64_t prev_write_nanos = 0; uint64_t prev_write_nanos = 0;
uint64_t prev_fsync_nanos = 0; uint64_t prev_fsync_nanos = 0;
uint64_t prev_range_sync_nanos = 0; uint64_t prev_range_sync_nanos = 0;
@ -619,17 +593,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
prev_prepare_write_nanos = iostats_context.prepare_write_nanos; prev_prepare_write_nanos = iostats_context.prepare_write_nanos;
} }
// Variables used inside the loop
Status status;
std::string compaction_filter_value;
ParsedInternalKey ikey;
IterKey current_user_key;
bool has_current_user_key = false;
IterKey delete_key;
SequenceNumber last_sequence_for_key __attribute__((unused)) =
kMaxSequenceNumber;
SequenceNumber visible_in_snapshot = kMaxSequenceNumber;
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
MergeHelper merge(cfd->user_comparator(), cfd->ioptions()->merge_operator, MergeHelper merge(cfd->user_comparator(), cfd->ioptions()->merge_operator,
db_options_.info_log.get(), db_options_.info_log.get(),
@ -645,200 +608,93 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
TEST_SYNC_POINT("CompactionJob::Run():Inprogress"); TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
int64_t key_drop_user = 0;
int64_t key_drop_newer_entry = 0;
int64_t key_drop_obsolete = 0;
int64_t loop_cnt = 0;
StopWatchNano timer(env_, stats_ != nullptr);
uint64_t total_filter_time = 0;
Slice* start = sub_compact->start; Slice* start = sub_compact->start;
Slice* end = sub_compact->end; Slice* end = sub_compact->end;
if (start != nullptr) { if (start != nullptr) {
IterKey start_iter; IterKey start_iter;
start_iter.SetInternalKey(*start, kMaxSequenceNumber, kValueTypeForSeek); start_iter.SetInternalKey(*start, kMaxSequenceNumber, kValueTypeForSeek);
Slice start_key = start_iter.GetKey(); input->Seek(start_iter.GetKey());
input->Seek(start_key);
} else { } else {
input->SeekToFirst(); input->SeekToFirst();
} }
Status status;
sub_compact->c_iter.reset(new CompactionIterator(
input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
&existing_snapshots_, env_, false, db_options_.statistics.get(),
sub_compact->compaction, compaction_filter));
auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst();
const auto& c_iter_stats = c_iter->iter_stats();
// TODO(noetzli): check whether we could check !shutting_down_->... only // TODO(noetzli): check whether we could check !shutting_down_->... only
// only occasionally (see diff D42687) // only occasionally (see diff D42687)
while (input->Valid() && !shutting_down_->load(std::memory_order_acquire) && while (status.ok() && !shutting_down_->load(std::memory_order_acquire) &&
!cfd->IsDropped() && status.ok()) { !cfd->IsDropped() && c_iter->Valid()) {
Slice key = input->key(); // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
Slice value = input->value(); // returns true.
const Slice& key = c_iter->key();
// First check that the key is parseable before performing the comparison const Slice& value = c_iter->value();
// to determine if it's within the range we want. Parsing may fail if the
// key being passed in is a user key without any internal key component
if (!ParseInternalKey(key, &ikey)) {
// Do not hide error keys
// TODO: error key stays in db forever? Figure out the rationale
// v10 error v8 : we cannot hide v8 even though it's pretty obvious.
current_user_key.Clear();
has_current_user_key = false;
last_sequence_for_key = kMaxSequenceNumber;
visible_in_snapshot = kMaxSequenceNumber;
sub_compact->compaction_job_stats.num_corrupt_keys++;
status = WriteKeyValue(key, value, ikey, input->status(), sub_compact);
input->Next();
continue;
}
// If an end key (exclusive) is specified, check if the current key is // If an end key (exclusive) is specified, check if the current key is
// >= than it and exit if it is because the iterator is out of its range // >= than it and exit if it is because the iterator is out of its range
if (end != nullptr && if (end != nullptr &&
cfd->user_comparator()->Compare(ikey.user_key, *end) >= 0) { cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) {
break; break;
} } else if (sub_compact->compaction->ShouldStopBefore(key) &&
sub_compact->builder != nullptr) {
sub_compact->num_input_records++;
if (++loop_cnt > 1000) {
RecordDroppedKeys(&key_drop_user, &key_drop_newer_entry,
&key_drop_obsolete,
&sub_compact->compaction_job_stats);
RecordCompactionIOStats();
loop_cnt = 0;
}
sub_compact->compaction_job_stats.total_input_raw_key_bytes += key.size();
sub_compact->compaction_job_stats.total_input_raw_value_bytes +=
value.size();
if (sub_compact->compaction->ShouldStopBefore(key) &&
sub_compact->builder != nullptr) {
status = FinishCompactionOutputFile(input->status(), sub_compact); status = FinishCompactionOutputFile(input->status(), sub_compact);
if (!status.ok()) { if (!status.ok()) {
break; break;
} }
} }
if (ikey.type == kTypeDeletion) { if (c_iter_stats.num_input_records % kRecordStatsEvery ==
sub_compact->compaction_job_stats.num_input_deletion_records++; kRecordStatsEvery - 1) {
RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
c_iter->ResetRecordCounts();
RecordCompactionIOStats();
} }
if (!has_current_user_key || // Open output file if necessary
!cfd->user_comparator()->Equal(ikey.user_key, if (sub_compact->builder == nullptr) {
current_user_key.GetKey())) { status = OpenCompactionOutputFile(sub_compact);
// First occurrence of this user key if (!status.ok()) {
current_user_key.SetKey(ikey.user_key);
has_current_user_key = true;
last_sequence_for_key = kMaxSequenceNumber;
visible_in_snapshot = kMaxSequenceNumber;
// apply the compaction filter to the first occurrence of the user key
if (compaction_filter && ikey.type == kTypeValue &&
(visible_at_tip_ || ikey.sequence > latest_snapshot_)) {
// If the user has specified a compaction filter and the sequence
// number is greater than any external snapshot, then invoke the
// filter. If the return value of the compaction filter is true,
// replace the entry with a deletion marker.
bool value_changed = false;
compaction_filter_value.clear();
if (stats_ != nullptr) {
timer.Start();
}
bool to_delete = compaction_filter->Filter(
sub_compact->compaction->level(), ikey.user_key, value,
&compaction_filter_value, &value_changed);
total_filter_time += timer.ElapsedNanos();
if (to_delete) {
// make a copy of the original key and convert it to a delete
delete_key.SetInternalKey(ExtractUserKey(key), ikey.sequence,
kTypeDeletion);
// anchor the key again
key = delete_key.GetKey();
// needed because ikey is backed by key
ParseInternalKey(key, &ikey);
// no value associated with delete
value.clear();
++key_drop_user;
} else if (value_changed) {
value = compaction_filter_value;
}
}
}
// If there are no snapshots, then this kv affect visibility at tip.
// Otherwise, search though all existing snapshots to find
// the earlist snapshot that is affected by this kv.
SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot
SequenceNumber visible =
visible_at_tip_ ? visible_at_tip_ : findEarliestVisibleSnapshot(
ikey.sequence, &prev_snapshot);
if (visible_in_snapshot == visible) {
// If the earliest snapshot is which this key is visible in
// is the same as the visibily of a previous instance of the
// same key, then this kv is not visible in any snapshot.
// Hidden by an newer entry for same user key
// TODO: why not > ?
assert(last_sequence_for_key >= ikey.sequence);
++key_drop_newer_entry;
input->Next(); // (A)
} else if (ikey.type == kTypeDeletion &&
ikey.sequence <= earliest_snapshot_ &&
sub_compact->compaction->KeyNotExistsBeyondOutputLevel(
ikey.user_key, &sub_compact->level_ptrs)) {
// For this user key:
// (1) there is no data in higher levels
// (2) data in lower levels will have larger sequence numbers
// (3) data in layers that are being compacted here and have
// smaller sequence numbers will be dropped in the next
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
++key_drop_obsolete;
input->Next();
} else if (ikey.type == kTypeMerge) {
if (!merge.HasOperator()) {
LogToBuffer(log_buffer_, "Options::merge_operator is null.");
status = Status::InvalidArgument(
"merge_operator is not properly initialized.");
break; break;
} }
// We know the merge type entry is not hidden, otherwise we would }
// have hit (A) assert(sub_compact->builder != nullptr);
// We encapsulate the merge related state machine in a different assert(sub_compact->current_output() != nullptr);
// object to minimize change to the existing flow. Turn out this sub_compact->builder->Add(key, value);
// logic could also be nicely re-used for memtable flush purge sub_compact->current_output()->meta.UpdateBoundaries(
// optimization in BuildTable. key, c_iter->ikey().sequence);
merge.MergeUntil(input, prev_snapshot, bottommost_level_, sub_compact->num_output_records++;
db_options_.statistics.get(), env_);
// NOTE: key, value, and ikey refer to old entries. // Close output file if it is big enough
// These will be correctly set below. // TODO(aekmekji): determine if file should be closed earlier than this
const auto& keys = merge.keys(); // during subcompactions (i.e. if output size, estimated by input size, is
const auto& values = merge.values(); // going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB
assert(!keys.empty()); // and 0.6MB instead of 1MB and 0.2MB)
assert(keys.size() == values.size()); if (sub_compact->builder->FileSize() >=
sub_compact->compaction->max_output_file_size()) {
// We have a list of keys to write, write all keys in the list. status = FinishCompactionOutputFile(input->status(), sub_compact);
for (auto key_iter = keys.rbegin(), value_iter = values.rbegin();
!status.ok() || key_iter != keys.rend(); key_iter++, value_iter++) {
key = Slice(*key_iter);
value = Slice(*value_iter);
bool valid_key __attribute__((__unused__)) =
ParseInternalKey(key, &ikey);
// MergeUntil stops when it encounters a corrupt key and does not
// include them in the result, so we expect the keys here to valid.
assert(valid_key);
status = WriteKeyValue(key, value, ikey, input->status(), sub_compact);
}
} else {
status = WriteKeyValue(key, value, ikey, input->status(), sub_compact);
input->Next();
} }
last_sequence_for_key = ikey.sequence; c_iter->Next();
visible_in_snapshot = visible;
} }
RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, total_filter_time); sub_compact->num_input_records = c_iter_stats.num_input_records;
RecordDroppedKeys(&key_drop_user, &key_drop_newer_entry, &key_drop_obsolete, sub_compact->compaction_job_stats.num_input_deletion_records =
&sub_compact->compaction_job_stats); c_iter_stats.num_input_deletion_records;
sub_compact->compaction_job_stats.num_corrupt_keys =
c_iter_stats.num_input_corrupt_records;
sub_compact->compaction_job_stats.total_input_raw_key_bytes +=
c_iter_stats.total_input_raw_key_bytes;
sub_compact->compaction_job_stats.total_input_raw_value_bytes +=
c_iter_stats.total_input_raw_value_bytes;
RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME,
c_iter_stats.total_filter_time);
RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
RecordCompactionIOStats(); RecordCompactionIOStats();
if (status.ok() && if (status.ok() &&
@ -867,91 +723,33 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
} }
} }
input_ptr.reset(); sub_compact->c_iter.reset();
input.reset();
sub_compact->status = status; sub_compact->status = status;
} }
Status CompactionJob::WriteKeyValue(const Slice& key, const Slice& value,
const ParsedInternalKey& ikey,
const Status& input_status,
SubcompactionState* sub_compact) {
Slice newkey(key.data(), key.size());
std::string kstr;
// Zeroing out the sequence number leads to better compression.
// If this is the bottommost level (no files in lower levels)
// and the earliest snapshot is larger than this seqno
// then we can squash the seqno to zero.
if (bottommost_level_ && ikey.sequence < earliest_snapshot_ &&
ikey.type != kTypeMerge) {
assert(ikey.type != kTypeDeletion);
// make a copy because updating in place would cause problems
// with the priority queue that is managing the input key iterator
kstr.assign(key.data(), key.size());
UpdateInternalKey(&kstr, (uint64_t)0, ikey.type);
newkey = Slice(kstr);
}
// Open output file if necessary
if (sub_compact->builder == nullptr) {
Status status = OpenCompactionOutputFile(sub_compact);
if (!status.ok()) {
return status;
}
}
assert(sub_compact->builder != nullptr);
assert(sub_compact->current_output() != nullptr);
SequenceNumber seqno = GetInternalKeySeqno(newkey);
if (sub_compact->builder->NumEntries() == 0) {
sub_compact->current_output()->smallest.DecodeFrom(newkey);
sub_compact->current_output()->smallest_seqno = seqno;
} else {
sub_compact->current_output()->smallest_seqno =
std::min(sub_compact->current_output()->smallest_seqno, seqno);
}
sub_compact->current_output()->largest.DecodeFrom(newkey);
sub_compact->builder->Add(newkey, value);
sub_compact->num_output_records++;
sub_compact->current_output()->largest_seqno =
std::max(sub_compact->current_output()->largest_seqno, seqno);
// Close output file if it is big enough
// TODO(aekmekji): determine if file should be closed earlier than this
// during subcompactions (i.e. if output size, estimated by input size, is
// going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB
// and 0.6MB instead of 1MB and 0.2MB)
Status status;
if (sub_compact->builder->FileSize() >=
sub_compact->compaction->max_output_file_size()) {
status = FinishCompactionOutputFile(input_status, sub_compact);
}
return status;
}
void CompactionJob::RecordDroppedKeys( void CompactionJob::RecordDroppedKeys(
int64_t* key_drop_user, const CompactionIteratorStats& c_iter_stats,
int64_t* key_drop_newer_entry,
int64_t* key_drop_obsolete,
CompactionJobStats* compaction_job_stats) { CompactionJobStats* compaction_job_stats) {
if (*key_drop_user > 0) { if (c_iter_stats.num_record_drop_user > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_USER, *key_drop_user); RecordTick(stats_, COMPACTION_KEY_DROP_USER,
*key_drop_user = 0; c_iter_stats.num_record_drop_user);
} }
if (*key_drop_newer_entry > 0) { if (c_iter_stats.num_record_drop_hidden > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY, *key_drop_newer_entry); RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY,
c_iter_stats.num_record_drop_hidden);
if (compaction_job_stats) { if (compaction_job_stats) {
compaction_job_stats->num_records_replaced += *key_drop_newer_entry; compaction_job_stats->num_records_replaced +=
c_iter_stats.num_record_drop_hidden;
} }
*key_drop_newer_entry = 0;
} }
if (*key_drop_obsolete > 0) { if (c_iter_stats.num_record_drop_obsolete > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, *key_drop_obsolete); RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE,
c_iter_stats.num_record_drop_obsolete);
if (compaction_job_stats) { if (compaction_job_stats) {
compaction_job_stats->num_expired_deletion_records += *key_drop_obsolete; compaction_job_stats->num_expired_deletion_records +=
c_iter_stats.num_record_drop_obsolete;
} }
*key_drop_obsolete = 0;
} }
} }
@ -964,23 +762,23 @@ Status CompactionJob::FinishCompactionOutputFile(
assert(sub_compact->builder != nullptr); assert(sub_compact->builder != nullptr);
assert(sub_compact->current_output() != nullptr); assert(sub_compact->current_output() != nullptr);
const uint64_t output_number = sub_compact->current_output()->number; uint64_t output_number = sub_compact->current_output()->meta.fd.GetNumber();
const uint32_t output_path_id = sub_compact->current_output()->path_id;
assert(output_number != 0); assert(output_number != 0);
TableProperties table_properties; TableProperties table_properties;
// Check for iterator errors // Check for iterator errors
Status s = input_status; Status s = input_status;
auto meta = &sub_compact->current_output()->meta;
const uint64_t current_entries = sub_compact->builder->NumEntries(); const uint64_t current_entries = sub_compact->builder->NumEntries();
sub_compact->current_output()->need_compaction = meta->marked_for_compaction = sub_compact->builder->NeedCompact();
sub_compact->builder->NeedCompact();
if (s.ok()) { if (s.ok()) {
s = sub_compact->builder->Finish(); s = sub_compact->builder->Finish();
} else { } else {
sub_compact->builder->Abandon(); sub_compact->builder->Abandon();
} }
const uint64_t current_bytes = sub_compact->builder->FileSize(); const uint64_t current_bytes = sub_compact->builder->FileSize();
sub_compact->current_output()->file_size = current_bytes; meta->fd.file_size = current_bytes;
sub_compact->current_output()->finished = true;
sub_compact->total_bytes += current_bytes; sub_compact->total_bytes += current_bytes;
// Finish and check for file errors // Finish and check for file errors
@ -996,11 +794,10 @@ Status CompactionJob::FinishCompactionOutputFile(
if (s.ok() && current_entries > 0) { if (s.ok() && current_entries > 0) {
// Verify that the table is usable // Verify that the table is usable
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
FileDescriptor fd(output_number, output_path_id, current_bytes);
Iterator* iter = cfd->table_cache()->NewIterator( Iterator* iter = cfd->table_cache()->NewIterator(
ReadOptions(), env_options_, cfd->internal_comparator(), fd, nullptr, ReadOptions(), env_options_, cfd->internal_comparator(), meta->fd,
cfd->internal_stats()->GetFileReadHist( nullptr, cfd->internal_stats()->GetFileReadHist(
compact_->compaction->output_level()), compact_->compaction->output_level()),
false); false);
s = iter->status(); s = iter->status();
@ -1014,19 +811,19 @@ Status CompactionJob::FinishCompactionOutputFile(
TableFileCreationInfo info(sub_compact->builder->GetTableProperties()); TableFileCreationInfo info(sub_compact->builder->GetTableProperties());
info.db_name = dbname_; info.db_name = dbname_;
info.cf_name = cfd->GetName(); info.cf_name = cfd->GetName();
info.file_path = TableFileName(cfd->ioptions()->db_paths, info.file_path =
fd.GetNumber(), fd.GetPathId()); TableFileName(cfd->ioptions()->db_paths, meta->fd.GetNumber(),
info.file_size = fd.GetFileSize(); meta->fd.GetPathId());
info.file_size = meta->fd.GetFileSize();
info.job_id = job_id_; info.job_id = job_id_;
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64 "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
" keys, %" PRIu64 " bytes%s", " keys, %" PRIu64 " bytes%s",
cfd->GetName().c_str(), job_id_, output_number, current_entries, cfd->GetName().c_str(), job_id_, output_number, current_entries,
current_bytes, current_bytes,
sub_compact->current_output()->need_compaction ? " (need compaction)" meta->marked_for_compaction ? " (need compaction)" : "");
: "");
EventHelpers::LogAndNotifyTableFileCreation( EventHelpers::LogAndNotifyTableFileCreation(
event_logger_, cfd->ioptions()->listeners, fd, info); event_logger_, cfd->ioptions()->listeners, meta->fd, info);
} }
} }
sub_compact->builder.reset(); sub_compact->builder.reset();
@ -1063,13 +860,9 @@ Status CompactionJob::InstallCompactionResults(
// Add compaction outputs // Add compaction outputs
compaction->AddInputDeletions(compact_->compaction->edit()); compaction->AddInputDeletions(compact_->compaction->edit());
for (SubcompactionState& sub_compact : compact_->sub_compact_states) { for (const auto& sub_compact : compact_->sub_compact_states) {
for (size_t i = 0; i < sub_compact.outputs.size(); i++) { for (const auto& out : sub_compact.outputs) {
const SubcompactionState::Output& out = sub_compact.outputs[i]; compaction->edit()->AddFile(compaction->output_level(), out.meta);
compaction->edit()->AddFile(compaction->output_level(), out.number,
out.path_id, out.file_size, out.smallest,
out.largest, out.smallest_seqno,
out.largest_seqno, out.need_compaction);
} }
} }
return versions_->LogAndApply(compaction->column_family_data(), return versions_->LogAndApply(compaction->column_family_data(),
@ -1077,34 +870,6 @@ Status CompactionJob::InstallCompactionResults(
db_mutex, db_directory_); db_mutex, db_directory_);
} }
// Given a sequence number, return the sequence number of the
// earliest snapshot that this sequence number is visible in.
// The snapshots themselves are arranged in ascending order of
// sequence numbers.
// Employ a sequential search because the total number of
// snapshots are typically small.
inline SequenceNumber CompactionJob::findEarliestVisibleSnapshot(
SequenceNumber in, SequenceNumber* prev_snapshot) {
assert(existing_snapshots_.size());
SequenceNumber prev __attribute__((unused)) = 0;
for (const auto cur : existing_snapshots_) {
assert(prev <= cur);
if (cur >= in) {
*prev_snapshot = prev;
return cur;
}
prev = cur; // assignment
assert(prev);
}
Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
"CompactionJob is not able to find snapshot"
" with SeqId later than %" PRIu64
": current MaxSeqId is %" PRIu64 "",
in, existing_snapshots_[existing_snapshots_.size() - 1]);
assert(0);
return 0;
}
void CompactionJob::RecordCompactionIOStats() { void CompactionJob::RecordCompactionIOStats() {
RecordTick(stats_, COMPACT_READ_BYTES, IOSTATS(bytes_read)); RecordTick(stats_, COMPACT_READ_BYTES, IOSTATS(bytes_read));
ThreadStatusUtil::IncreaseThreadOperationProperty( ThreadStatusUtil::IncreaseThreadOperationProperty(
@ -1137,11 +902,9 @@ Status CompactionJob::OpenCompactionOutputFile(
return s; return s;
} }
SubcompactionState::Output out; SubcompactionState::Output out;
out.number = file_number; out.meta.fd =
out.path_id = sub_compact->compaction->output_path_id(); FileDescriptor(file_number, sub_compact->compaction->output_path_id(), 0);
out.smallest.Clear(); out.finished = false;
out.largest.Clear();
out.smallest_seqno = out.largest_seqno = 0;
sub_compact->outputs.push_back(out); sub_compact->outputs.push_back(out);
writable_file->SetIOPriority(Env::IO_LOW); writable_file->SetIOPriority(Env::IO_LOW);
@ -1151,16 +914,11 @@ Status CompactionJob::OpenCompactionOutputFile(
new WritableFileWriter(std::move(writable_file), env_options_)); new WritableFileWriter(std::move(writable_file), env_options_));
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
bool skip_filters = false;
// If the Column family flag is to only optimize filters for hits, // If the Column family flag is to only optimize filters for hits,
// we can skip creating filters if this is the bottommost_level where // we can skip creating filters if this is the bottommost_level where
// data is going to be found // data is going to be found
// bool skip_filters =
if (cfd->ioptions()->optimize_filters_for_hits && bottommost_level_) { cfd->ioptions()->optimize_filters_for_hits && bottommost_level_;
skip_filters = true;
}
sub_compact->builder.reset(NewTableBuilder( sub_compact->builder.reset(NewTableBuilder(
*cfd->ioptions(), cfd->internal_comparator(), *cfd->ioptions(), cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), sub_compact->outfile.get(), cfd->int_tbl_prop_collector_factories(), sub_compact->outfile.get(),
@ -1181,13 +939,11 @@ void CompactionJob::CleanupCompaction() {
} else { } else {
assert(!sub_status.ok() || sub_compact.outfile == nullptr); assert(!sub_status.ok() || sub_compact.outfile == nullptr);
} }
for (size_t i = 0; i < sub_compact.outputs.size(); i++) { for (const auto& out : sub_compact.outputs) {
const SubcompactionState::Output& out = sub_compact.outputs[i];
// If this file was inserted into the table cache then remove // If this file was inserted into the table cache then remove
// them here because this compaction was not committed. // them here because this compaction was not committed.
if (!sub_status.ok()) { if (!sub_status.ok()) {
TableCache::Evict(table_cache_.get(), out.number); TableCache::Evict(table_cache_.get(), out.meta.fd.GetNumber());
} }
} }
} }
@ -1237,8 +993,8 @@ void CompactionJob::UpdateCompactionStats() {
} }
compaction_stats_.num_output_files += static_cast<int>(num_output_files); compaction_stats_.num_output_files += static_cast<int>(num_output_files);
for (size_t i = 0; i < num_output_files; i++) { for (const auto& out : sub_compact.outputs) {
compaction_stats_.bytes_written += sub_compact.outputs[i].file_size; compaction_stats_.bytes_written += out.meta.fd.file_size;
} }
if (sub_compact.num_input_records > sub_compact.num_output_records) { if (sub_compact.num_input_records > sub_compact.num_output_records) {
compaction_stats_.num_dropped_records += compaction_stats_.num_dropped_records +=

View File

@ -18,6 +18,7 @@
#include <vector> #include <vector>
#include "db/column_family.h" #include "db/column_family.h"
#include "db/compaction_iterator.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/flush_scheduler.h" #include "db/flush_scheduler.h"
#include "db/internal_stats.h" #include "db/internal_stats.h"
@ -91,25 +92,16 @@ class CompactionJob {
// kv-pairs // kv-pairs
void ProcessKeyValueCompaction(SubcompactionState* sub_compact); void ProcessKeyValueCompaction(SubcompactionState* sub_compact);
Status WriteKeyValue(const Slice& key, const Slice& value,
const ParsedInternalKey& ikey,
const Status& input_status,
SubcompactionState* sub_compact);
Status FinishCompactionOutputFile(const Status& input_status, Status FinishCompactionOutputFile(const Status& input_status,
SubcompactionState* sub_compact); SubcompactionState* sub_compact);
Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options, Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options,
InstrumentedMutex* db_mutex); InstrumentedMutex* db_mutex);
SequenceNumber findEarliestVisibleSnapshot(SequenceNumber in,
SequenceNumber* prev_snapshot);
void RecordCompactionIOStats(); void RecordCompactionIOStats();
Status OpenCompactionOutputFile(SubcompactionState* sub_compact); Status OpenCompactionOutputFile(SubcompactionState* sub_compact);
void CleanupCompaction(); void CleanupCompaction();
void UpdateCompactionJobStats( void UpdateCompactionJobStats(
const InternalStats::CompactionStats& stats) const; const InternalStats::CompactionStats& stats) const;
void RecordDroppedKeys(int64_t* key_drop_user, void RecordDroppedKeys(const CompactionIteratorStats& c_iter_stats,
int64_t* key_drop_newer_entry,
int64_t* key_drop_obsolete,
CompactionJobStats* compaction_job_stats = nullptr); CompactionJobStats* compaction_job_stats = nullptr);
void UpdateCompactionStats(); void UpdateCompactionStats();
@ -124,15 +116,8 @@ class CompactionJob {
struct CompactionState; struct CompactionState;
CompactionState* compact_; CompactionState* compact_;
CompactionJobStats* compaction_job_stats_; CompactionJobStats* compaction_job_stats_;
bool bottommost_level_;
InternalStats::CompactionStats compaction_stats_; InternalStats::CompactionStats compaction_stats_;
SequenceNumber earliest_snapshot_;
SequenceNumber latest_snapshot_;
SequenceNumber visible_at_tip_;
// DBImpl state // DBImpl state
const std::string& dbname_; const std::string& dbname_;
const DBOptions& db_options_; const DBOptions& db_options_;
@ -153,6 +138,7 @@ class CompactionJob {
EventLogger* event_logger_; EventLogger* event_logger_;
bool bottommost_level_;
bool paranoid_file_checks_; bool paranoid_file_checks_;
bool measure_io_stats_; bool measure_io_stats_;
// Stores the Slices that designate the boundaries for each subcompaction // Stores the Slices that designate the boundaries for each subcompaction

View File

@ -166,6 +166,7 @@ class InternalKey {
} }
Slice user_key() const { return ExtractUserKey(rep_); } Slice user_key() const { return ExtractUserKey(rep_); }
size_t size() { return rep_.size(); }
void SetFrom(const ParsedInternalKey& p) { void SetFrom(const ParsedInternalKey& p) {
rep_.clear(); rep_.clear();

View File

@ -222,4 +222,24 @@ Status MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before,
return s; return s;
} }
MergeOutputIterator::MergeOutputIterator(const MergeHelper* merge_helper)
: merge_helper_(merge_helper) {
it_keys_ = merge_helper_->keys().rend();
it_values_ = merge_helper_->values().rend();
}
void MergeOutputIterator::SeekToFirst() {
const auto& keys = merge_helper_->keys();
const auto& values = merge_helper_->values();
assert(keys.size() > 0);
assert(keys.size() == values.size());
it_keys_ = keys.rbegin();
it_values_ = values.rbegin();
}
void MergeOutputIterator::Next() {
++it_keys_;
++it_values_;
}
} // namespace rocksdb } // namespace rocksdb

View File

@ -114,6 +114,27 @@ class MergeHelper {
std::deque<std::string> operands_; // Parallel with keys_; stores the values std::deque<std::string> operands_; // Parallel with keys_; stores the values
}; };
// MergeOutputIterator can be used to iterate over the result of a merge.
class MergeOutputIterator {
public:
// The MergeOutputIterator is bound to a MergeHelper instance.
explicit MergeOutputIterator(const MergeHelper* merge_helper);
// Seeks to the first record in the output.
void SeekToFirst();
// Advances to the next record in the output.
void Next();
Slice key() { return Slice(*it_keys_); }
Slice value() { return Slice(*it_values_); }
bool Valid() { return it_keys_ != merge_helper_->keys().rend(); }
private:
const MergeHelper* merge_helper_;
std::deque<std::string>::const_reverse_iterator it_keys_;
std::deque<std::string>::const_reverse_iterator it_values_;
};
} // namespace rocksdb } // namespace rocksdb
#endif #endif

View File

@ -93,6 +93,8 @@ struct FileMetaData {
FileMetaData() FileMetaData()
: refs(0), : refs(0),
being_compacted(false), being_compacted(false),
smallest_seqno(kMaxSequenceNumber),
largest_seqno(0),
table_reader_handle(nullptr), table_reader_handle(nullptr),
compensated_file_size(0), compensated_file_size(0),
num_entries(0), num_entries(0),
@ -101,6 +103,17 @@ struct FileMetaData {
raw_value_size(0), raw_value_size(0),
init_stats_from_file(false), init_stats_from_file(false),
marked_for_compaction(false) {} marked_for_compaction(false) {}
// REQUIRED: Keys must be given to the function in sorted order (it expects
// the last key to be the largest).
void UpdateBoundaries(const Slice& key, SequenceNumber seqno) {
if (smallest.size() == 0) {
smallest.DecodeFrom(key);
}
largest.DecodeFrom(key);
smallest_seqno = std::min(smallest_seqno, seqno);
largest_seqno = std::max(largest_seqno, seqno);
}
}; };
// A compressed copy of file meta data that just contain // A compressed copy of file meta data that just contain
@ -179,7 +192,12 @@ class VersionEdit {
f.smallest_seqno = smallest_seqno; f.smallest_seqno = smallest_seqno;
f.largest_seqno = largest_seqno; f.largest_seqno = largest_seqno;
f.marked_for_compaction = marked_for_compaction; f.marked_for_compaction = marked_for_compaction;
new_files_.push_back(std::make_pair(level, f)); new_files_.emplace_back(level, f);
}
void AddFile(int level, const FileMetaData& f) {
assert(f.smallest_seqno <= f.largest_seqno);
new_files_.emplace_back(level, f);
} }
// Delete the specified "file" from the specified "level". // Delete the specified "file" from the specified "level".

1
src.mk
View File

@ -5,6 +5,7 @@ LIB_SOURCES = \
db/column_family.cc \ db/column_family.cc \
db/compacted_db_impl.cc \ db/compacted_db_impl.cc \
db/compaction.cc \ db/compaction.cc \
db/compaction_iterator.cc \
db/compaction_job.cc \ db/compaction_job.cc \
db/compaction_picker.cc \ db/compaction_picker.cc \
db/convenience.cc \ db/convenience.cc \