5f025ea832
Summary: This is groundwork for adding garbage collection support to BlobDB. The patch adds logic that keeps track of the oldest blob file referred to by each SST file. The oldest blob file is identified during flush/ compaction (similarly to how the range of keys covered by the SST is identified), and persisted in the manifest as a custom field of the new file edit record. Blob indexes with TTL are ignored for the purposes of identifying the oldest blob file (since such blob files are cleaned up by the TTL logic in BlobDB). Pull Request resolved: https://github.com/facebook/rocksdb/pull/5903 Test Plan: Added new unit tests; also ran db_bench in BlobDB mode, inspected the manifest using ldb, and confirmed (by scanning the SST files using sst_dump) that the value of the oldest blob file number field matches the contents of the file for each SST. Differential Revision: D17859997 Pulled By: ltamasi fbshipit-source-id: 21662c137c6259a6af70446faaf3a9912c550e90
428 lines
16 KiB
C++
428 lines
16 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#include "db/flush_job.h"
|
|
|
|
#include <cinttypes>
|
|
|
|
#include <algorithm>
|
|
#include <vector>
|
|
|
|
#include "db/builder.h"
|
|
#include "db/db_iter.h"
|
|
#include "db/dbformat.h"
|
|
#include "db/event_helpers.h"
|
|
#include "db/log_reader.h"
|
|
#include "db/log_writer.h"
|
|
#include "db/memtable.h"
|
|
#include "db/memtable_list.h"
|
|
#include "db/merge_context.h"
|
|
#include "db/range_tombstone_fragmenter.h"
|
|
#include "db/version_set.h"
|
|
#include "file/file_util.h"
|
|
#include "file/filename.h"
|
|
#include "logging/event_logger.h"
|
|
#include "logging/log_buffer.h"
|
|
#include "logging/logging.h"
|
|
#include "monitoring/iostats_context_imp.h"
|
|
#include "monitoring/perf_context_imp.h"
|
|
#include "monitoring/thread_status_util.h"
|
|
#include "port/port.h"
|
|
#include "rocksdb/db.h"
|
|
#include "rocksdb/env.h"
|
|
#include "rocksdb/statistics.h"
|
|
#include "rocksdb/status.h"
|
|
#include "rocksdb/table.h"
|
|
#include "table/block_based/block.h"
|
|
#include "table/block_based/block_based_table_factory.h"
|
|
#include "table/merging_iterator.h"
|
|
#include "table/table_builder.h"
|
|
#include "table/two_level_iterator.h"
|
|
#include "test_util/sync_point.h"
|
|
#include "util/coding.h"
|
|
#include "util/mutexlock.h"
|
|
#include "util/stop_watch.h"
|
|
|
|
namespace rocksdb {
|
|
|
|
const char* GetFlushReasonString (FlushReason flush_reason) {
|
|
switch (flush_reason) {
|
|
case FlushReason::kOthers:
|
|
return "Other Reasons";
|
|
case FlushReason::kGetLiveFiles:
|
|
return "Get Live Files";
|
|
case FlushReason::kShutDown:
|
|
return "Shut down";
|
|
case FlushReason::kExternalFileIngestion:
|
|
return "External File Ingestion";
|
|
case FlushReason::kManualCompaction:
|
|
return "Manual Compaction";
|
|
case FlushReason::kWriteBufferManager:
|
|
return "Write Buffer Manager";
|
|
case FlushReason::kWriteBufferFull:
|
|
return "Write Buffer Full";
|
|
case FlushReason::kTest:
|
|
return "Test";
|
|
case FlushReason::kDeleteFiles:
|
|
return "Delete Files";
|
|
case FlushReason::kAutoCompaction:
|
|
return "Auto Compaction";
|
|
case FlushReason::kManualFlush:
|
|
return "Manual Flush";
|
|
case FlushReason::kErrorRecovery:
|
|
return "Error Recovery";
|
|
default:
|
|
return "Invalid";
|
|
}
|
|
}
|
|
|
|
FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
|
|
const ImmutableDBOptions& db_options,
|
|
const MutableCFOptions& mutable_cf_options,
|
|
const uint64_t* max_memtable_id,
|
|
const EnvOptions& env_options, VersionSet* versions,
|
|
InstrumentedMutex* db_mutex,
|
|
std::atomic<bool>* shutting_down,
|
|
std::vector<SequenceNumber> existing_snapshots,
|
|
SequenceNumber earliest_write_conflict_snapshot,
|
|
SnapshotChecker* snapshot_checker, JobContext* job_context,
|
|
LogBuffer* log_buffer, Directory* db_directory,
|
|
Directory* output_file_directory,
|
|
CompressionType output_compression, Statistics* stats,
|
|
EventLogger* event_logger, bool measure_io_stats,
|
|
const bool sync_output_directory, const bool write_manifest,
|
|
Env::Priority thread_pri)
|
|
: dbname_(dbname),
|
|
cfd_(cfd),
|
|
db_options_(db_options),
|
|
mutable_cf_options_(mutable_cf_options),
|
|
max_memtable_id_(max_memtable_id),
|
|
env_options_(env_options),
|
|
versions_(versions),
|
|
db_mutex_(db_mutex),
|
|
shutting_down_(shutting_down),
|
|
existing_snapshots_(std::move(existing_snapshots)),
|
|
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
|
|
snapshot_checker_(snapshot_checker),
|
|
job_context_(job_context),
|
|
log_buffer_(log_buffer),
|
|
db_directory_(db_directory),
|
|
output_file_directory_(output_file_directory),
|
|
output_compression_(output_compression),
|
|
stats_(stats),
|
|
event_logger_(event_logger),
|
|
measure_io_stats_(measure_io_stats),
|
|
sync_output_directory_(sync_output_directory),
|
|
write_manifest_(write_manifest),
|
|
edit_(nullptr),
|
|
base_(nullptr),
|
|
pick_memtable_called(false),
|
|
thread_pri_(thread_pri) {
|
|
// Update the thread status to indicate flush.
|
|
ReportStartedFlush();
|
|
TEST_SYNC_POINT("FlushJob::FlushJob()");
|
|
}
|
|
|
|
FlushJob::~FlushJob() {
|
|
ThreadStatusUtil::ResetThreadStatus();
|
|
}
|
|
|
|
void FlushJob::ReportStartedFlush() {
|
|
ThreadStatusUtil::SetColumnFamily(cfd_, cfd_->ioptions()->env,
|
|
db_options_.enable_thread_tracking);
|
|
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);
|
|
ThreadStatusUtil::SetThreadOperationProperty(
|
|
ThreadStatus::COMPACTION_JOB_ID,
|
|
job_context_->job_id);
|
|
IOSTATS_RESET(bytes_written);
|
|
}
|
|
|
|
void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) {
|
|
uint64_t input_size = 0;
|
|
for (auto* mem : mems) {
|
|
input_size += mem->ApproximateMemoryUsage();
|
|
}
|
|
ThreadStatusUtil::IncreaseThreadOperationProperty(
|
|
ThreadStatus::FLUSH_BYTES_MEMTABLES,
|
|
input_size);
|
|
}
|
|
|
|
void FlushJob::RecordFlushIOStats() {
|
|
RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written));
|
|
ThreadStatusUtil::IncreaseThreadOperationProperty(
|
|
ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
|
|
IOSTATS_RESET(bytes_written);
|
|
}
|
|
|
|
void FlushJob::PickMemTable() {
|
|
db_mutex_->AssertHeld();
|
|
assert(!pick_memtable_called);
|
|
pick_memtable_called = true;
|
|
// Save the contents of the earliest memtable as a new Table
|
|
cfd_->imm()->PickMemtablesToFlush(max_memtable_id_, &mems_);
|
|
if (mems_.empty()) {
|
|
return;
|
|
}
|
|
|
|
ReportFlushInputSize(mems_);
|
|
|
|
// entries mems are (implicitly) sorted in ascending order by their created
|
|
// time. We will use the first memtable's `edit` to keep the meta info for
|
|
// this flush.
|
|
MemTable* m = mems_[0];
|
|
edit_ = m->GetEdits();
|
|
edit_->SetPrevLogNumber(0);
|
|
// SetLogNumber(log_num) indicates logs with number smaller than log_num
|
|
// will no longer be picked up for recovery.
|
|
edit_->SetLogNumber(mems_.back()->GetNextLogNumber());
|
|
edit_->SetColumnFamily(cfd_->GetID());
|
|
|
|
// path 0 for level 0 file.
|
|
meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
|
|
|
|
base_ = cfd_->current();
|
|
base_->Ref(); // it is likely that we do not need this reference
|
|
}
|
|
|
|
Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
|
|
FileMetaData* file_meta) {
|
|
TEST_SYNC_POINT("FlushJob::Start");
|
|
db_mutex_->AssertHeld();
|
|
assert(pick_memtable_called);
|
|
AutoThreadOperationStageUpdater stage_run(
|
|
ThreadStatus::STAGE_FLUSH_RUN);
|
|
if (mems_.empty()) {
|
|
ROCKS_LOG_BUFFER(log_buffer_, "[%s] Nothing in memtable to flush",
|
|
cfd_->GetName().c_str());
|
|
return Status::OK();
|
|
}
|
|
|
|
// I/O measurement variables
|
|
PerfLevel prev_perf_level = PerfLevel::kEnableTime;
|
|
uint64_t prev_write_nanos = 0;
|
|
uint64_t prev_fsync_nanos = 0;
|
|
uint64_t prev_range_sync_nanos = 0;
|
|
uint64_t prev_prepare_write_nanos = 0;
|
|
uint64_t prev_cpu_write_nanos = 0;
|
|
uint64_t prev_cpu_read_nanos = 0;
|
|
if (measure_io_stats_) {
|
|
prev_perf_level = GetPerfLevel();
|
|
SetPerfLevel(PerfLevel::kEnableTime);
|
|
prev_write_nanos = IOSTATS(write_nanos);
|
|
prev_fsync_nanos = IOSTATS(fsync_nanos);
|
|
prev_range_sync_nanos = IOSTATS(range_sync_nanos);
|
|
prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
|
|
prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
|
|
prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
|
|
}
|
|
|
|
// This will release and re-acquire the mutex.
|
|
Status s = WriteLevel0Table();
|
|
|
|
if (s.ok() && cfd_->IsDropped()) {
|
|
s = Status::ColumnFamilyDropped("Column family dropped during compaction");
|
|
}
|
|
if ((s.ok() || s.IsColumnFamilyDropped()) &&
|
|
shutting_down_->load(std::memory_order_acquire)) {
|
|
s = Status::ShutdownInProgress("Database shutdown");
|
|
}
|
|
|
|
if (!s.ok()) {
|
|
cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
|
|
} else if (write_manifest_) {
|
|
TEST_SYNC_POINT("FlushJob::InstallResults");
|
|
// Replace immutable memtable with the generated Table
|
|
s = cfd_->imm()->TryInstallMemtableFlushResults(
|
|
cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
|
|
meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
|
|
log_buffer_);
|
|
}
|
|
|
|
if (s.ok() && file_meta != nullptr) {
|
|
*file_meta = meta_;
|
|
}
|
|
RecordFlushIOStats();
|
|
|
|
auto stream = event_logger_->LogToBuffer(log_buffer_);
|
|
stream << "job" << job_context_->job_id << "event"
|
|
<< "flush_finished";
|
|
stream << "output_compression"
|
|
<< CompressionTypeToString(output_compression_);
|
|
stream << "lsm_state";
|
|
stream.StartArray();
|
|
auto vstorage = cfd_->current()->storage_info();
|
|
for (int level = 0; level < vstorage->num_levels(); ++level) {
|
|
stream << vstorage->NumLevelFiles(level);
|
|
}
|
|
stream.EndArray();
|
|
stream << "immutable_memtables" << cfd_->imm()->NumNotFlushed();
|
|
|
|
if (measure_io_stats_) {
|
|
if (prev_perf_level != PerfLevel::kEnableTime) {
|
|
SetPerfLevel(prev_perf_level);
|
|
}
|
|
stream << "file_write_nanos" << (IOSTATS(write_nanos) - prev_write_nanos);
|
|
stream << "file_range_sync_nanos"
|
|
<< (IOSTATS(range_sync_nanos) - prev_range_sync_nanos);
|
|
stream << "file_fsync_nanos" << (IOSTATS(fsync_nanos) - prev_fsync_nanos);
|
|
stream << "file_prepare_write_nanos"
|
|
<< (IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos);
|
|
stream << "file_cpu_write_nanos"
|
|
<< (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos);
|
|
stream << "file_cpu_read_nanos"
|
|
<< (IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos);
|
|
}
|
|
|
|
return s;
|
|
}
|
|
|
|
void FlushJob::Cancel() {
|
|
db_mutex_->AssertHeld();
|
|
assert(base_ != nullptr);
|
|
base_->Unref();
|
|
}
|
|
|
|
Status FlushJob::WriteLevel0Table() {
|
|
AutoThreadOperationStageUpdater stage_updater(
|
|
ThreadStatus::STAGE_FLUSH_WRITE_L0);
|
|
db_mutex_->AssertHeld();
|
|
const uint64_t start_micros = db_options_.env->NowMicros();
|
|
const uint64_t start_cpu_micros = db_options_.env->NowCPUNanos() / 1000;
|
|
Status s;
|
|
{
|
|
auto write_hint = cfd_->CalculateSSTWriteHint(0);
|
|
db_mutex_->Unlock();
|
|
if (log_buffer_) {
|
|
log_buffer_->FlushBufferToLog();
|
|
}
|
|
// memtables and range_del_iters store internal iterators over each data
|
|
// memtable and its associated range deletion memtable, respectively, at
|
|
// corresponding indexes.
|
|
std::vector<InternalIterator*> memtables;
|
|
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
|
|
range_del_iters;
|
|
ReadOptions ro;
|
|
ro.total_order_seek = true;
|
|
Arena arena;
|
|
uint64_t total_num_entries = 0, total_num_deletes = 0;
|
|
uint64_t total_data_size = 0;
|
|
size_t total_memory_usage = 0;
|
|
for (MemTable* m : mems_) {
|
|
ROCKS_LOG_INFO(
|
|
db_options_.info_log,
|
|
"[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
|
|
cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
|
|
memtables.push_back(m->NewIterator(ro, &arena));
|
|
auto* range_del_iter =
|
|
m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
|
|
if (range_del_iter != nullptr) {
|
|
range_del_iters.emplace_back(range_del_iter);
|
|
}
|
|
total_num_entries += m->num_entries();
|
|
total_num_deletes += m->num_deletes();
|
|
total_data_size += m->get_data_size();
|
|
total_memory_usage += m->ApproximateMemoryUsage();
|
|
}
|
|
|
|
event_logger_->Log() << "job" << job_context_->job_id << "event"
|
|
<< "flush_started"
|
|
<< "num_memtables" << mems_.size() << "num_entries"
|
|
<< total_num_entries << "num_deletes"
|
|
<< total_num_deletes << "total_data_size"
|
|
<< total_data_size << "memory_usage"
|
|
<< total_memory_usage << "flush_reason"
|
|
<< GetFlushReasonString(cfd_->GetFlushReason());
|
|
|
|
{
|
|
ScopedArenaIterator iter(
|
|
NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
|
|
static_cast<int>(memtables.size()), &arena));
|
|
ROCKS_LOG_INFO(db_options_.info_log,
|
|
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
|
|
cfd_->GetName().c_str(), job_context_->job_id,
|
|
meta_.fd.GetNumber());
|
|
|
|
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
|
|
&output_compression_);
|
|
int64_t _current_time = 0;
|
|
auto status = db_options_.env->GetCurrentTime(&_current_time);
|
|
// Safe to proceed even if GetCurrentTime fails. So, log and proceed.
|
|
if (!status.ok()) {
|
|
ROCKS_LOG_WARN(
|
|
db_options_.info_log,
|
|
"Failed to get current time to populate creation_time property. "
|
|
"Status: %s",
|
|
status.ToString().c_str());
|
|
}
|
|
const uint64_t current_time = static_cast<uint64_t>(_current_time);
|
|
|
|
uint64_t oldest_key_time =
|
|
mems_.front()->ApproximateOldestKeyTime();
|
|
|
|
s = BuildTable(
|
|
dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
|
|
env_options_, cfd_->table_cache(), iter.get(),
|
|
std::move(range_del_iters), &meta_, cfd_->internal_comparator(),
|
|
cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
|
|
cfd_->GetName(), existing_snapshots_,
|
|
earliest_write_conflict_snapshot_, snapshot_checker_,
|
|
output_compression_, mutable_cf_options_.sample_for_compression,
|
|
cfd_->ioptions()->compression_opts,
|
|
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
|
|
TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
|
|
Env::IO_HIGH, &table_properties_, 0 /* level */, current_time,
|
|
oldest_key_time, write_hint, current_time);
|
|
LogFlush(db_options_.info_log);
|
|
}
|
|
ROCKS_LOG_INFO(db_options_.info_log,
|
|
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64
|
|
" bytes %s"
|
|
"%s",
|
|
cfd_->GetName().c_str(), job_context_->job_id,
|
|
meta_.fd.GetNumber(), meta_.fd.GetFileSize(),
|
|
s.ToString().c_str(),
|
|
meta_.marked_for_compaction ? " (needs compaction)" : "");
|
|
|
|
if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) {
|
|
s = output_file_directory_->Fsync();
|
|
}
|
|
TEST_SYNC_POINT("FlushJob::WriteLevel0Table");
|
|
db_mutex_->Lock();
|
|
}
|
|
base_->Unref();
|
|
|
|
// Note that if file_size is zero, the file has been deleted and
|
|
// should not be added to the manifest.
|
|
if (s.ok() && meta_.fd.GetFileSize() > 0) {
|
|
// if we have more than 1 background thread, then we cannot
|
|
// insert files directly into higher levels because some other
|
|
// threads could be concurrently producing compacted files for
|
|
// that key range.
|
|
// Add file to L0
|
|
edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
|
|
meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
|
|
meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
|
|
meta_.marked_for_compaction, meta_.oldest_blob_file_number);
|
|
}
|
|
|
|
// Note that here we treat flush as level 0 compaction in internal stats
|
|
InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
|
|
stats.micros = db_options_.env->NowMicros() - start_micros;
|
|
stats.cpu_micros = db_options_.env->NowCPUNanos() / 1000 - start_cpu_micros;
|
|
stats.bytes_written = meta_.fd.GetFileSize();
|
|
RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros);
|
|
cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats);
|
|
cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
|
|
meta_.fd.GetFileSize());
|
|
RecordFlushIOStats();
|
|
return s;
|
|
}
|
|
|
|
} // namespace rocksdb
|