243ca14116
Summary: Closes https://github.com/facebook/rocksdb/pull/3264 Differential Revision: D6552768 Pulled By: siying fbshipit-source-id: 6303110aff22f341d5cff41f8d2d4f138a53652d
375 lines
14 KiB
C++
375 lines
14 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#include "db/flush_job.h"
|
|
|
|
#ifndef __STDC_FORMAT_MACROS
|
|
#define __STDC_FORMAT_MACROS
|
|
#endif
|
|
|
|
#include <inttypes.h>
|
|
|
|
#include <algorithm>
|
|
#include <vector>
|
|
|
|
#include "db/builder.h"
|
|
#include "db/db_iter.h"
|
|
#include "db/dbformat.h"
|
|
#include "db/event_helpers.h"
|
|
#include "db/log_reader.h"
|
|
#include "db/log_writer.h"
|
|
#include "db/memtable_list.h"
|
|
#include "db/merge_context.h"
|
|
#include "db/version_set.h"
|
|
#include "monitoring/iostats_context_imp.h"
|
|
#include "monitoring/perf_context_imp.h"
|
|
#include "monitoring/thread_status_util.h"
|
|
#include "port/port.h"
|
|
#include "db/memtable.h"
|
|
#include "rocksdb/db.h"
|
|
#include "rocksdb/env.h"
|
|
#include "rocksdb/statistics.h"
|
|
#include "rocksdb/status.h"
|
|
#include "rocksdb/table.h"
|
|
#include "table/block.h"
|
|
#include "table/block_based_table_factory.h"
|
|
#include "table/merging_iterator.h"
|
|
#include "table/table_builder.h"
|
|
#include "table/two_level_iterator.h"
|
|
#include "util/coding.h"
|
|
#include "util/event_logger.h"
|
|
#include "util/file_util.h"
|
|
#include "util/filename.h"
|
|
#include "util/log_buffer.h"
|
|
#include "util/logging.h"
|
|
#include "util/mutexlock.h"
|
|
#include "util/stop_watch.h"
|
|
#include "util/sync_point.h"
|
|
|
|
namespace rocksdb {
|
|
|
|
FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
|
|
const ImmutableDBOptions& db_options,
|
|
const MutableCFOptions& mutable_cf_options,
|
|
const EnvOptions env_options, VersionSet* versions,
|
|
InstrumentedMutex* db_mutex,
|
|
std::atomic<bool>* shutting_down,
|
|
std::vector<SequenceNumber> existing_snapshots,
|
|
SequenceNumber earliest_write_conflict_snapshot,
|
|
SnapshotChecker* snapshot_checker, JobContext* job_context,
|
|
LogBuffer* log_buffer, Directory* db_directory,
|
|
Directory* output_file_directory,
|
|
CompressionType output_compression, Statistics* stats,
|
|
EventLogger* event_logger, bool measure_io_stats)
|
|
: dbname_(dbname),
|
|
cfd_(cfd),
|
|
db_options_(db_options),
|
|
mutable_cf_options_(mutable_cf_options),
|
|
env_options_(env_options),
|
|
versions_(versions),
|
|
db_mutex_(db_mutex),
|
|
shutting_down_(shutting_down),
|
|
existing_snapshots_(std::move(existing_snapshots)),
|
|
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
|
|
snapshot_checker_(snapshot_checker),
|
|
job_context_(job_context),
|
|
log_buffer_(log_buffer),
|
|
db_directory_(db_directory),
|
|
output_file_directory_(output_file_directory),
|
|
output_compression_(output_compression),
|
|
stats_(stats),
|
|
event_logger_(event_logger),
|
|
measure_io_stats_(measure_io_stats),
|
|
edit_(nullptr),
|
|
base_(nullptr),
|
|
pick_memtable_called(false) {
|
|
// Update the thread status to indicate flush.
|
|
ReportStartedFlush();
|
|
TEST_SYNC_POINT("FlushJob::FlushJob()");
|
|
}
|
|
|
|
FlushJob::~FlushJob() {
|
|
ThreadStatusUtil::ResetThreadStatus();
|
|
}
|
|
|
|
void FlushJob::ReportStartedFlush() {
|
|
ThreadStatusUtil::SetColumnFamily(cfd_, cfd_->ioptions()->env,
|
|
db_options_.enable_thread_tracking);
|
|
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);
|
|
ThreadStatusUtil::SetThreadOperationProperty(
|
|
ThreadStatus::COMPACTION_JOB_ID,
|
|
job_context_->job_id);
|
|
IOSTATS_RESET(bytes_written);
|
|
}
|
|
|
|
void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) {
|
|
uint64_t input_size = 0;
|
|
for (auto* mem : mems) {
|
|
input_size += mem->ApproximateMemoryUsage();
|
|
}
|
|
ThreadStatusUtil::IncreaseThreadOperationProperty(
|
|
ThreadStatus::FLUSH_BYTES_MEMTABLES,
|
|
input_size);
|
|
}
|
|
|
|
void FlushJob::RecordFlushIOStats() {
|
|
RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written));
|
|
ThreadStatusUtil::IncreaseThreadOperationProperty(
|
|
ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
|
|
IOSTATS_RESET(bytes_written);
|
|
}
|
|
|
|
void FlushJob::PickMemTable() {
|
|
db_mutex_->AssertHeld();
|
|
assert(!pick_memtable_called);
|
|
pick_memtable_called = true;
|
|
// Save the contents of the earliest memtable as a new Table
|
|
cfd_->imm()->PickMemtablesToFlush(&mems_);
|
|
if (mems_.empty()) {
|
|
return;
|
|
}
|
|
|
|
ReportFlushInputSize(mems_);
|
|
|
|
// entries mems are (implicitly) sorted in ascending order by their created
|
|
// time. We will use the first memtable's `edit` to keep the meta info for
|
|
// this flush.
|
|
MemTable* m = mems_[0];
|
|
edit_ = m->GetEdits();
|
|
edit_->SetPrevLogNumber(0);
|
|
// SetLogNumber(log_num) indicates logs with number smaller than log_num
|
|
// will no longer be picked up for recovery.
|
|
edit_->SetLogNumber(mems_.back()->GetNextLogNumber());
|
|
edit_->SetColumnFamily(cfd_->GetID());
|
|
|
|
// path 0 for level 0 file.
|
|
meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
|
|
|
|
base_ = cfd_->current();
|
|
base_->Ref(); // it is likely that we do not need this reference
|
|
}
|
|
|
|
Status FlushJob::Run(FileMetaData* file_meta) {
|
|
db_mutex_->AssertHeld();
|
|
assert(pick_memtable_called);
|
|
AutoThreadOperationStageUpdater stage_run(
|
|
ThreadStatus::STAGE_FLUSH_RUN);
|
|
if (mems_.empty()) {
|
|
ROCKS_LOG_BUFFER(log_buffer_, "[%s] Nothing in memtable to flush",
|
|
cfd_->GetName().c_str());
|
|
return Status::OK();
|
|
}
|
|
|
|
// I/O measurement variables
|
|
PerfLevel prev_perf_level = PerfLevel::kEnableTime;
|
|
uint64_t prev_write_nanos = 0;
|
|
uint64_t prev_fsync_nanos = 0;
|
|
uint64_t prev_range_sync_nanos = 0;
|
|
uint64_t prev_prepare_write_nanos = 0;
|
|
if (measure_io_stats_) {
|
|
prev_perf_level = GetPerfLevel();
|
|
SetPerfLevel(PerfLevel::kEnableTime);
|
|
prev_write_nanos = IOSTATS(write_nanos);
|
|
prev_fsync_nanos = IOSTATS(fsync_nanos);
|
|
prev_range_sync_nanos = IOSTATS(range_sync_nanos);
|
|
prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
|
|
}
|
|
|
|
// This will release and re-acquire the mutex.
|
|
Status s = WriteLevel0Table();
|
|
|
|
if (s.ok() &&
|
|
(shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) {
|
|
s = Status::ShutdownInProgress(
|
|
"Database shutdown or Column family drop during flush");
|
|
}
|
|
|
|
if (!s.ok()) {
|
|
cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
|
|
} else {
|
|
TEST_SYNC_POINT("FlushJob::InstallResults");
|
|
// Replace immutable memtable with the generated Table
|
|
s = cfd_->imm()->InstallMemtableFlushResults(
|
|
cfd_, mutable_cf_options_, mems_, versions_, db_mutex_,
|
|
meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
|
|
log_buffer_);
|
|
}
|
|
|
|
if (s.ok() && file_meta != nullptr) {
|
|
*file_meta = meta_;
|
|
}
|
|
RecordFlushIOStats();
|
|
|
|
auto stream = event_logger_->LogToBuffer(log_buffer_);
|
|
stream << "job" << job_context_->job_id << "event"
|
|
<< "flush_finished";
|
|
stream << "output_compression"
|
|
<< CompressionTypeToString(output_compression_);
|
|
stream << "lsm_state";
|
|
stream.StartArray();
|
|
auto vstorage = cfd_->current()->storage_info();
|
|
for (int level = 0; level < vstorage->num_levels(); ++level) {
|
|
stream << vstorage->NumLevelFiles(level);
|
|
}
|
|
stream.EndArray();
|
|
stream << "immutable_memtables" << cfd_->imm()->NumNotFlushed();
|
|
|
|
if (measure_io_stats_) {
|
|
if (prev_perf_level != PerfLevel::kEnableTime) {
|
|
SetPerfLevel(prev_perf_level);
|
|
}
|
|
stream << "file_write_nanos" << (IOSTATS(write_nanos) - prev_write_nanos);
|
|
stream << "file_range_sync_nanos"
|
|
<< (IOSTATS(range_sync_nanos) - prev_range_sync_nanos);
|
|
stream << "file_fsync_nanos" << (IOSTATS(fsync_nanos) - prev_fsync_nanos);
|
|
stream << "file_prepare_write_nanos"
|
|
<< (IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos);
|
|
}
|
|
|
|
return s;
|
|
}
|
|
|
|
void FlushJob::Cancel() {
|
|
db_mutex_->AssertHeld();
|
|
assert(base_ != nullptr);
|
|
base_->Unref();
|
|
}
|
|
|
|
Status FlushJob::WriteLevel0Table() {
|
|
AutoThreadOperationStageUpdater stage_updater(
|
|
ThreadStatus::STAGE_FLUSH_WRITE_L0);
|
|
db_mutex_->AssertHeld();
|
|
const uint64_t start_micros = db_options_.env->NowMicros();
|
|
Status s;
|
|
{
|
|
auto write_hint = cfd_->CalculateSSTWriteHint(0);
|
|
db_mutex_->Unlock();
|
|
if (log_buffer_) {
|
|
log_buffer_->FlushBufferToLog();
|
|
}
|
|
// memtables and range_del_iters store internal iterators over each data
|
|
// memtable and its associated range deletion memtable, respectively, at
|
|
// corresponding indexes.
|
|
std::vector<InternalIterator*> memtables;
|
|
std::vector<InternalIterator*> range_del_iters;
|
|
ReadOptions ro;
|
|
ro.total_order_seek = true;
|
|
Arena arena;
|
|
uint64_t total_num_entries = 0, total_num_deletes = 0;
|
|
size_t total_memory_usage = 0;
|
|
for (MemTable* m : mems_) {
|
|
ROCKS_LOG_INFO(
|
|
db_options_.info_log,
|
|
"[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
|
|
cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
|
|
memtables.push_back(m->NewIterator(ro, &arena));
|
|
auto* range_del_iter = m->NewRangeTombstoneIterator(ro);
|
|
if (range_del_iter != nullptr) {
|
|
range_del_iters.push_back(range_del_iter);
|
|
}
|
|
total_num_entries += m->num_entries();
|
|
total_num_deletes += m->num_deletes();
|
|
total_memory_usage += m->ApproximateMemoryUsage();
|
|
}
|
|
|
|
event_logger_->Log() << "job" << job_context_->job_id << "event"
|
|
<< "flush_started"
|
|
<< "num_memtables" << mems_.size() << "num_entries"
|
|
<< total_num_entries << "num_deletes"
|
|
<< total_num_deletes << "memory_usage"
|
|
<< total_memory_usage;
|
|
|
|
{
|
|
ScopedArenaIterator iter(
|
|
NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
|
|
static_cast<int>(memtables.size()), &arena));
|
|
std::unique_ptr<InternalIterator> range_del_iter(NewMergingIterator(
|
|
&cfd_->internal_comparator(),
|
|
range_del_iters.empty() ? nullptr : &range_del_iters[0],
|
|
static_cast<int>(range_del_iters.size())));
|
|
ROCKS_LOG_INFO(db_options_.info_log,
|
|
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
|
|
cfd_->GetName().c_str(), job_context_->job_id,
|
|
meta_.fd.GetNumber());
|
|
|
|
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
|
|
&output_compression_);
|
|
int64_t _current_time = 0;
|
|
auto status = db_options_.env->GetCurrentTime(&_current_time);
|
|
// Safe to proceed even if GetCurrentTime fails. So, log and proceed.
|
|
if (!status.ok()) {
|
|
ROCKS_LOG_WARN(
|
|
db_options_.info_log,
|
|
"Failed to get current time to populate creation_time property. "
|
|
"Status: %s",
|
|
status.ToString().c_str());
|
|
}
|
|
const uint64_t current_time = static_cast<uint64_t>(_current_time);
|
|
|
|
uint64_t oldest_key_time =
|
|
mems_.front()->ApproximateOldestKeyTime();
|
|
|
|
s = BuildTable(
|
|
dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
|
|
env_options_, cfd_->table_cache(), iter.get(),
|
|
std::move(range_del_iter), &meta_, cfd_->internal_comparator(),
|
|
cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
|
|
cfd_->GetName(), existing_snapshots_,
|
|
earliest_write_conflict_snapshot_, snapshot_checker_,
|
|
output_compression_, cfd_->ioptions()->compression_opts,
|
|
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
|
|
TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
|
|
Env::IO_HIGH, &table_properties_, 0 /* level */, current_time,
|
|
oldest_key_time, write_hint);
|
|
LogFlush(db_options_.info_log);
|
|
}
|
|
ROCKS_LOG_INFO(db_options_.info_log,
|
|
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64
|
|
" bytes %s"
|
|
"%s",
|
|
cfd_->GetName().c_str(), job_context_->job_id,
|
|
meta_.fd.GetNumber(), meta_.fd.GetFileSize(),
|
|
s.ToString().c_str(),
|
|
meta_.marked_for_compaction ? " (needs compaction)" : "");
|
|
|
|
if (output_file_directory_ != nullptr) {
|
|
output_file_directory_->Fsync();
|
|
}
|
|
TEST_SYNC_POINT("FlushJob::WriteLevel0Table");
|
|
db_mutex_->Lock();
|
|
}
|
|
base_->Unref();
|
|
|
|
// Note that if file_size is zero, the file has been deleted and
|
|
// should not be added to the manifest.
|
|
if (s.ok() && meta_.fd.GetFileSize() > 0) {
|
|
// if we have more than 1 background thread, then we cannot
|
|
// insert files directly into higher levels because some other
|
|
// threads could be concurrently producing compacted files for
|
|
// that key range.
|
|
// Add file to L0
|
|
edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
|
|
meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
|
|
meta_.smallest_seqno, meta_.largest_seqno,
|
|
meta_.marked_for_compaction);
|
|
}
|
|
|
|
// Note that here we treat flush as level 0 compaction in internal stats
|
|
InternalStats::CompactionStats stats(1);
|
|
stats.micros = db_options_.env->NowMicros() - start_micros;
|
|
stats.bytes_written = meta_.fd.GetFileSize();
|
|
cfd_->internal_stats()->AddCompactionStats(0 /* level */, stats);
|
|
cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
|
|
meta_.fd.GetFileSize());
|
|
RecordFlushIOStats();
|
|
return s;
|
|
}
|
|
|
|
} // namespace rocksdb
|