6fbe96baf8
Summary: This diff introduces RangeDelAggregator, which takes ownership of iterators provided to it via AddTombstones(). The tombstones are organized in a two-level map (snapshot stripe -> begin key -> tombstone). Tombstone creation avoids data copy by holding Slices returned by the iterator, which remain valid thanks to pinning. For compaction, we create a hierarchical range tombstone iterator with structure matching the iterator over compaction input data. An aggregator based on that iterator is used by CompactionIterator to determine which keys are covered by range tombstones. In case of merge operand, the same aggregator is used by MergeHelper. Upon finishing each file in the compaction, relevant range tombstones are added to the output file's range tombstone metablock and file boundaries are updated accordingly. To check whether a key is covered by range tombstone, RangeDelAggregator::ShouldDelete() considers tombstones in the key's snapshot stripe. When this function is used outside of compaction, it also checks newer stripes, which can contain covering tombstones. Currently the intra-stripe check involves a linear scan; however, in the future we plan to collapse ranges within a stripe such that binary search can be used. RangeDelAggregator::AddToBuilder() adds all range tombstones in the table's key-range to a new table's range tombstone meta-block. Since range tombstones may fall in the gap between files, we may need to extend some files' key-ranges. The strategy is (1) first file extends as far left as possible and other files do not extend left, (2) all files extend right until either the start of the next file or the end of the last range tombstone in the gap, whichever comes first. One other notable change is adding release/move semantics to ScopedArenaIterator such that it can be used to transfer ownership of an arena-allocated iterator, similar to how unique_ptr is used for malloc'd data. Depends on D61473 Test Plan: compaction_iterator_test, mock_table, end-to-end tests in D63927 Reviewers: sdong, IslamAbdelRahman, wanning, yhchiang, lightmark Reviewed By: lightmark Subscribers: andrewkr, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D62205
207 lines
7.6 KiB
C++
207 lines
7.6 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under the BSD-style license found in the
|
|
// LICENSE file in the root directory of this source tree. An additional grant
|
|
// of patent rights can be found in the PATENTS file in the same directory.
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#include "db/builder.h"
|
|
|
|
#include <algorithm>
|
|
#include <deque>
|
|
#include <vector>
|
|
|
|
#include "db/compaction_iterator.h"
|
|
#include "db/dbformat.h"
|
|
#include "db/event_helpers.h"
|
|
#include "db/filename.h"
|
|
#include "db/internal_stats.h"
|
|
#include "db/merge_helper.h"
|
|
#include "db/table_cache.h"
|
|
#include "db/version_edit.h"
|
|
#include "rocksdb/db.h"
|
|
#include "rocksdb/env.h"
|
|
#include "rocksdb/iterator.h"
|
|
#include "rocksdb/options.h"
|
|
#include "rocksdb/table.h"
|
|
#include "table/block_based_table_builder.h"
|
|
#include "table/internal_iterator.h"
|
|
#include "util/file_reader_writer.h"
|
|
#include "util/iostats_context_imp.h"
|
|
#include "util/stop_watch.h"
|
|
#include "util/thread_status_util.h"
|
|
|
|
namespace rocksdb {
|
|
|
|
class TableFactory;
|
|
|
|
TableBuilder* NewTableBuilder(
|
|
const ImmutableCFOptions& ioptions,
|
|
const InternalKeyComparator& internal_comparator,
|
|
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
|
|
int_tbl_prop_collector_factories,
|
|
uint32_t column_family_id, const std::string& column_family_name,
|
|
WritableFileWriter* file, const CompressionType compression_type,
|
|
const CompressionOptions& compression_opts,
|
|
int level,
|
|
const std::string* compression_dict, const bool skip_filters) {
|
|
assert((column_family_id ==
|
|
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
|
|
column_family_name.empty());
|
|
return ioptions.table_factory->NewTableBuilder(
|
|
TableBuilderOptions(ioptions, internal_comparator,
|
|
int_tbl_prop_collector_factories, compression_type,
|
|
compression_opts, compression_dict, skip_filters,
|
|
column_family_name, level),
|
|
column_family_id, file);
|
|
}
|
|
|
|
Status BuildTable(
|
|
const std::string& dbname, Env* env, const ImmutableCFOptions& ioptions,
|
|
const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options,
|
|
TableCache* table_cache, InternalIterator* iter, FileMetaData* meta,
|
|
const InternalKeyComparator& internal_comparator,
|
|
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
|
|
int_tbl_prop_collector_factories,
|
|
uint32_t column_family_id, const std::string& column_family_name,
|
|
std::vector<SequenceNumber> snapshots,
|
|
SequenceNumber earliest_write_conflict_snapshot,
|
|
const CompressionType compression,
|
|
const CompressionOptions& compression_opts, bool paranoid_file_checks,
|
|
InternalStats* internal_stats, TableFileCreationReason reason,
|
|
EventLogger* event_logger, int job_id, const Env::IOPriority io_priority,
|
|
TableProperties* table_properties, int level) {
|
|
assert((column_family_id ==
|
|
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
|
|
column_family_name.empty());
|
|
// Reports the IOStats for flush for every following bytes.
|
|
const size_t kReportFlushIOStatsEvery = 1048576;
|
|
Status s;
|
|
meta->fd.file_size = 0;
|
|
iter->SeekToFirst();
|
|
|
|
std::string fname = TableFileName(ioptions.db_paths, meta->fd.GetNumber(),
|
|
meta->fd.GetPathId());
|
|
#ifndef ROCKSDB_LITE
|
|
EventHelpers::NotifyTableFileCreationStarted(
|
|
ioptions.listeners, dbname, column_family_name, fname, job_id, reason);
|
|
#endif // !ROCKSDB_LITE
|
|
TableProperties tp;
|
|
|
|
if (iter->Valid()) {
|
|
TableBuilder* builder;
|
|
unique_ptr<WritableFileWriter> file_writer;
|
|
{
|
|
unique_ptr<WritableFile> file;
|
|
s = NewWritableFile(env, fname, &file, env_options);
|
|
if (!s.ok()) {
|
|
EventHelpers::LogAndNotifyTableFileCreationFinished(
|
|
event_logger, ioptions.listeners, dbname, column_family_name, fname,
|
|
job_id, meta->fd, tp, reason, s);
|
|
return s;
|
|
}
|
|
file->SetIOPriority(io_priority);
|
|
|
|
file_writer.reset(new WritableFileWriter(std::move(file), env_options));
|
|
|
|
builder = NewTableBuilder(
|
|
ioptions, internal_comparator, int_tbl_prop_collector_factories,
|
|
column_family_id, column_family_name, file_writer.get(), compression,
|
|
compression_opts, level);
|
|
}
|
|
|
|
std::unique_ptr<RangeDelAggregator> range_del_agg;
|
|
range_del_agg.reset(new RangeDelAggregator(internal_comparator, snapshots));
|
|
MergeHelper merge(env, internal_comparator.user_comparator(),
|
|
ioptions.merge_operator, nullptr, ioptions.info_log,
|
|
mutable_cf_options.min_partial_merge_operands,
|
|
true /* internal key corruption is not ok */,
|
|
snapshots.empty() ? 0 : snapshots.back());
|
|
|
|
CompactionIterator c_iter(
|
|
iter, internal_comparator.user_comparator(), &merge, kMaxSequenceNumber,
|
|
&snapshots, earliest_write_conflict_snapshot, env,
|
|
true /* internal key corruption is not ok */, range_del_agg.get());
|
|
c_iter.SeekToFirst();
|
|
for (; c_iter.Valid(); c_iter.Next()) {
|
|
const Slice& key = c_iter.key();
|
|
const Slice& value = c_iter.value();
|
|
builder->Add(key, value);
|
|
meta->UpdateBoundaries(key, c_iter.ikey().sequence);
|
|
|
|
// TODO(noetzli): Update stats after flush, too.
|
|
if (io_priority == Env::IO_HIGH &&
|
|
IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) {
|
|
ThreadStatusUtil::SetThreadOperationProperty(
|
|
ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
|
|
}
|
|
}
|
|
|
|
// Finish and check for builder errors
|
|
bool empty = builder->NumEntries() == 0;
|
|
s = c_iter.status();
|
|
if (!s.ok() || empty) {
|
|
builder->Abandon();
|
|
} else {
|
|
s = builder->Finish();
|
|
}
|
|
|
|
if (s.ok() && !empty) {
|
|
uint64_t file_size = builder->FileSize();
|
|
meta->fd.file_size = file_size;
|
|
meta->marked_for_compaction = builder->NeedCompact();
|
|
assert(meta->fd.GetFileSize() > 0);
|
|
tp = builder->GetTableProperties();
|
|
if (table_properties) {
|
|
*table_properties = tp;
|
|
}
|
|
}
|
|
delete builder;
|
|
|
|
// Finish and check for file errors
|
|
if (s.ok() && !empty && !ioptions.disable_data_sync) {
|
|
StopWatch sw(env, ioptions.statistics, TABLE_SYNC_MICROS);
|
|
file_writer->Sync(ioptions.use_fsync);
|
|
}
|
|
if (s.ok() && !empty) {
|
|
s = file_writer->Close();
|
|
}
|
|
|
|
if (s.ok() && !empty) {
|
|
// Verify that the table is usable
|
|
std::unique_ptr<InternalIterator> it(table_cache->NewIterator(
|
|
ReadOptions(), env_options, internal_comparator, meta->fd, nullptr,
|
|
(internal_stats == nullptr) ? nullptr
|
|
: internal_stats->GetFileReadHist(0),
|
|
false /* for_compaction */, nullptr /* arena */,
|
|
false /* skip_filter */, level));
|
|
s = it->status();
|
|
if (s.ok() && paranoid_file_checks) {
|
|
for (it->SeekToFirst(); it->Valid(); it->Next()) {
|
|
}
|
|
s = it->status();
|
|
}
|
|
}
|
|
}
|
|
|
|
// Check for input iterator errors
|
|
if (!iter->status().ok()) {
|
|
s = iter->status();
|
|
}
|
|
|
|
if (!s.ok() || meta->fd.GetFileSize() == 0) {
|
|
env->DeleteFile(fname);
|
|
}
|
|
|
|
// Output to event logger and fire events.
|
|
EventHelpers::LogAndNotifyTableFileCreationFinished(
|
|
event_logger, ioptions.listeners, dbname, column_family_name, fname,
|
|
job_id, meta->fd, tp, reason, s);
|
|
|
|
return s;
|
|
}
|
|
|
|
} // namespace rocksdb
|