Write file temperature information to manifest (#8284)
Summary: As a part of tiered storage, writing tempeature information to manifest is needed so that after DB recovery, RocksDB still has the tiering information, to implement some further necessary functionalities. Also fix some issues in simulated hybrid FS. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8284 Test Plan: Add a new unit test to validate that the information is indeed written and read back. Reviewed By: zhichao-cao Differential Revision: D28335801 fbshipit-source-id: 56aeb2e6ea090be0200181dd968c8a7278037def
This commit is contained in:
parent
feb06e83b2
commit
0ed8cb666d
@ -1689,8 +1689,9 @@ Status CompactionJob::OpenCompactionOutputFile(
|
||||
|
||||
// Pass temperature of botommost files to FileSystem.
|
||||
FileOptions fo_copy = file_options_;
|
||||
Temperature temperature = Temperature::kUnknown;
|
||||
if (bottommost_level_) {
|
||||
fo_copy.temperature =
|
||||
fo_copy.temperature = temperature =
|
||||
sub_compact->compaction->mutable_cf_options()->bottommost_temperature;
|
||||
}
|
||||
|
||||
@ -1742,6 +1743,7 @@ Status CompactionJob::OpenCompactionOutputFile(
|
||||
sub_compact->compaction->output_path_id(), 0);
|
||||
meta.oldest_ancester_time = oldest_ancester_time;
|
||||
meta.file_creation_time = current_time;
|
||||
meta.temperature = temperature;
|
||||
sub_compact->outputs.emplace_back(
|
||||
std::move(meta), cfd->internal_comparator(),
|
||||
/*enable_order_check=*/
|
||||
|
@ -5531,6 +5531,28 @@ TEST_P(RenameCurrentTest, Compaction) {
|
||||
ASSERT_EQ("NOT_FOUND", Get("foo"));
|
||||
ASSERT_EQ("d_value", Get("d"));
|
||||
}
|
||||
|
||||
TEST_F(DBTest2, BottommostTemperature) {
|
||||
Options options = CurrentOptions();
|
||||
options.bottommost_temperature = Temperature::kWarm;
|
||||
options.level0_file_num_compaction_trigger = 2;
|
||||
Reopen(options);
|
||||
|
||||
ASSERT_OK(Put("foo", "bar"));
|
||||
ASSERT_OK(Put("bar", "bar"));
|
||||
ASSERT_OK(Flush());
|
||||
ASSERT_OK(Put("foo", "bar"));
|
||||
ASSERT_OK(Put("bar", "bar"));
|
||||
ASSERT_OK(Flush());
|
||||
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
||||
|
||||
Reopen(options);
|
||||
|
||||
ColumnFamilyMetaData metadata;
|
||||
db_->GetColumnFamilyMetaData(&metadata);
|
||||
ASSERT_EQ(1, metadata.file_count);
|
||||
ASSERT_EQ(Temperature::kWarm, metadata.levels[1].files[0].temperature);
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
// WAL recovery mode is WALRecoveryMode::kPointInTimeRecovery.
|
||||
@ -5586,7 +5608,6 @@ TEST_F(DBTest2, PointInTimeRecoveryWithSyncFailureInCFCreation) {
|
||||
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
|
||||
ReopenWithColumnFamilies({"default", "test1", "test2"}, options);
|
||||
}
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
|
||||
|
@ -191,6 +191,11 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
|
||||
char p = static_cast<char>(f.fd.GetPathId());
|
||||
PutLengthPrefixedSlice(dst, Slice(&p, 1));
|
||||
}
|
||||
if (f.temperature != Temperature::kUnknown) {
|
||||
PutVarint32(dst, NewFileCustomTag::kTemperature);
|
||||
char p = static_cast<char>(f.temperature);
|
||||
PutLengthPrefixedSlice(dst, Slice(&p, 1));
|
||||
}
|
||||
if (f.marked_for_compaction) {
|
||||
PutVarint32(dst, NewFileCustomTag::kNeedCompaction);
|
||||
char p = static_cast<char>(1);
|
||||
@ -360,6 +365,16 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
|
||||
return "invalid oldest blob file number";
|
||||
}
|
||||
break;
|
||||
case kTemperature:
|
||||
if (field.size() != 1) {
|
||||
return "temperature field wrong size";
|
||||
} else {
|
||||
Temperature casted_field = static_cast<Temperature>(field[0]);
|
||||
if (casted_field <= Temperature::kCold) {
|
||||
f.temperature = casted_field;
|
||||
}
|
||||
}
|
||||
break;
|
||||
default:
|
||||
if ((custom_tag & kCustomTagNonSafeIgnoreMask) != 0) {
|
||||
// Should not proceed if cannot understand it
|
||||
@ -774,6 +789,12 @@ std::string VersionEdit::DebugString(bool hex_key) const {
|
||||
r.append(f.file_checksum);
|
||||
r.append(" file_checksum_func_name: ");
|
||||
r.append(f.file_checksum_func_name);
|
||||
if (f.temperature != Temperature::kUnknown) {
|
||||
r.append(" temperature: ");
|
||||
// Maybe change to human readable format whenthe feature becomes
|
||||
// permanent
|
||||
r.append(ToString(static_cast<int>(f.temperature)));
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto& blob_file_addition : blob_file_additions_) {
|
||||
@ -876,6 +897,11 @@ std::string VersionEdit::DebugJSON(int edit_num, bool hex_key) const {
|
||||
if (f.oldest_blob_file_number != kInvalidBlobFileNumber) {
|
||||
jw << "OldestBlobFile" << f.oldest_blob_file_number;
|
||||
}
|
||||
if (f.temperature != Temperature::kUnknown) {
|
||||
// Maybe change to human readable format whenthe feature becomes
|
||||
// permanent
|
||||
jw << "Temperature" << static_cast<int>(f.temperature);
|
||||
}
|
||||
jw.EndArrayedObject();
|
||||
}
|
||||
|
||||
|
@ -81,6 +81,7 @@ enum NewFileCustomTag : uint32_t {
|
||||
kFileCreationTime = 6,
|
||||
kFileChecksum = 7,
|
||||
kFileChecksumFuncName = 8,
|
||||
kTemperature = 9,
|
||||
|
||||
// If this bit for the custom tag is set, opening DB should fail if
|
||||
// we don't know this field.
|
||||
@ -188,6 +189,7 @@ struct FileMetaData {
|
||||
|
||||
bool marked_for_compaction = false; // True if client asked us nicely to
|
||||
// compact this file.
|
||||
Temperature temperature = Temperature::kUnknown;
|
||||
|
||||
// Used only in BlobDB. The file number of the oldest blob file this SST file
|
||||
// refers to. 0 is an invalid value; BlobDB numbers the files starting from 1.
|
||||
|
@ -1485,15 +1485,16 @@ void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) {
|
||||
file_path = ioptions->cf_paths.back().path;
|
||||
}
|
||||
const uint64_t file_number = file->fd.GetNumber();
|
||||
files.emplace_back(SstFileMetaData{
|
||||
files.emplace_back(
|
||||
MakeTableFileName("", file_number), file_number, file_path,
|
||||
static_cast<size_t>(file->fd.GetFileSize()), file->fd.smallest_seqno,
|
||||
file->fd.largest_seqno, file->smallest.user_key().ToString(),
|
||||
file->largest.user_key().ToString(),
|
||||
file->stats.num_reads_sampled.load(std::memory_order_relaxed),
|
||||
file->being_compacted, file->oldest_blob_file_number,
|
||||
file->TryGetOldestAncesterTime(), file->TryGetFileCreationTime(),
|
||||
file->file_checksum, file->file_checksum_func_name});
|
||||
file->being_compacted, file->temperature,
|
||||
file->oldest_blob_file_number, file->TryGetOldestAncesterTime(),
|
||||
file->TryGetFileCreationTime(), file->file_checksum,
|
||||
file->file_checksum_func_name);
|
||||
files.back().num_entries = file->num_entries;
|
||||
files.back().num_deletions = file->num_deletions;
|
||||
level_size += file->fd.GetFileSize();
|
||||
|
@ -188,12 +188,13 @@ struct CompressionOptions {
|
||||
|
||||
// Temperature of a file. Used to pass to FileSystem for a different
|
||||
// placement and/or coding.
|
||||
// Reserve some numbers in the middle, in case we need to insert new tier
|
||||
// there.
|
||||
enum class Temperature : uint8_t {
|
||||
kHot,
|
||||
kWarm,
|
||||
kCold,
|
||||
kTotal,
|
||||
kUnknown = kTotal,
|
||||
kUnknown = 0,
|
||||
kHot = 0x04,
|
||||
kWarm = 0x08,
|
||||
kCold = 0x0C,
|
||||
};
|
||||
|
||||
enum UpdateStatus { // Return status For inplace update callback
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "rocksdb/options.h"
|
||||
#include "rocksdb/types.h"
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
@ -62,6 +63,7 @@ struct SstFileMetaData {
|
||||
being_compacted(false),
|
||||
num_entries(0),
|
||||
num_deletions(0),
|
||||
temperature(Temperature::kUnknown),
|
||||
oldest_blob_file_number(0),
|
||||
oldest_ancester_time(0),
|
||||
file_creation_time(0) {}
|
||||
@ -71,7 +73,8 @@ struct SstFileMetaData {
|
||||
SequenceNumber _smallest_seqno, SequenceNumber _largest_seqno,
|
||||
const std::string& _smallestkey,
|
||||
const std::string& _largestkey, uint64_t _num_reads_sampled,
|
||||
bool _being_compacted, uint64_t _oldest_blob_file_number,
|
||||
bool _being_compacted, Temperature _temperature,
|
||||
uint64_t _oldest_blob_file_number,
|
||||
uint64_t _oldest_ancester_time, uint64_t _file_creation_time,
|
||||
std::string& _file_checksum,
|
||||
std::string& _file_checksum_func_name)
|
||||
@ -87,6 +90,7 @@ struct SstFileMetaData {
|
||||
being_compacted(_being_compacted),
|
||||
num_entries(0),
|
||||
num_deletions(0),
|
||||
temperature(_temperature),
|
||||
oldest_blob_file_number(_oldest_blob_file_number),
|
||||
oldest_ancester_time(_oldest_ancester_time),
|
||||
file_creation_time(_file_creation_time),
|
||||
@ -112,6 +116,9 @@ struct SstFileMetaData {
|
||||
uint64_t num_entries;
|
||||
uint64_t num_deletions;
|
||||
|
||||
// This feature is experimental and subject to change.
|
||||
Temperature temperature;
|
||||
|
||||
uint64_t oldest_blob_file_number; // The id of the oldest blob file
|
||||
// referenced by the file.
|
||||
// An SST file may be generated by compactions whose input files may
|
||||
|
@ -7,6 +7,7 @@
|
||||
|
||||
#include "tools/simulated_hybrid_file_system.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
|
||||
@ -110,8 +111,7 @@ IOStatus SimulatedHybridRaf::Read(uint64_t offset, size_t n,
|
||||
char* scratch, IODebugContext* dbg) const {
|
||||
if (temperature_ == Temperature::kWarm) {
|
||||
Env::Default()->SleepForMicroseconds(kLatencyAddedPerRequestUs);
|
||||
rate_limiter_->Request(kDummyBytesPerRequest, Env::IOPriority::IO_LOW,
|
||||
nullptr);
|
||||
RequestRateLimit(1);
|
||||
}
|
||||
return target()->Read(offset, n, options, result, scratch, dbg);
|
||||
}
|
||||
@ -120,11 +120,9 @@ IOStatus SimulatedHybridRaf::MultiRead(FSReadRequest* reqs, size_t num_reqs,
|
||||
const IOOptions& options,
|
||||
IODebugContext* dbg) {
|
||||
if (temperature_ == Temperature::kWarm) {
|
||||
RequestRateLimit(static_cast<int64_t>(num_reqs));
|
||||
Env::Default()->SleepForMicroseconds(kLatencyAddedPerRequestUs *
|
||||
static_cast<int>(num_reqs));
|
||||
rate_limiter_->Request(
|
||||
static_cast<int64_t>(num_reqs) * kDummyBytesPerRequest,
|
||||
Env::IOPriority::IO_LOW, nullptr);
|
||||
}
|
||||
return target()->MultiRead(reqs, num_reqs, options, dbg);
|
||||
}
|
||||
@ -133,13 +131,22 @@ IOStatus SimulatedHybridRaf::Prefetch(uint64_t offset, size_t n,
|
||||
const IOOptions& options,
|
||||
IODebugContext* dbg) {
|
||||
if (temperature_ == Temperature::kWarm) {
|
||||
rate_limiter_->Request(kDummyBytesPerRequest, Env::IOPriority::IO_LOW,
|
||||
nullptr);
|
||||
RequestRateLimit(1);
|
||||
Env::Default()->SleepForMicroseconds(kLatencyAddedPerRequestUs);
|
||||
}
|
||||
return target()->Prefetch(offset, n, options, dbg);
|
||||
}
|
||||
|
||||
void SimulatedHybridRaf::RequestRateLimit(int64_t num_requests) const {
|
||||
int64_t left = num_requests * kDummyBytesPerRequest;
|
||||
const int64_t kMaxToRequest = kDummyBytesPerRequest / 100;
|
||||
while (left > 0) {
|
||||
int64_t to_request = std::min(kMaxToRequest, left);
|
||||
rate_limiter_->Request(to_request, Env::IOPriority::IO_LOW, nullptr);
|
||||
left -= to_request;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
#endif // ROCKSDB_LITE
|
||||
|
@ -83,6 +83,8 @@ class SimulatedHybridRaf : public FSRandomAccessFileWrapper {
|
||||
private:
|
||||
std::shared_ptr<RateLimiter> rate_limiter_;
|
||||
Temperature temperature_;
|
||||
|
||||
void RequestRateLimit(int64_t num_requests) const;
|
||||
};
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user