Update DB::AddFile() to ingest the file to the lowest possible level
Summary: DB::AddFile() right now always add the ingested file to L0 update the logic to add the file to the lowest possible level Test Plan: unit tests Reviewers: jkedgar, sdong Reviewed By: sdong Subscribers: andrewkr, dhruba, yoshinorim Differential Revision: https://reviews.facebook.net/D59637
This commit is contained in:
parent
d6b79e2fd0
commit
fa813f7478
@ -187,8 +187,7 @@ Compaction::Compaction(VersionStorageInfo* vstorage,
|
||||
}
|
||||
}
|
||||
|
||||
Slice smallest_user_key;
|
||||
GetBoundaryKeys(vstorage, inputs_, &smallest_user_key, &largest_user_key_);
|
||||
GetBoundaryKeys(vstorage, inputs_, &smallest_user_key_, &largest_user_key_);
|
||||
}
|
||||
|
||||
Compaction::~Compaction() {
|
||||
|
@ -219,6 +219,8 @@ class Compaction {
|
||||
output_table_properties_ = std::move(tp);
|
||||
}
|
||||
|
||||
Slice GetSmallestUserKey() const { return smallest_user_key_; }
|
||||
|
||||
Slice GetLargestUserKey() const { return largest_user_key_; }
|
||||
|
||||
CompactionReason compaction_reason() { return compaction_reason_; }
|
||||
@ -294,6 +296,9 @@ class Compaction {
|
||||
// table properties of output files
|
||||
TablePropertiesCollection output_table_properties_;
|
||||
|
||||
// smallest user keys in compaction
|
||||
Slice smallest_user_key_;
|
||||
|
||||
// largest user keys in compaction
|
||||
Slice largest_user_key_;
|
||||
|
||||
|
251
db/db_impl.cc
251
db/db_impl.cc
@ -68,7 +68,6 @@
|
||||
#include "rocksdb/db.h"
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/merge_operator.h"
|
||||
#include "rocksdb/sst_file_writer.h"
|
||||
#include "rocksdb/statistics.h"
|
||||
#include "rocksdb/status.h"
|
||||
#include "rocksdb/table.h"
|
||||
@ -2090,6 +2089,7 @@ Status DBImpl::CompactFilesImpl(
|
||||
c->SetInputVersion(version);
|
||||
// deletion compaction currently not allowed in CompactFiles.
|
||||
assert(!c->deletion_compaction());
|
||||
running_compactions_.insert(c.get());
|
||||
|
||||
SequenceNumber earliest_write_conflict_snapshot;
|
||||
std::vector<SequenceNumber> snapshot_seqs =
|
||||
@ -2145,6 +2145,8 @@ Status DBImpl::CompactFilesImpl(
|
||||
|
||||
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
|
||||
|
||||
running_compactions_.erase(c.get());
|
||||
|
||||
if (status.ok()) {
|
||||
// Done
|
||||
} else if (status.IsShutdownInProgress()) {
|
||||
@ -3085,6 +3087,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
|
||||
reinterpret_cast<ManualCompaction*>(arg);
|
||||
*made_progress = false;
|
||||
mutex_.AssertHeld();
|
||||
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start");
|
||||
|
||||
bool is_manual = (manual_compaction != nullptr);
|
||||
|
||||
@ -3199,6 +3202,10 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
|
||||
}
|
||||
}
|
||||
|
||||
if (c != nullptr) {
|
||||
running_compactions_.insert(c.get());
|
||||
}
|
||||
|
||||
if (!c) {
|
||||
// Nothing to do
|
||||
LogToBuffer(log_buffer, "Compaction nothing to do");
|
||||
@ -3325,6 +3332,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
|
||||
NotifyOnCompactionCompleted(
|
||||
c->column_family_data(), c.get(), status,
|
||||
compaction_job_stats, job_context->job_id);
|
||||
running_compactions_.erase(c.get());
|
||||
}
|
||||
// this will unref its input_version and column_family_data
|
||||
c.reset();
|
||||
@ -3766,247 +3774,6 @@ std::vector<Status> DBImpl::MultiGet(
|
||||
return stat_list;
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
|
||||
const std::string& file_path, bool move_file) {
|
||||
Status status;
|
||||
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
||||
ColumnFamilyData* cfd = cfh->cfd();
|
||||
|
||||
ExternalSstFileInfo file_info;
|
||||
file_info.file_path = file_path;
|
||||
status = env_->GetFileSize(file_path, &file_info.file_size);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
// Access the file using TableReader to extract
|
||||
// version, number of entries, smallest user key, largest user key
|
||||
std::unique_ptr<RandomAccessFile> sst_file;
|
||||
status = env_->NewRandomAccessFile(file_path, &sst_file, env_options_);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
std::unique_ptr<RandomAccessFileReader> sst_file_reader;
|
||||
sst_file_reader.reset(new RandomAccessFileReader(std::move(sst_file)));
|
||||
|
||||
std::unique_ptr<TableReader> table_reader;
|
||||
status = cfd->ioptions()->table_factory->NewTableReader(
|
||||
TableReaderOptions(*cfd->ioptions(), env_options_,
|
||||
cfd->internal_comparator()),
|
||||
std::move(sst_file_reader), file_info.file_size, &table_reader);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
// Get the external sst file version from table properties
|
||||
const UserCollectedProperties& user_collected_properties =
|
||||
table_reader->GetTableProperties()->user_collected_properties;
|
||||
UserCollectedProperties::const_iterator external_sst_file_version_iter =
|
||||
user_collected_properties.find(ExternalSstFilePropertyNames::kVersion);
|
||||
if (external_sst_file_version_iter == user_collected_properties.end()) {
|
||||
return Status::InvalidArgument("Generated table version not found");
|
||||
}
|
||||
|
||||
file_info.version =
|
||||
DecodeFixed32(external_sst_file_version_iter->second.c_str());
|
||||
if (file_info.version == 1) {
|
||||
// version 1 imply that all sequence numbers in table equal 0
|
||||
file_info.sequence_number = 0;
|
||||
} else {
|
||||
return Status::InvalidArgument("Generated table version is not supported");
|
||||
}
|
||||
|
||||
// Get number of entries in table
|
||||
file_info.num_entries = table_reader->GetTableProperties()->num_entries;
|
||||
|
||||
ParsedInternalKey key;
|
||||
std::unique_ptr<InternalIterator> iter(
|
||||
table_reader->NewIterator(ReadOptions()));
|
||||
|
||||
// Get first (smallest) key from file
|
||||
iter->SeekToFirst();
|
||||
if (!ParseInternalKey(iter->key(), &key)) {
|
||||
return Status::Corruption("Generated table have corrupted keys");
|
||||
}
|
||||
if (key.sequence != 0) {
|
||||
return Status::Corruption("Generated table have non zero sequence number");
|
||||
}
|
||||
file_info.smallest_key = key.user_key.ToString();
|
||||
|
||||
// Get last (largest) key from file
|
||||
iter->SeekToLast();
|
||||
if (!ParseInternalKey(iter->key(), &key)) {
|
||||
return Status::Corruption("Generated table have corrupted keys");
|
||||
}
|
||||
if (key.sequence != 0) {
|
||||
return Status::Corruption("Generated table have non zero sequence number");
|
||||
}
|
||||
file_info.largest_key = key.user_key.ToString();
|
||||
|
||||
return AddFile(column_family, &file_info, move_file);
|
||||
}
|
||||
|
||||
Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
|
||||
const ExternalSstFileInfo* file_info, bool move_file) {
|
||||
Status status;
|
||||
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
||||
ColumnFamilyData* cfd = cfh->cfd();
|
||||
const uint64_t start_micros = env_->NowMicros();
|
||||
|
||||
if (file_info->num_entries == 0) {
|
||||
return Status::InvalidArgument("File contain no entries");
|
||||
}
|
||||
if (file_info->version != 1) {
|
||||
return Status::InvalidArgument("Generated table version is not supported");
|
||||
}
|
||||
// version 1 imply that file have only Put Operations with Sequence Number = 0
|
||||
|
||||
FileMetaData meta;
|
||||
meta.smallest =
|
||||
InternalKey(file_info->smallest_key, file_info->sequence_number,
|
||||
ValueType::kTypeValue);
|
||||
meta.largest = InternalKey(file_info->largest_key, file_info->sequence_number,
|
||||
ValueType::kTypeValue);
|
||||
if (!meta.smallest.Valid() || !meta.largest.Valid()) {
|
||||
return Status::Corruption("Generated table have corrupted keys");
|
||||
}
|
||||
meta.smallest_seqno = file_info->sequence_number;
|
||||
meta.largest_seqno = file_info->sequence_number;
|
||||
if (meta.smallest_seqno != 0 || meta.largest_seqno != 0) {
|
||||
return Status::InvalidArgument(
|
||||
"Non zero sequence numbers are not supported");
|
||||
}
|
||||
|
||||
// Generate a location for the new table
|
||||
std::list<uint64_t>::iterator pending_outputs_inserted_elem;
|
||||
{
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
pending_outputs_inserted_elem = CaptureCurrentFileNumberInPendingOutputs();
|
||||
meta.fd =
|
||||
FileDescriptor(versions_->NewFileNumber(), 0, file_info->file_size);
|
||||
}
|
||||
|
||||
std::string db_fname = TableFileName(
|
||||
db_options_.db_paths, meta.fd.GetNumber(), meta.fd.GetPathId());
|
||||
|
||||
if (move_file) {
|
||||
status = env_->LinkFile(file_info->file_path, db_fname);
|
||||
if (status.IsNotSupported()) {
|
||||
// Original file is on a different FS, use copy instead of hard linking
|
||||
status = CopyFile(env_, file_info->file_path, db_fname, 0);
|
||||
}
|
||||
} else {
|
||||
status = CopyFile(env_, file_info->file_path, db_fname, 0);
|
||||
}
|
||||
TEST_SYNC_POINT("DBImpl::AddFile:FileCopied");
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
{
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
const MutableCFOptions mutable_cf_options =
|
||||
*cfd->GetLatestMutableCFOptions();
|
||||
|
||||
WriteThread::Writer w;
|
||||
write_thread_.EnterUnbatched(&w, &mutex_);
|
||||
|
||||
if (!snapshots_.empty()) {
|
||||
// Check that no snapshots are being held
|
||||
status =
|
||||
Status::NotSupported("Cannot add a file while holding snapshots");
|
||||
}
|
||||
|
||||
if (status.ok()) {
|
||||
// Verify that added file key range dont overlap with any keys in DB
|
||||
SuperVersion* sv = cfd->GetSuperVersion()->Ref();
|
||||
Arena arena;
|
||||
ReadOptions ro;
|
||||
ro.total_order_seek = true;
|
||||
ScopedArenaIterator iter(NewInternalIterator(ro, cfd, sv, &arena));
|
||||
|
||||
InternalKey range_start(file_info->smallest_key, kMaxSequenceNumber,
|
||||
kTypeValue);
|
||||
iter->Seek(range_start.Encode());
|
||||
status = iter->status();
|
||||
|
||||
if (status.ok() && iter->Valid()) {
|
||||
ParsedInternalKey seek_result;
|
||||
if (ParseInternalKey(iter->key(), &seek_result)) {
|
||||
auto* vstorage = cfd->current()->storage_info();
|
||||
if (vstorage->InternalComparator()->user_comparator()->Compare(
|
||||
seek_result.user_key, file_info->largest_key) <= 0) {
|
||||
status = Status::NotSupported("Cannot add overlapping range");
|
||||
}
|
||||
} else {
|
||||
status = Status::Corruption("DB have corrupted keys");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (status.ok()) {
|
||||
// Add file to L0
|
||||
VersionEdit edit;
|
||||
edit.SetColumnFamily(cfd->GetID());
|
||||
edit.AddFile(0, meta.fd.GetNumber(), meta.fd.GetPathId(),
|
||||
meta.fd.GetFileSize(), meta.smallest, meta.largest,
|
||||
meta.smallest_seqno, meta.largest_seqno,
|
||||
meta.marked_for_compaction);
|
||||
|
||||
status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_,
|
||||
directories_.GetDbDir());
|
||||
}
|
||||
write_thread_.ExitUnbatched(&w);
|
||||
|
||||
if (status.ok()) {
|
||||
delete InstallSuperVersionAndScheduleWork(cfd, nullptr,
|
||||
mutable_cf_options);
|
||||
|
||||
// Update internal stats
|
||||
InternalStats::CompactionStats stats(1);
|
||||
stats.micros = env_->NowMicros() - start_micros;
|
||||
stats.bytes_written = meta.fd.GetFileSize();
|
||||
stats.num_output_files = 1;
|
||||
cfd->internal_stats()->AddCompactionStats(0 /* L0 */, stats);
|
||||
cfd->internal_stats()->AddCFStats(InternalStats::BYTES_INGESTED_ADD_FILE,
|
||||
meta.fd.GetFileSize());
|
||||
}
|
||||
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
|
||||
}
|
||||
|
||||
if (!status.ok()) {
|
||||
// We failed to add the file to the database
|
||||
Status s = env_->DeleteFile(db_fname);
|
||||
if (!s.ok()) {
|
||||
Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
|
||||
"AddFile() clean up for file %s failed : %s", db_fname.c_str(),
|
||||
s.ToString().c_str());
|
||||
}
|
||||
} else {
|
||||
// File was ingested successfully
|
||||
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
|
||||
"New file %" PRIu64 " was added to L0 (Size: %.2f MB, "
|
||||
"entries: %" PRIu64 ")",
|
||||
meta.fd.GetNumber(),
|
||||
static_cast<double>(meta.fd.GetFileSize()) / 1048576.0,
|
||||
file_info->num_entries);
|
||||
|
||||
if (move_file) {
|
||||
// The file was moved and added successfully, remove original file link
|
||||
Status s = env_->DeleteFile(file_info->file_path);
|
||||
if (!s.ok()) {
|
||||
Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
|
||||
"%s was added to DB successfully but failed to remove original "
|
||||
"file link : %s",
|
||||
file_info->file_path.c_str(), s.ToString().c_str());
|
||||
}
|
||||
}
|
||||
}
|
||||
return status;
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
|
||||
const std::string& column_family_name,
|
||||
ColumnFamilyHandle** handle) {
|
||||
|
@ -623,6 +623,11 @@ class DBImpl : public DB {
|
||||
Status WaitForFlushMemTable(ColumnFamilyData* cfd);
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
// Finds the lowest level in the DB that the ingested file can be added to
|
||||
// REQUIRES: mutex_ held
|
||||
int PickLevelForIngestedFile(ColumnFamilyData* cfd,
|
||||
const ExternalSstFileInfo* file_info);
|
||||
|
||||
Status CompactFilesImpl(
|
||||
const CompactionOptions& compact_options, ColumnFamilyData* cfd,
|
||||
Version* version, const std::vector<std::string>& input_file_names,
|
||||
@ -912,6 +917,10 @@ class DBImpl : public DB {
|
||||
// The options to access storage files
|
||||
const EnvOptions env_options_;
|
||||
|
||||
// A set of compactions that are running right now
|
||||
// REQUIRES: mutex held
|
||||
std::unordered_set<Compaction*> running_compactions_;
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
WalManager wal_manager_;
|
||||
#endif // ROCKSDB_LITE
|
||||
|
321
db/db_impl_add_file.cc
Normal file
321
db/db_impl_add_file.cc
Normal file
@ -0,0 +1,321 @@
|
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
|
||||
#include "db/db_impl.h"
|
||||
|
||||
#ifndef __STDC_FORMAT_MACROS
|
||||
#define __STDC_FORMAT_MACROS
|
||||
#endif
|
||||
|
||||
#include <inttypes.h>
|
||||
|
||||
#include "db/builder.h"
|
||||
#include "rocksdb/db.h"
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/sst_file_writer.h"
|
||||
#include "table/table_builder.h"
|
||||
#include "util/file_reader_writer.h"
|
||||
#include "util/file_util.h"
|
||||
#include "util/sync_point.h"
|
||||
|
||||
namespace rocksdb {
|
||||
#ifndef ROCKSDB_LITE
|
||||
Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
|
||||
const std::string& file_path, bool move_file) {
|
||||
Status status;
|
||||
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
||||
ColumnFamilyData* cfd = cfh->cfd();
|
||||
|
||||
ExternalSstFileInfo file_info;
|
||||
file_info.file_path = file_path;
|
||||
status = env_->GetFileSize(file_path, &file_info.file_size);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
// Access the file using TableReader to extract
|
||||
// version, number of entries, smallest user key, largest user key
|
||||
std::unique_ptr<RandomAccessFile> sst_file;
|
||||
status = env_->NewRandomAccessFile(file_path, &sst_file, env_options_);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
std::unique_ptr<RandomAccessFileReader> sst_file_reader;
|
||||
sst_file_reader.reset(new RandomAccessFileReader(std::move(sst_file)));
|
||||
|
||||
std::unique_ptr<TableReader> table_reader;
|
||||
status = cfd->ioptions()->table_factory->NewTableReader(
|
||||
TableReaderOptions(*cfd->ioptions(), env_options_,
|
||||
cfd->internal_comparator()),
|
||||
std::move(sst_file_reader), file_info.file_size, &table_reader);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
// Get the external sst file version from table properties
|
||||
const UserCollectedProperties& user_collected_properties =
|
||||
table_reader->GetTableProperties()->user_collected_properties;
|
||||
UserCollectedProperties::const_iterator external_sst_file_version_iter =
|
||||
user_collected_properties.find(ExternalSstFilePropertyNames::kVersion);
|
||||
if (external_sst_file_version_iter == user_collected_properties.end()) {
|
||||
return Status::InvalidArgument("Generated table version not found");
|
||||
}
|
||||
|
||||
file_info.version =
|
||||
DecodeFixed32(external_sst_file_version_iter->second.c_str());
|
||||
if (file_info.version == 1) {
|
||||
// version 1 imply that all sequence numbers in table equal 0
|
||||
file_info.sequence_number = 0;
|
||||
} else {
|
||||
return Status::InvalidArgument("Generated table version is not supported");
|
||||
}
|
||||
|
||||
// Get number of entries in table
|
||||
file_info.num_entries = table_reader->GetTableProperties()->num_entries;
|
||||
|
||||
ParsedInternalKey key;
|
||||
std::unique_ptr<InternalIterator> iter(
|
||||
table_reader->NewIterator(ReadOptions()));
|
||||
|
||||
// Get first (smallest) key from file
|
||||
iter->SeekToFirst();
|
||||
if (!ParseInternalKey(iter->key(), &key)) {
|
||||
return Status::Corruption("Generated table have corrupted keys");
|
||||
}
|
||||
if (key.sequence != 0) {
|
||||
return Status::Corruption("Generated table have non zero sequence number");
|
||||
}
|
||||
file_info.smallest_key = key.user_key.ToString();
|
||||
|
||||
// Get last (largest) key from file
|
||||
iter->SeekToLast();
|
||||
if (!ParseInternalKey(iter->key(), &key)) {
|
||||
return Status::Corruption("Generated table have corrupted keys");
|
||||
}
|
||||
if (key.sequence != 0) {
|
||||
return Status::Corruption("Generated table have non zero sequence number");
|
||||
}
|
||||
file_info.largest_key = key.user_key.ToString();
|
||||
|
||||
return AddFile(column_family, &file_info, move_file);
|
||||
}
|
||||
|
||||
Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
|
||||
const ExternalSstFileInfo* file_info, bool move_file) {
|
||||
Status status;
|
||||
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
||||
ColumnFamilyData* cfd = cfh->cfd();
|
||||
const uint64_t start_micros = env_->NowMicros();
|
||||
|
||||
if (file_info->num_entries == 0) {
|
||||
return Status::InvalidArgument("File contain no entries");
|
||||
}
|
||||
if (file_info->version != 1) {
|
||||
return Status::InvalidArgument("Generated table version is not supported");
|
||||
}
|
||||
// version 1 imply that file have only Put Operations with Sequence Number = 0
|
||||
|
||||
FileMetaData meta;
|
||||
meta.smallest =
|
||||
InternalKey(file_info->smallest_key, file_info->sequence_number,
|
||||
ValueType::kTypeValue);
|
||||
meta.largest = InternalKey(file_info->largest_key, file_info->sequence_number,
|
||||
ValueType::kTypeValue);
|
||||
if (!meta.smallest.Valid() || !meta.largest.Valid()) {
|
||||
return Status::Corruption("Generated table have corrupted keys");
|
||||
}
|
||||
meta.smallest_seqno = file_info->sequence_number;
|
||||
meta.largest_seqno = file_info->sequence_number;
|
||||
if (meta.smallest_seqno != 0 || meta.largest_seqno != 0) {
|
||||
return Status::InvalidArgument(
|
||||
"Non zero sequence numbers are not supported");
|
||||
}
|
||||
|
||||
// Generate a location for the new table
|
||||
std::list<uint64_t>::iterator pending_outputs_inserted_elem;
|
||||
{
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
pending_outputs_inserted_elem = CaptureCurrentFileNumberInPendingOutputs();
|
||||
meta.fd =
|
||||
FileDescriptor(versions_->NewFileNumber(), 0, file_info->file_size);
|
||||
}
|
||||
|
||||
std::string db_fname = TableFileName(
|
||||
db_options_.db_paths, meta.fd.GetNumber(), meta.fd.GetPathId());
|
||||
|
||||
if (move_file) {
|
||||
status = env_->LinkFile(file_info->file_path, db_fname);
|
||||
if (status.IsNotSupported()) {
|
||||
// Original file is on a different FS, use copy instead of hard linking
|
||||
status = CopyFile(env_, file_info->file_path, db_fname, 0);
|
||||
}
|
||||
} else {
|
||||
status = CopyFile(env_, file_info->file_path, db_fname, 0);
|
||||
}
|
||||
TEST_SYNC_POINT("DBImpl::AddFile:FileCopied");
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
// The level the file will be ingested into
|
||||
int target_level = 0;
|
||||
{
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
const MutableCFOptions mutable_cf_options =
|
||||
*cfd->GetLatestMutableCFOptions();
|
||||
|
||||
WriteThread::Writer w;
|
||||
write_thread_.EnterUnbatched(&w, &mutex_);
|
||||
|
||||
if (!snapshots_.empty()) {
|
||||
// Check that no snapshots are being held
|
||||
status =
|
||||
Status::NotSupported("Cannot add a file while holding snapshots");
|
||||
}
|
||||
|
||||
if (status.ok()) {
|
||||
// Verify that added file key range dont overlap with any keys in DB
|
||||
SuperVersion* sv = cfd->GetSuperVersion()->Ref();
|
||||
Arena arena;
|
||||
ReadOptions ro;
|
||||
ro.total_order_seek = true;
|
||||
ScopedArenaIterator iter(NewInternalIterator(ro, cfd, sv, &arena));
|
||||
|
||||
InternalKey range_start(file_info->smallest_key, kMaxSequenceNumber,
|
||||
kTypeValue);
|
||||
iter->Seek(range_start.Encode());
|
||||
status = iter->status();
|
||||
|
||||
if (status.ok() && iter->Valid()) {
|
||||
ParsedInternalKey seek_result;
|
||||
if (ParseInternalKey(iter->key(), &seek_result)) {
|
||||
auto* vstorage = cfd->current()->storage_info();
|
||||
if (vstorage->InternalComparator()->user_comparator()->Compare(
|
||||
seek_result.user_key, file_info->largest_key) <= 0) {
|
||||
status = Status::NotSupported("Cannot add overlapping range");
|
||||
}
|
||||
} else {
|
||||
status = Status::Corruption("DB have corrupted keys");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (status.ok()) {
|
||||
// Add file to the lowest possible level
|
||||
target_level = PickLevelForIngestedFile(cfd, file_info);
|
||||
VersionEdit edit;
|
||||
edit.SetColumnFamily(cfd->GetID());
|
||||
edit.AddFile(target_level, meta.fd.GetNumber(), meta.fd.GetPathId(),
|
||||
meta.fd.GetFileSize(), meta.smallest, meta.largest,
|
||||
meta.smallest_seqno, meta.largest_seqno,
|
||||
meta.marked_for_compaction);
|
||||
|
||||
status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_,
|
||||
directories_.GetDbDir());
|
||||
}
|
||||
write_thread_.ExitUnbatched(&w);
|
||||
|
||||
if (status.ok()) {
|
||||
delete InstallSuperVersionAndScheduleWork(cfd, nullptr,
|
||||
mutable_cf_options);
|
||||
|
||||
// Update internal stats
|
||||
InternalStats::CompactionStats stats(1);
|
||||
stats.micros = env_->NowMicros() - start_micros;
|
||||
stats.bytes_written = meta.fd.GetFileSize();
|
||||
stats.num_output_files = 1;
|
||||
cfd->internal_stats()->AddCompactionStats(target_level, stats);
|
||||
cfd->internal_stats()->AddCFStats(InternalStats::BYTES_INGESTED_ADD_FILE,
|
||||
meta.fd.GetFileSize());
|
||||
}
|
||||
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
|
||||
}
|
||||
|
||||
if (!status.ok()) {
|
||||
// We failed to add the file to the database
|
||||
Status s = env_->DeleteFile(db_fname);
|
||||
if (!s.ok()) {
|
||||
Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
|
||||
"AddFile() clean up for file %s failed : %s", db_fname.c_str(),
|
||||
s.ToString().c_str());
|
||||
}
|
||||
} else {
|
||||
// File was ingested successfully
|
||||
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
|
||||
"New file %" PRIu64
|
||||
" was added to L%d (Size: %.2f MB, "
|
||||
"entries: %" PRIu64 ")",
|
||||
meta.fd.GetNumber(), target_level,
|
||||
static_cast<double>(meta.fd.GetFileSize()) / 1048576.0,
|
||||
file_info->num_entries);
|
||||
|
||||
if (move_file) {
|
||||
// The file was moved and added successfully, remove original file link
|
||||
Status s = env_->DeleteFile(file_info->file_path);
|
||||
if (!s.ok()) {
|
||||
Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
|
||||
"%s was added to DB successfully but failed to remove original "
|
||||
"file link : %s",
|
||||
file_info->file_path.c_str(), s.ToString().c_str());
|
||||
}
|
||||
}
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
// Finds the lowest level in the DB that the ingested file can be added to
|
||||
int DBImpl::PickLevelForIngestedFile(ColumnFamilyData* cfd,
|
||||
const ExternalSstFileInfo* file_info) {
|
||||
mutex_.AssertHeld();
|
||||
|
||||
int target_level = 0;
|
||||
auto* vstorage = cfd->current()->storage_info();
|
||||
auto* ucmp = vstorage->InternalComparator()->user_comparator();
|
||||
|
||||
Slice file_smallest_user_key(file_info->smallest_key);
|
||||
Slice file_largest_user_key(file_info->largest_key);
|
||||
|
||||
for (int lvl = cfd->NumberLevels() - 1; lvl >= vstorage->base_level();
|
||||
lvl--) {
|
||||
if (vstorage->OverlapInLevel(lvl, &file_smallest_user_key,
|
||||
&file_largest_user_key) == false) {
|
||||
// Make sure that the file dont overlap with the output of any
|
||||
// compaction running right now
|
||||
Slice compaction_smallest_user_key;
|
||||
Slice compaction_largest_user_key;
|
||||
bool overlap_with_compaction_output = false;
|
||||
for (Compaction* c : running_compactions_) {
|
||||
if (c->column_family_data()->GetID() != cfd->GetID() ||
|
||||
c->output_level() != lvl) {
|
||||
continue;
|
||||
}
|
||||
|
||||
compaction_smallest_user_key = c->GetSmallestUserKey();
|
||||
compaction_largest_user_key = c->GetLargestUserKey();
|
||||
|
||||
if (ucmp->Compare(file_smallest_user_key,
|
||||
compaction_largest_user_key) <= 0 &&
|
||||
ucmp->Compare(file_largest_user_key,
|
||||
compaction_smallest_user_key) >= 0) {
|
||||
overlap_with_compaction_output = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (overlap_with_compaction_output == false) {
|
||||
// Level lvl is the lowest level that dont have any files with key
|
||||
// range overlapping with our file key range and no compactions
|
||||
// planning to add overlapping files in it.
|
||||
target_level = lvl;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return target_level;
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
} // namespace rocksdb
|
@ -1243,6 +1243,215 @@ TEST_F(DBSSTTest, AddExternalSstFileOverlappingRanges) {
|
||||
kSkipFIFOCompaction));
|
||||
}
|
||||
|
||||
TEST_F(DBSSTTest, AddExternalSstFilePickedLevel) {
|
||||
std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/";
|
||||
env_->CreateDir(sst_files_folder);
|
||||
Options options = CurrentOptions();
|
||||
options.disable_auto_compactions = false;
|
||||
options.level0_file_num_compaction_trigger = 4;
|
||||
options.num_levels = 4;
|
||||
options.env = env_;
|
||||
DestroyAndReopen(options);
|
||||
|
||||
std::vector<std::vector<int>> file_to_keys;
|
||||
|
||||
// File 0 will go to last level (L3)
|
||||
file_to_keys.push_back({1, 10});
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(),
|
||||
file_to_keys.size() - 1));
|
||||
EXPECT_EQ(FilesPerLevel(), "0,0,0,1");
|
||||
|
||||
// File 1 will go to level L2 (since it overlap with file 0 in L3)
|
||||
file_to_keys.push_back({2, 9});
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(),
|
||||
file_to_keys.size() - 1));
|
||||
EXPECT_EQ(FilesPerLevel(), "0,0,1,1");
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency({
|
||||
{"DBSSTTest::AddExternalSstFilePickedLevel:0",
|
||||
"BackgroundCallCompaction:0"},
|
||||
{"DBImpl::BackgroundCompaction:Start",
|
||||
"DBSSTTest::AddExternalSstFilePickedLevel:1"},
|
||||
{"DBSSTTest::AddExternalSstFilePickedLevel:2",
|
||||
"DBImpl::BackgroundCompaction:NonTrivial:AfterRun"},
|
||||
});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
// Flush 4 files containing the same keys
|
||||
for (int i = 0; i < 4; i++) {
|
||||
ASSERT_OK(Put(Key(3), Key(3) + "put"));
|
||||
ASSERT_OK(Put(Key(8), Key(8) + "put"));
|
||||
ASSERT_OK(Flush());
|
||||
}
|
||||
|
||||
// Wait for BackgroundCompaction() to be called
|
||||
TEST_SYNC_POINT("DBSSTTest::AddExternalSstFilePickedLevel:0");
|
||||
TEST_SYNC_POINT("DBSSTTest::AddExternalSstFilePickedLevel:1");
|
||||
|
||||
EXPECT_EQ(FilesPerLevel(), "4,0,1,1");
|
||||
|
||||
// This file overlaps with file 0 (L3), file 1 (L2) and the
|
||||
// output of compaction going to L1
|
||||
file_to_keys.push_back({4, 7});
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(),
|
||||
file_to_keys.size() - 1));
|
||||
EXPECT_EQ(FilesPerLevel(), "5,0,1,1");
|
||||
|
||||
// This file does not overlap with any file or with the running compaction
|
||||
file_to_keys.push_back({9000, 9001});
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(),
|
||||
file_to_keys.size() - 1));
|
||||
EXPECT_EQ(FilesPerLevel(), "5,0,1,2");
|
||||
|
||||
// Hold compaction from finishing
|
||||
TEST_SYNC_POINT("DBSSTTest::AddExternalSstFilePickedLevel:2");
|
||||
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
EXPECT_EQ(FilesPerLevel(), "1,1,1,2");
|
||||
|
||||
for (size_t file_id = 0; file_id < file_to_keys.size(); file_id++) {
|
||||
for (auto& key_id : file_to_keys[file_id]) {
|
||||
std::string k = Key(key_id);
|
||||
std::string v = k + ToString(file_id);
|
||||
if (key_id == 3 || key_id == 8) {
|
||||
v = k + "put";
|
||||
}
|
||||
|
||||
ASSERT_EQ(Get(k), v);
|
||||
}
|
||||
}
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_F(DBSSTTest, AddExternalSstFilePickedLevelDynamic) {
|
||||
std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/";
|
||||
env_->CreateDir(sst_files_folder);
|
||||
Options options = CurrentOptions();
|
||||
options.disable_auto_compactions = false;
|
||||
options.level0_file_num_compaction_trigger = 4;
|
||||
options.level_compaction_dynamic_level_bytes = true;
|
||||
options.num_levels = 4;
|
||||
options.env = env_;
|
||||
DestroyAndReopen(options);
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency({
|
||||
{"DBSSTTest::AddExternalSstFilePickedLevelDynamic:0",
|
||||
"BackgroundCallCompaction:0"},
|
||||
{"DBImpl::BackgroundCompaction:Start",
|
||||
"DBSSTTest::AddExternalSstFilePickedLevelDynamic:1"},
|
||||
{"DBSSTTest::AddExternalSstFilePickedLevelDynamic:2",
|
||||
"DBImpl::BackgroundCompaction:NonTrivial:AfterRun"},
|
||||
});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
// Flush 4 files containing the same keys
|
||||
for (int i = 0; i < 4; i++) {
|
||||
for (int k = 20; k <= 30; k++) {
|
||||
ASSERT_OK(Put(Key(k), Key(k) + "put"));
|
||||
}
|
||||
for (int k = 50; k <= 60; k++) {
|
||||
ASSERT_OK(Put(Key(k), Key(k) + "put"));
|
||||
}
|
||||
ASSERT_OK(Flush());
|
||||
}
|
||||
|
||||
// Wait for BackgroundCompaction() to be called
|
||||
TEST_SYNC_POINT("DBSSTTest::AddExternalSstFilePickedLevelDynamic:0");
|
||||
TEST_SYNC_POINT("DBSSTTest::AddExternalSstFilePickedLevelDynamic:1");
|
||||
std::vector<std::vector<int>> file_to_keys;
|
||||
|
||||
// This file overlaps with the output of the compaction (going to L3)
|
||||
// so the file will be added to L0 since L3 is the base level
|
||||
file_to_keys.push_back({31, 32, 33, 34});
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(),
|
||||
file_to_keys.size() - 1));
|
||||
EXPECT_EQ(FilesPerLevel(), "5");
|
||||
|
||||
// This file does not overlap with the current running compactiong
|
||||
file_to_keys.push_back({9000, 9001});
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(),
|
||||
file_to_keys.size() - 1));
|
||||
EXPECT_EQ(FilesPerLevel(), "5,0,0,1");
|
||||
|
||||
// Hold compaction from finishing
|
||||
TEST_SYNC_POINT("DBSSTTest::AddExternalSstFilePickedLevelDynamic:2");
|
||||
|
||||
// Output of the compaction will go to L3
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
EXPECT_EQ(FilesPerLevel(), "1,0,0,2");
|
||||
|
||||
Close();
|
||||
options.disable_auto_compactions = true;
|
||||
Reopen(options);
|
||||
|
||||
file_to_keys.push_back({1, 15, 19});
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(),
|
||||
file_to_keys.size() - 1));
|
||||
ASSERT_EQ(FilesPerLevel(), "1,0,0,3");
|
||||
|
||||
file_to_keys.push_back({1000, 1001, 1002});
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(),
|
||||
file_to_keys.size() - 1));
|
||||
ASSERT_EQ(FilesPerLevel(), "1,0,0,4");
|
||||
|
||||
file_to_keys.push_back({500, 600, 700});
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(),
|
||||
file_to_keys.size() - 1));
|
||||
ASSERT_EQ(FilesPerLevel(), "1,0,0,5");
|
||||
|
||||
// File 5 overlaps with file 2 (L3 / base level)
|
||||
file_to_keys.push_back({2, 10});
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(),
|
||||
file_to_keys.size() - 1));
|
||||
ASSERT_EQ(FilesPerLevel(), "2,0,0,5");
|
||||
|
||||
// File 6 overlaps with file 2 (L3 / base level) and file 5 (L0)
|
||||
file_to_keys.push_back({3, 9});
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(),
|
||||
file_to_keys.size() - 1));
|
||||
ASSERT_EQ(FilesPerLevel(), "3,0,0,5");
|
||||
|
||||
// Verify data in files
|
||||
for (size_t file_id = 0; file_id < file_to_keys.size(); file_id++) {
|
||||
for (auto& key_id : file_to_keys[file_id]) {
|
||||
std::string k = Key(key_id);
|
||||
std::string v = k + ToString(file_id);
|
||||
|
||||
ASSERT_EQ(Get(k), v);
|
||||
}
|
||||
}
|
||||
|
||||
// Write range [5 => 10] to L0
|
||||
for (int i = 5; i <= 10; i++) {
|
||||
std::string k = Key(i);
|
||||
std::string v = k + "put";
|
||||
ASSERT_OK(Put(k, v));
|
||||
}
|
||||
ASSERT_OK(Flush());
|
||||
ASSERT_EQ(FilesPerLevel(), "4,0,0,5");
|
||||
|
||||
// File 7 overlaps with file 4 (L3)
|
||||
file_to_keys.push_back({650, 651, 652});
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(),
|
||||
file_to_keys.size() - 1));
|
||||
ASSERT_EQ(FilesPerLevel(), "5,0,0,5");
|
||||
|
||||
for (size_t file_id = 0; file_id < file_to_keys.size(); file_id++) {
|
||||
for (auto& key_id : file_to_keys[file_id]) {
|
||||
std::string k = Key(key_id);
|
||||
std::string v = k + ToString(file_id);
|
||||
if (key_id >= 5 && key_id <= 10) {
|
||||
v = k + "put";
|
||||
}
|
||||
|
||||
ASSERT_EQ(Get(k), v);
|
||||
}
|
||||
}
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
// 1 Create some SST files by inserting K-V pairs into DB
|
||||
|
@ -1075,6 +1075,35 @@ std::vector<std::uint64_t> DBTestBase::ListTableFiles(Env* env,
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
Status DBTestBase::GenerateAndAddExternalFile(const Options options,
|
||||
std::vector<int> keys,
|
||||
size_t file_id) {
|
||||
std::string file_path =
|
||||
test::TmpDir(env_) + "/sst_files/" + ToString(file_id);
|
||||
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator);
|
||||
|
||||
Status s = sst_file_writer.Open(file_path);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
for (auto& entry : keys) {
|
||||
std::string k = Key(entry);
|
||||
std::string v = k + ToString(file_id);
|
||||
s = sst_file_writer.Add(k, v);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
}
|
||||
s = sst_file_writer.Finish();
|
||||
|
||||
if (s.ok()) {
|
||||
s = db_->AddFile(file_path);
|
||||
}
|
||||
|
||||
return s;
|
||||
}
|
||||
|
||||
uint64_t DBTestBase::GetNumberOfSstFilesForColumnFamily(
|
||||
DB* db, std::string column_family_name) {
|
||||
std::vector<LiveFileMetaData> metadata;
|
||||
|
@ -39,6 +39,7 @@
|
||||
#include "rocksdb/filter_policy.h"
|
||||
#include "rocksdb/options.h"
|
||||
#include "rocksdb/slice.h"
|
||||
#include "rocksdb/sst_file_writer.h"
|
||||
#include "rocksdb/statistics.h"
|
||||
#include "rocksdb/table.h"
|
||||
#include "rocksdb/utilities/checkpoint.h"
|
||||
@ -797,6 +798,9 @@ class DBTestBase : public testing::Test {
|
||||
std::vector<std::uint64_t> ListTableFiles(Env* env, const std::string& path);
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
Status GenerateAndAddExternalFile(const Options options,
|
||||
std::vector<int> keys, size_t file_id);
|
||||
|
||||
uint64_t GetNumberOfSstFilesForColumnFamily(DB* db,
|
||||
std::string column_family_name);
|
||||
#endif // ROCKSDB_LITE
|
||||
|
@ -800,9 +800,10 @@ class DB {
|
||||
// Current Requirements:
|
||||
// (1) Key range in loaded table file don't overlap with
|
||||
// existing keys or tombstones in DB.
|
||||
// (2) No other writes happen during AddFile call, otherwise
|
||||
// DB may get corrupted.
|
||||
// (3) No snapshots are held.
|
||||
// (2) No snapshots are held.
|
||||
//
|
||||
// Notes: We will try to ingest the file to the lowest possible level
|
||||
// even if the file compression dont match the level compression
|
||||
virtual Status AddFile(ColumnFamilyHandle* column_family,
|
||||
const std::string& file_path,
|
||||
bool move_file = false) = 0;
|
||||
|
Loading…
Reference in New Issue
Block a user