Merge branch 'master' into fix-range-deletion-bug
This commit is contained in:
commit
d21bd42116
@ -1070,6 +1070,7 @@ if(WITH_TESTS)
|
||||
add_custom_target(check COMMAND ${CMAKE_CTEST_COMMAND})
|
||||
set(TESTUTILLIB testutillib${ARTIFACT_SUFFIX})
|
||||
add_library(${TESTUTILLIB} STATIC ${TESTUTIL_SOURCE})
|
||||
target_link_libraries(${TESTUTILLIB} ${LIBS})
|
||||
if(MSVC)
|
||||
set_target_properties(${TESTUTILLIB} PROPERTIES COMPILE_FLAGS "/Fd${CMAKE_CFG_INTDIR}/testutillib${ARTIFACT_SUFFIX}.pdb")
|
||||
endif()
|
||||
|
@ -17,6 +17,8 @@
|
||||
* A batched MultiGet API (DB::MultiGet()) that supports retrieving keys from multiple column families.
|
||||
* Full and partitioned filters in the block-based table use an improved Bloom filter implementation, enabled with format_version 5 (or above) because previous releases cannot read this filter. This replacement is faster and more accurate, especially for high bits per key or millions of keys in a single (full) filter. For example, the new Bloom filter has a lower false positive rate at 16 bits per key than the old one at 100 bits per key.
|
||||
* Added AVX2 instructions to USE_SSE builds to accelerate the new Bloom filter and XXH3-based hash function on compatible x86_64 platforms (Haswell and later, ~2014).
|
||||
* Support options.ttl with options.max_open_files = -1. File's oldest ancester time will be written to manifest. If it is availalbe, this information will be used instead of creation_time in table properties.
|
||||
* Setting options.ttl for universal compaction now has the same meaning as setting periodic_compaction_seconds.
|
||||
|
||||
### Performance Improvements
|
||||
* For 64-bit hashing, RocksDB is standardizing on a slightly modified preview version of XXH3. This function is now used for many non-persisted hashes, along with fastrange64() in place of the modulus operator, and some benchmarks show a slight improvement.
|
||||
|
@ -1,7 +1,7 @@
|
||||
# Read rocksdb version from version.h header file.
|
||||
|
||||
function(get_rocksdb_version version_var)
|
||||
file(READ "${CMAKE_SOURCE_DIR}/include/rocksdb/version.h" version_header_file)
|
||||
file(READ "${CMAKE_CURRENT_SOURCE_DIR}/include/rocksdb/version.h" version_header_file)
|
||||
foreach(component MAJOR MINOR PATCH)
|
||||
string(REGEX MATCH "#define ROCKSDB_${component} ([0-9]+)" _ ${version_header_file})
|
||||
set(ROCKSDB_VERSION_${component} ${CMAKE_MATCH_1})
|
||||
|
@ -367,6 +367,18 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
|
||||
}
|
||||
}
|
||||
|
||||
// TTL compactions would work similar to Periodic Compactions in Universal in
|
||||
// most of the cases. So, if ttl is set, execute the periodic compaction
|
||||
// codepath.
|
||||
if (result.compaction_style == kCompactionStyleUniversal && result.ttl != 0) {
|
||||
if (result.periodic_compaction_seconds != 0) {
|
||||
result.periodic_compaction_seconds =
|
||||
std::min(result.ttl, result.periodic_compaction_seconds);
|
||||
} else {
|
||||
result.periodic_compaction_seconds = result.ttl;
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -1198,11 +1210,6 @@ Status ColumnFamilyData::ValidateOptions(
|
||||
}
|
||||
|
||||
if (cf_options.ttl > 0) {
|
||||
if (db_options.max_open_files != -1) {
|
||||
return Status::NotSupported(
|
||||
"TTL is only supported when files are always "
|
||||
"kept open (set max_open_files = -1). ");
|
||||
}
|
||||
if (cf_options.table_factory->Name() != BlockBasedTableFactory().Name()) {
|
||||
return Status::NotSupported(
|
||||
"TTL is only supported in Block-Based Table format. ");
|
||||
|
@ -545,17 +545,16 @@ bool Compaction::ShouldFormSubcompactions() const {
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t Compaction::MinInputFileCreationTime() const {
|
||||
uint64_t min_creation_time = port::kMaxUint64;
|
||||
uint64_t Compaction::MinInputFileOldestAncesterTime() const {
|
||||
uint64_t min_oldest_ancester_time = port::kMaxUint64;
|
||||
for (const auto& file : inputs_[0].files) {
|
||||
if (file->fd.table_reader != nullptr &&
|
||||
file->fd.table_reader->GetTableProperties() != nullptr) {
|
||||
uint64_t creation_time =
|
||||
file->fd.table_reader->GetTableProperties()->creation_time;
|
||||
min_creation_time = std::min(min_creation_time, creation_time);
|
||||
uint64_t oldest_ancester_time = file->TryGetOldestAncesterTime();
|
||||
if (oldest_ancester_time != 0) {
|
||||
min_oldest_ancester_time =
|
||||
std::min(min_oldest_ancester_time, oldest_ancester_time);
|
||||
}
|
||||
}
|
||||
return min_creation_time;
|
||||
return min_oldest_ancester_time;
|
||||
}
|
||||
|
||||
int Compaction::GetInputBaseLevel() const {
|
||||
|
@ -291,7 +291,7 @@ class Compaction {
|
||||
|
||||
uint32_t max_subcompactions() const { return max_subcompactions_; }
|
||||
|
||||
uint64_t MinInputFileCreationTime() const;
|
||||
uint64_t MinInputFileOldestAncesterTime() const;
|
||||
|
||||
private:
|
||||
// mark (or clear) all files that are being compacted
|
||||
|
@ -1479,12 +1479,32 @@ Status CompactionJob::OpenCompactionOutputFile(
|
||||
return s;
|
||||
}
|
||||
|
||||
SubcompactionState::Output out;
|
||||
out.meta.fd =
|
||||
FileDescriptor(file_number, sub_compact->compaction->output_path_id(), 0);
|
||||
out.finished = false;
|
||||
// Try to figure out the output file's oldest ancester time.
|
||||
int64_t temp_current_time = 0;
|
||||
auto get_time_status = env_->GetCurrentTime(&temp_current_time);
|
||||
// Safe to proceed even if GetCurrentTime fails. So, log and proceed.
|
||||
if (!get_time_status.ok()) {
|
||||
ROCKS_LOG_WARN(db_options_.info_log,
|
||||
"Failed to get current time. Status: %s",
|
||||
get_time_status.ToString().c_str());
|
||||
}
|
||||
uint64_t current_time = static_cast<uint64_t>(temp_current_time);
|
||||
uint64_t oldest_ancester_time =
|
||||
sub_compact->compaction->MinInputFileOldestAncesterTime();
|
||||
if (oldest_ancester_time == port::kMaxUint64) {
|
||||
oldest_ancester_time = current_time;
|
||||
}
|
||||
|
||||
// Initialize a SubcompactionState::Output and add it to sub_compact->outputs
|
||||
{
|
||||
SubcompactionState::Output out;
|
||||
out.meta.fd = FileDescriptor(file_number,
|
||||
sub_compact->compaction->output_path_id(), 0);
|
||||
out.meta.oldest_ancester_time = oldest_ancester_time;
|
||||
out.finished = false;
|
||||
sub_compact->outputs.push_back(out);
|
||||
}
|
||||
|
||||
sub_compact->outputs.push_back(out);
|
||||
writable_file->SetIOPriority(Env::IO_LOW);
|
||||
writable_file->SetWriteLifeTimeHint(write_hint_);
|
||||
writable_file->SetPreallocationBlockSize(static_cast<size_t>(
|
||||
@ -1501,21 +1521,6 @@ Status CompactionJob::OpenCompactionOutputFile(
|
||||
bool skip_filters =
|
||||
cfd->ioptions()->optimize_filters_for_hits && bottommost_level_;
|
||||
|
||||
int64_t temp_current_time = 0;
|
||||
auto get_time_status = env_->GetCurrentTime(&temp_current_time);
|
||||
// Safe to proceed even if GetCurrentTime fails. So, log and proceed.
|
||||
if (!get_time_status.ok()) {
|
||||
ROCKS_LOG_WARN(db_options_.info_log,
|
||||
"Failed to get current time. Status: %s",
|
||||
get_time_status.ToString().c_str());
|
||||
}
|
||||
uint64_t current_time = static_cast<uint64_t>(temp_current_time);
|
||||
|
||||
uint64_t creation_time = sub_compact->compaction->MinInputFileCreationTime();
|
||||
if (creation_time == port::kMaxUint64) {
|
||||
creation_time = current_time;
|
||||
}
|
||||
|
||||
sub_compact->builder.reset(NewTableBuilder(
|
||||
*cfd->ioptions(), *(sub_compact->compaction->mutable_cf_options()),
|
||||
cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
|
||||
@ -1523,9 +1528,9 @@ Status CompactionJob::OpenCompactionOutputFile(
|
||||
sub_compact->compaction->output_compression(),
|
||||
0 /*sample_for_compression */,
|
||||
sub_compact->compaction->output_compression_opts(),
|
||||
sub_compact->compaction->output_level(), skip_filters, creation_time,
|
||||
0 /* oldest_key_time */, sub_compact->compaction->max_output_file_size(),
|
||||
current_time));
|
||||
sub_compact->compaction->output_level(), skip_filters,
|
||||
oldest_ancester_time, 0 /* oldest_key_time */,
|
||||
sub_compact->compaction->max_output_file_size(), current_time));
|
||||
LogFlush(db_options_.info_log);
|
||||
return s;
|
||||
}
|
||||
|
@ -183,7 +183,8 @@ class CompactionJobTest : public testing::Test {
|
||||
|
||||
VersionEdit edit;
|
||||
edit.AddFile(level, file_number, 0, 10, smallest_key, largest_key,
|
||||
smallest_seqno, largest_seqno, false, oldest_blob_file_number);
|
||||
smallest_seqno, largest_seqno, false, oldest_blob_file_number,
|
||||
kUnknownOldestAncesterTime);
|
||||
|
||||
mutex_.Lock();
|
||||
versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
|
||||
|
@ -43,6 +43,7 @@ bool FindIntraL0Compaction(const std::vector<FileMetaData*>& level_files,
|
||||
SequenceNumber earliest_mem_seqno) {
|
||||
// Do not pick ingested file when there is at least one memtable not flushed
|
||||
// which of seqno is overlap with the sst.
|
||||
TEST_SYNC_POINT("FindIntraL0Compaction");
|
||||
size_t start = 0;
|
||||
for (; start < level_files.size(); start++) {
|
||||
if (level_files[start]->being_compacted) {
|
||||
|
@ -92,7 +92,8 @@ class CompactionPickerTest : public testing::Test {
|
||||
file_number, path_id, file_size,
|
||||
InternalKey(smallest, smallest_seq, kTypeValue),
|
||||
InternalKey(largest, largest_seq, kTypeValue), smallest_seq,
|
||||
largest_seq, /* marked_for_compact */ false, kInvalidBlobFileNumber);
|
||||
largest_seq, /* marked_for_compact */ false, kInvalidBlobFileNumber,
|
||||
kUnknownOldestAncesterTime);
|
||||
f->compensated_file_size =
|
||||
(compensated_file_size != 0) ? compensated_file_size : file_size;
|
||||
vstorage_->AddFile(level, f);
|
||||
|
@ -3525,88 +3525,131 @@ TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
|
||||
TEST_F(DBCompactionTest, LevelTtlCascadingCompactions) {
|
||||
const int kValueSize = 100;
|
||||
|
||||
Options options = CurrentOptions();
|
||||
options.compression = kNoCompression;
|
||||
options.ttl = 24 * 60 * 60; // 24 hours
|
||||
options.max_open_files = -1;
|
||||
env_->time_elapse_only_sleep_ = false;
|
||||
options.env = env_;
|
||||
for (bool if_restart : {false, true}) {
|
||||
for (bool if_open_all_files : {false, true}) {
|
||||
Options options = CurrentOptions();
|
||||
options.compression = kNoCompression;
|
||||
options.ttl = 24 * 60 * 60; // 24 hours
|
||||
if (if_open_all_files) {
|
||||
options.max_open_files = -1;
|
||||
} else {
|
||||
options.max_open_files = 20;
|
||||
}
|
||||
// RocksDB sanitize max open files to at least 20. Modify it back.
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
|
||||
int* max_open_files = static_cast<int*>(arg);
|
||||
*max_open_files = 2;
|
||||
});
|
||||
// In the case where all files are opened and doing DB restart
|
||||
// forcing the oldest ancester time in manifest file to be 0 to
|
||||
// simulate the case of reading from an old version.
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"VersionEdit::EncodeTo:VarintOldestAncesterTime", [&](void* arg) {
|
||||
if (if_restart && if_open_all_files) {
|
||||
std::string* encoded_fieled = static_cast<std::string*>(arg);
|
||||
*encoded_fieled = "";
|
||||
PutVarint64(encoded_fieled, 0);
|
||||
}
|
||||
});
|
||||
|
||||
env_->addon_time_.store(0);
|
||||
DestroyAndReopen(options);
|
||||
env_->time_elapse_only_sleep_ = false;
|
||||
options.env = env_;
|
||||
|
||||
int ttl_compactions = 0;
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
|
||||
Compaction* compaction = reinterpret_cast<Compaction*>(arg);
|
||||
auto compaction_reason = compaction->compaction_reason();
|
||||
if (compaction_reason == CompactionReason::kTtl) {
|
||||
ttl_compactions++;
|
||||
}
|
||||
});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
env_->addon_time_.store(0);
|
||||
DestroyAndReopen(options);
|
||||
|
||||
// Add two L6 files with key ranges: [1 .. 100], [101 .. 200].
|
||||
Random rnd(301);
|
||||
for (int i = 1; i <= 100; ++i) {
|
||||
ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
|
||||
int ttl_compactions = 0;
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
|
||||
Compaction* compaction = reinterpret_cast<Compaction*>(arg);
|
||||
auto compaction_reason = compaction->compaction_reason();
|
||||
if (compaction_reason == CompactionReason::kTtl) {
|
||||
ttl_compactions++;
|
||||
}
|
||||
});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
// Add two L6 files with key ranges: [1 .. 100], [101 .. 200].
|
||||
Random rnd(301);
|
||||
for (int i = 1; i <= 100; ++i) {
|
||||
ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
|
||||
}
|
||||
Flush();
|
||||
for (int i = 101; i <= 200; ++i) {
|
||||
ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
|
||||
}
|
||||
Flush();
|
||||
MoveFilesToLevel(6);
|
||||
ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
|
||||
|
||||
// Add two L4 files with key ranges: [1 .. 50], [51 .. 150].
|
||||
for (int i = 1; i <= 50; ++i) {
|
||||
ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
|
||||
}
|
||||
Flush();
|
||||
for (int i = 51; i <= 150; ++i) {
|
||||
ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
|
||||
}
|
||||
Flush();
|
||||
MoveFilesToLevel(4);
|
||||
ASSERT_EQ("0,0,0,0,2,0,2", FilesPerLevel());
|
||||
|
||||
// Add one L1 file with key range: [26, 75].
|
||||
for (int i = 26; i <= 75; ++i) {
|
||||
ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
|
||||
}
|
||||
Flush();
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
MoveFilesToLevel(1);
|
||||
ASSERT_EQ("0,1,0,0,2,0,2", FilesPerLevel());
|
||||
|
||||
// LSM tree:
|
||||
// L1: [26 .. 75]
|
||||
// L4: [1 .. 50][51 ..... 150]
|
||||
// L6: [1 ........ 100][101 .... 200]
|
||||
//
|
||||
// On TTL expiry, TTL compaction should be initiated on L1 file, and the
|
||||
// compactions should keep going on until the key range hits bottom level.
|
||||
// In other words: the compaction on this data range "cascasdes" until
|
||||
// reaching the bottom level.
|
||||
//
|
||||
// Order of events on TTL expiry:
|
||||
// 1. L1 file falls to L3 via 2 trivial moves which are initiated by the
|
||||
// ttl
|
||||
// compaction.
|
||||
// 2. A TTL compaction happens between L3 and L4 files. Output file in L4.
|
||||
// 3. The new output file from L4 falls to L5 via 1 trival move initiated
|
||||
// by the ttl compaction.
|
||||
// 4. A TTL compaction happens between L5 and L6 files. Ouptut in L6.
|
||||
|
||||
// Add 25 hours and do a write
|
||||
env_->addon_time_.fetch_add(25 * 60 * 60);
|
||||
|
||||
ASSERT_OK(Put(Key(1), "1"));
|
||||
if (if_restart) {
|
||||
Reopen(options);
|
||||
} else {
|
||||
Flush();
|
||||
}
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_EQ("1,0,0,0,0,0,1", FilesPerLevel());
|
||||
ASSERT_EQ(5, ttl_compactions);
|
||||
|
||||
env_->addon_time_.fetch_add(25 * 60 * 60);
|
||||
ASSERT_OK(Put(Key(2), "1"));
|
||||
if (if_restart) {
|
||||
Reopen(options);
|
||||
} else {
|
||||
Flush();
|
||||
}
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_EQ("1,0,0,0,0,0,1", FilesPerLevel());
|
||||
ASSERT_GE(ttl_compactions, 6);
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
}
|
||||
Flush();
|
||||
for (int i = 101; i <= 200; ++i) {
|
||||
ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
|
||||
}
|
||||
Flush();
|
||||
MoveFilesToLevel(6);
|
||||
ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
|
||||
|
||||
// Add two L4 files with key ranges: [1 .. 50], [51 .. 150].
|
||||
for (int i = 1; i <= 50; ++i) {
|
||||
ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
|
||||
}
|
||||
Flush();
|
||||
for (int i = 51; i <= 150; ++i) {
|
||||
ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
|
||||
}
|
||||
Flush();
|
||||
MoveFilesToLevel(4);
|
||||
ASSERT_EQ("0,0,0,0,2,0,2", FilesPerLevel());
|
||||
|
||||
// Add one L1 file with key range: [26, 75].
|
||||
for (int i = 26; i <= 75; ++i) {
|
||||
ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
|
||||
}
|
||||
Flush();
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
MoveFilesToLevel(1);
|
||||
ASSERT_EQ("0,1,0,0,2,0,2", FilesPerLevel());
|
||||
|
||||
// LSM tree:
|
||||
// L1: [26 .. 75]
|
||||
// L4: [1 .. 50][51 ..... 150]
|
||||
// L6: [1 ........ 100][101 .... 200]
|
||||
//
|
||||
// On TTL expiry, TTL compaction should be initiated on L1 file, and the
|
||||
// compactions should keep going on until the key range hits bottom level.
|
||||
// In other words: the compaction on this data range "cascasdes" until
|
||||
// reaching the bottom level.
|
||||
//
|
||||
// Order of events on TTL expiry:
|
||||
// 1. L1 file falls to L3 via 2 trivial moves which are initiated by the ttl
|
||||
// compaction.
|
||||
// 2. A TTL compaction happens between L3 and L4 files. Output file in L4.
|
||||
// 3. The new output file from L4 falls to L5 via 1 trival move initiated
|
||||
// by the ttl compaction.
|
||||
// 4. A TTL compaction happens between L5 and L6 files. Ouptut in L6.
|
||||
|
||||
// Add 25 hours and do a write
|
||||
env_->addon_time_.fetch_add(25 * 60 * 60);
|
||||
ASSERT_OK(Put("a", "1"));
|
||||
Flush();
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_EQ("1,0,0,0,0,0,1", FilesPerLevel());
|
||||
ASSERT_EQ(5, ttl_compactions);
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_F(DBCompactionTest, LevelPeriodicCompaction) {
|
||||
@ -4876,7 +4919,7 @@ void IngestOneKeyValue(DBImpl* db, const std::string& key,
|
||||
}
|
||||
|
||||
TEST_P(DBCompactionTestWithParam,
|
||||
FlushAfterL0IntraCompactionCheckConsistencyFail) {
|
||||
FlushAfterIntraL0CompactionCheckConsistencyFail) {
|
||||
Options options = CurrentOptions();
|
||||
options.force_consistency_checks = true;
|
||||
options.compression = kNoCompression;
|
||||
@ -4887,11 +4930,16 @@ TEST_P(DBCompactionTestWithParam,
|
||||
|
||||
const size_t kValueSize = 1 << 20;
|
||||
Random rnd(301);
|
||||
std::atomic<int> pick_intra_l0_count(0);
|
||||
std::string value(RandomString(&rnd, kValueSize));
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"LevelCompactionPicker::PickCompactionBySize:0",
|
||||
{{"DBCompactionTestWithParam::FlushAfterIntraL0:1",
|
||||
"CompactionJob::Run():Start"}});
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"FindIntraL0Compaction",
|
||||
[&](void* /*arg*/) { pick_intra_l0_count.fetch_add(1); });
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
// prevents trivial move
|
||||
@ -4921,6 +4969,7 @@ TEST_P(DBCompactionTestWithParam,
|
||||
ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
|
||||
}
|
||||
|
||||
TEST_SYNC_POINT("DBCompactionTestWithParam::FlushAfterIntraL0:1");
|
||||
// Put one key, to make biggest log sequence number in this memtable is bigger
|
||||
// than sst which would be ingested in next step.
|
||||
ASSERT_OK(Put(Key(2), "b"));
|
||||
@ -4931,6 +4980,7 @@ TEST_P(DBCompactionTestWithParam,
|
||||
dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
|
||||
&level_to_files);
|
||||
ASSERT_GT(level_to_files[0].size(), 0);
|
||||
ASSERT_GT(pick_intra_l0_count.load(), 0);
|
||||
|
||||
ASSERT_OK(Flush());
|
||||
}
|
||||
@ -4960,9 +5010,14 @@ TEST_P(DBCompactionTestWithParam,
|
||||
ASSERT_OK(Flush());
|
||||
Compact("", Key(99));
|
||||
ASSERT_EQ(0, NumTableFilesAtLevel(0));
|
||||
|
||||
std::atomic<int> pick_intra_l0_count(0);
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"LevelCompactionPicker::PickCompactionBySize:0",
|
||||
{{"DBCompactionTestWithParam::IntraL0CompactionAfterFlush:1",
|
||||
"CompactionJob::Run():Start"}});
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"FindIntraL0Compaction",
|
||||
[&](void* /*arg*/) { pick_intra_l0_count.fetch_add(1); });
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
// Make 6 L0 sst.
|
||||
for (int i = 0; i < 6; ++i) {
|
||||
@ -4999,12 +5054,14 @@ TEST_P(DBCompactionTestWithParam,
|
||||
// Wake up flush job
|
||||
sleeping_tasks.WakeUp();
|
||||
sleeping_tasks.WaitUntilDone();
|
||||
TEST_SYNC_POINT("DBCompactionTestWithParam::IntraL0CompactionAfterFlush:1");
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
|
||||
uint64_t error_count = 0;
|
||||
db_->GetIntProperty("rocksdb.background-errors", &error_count);
|
||||
ASSERT_EQ(error_count, 0);
|
||||
ASSERT_GT(pick_intra_l0_count.load(), 0);
|
||||
for (int i = 0; i < 6; ++i) {
|
||||
ASSERT_EQ(bigvalue, Get(Key(i)));
|
||||
}
|
||||
|
@ -1256,7 +1256,8 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
|
||||
edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
|
||||
f->fd.GetFileSize(), f->smallest, f->largest,
|
||||
f->fd.smallest_seqno, f->fd.largest_seqno,
|
||||
f->marked_for_compaction, f->oldest_blob_file_number);
|
||||
f->marked_for_compaction, f->oldest_blob_file_number,
|
||||
f->oldest_ancester_time);
|
||||
}
|
||||
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
|
||||
"[%s] Apply version edit:\n%s", cfd->GetName().c_str(),
|
||||
@ -2671,7 +2672,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
|
||||
f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest,
|
||||
f->largest, f->fd.smallest_seqno,
|
||||
f->fd.largest_seqno, f->marked_for_compaction,
|
||||
f->oldest_blob_file_number);
|
||||
f->oldest_blob_file_number, f->oldest_ancester_time);
|
||||
|
||||
ROCKS_LOG_BUFFER(
|
||||
log_buffer,
|
||||
|
@ -128,7 +128,8 @@ Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) {
|
||||
edit.AddFile(target_level, f->fd.GetNumber(), f->fd.GetPathId(),
|
||||
f->fd.GetFileSize(), f->smallest, f->largest,
|
||||
f->fd.smallest_seqno, f->fd.largest_seqno,
|
||||
f->marked_for_compaction, f->oldest_blob_file_number);
|
||||
f->marked_for_compaction, f->oldest_blob_file_number,
|
||||
f->oldest_ancester_time);
|
||||
}
|
||||
|
||||
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
|
||||
|
@ -1175,6 +1175,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
|
||||
int64_t _current_time = 0;
|
||||
env_->GetCurrentTime(&_current_time); // ignore error
|
||||
const uint64_t current_time = static_cast<uint64_t>(_current_time);
|
||||
meta.oldest_ancester_time = current_time;
|
||||
|
||||
{
|
||||
auto write_hint = cfd->CalculateSSTWriteHint(0);
|
||||
@ -1224,7 +1225,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
|
||||
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
|
||||
meta.fd.GetFileSize(), meta.smallest, meta.largest,
|
||||
meta.fd.smallest_seqno, meta.fd.largest_seqno,
|
||||
meta.marked_for_compaction, meta.oldest_blob_file_number);
|
||||
meta.marked_for_compaction, meta.oldest_blob_file_number,
|
||||
meta.oldest_ancester_time);
|
||||
}
|
||||
|
||||
InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
|
||||
|
@ -606,6 +606,35 @@ TEST_F(DBOptionsTest, SanitizeDelayedWriteRate) {
|
||||
ASSERT_EQ(31 * 1024 * 1024, dbfull()->GetDBOptions().delayed_write_rate);
|
||||
}
|
||||
|
||||
TEST_F(DBOptionsTest, SanitizeUniversalTTLCompaction) {
|
||||
Options options;
|
||||
options.compaction_style = kCompactionStyleUniversal;
|
||||
|
||||
options.ttl = 0;
|
||||
options.periodic_compaction_seconds = 0;
|
||||
Reopen(options);
|
||||
ASSERT_EQ(0, dbfull()->GetOptions().ttl);
|
||||
ASSERT_EQ(0, dbfull()->GetOptions().periodic_compaction_seconds);
|
||||
|
||||
options.ttl = 0;
|
||||
options.periodic_compaction_seconds = 100;
|
||||
Reopen(options);
|
||||
ASSERT_EQ(0, dbfull()->GetOptions().ttl);
|
||||
ASSERT_EQ(100, dbfull()->GetOptions().periodic_compaction_seconds);
|
||||
|
||||
options.ttl = 100;
|
||||
options.periodic_compaction_seconds = 0;
|
||||
Reopen(options);
|
||||
ASSERT_EQ(100, dbfull()->GetOptions().ttl);
|
||||
ASSERT_EQ(100, dbfull()->GetOptions().periodic_compaction_seconds);
|
||||
|
||||
options.ttl = 100;
|
||||
options.periodic_compaction_seconds = 500;
|
||||
Reopen(options);
|
||||
ASSERT_EQ(100, dbfull()->GetOptions().ttl);
|
||||
ASSERT_EQ(100, dbfull()->GetOptions().periodic_compaction_seconds);
|
||||
}
|
||||
|
||||
TEST_F(DBOptionsTest, SanitizeFIFOPeriodicCompaction) {
|
||||
Options options;
|
||||
options.compaction_style = kCompactionStyleFIFO;
|
||||
|
@ -3311,22 +3311,6 @@ TEST_F(DBTest, FIFOCompactionStyleWithCompactionAndDelete) {
|
||||
}
|
||||
}
|
||||
|
||||
// Check that FIFO-with-TTL is not supported with max_open_files != -1.
|
||||
TEST_F(DBTest, FIFOCompactionWithTTLAndMaxOpenFilesTest) {
|
||||
Options options;
|
||||
options.compaction_style = kCompactionStyleFIFO;
|
||||
options.create_if_missing = true;
|
||||
options.ttl = 600; // seconds
|
||||
|
||||
// Check that it is not supported with max_open_files != -1.
|
||||
options.max_open_files = 100;
|
||||
options = CurrentOptions(options);
|
||||
ASSERT_TRUE(TryReopen(options).IsNotSupported());
|
||||
|
||||
options.max_open_files = -1;
|
||||
ASSERT_OK(TryReopen(options));
|
||||
}
|
||||
|
||||
// Check that FIFO-with-TTL is supported only with BlockBasedTableFactory.
|
||||
TEST_F(DBTest, FIFOCompactionWithTTLAndVariousTableFormatsTest) {
|
||||
Options options;
|
||||
@ -6181,19 +6165,6 @@ TEST_F(DBTest, FailWhenCompressionNotSupportedTest) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBTest, CreateColumnFamilyShouldFailOnIncompatibleOptions) {
|
||||
Options options = CurrentOptions();
|
||||
options.max_open_files = 100;
|
||||
Reopen(options);
|
||||
|
||||
ColumnFamilyOptions cf_options(options);
|
||||
// ttl is only supported when max_open_files is -1.
|
||||
cf_options.ttl = 3600;
|
||||
ColumnFamilyHandle* handle;
|
||||
ASSERT_NOK(db_->CreateColumnFamily(cf_options, "pikachu", &handle));
|
||||
delete handle;
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
TEST_F(DBTest, RowCache) {
|
||||
Options options = CurrentOptions();
|
||||
|
@ -244,10 +244,18 @@ Status ExternalSstFileIngestionJob::Run() {
|
||||
return status;
|
||||
}
|
||||
|
||||
// We use the import time as the ancester time. This is the time the data
|
||||
// is written to the database.
|
||||
uint64_t oldest_ancester_time = 0;
|
||||
int64_t temp_current_time = 0;
|
||||
if (env_->GetCurrentTime(&temp_current_time).ok()) {
|
||||
oldest_ancester_time = static_cast<uint64_t>(temp_current_time);
|
||||
}
|
||||
|
||||
edit_.AddFile(f.picked_level, f.fd.GetNumber(), f.fd.GetPathId(),
|
||||
f.fd.GetFileSize(), f.smallest_internal_key,
|
||||
f.largest_internal_key, f.assigned_seqno, f.assigned_seqno,
|
||||
false, kInvalidBlobFileNumber);
|
||||
false, kInvalidBlobFileNumber, oldest_ancester_time);
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
@ -365,6 +365,10 @@ Status FlushJob::WriteLevel0Table() {
|
||||
uint64_t oldest_key_time =
|
||||
mems_.front()->ApproximateOldestKeyTime();
|
||||
|
||||
// It's not clear whether oldest_key_time is always available. In case
|
||||
// it is not available, use current_time.
|
||||
meta_.oldest_ancester_time = std::min(current_time, oldest_key_time);
|
||||
|
||||
s = BuildTable(
|
||||
dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
|
||||
env_options_, cfd_->table_cache(), iter.get(),
|
||||
@ -408,7 +412,8 @@ Status FlushJob::WriteLevel0Table() {
|
||||
edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
|
||||
meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
|
||||
meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
|
||||
meta_.marked_for_compaction, meta_.oldest_blob_file_number);
|
||||
meta_.marked_for_compaction, meta_.oldest_blob_file_number,
|
||||
meta_.oldest_ancester_time);
|
||||
}
|
||||
#ifndef ROCKSDB_LITE
|
||||
// Piggyback FlushJobInfo on the first first flushed memtable.
|
||||
|
@ -133,6 +133,14 @@ Status ImportColumnFamilyJob::Run() {
|
||||
Status status;
|
||||
edit_.SetColumnFamily(cfd_->GetID());
|
||||
|
||||
// We use the import time as the ancester time. This is the time the data
|
||||
// is written to the database.
|
||||
uint64_t oldest_ancester_time = 0;
|
||||
int64_t temp_current_time = 0;
|
||||
if (env_->GetCurrentTime(&temp_current_time).ok()) {
|
||||
oldest_ancester_time = static_cast<uint64_t>(temp_current_time);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < files_to_import_.size(); ++i) {
|
||||
const auto& f = files_to_import_[i];
|
||||
const auto& file_metadata = metadata_[i];
|
||||
@ -140,7 +148,8 @@ Status ImportColumnFamilyJob::Run() {
|
||||
edit_.AddFile(file_metadata.level, f.fd.GetNumber(), f.fd.GetPathId(),
|
||||
f.fd.GetFileSize(), f.smallest_internal_key,
|
||||
f.largest_internal_key, file_metadata.smallest_seqno,
|
||||
file_metadata.largest_seqno, false, kInvalidBlobFileNumber);
|
||||
file_metadata.largest_seqno, false, kInvalidBlobFileNumber,
|
||||
oldest_ancester_time);
|
||||
|
||||
// If incoming sequence number is higher, update local sequence number.
|
||||
if (file_metadata.largest_seqno > versions_->LastSequence()) {
|
||||
|
@ -499,6 +499,7 @@ class Repairer {
|
||||
status =
|
||||
AddColumnFamily(props->column_family_name, t->column_family_id);
|
||||
}
|
||||
t->meta.oldest_ancester_time = props->creation_time;
|
||||
}
|
||||
ColumnFamilyData* cfd = nullptr;
|
||||
if (status.ok()) {
|
||||
@ -581,7 +582,8 @@ class Repairer {
|
||||
table->meta.largest, table->meta.fd.smallest_seqno,
|
||||
table->meta.fd.largest_seqno,
|
||||
table->meta.marked_for_compaction,
|
||||
table->meta.oldest_blob_file_number);
|
||||
table->meta.oldest_blob_file_number,
|
||||
table->meta.oldest_ancester_time);
|
||||
}
|
||||
assert(next_file_number_ > 0);
|
||||
vset_.MarkFileNumberUsed(next_file_number_ - 1);
|
||||
|
@ -62,7 +62,8 @@ class VersionBuilderTest : public testing::Test {
|
||||
FileMetaData* f = new FileMetaData(
|
||||
file_number, path_id, file_size, GetInternalKey(smallest, smallest_seq),
|
||||
GetInternalKey(largest, largest_seq), smallest_seqno, largest_seqno,
|
||||
/* marked_for_compact */ false, kInvalidBlobFileNumber);
|
||||
/* marked_for_compact */ false, kInvalidBlobFileNumber,
|
||||
kUnknownOldestAncesterTime);
|
||||
f->compensated_file_size = file_size;
|
||||
f->num_entries = num_entries;
|
||||
f->num_deletions = num_deletions;
|
||||
@ -113,7 +114,7 @@ TEST_F(VersionBuilderTest, ApplyAndSaveTo) {
|
||||
VersionEdit version_edit;
|
||||
version_edit.AddFile(2, 666, 0, 100U, GetInternalKey("301"),
|
||||
GetInternalKey("350"), 200, 200, false,
|
||||
kInvalidBlobFileNumber);
|
||||
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
|
||||
version_edit.DeleteFile(3, 27U);
|
||||
|
||||
EnvOptions env_options;
|
||||
@ -148,7 +149,7 @@ TEST_F(VersionBuilderTest, ApplyAndSaveToDynamic) {
|
||||
VersionEdit version_edit;
|
||||
version_edit.AddFile(3, 666, 0, 100U, GetInternalKey("301"),
|
||||
GetInternalKey("350"), 200, 200, false,
|
||||
kInvalidBlobFileNumber);
|
||||
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
|
||||
version_edit.DeleteFile(0, 1U);
|
||||
version_edit.DeleteFile(0, 88U);
|
||||
|
||||
@ -186,7 +187,7 @@ TEST_F(VersionBuilderTest, ApplyAndSaveToDynamic2) {
|
||||
VersionEdit version_edit;
|
||||
version_edit.AddFile(4, 666, 0, 100U, GetInternalKey("301"),
|
||||
GetInternalKey("350"), 200, 200, false,
|
||||
kInvalidBlobFileNumber);
|
||||
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
|
||||
version_edit.DeleteFile(0, 1U);
|
||||
version_edit.DeleteFile(0, 88U);
|
||||
version_edit.DeleteFile(4, 6U);
|
||||
@ -215,19 +216,19 @@ TEST_F(VersionBuilderTest, ApplyMultipleAndSaveTo) {
|
||||
VersionEdit version_edit;
|
||||
version_edit.AddFile(2, 666, 0, 100U, GetInternalKey("301"),
|
||||
GetInternalKey("350"), 200, 200, false,
|
||||
kInvalidBlobFileNumber);
|
||||
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
|
||||
version_edit.AddFile(2, 676, 0, 100U, GetInternalKey("401"),
|
||||
GetInternalKey("450"), 200, 200, false,
|
||||
kInvalidBlobFileNumber);
|
||||
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
|
||||
version_edit.AddFile(2, 636, 0, 100U, GetInternalKey("601"),
|
||||
GetInternalKey("650"), 200, 200, false,
|
||||
kInvalidBlobFileNumber);
|
||||
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
|
||||
version_edit.AddFile(2, 616, 0, 100U, GetInternalKey("501"),
|
||||
GetInternalKey("550"), 200, 200, false,
|
||||
kInvalidBlobFileNumber);
|
||||
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
|
||||
version_edit.AddFile(2, 606, 0, 100U, GetInternalKey("701"),
|
||||
GetInternalKey("750"), 200, 200, false,
|
||||
kInvalidBlobFileNumber);
|
||||
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
|
||||
|
||||
EnvOptions env_options;
|
||||
|
||||
@ -254,30 +255,30 @@ TEST_F(VersionBuilderTest, ApplyDeleteAndSaveTo) {
|
||||
VersionEdit version_edit;
|
||||
version_edit.AddFile(2, 666, 0, 100U, GetInternalKey("301"),
|
||||
GetInternalKey("350"), 200, 200, false,
|
||||
kInvalidBlobFileNumber);
|
||||
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
|
||||
version_edit.AddFile(2, 676, 0, 100U, GetInternalKey("401"),
|
||||
GetInternalKey("450"), 200, 200, false,
|
||||
kInvalidBlobFileNumber);
|
||||
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
|
||||
version_edit.AddFile(2, 636, 0, 100U, GetInternalKey("601"),
|
||||
GetInternalKey("650"), 200, 200, false,
|
||||
kInvalidBlobFileNumber);
|
||||
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
|
||||
version_edit.AddFile(2, 616, 0, 100U, GetInternalKey("501"),
|
||||
GetInternalKey("550"), 200, 200, false,
|
||||
kInvalidBlobFileNumber);
|
||||
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
|
||||
version_edit.AddFile(2, 606, 0, 100U, GetInternalKey("701"),
|
||||
GetInternalKey("750"), 200, 200, false,
|
||||
kInvalidBlobFileNumber);
|
||||
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
|
||||
version_builder.Apply(&version_edit);
|
||||
|
||||
VersionEdit version_edit2;
|
||||
version_edit.AddFile(2, 808, 0, 100U, GetInternalKey("901"),
|
||||
GetInternalKey("950"), 200, 200, false,
|
||||
kInvalidBlobFileNumber);
|
||||
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
|
||||
version_edit2.DeleteFile(2, 616);
|
||||
version_edit2.DeleteFile(2, 636);
|
||||
version_edit.AddFile(2, 806, 0, 100U, GetInternalKey("801"),
|
||||
GetInternalKey("850"), 200, 200, false,
|
||||
kInvalidBlobFileNumber);
|
||||
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
|
||||
version_builder.Apply(&version_edit2);
|
||||
|
||||
version_builder.SaveTo(&new_vstorage);
|
||||
|
@ -61,6 +61,7 @@ enum CustomTag : uint32_t {
|
||||
// removed when manifest becomes forward-comptabile.
|
||||
kMinLogNumberToKeepHack = 3,
|
||||
kOldestBlobFileNumber = 4,
|
||||
kOldestAncesterTime = 5,
|
||||
kPathId = 65,
|
||||
};
|
||||
// If this bit for the custom tag is set, opening DB should fail if
|
||||
@ -178,82 +179,71 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
|
||||
if (!f.smallest.Valid() || !f.largest.Valid()) {
|
||||
return false;
|
||||
}
|
||||
bool has_customized_fields = false;
|
||||
if (f.marked_for_compaction || has_min_log_number_to_keep_ ||
|
||||
f.oldest_blob_file_number != kInvalidBlobFileNumber) {
|
||||
PutVarint32(dst, kNewFile4);
|
||||
has_customized_fields = true;
|
||||
} else if (f.fd.GetPathId() == 0) {
|
||||
// Use older format to make sure user can roll back the build if they
|
||||
// don't config multiple DB paths.
|
||||
PutVarint32(dst, kNewFile2);
|
||||
} else {
|
||||
PutVarint32(dst, kNewFile3);
|
||||
}
|
||||
PutVarint32(dst, kNewFile4);
|
||||
PutVarint32Varint64(dst, new_files_[i].first /* level */, f.fd.GetNumber());
|
||||
if (f.fd.GetPathId() != 0 && !has_customized_fields) {
|
||||
// kNewFile3
|
||||
PutVarint32(dst, f.fd.GetPathId());
|
||||
}
|
||||
PutVarint64(dst, f.fd.GetFileSize());
|
||||
PutLengthPrefixedSlice(dst, f.smallest.Encode());
|
||||
PutLengthPrefixedSlice(dst, f.largest.Encode());
|
||||
PutVarint64Varint64(dst, f.fd.smallest_seqno, f.fd.largest_seqno);
|
||||
if (has_customized_fields) {
|
||||
// Customized fields' format:
|
||||
// +-----------------------------+
|
||||
// | 1st field's tag (varint32) |
|
||||
// +-----------------------------+
|
||||
// | 1st field's size (varint32) |
|
||||
// +-----------------------------+
|
||||
// | bytes for 1st field |
|
||||
// | (based on size decoded) |
|
||||
// +-----------------------------+
|
||||
// | |
|
||||
// | ...... |
|
||||
// | |
|
||||
// +-----------------------------+
|
||||
// | last field's size (varint32)|
|
||||
// +-----------------------------+
|
||||
// | bytes for last field |
|
||||
// | (based on size decoded) |
|
||||
// +-----------------------------+
|
||||
// | terminating tag (varint32) |
|
||||
// +-----------------------------+
|
||||
//
|
||||
// Customized encoding for fields:
|
||||
// tag kPathId: 1 byte as path_id
|
||||
// tag kNeedCompaction:
|
||||
// now only can take one char value 1 indicating need-compaction
|
||||
//
|
||||
if (f.fd.GetPathId() != 0) {
|
||||
PutVarint32(dst, CustomTag::kPathId);
|
||||
char p = static_cast<char>(f.fd.GetPathId());
|
||||
PutLengthPrefixedSlice(dst, Slice(&p, 1));
|
||||
}
|
||||
if (f.marked_for_compaction) {
|
||||
PutVarint32(dst, CustomTag::kNeedCompaction);
|
||||
char p = static_cast<char>(1);
|
||||
PutLengthPrefixedSlice(dst, Slice(&p, 1));
|
||||
}
|
||||
if (has_min_log_number_to_keep_ && !min_log_num_written) {
|
||||
PutVarint32(dst, CustomTag::kMinLogNumberToKeepHack);
|
||||
std::string varint_log_number;
|
||||
PutFixed64(&varint_log_number, min_log_number_to_keep_);
|
||||
PutLengthPrefixedSlice(dst, Slice(varint_log_number));
|
||||
min_log_num_written = true;
|
||||
}
|
||||
if (f.oldest_blob_file_number != kInvalidBlobFileNumber) {
|
||||
PutVarint32(dst, CustomTag::kOldestBlobFileNumber);
|
||||
std::string oldest_blob_file_number;
|
||||
PutVarint64(&oldest_blob_file_number, f.oldest_blob_file_number);
|
||||
PutLengthPrefixedSlice(dst, Slice(oldest_blob_file_number));
|
||||
}
|
||||
TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:NewFile4:CustomizeFields",
|
||||
dst);
|
||||
|
||||
PutVarint32(dst, CustomTag::kTerminate);
|
||||
// Customized fields' format:
|
||||
// +-----------------------------+
|
||||
// | 1st field's tag (varint32) |
|
||||
// +-----------------------------+
|
||||
// | 1st field's size (varint32) |
|
||||
// +-----------------------------+
|
||||
// | bytes for 1st field |
|
||||
// | (based on size decoded) |
|
||||
// +-----------------------------+
|
||||
// | |
|
||||
// | ...... |
|
||||
// | |
|
||||
// +-----------------------------+
|
||||
// | last field's size (varint32)|
|
||||
// +-----------------------------+
|
||||
// | bytes for last field |
|
||||
// | (based on size decoded) |
|
||||
// +-----------------------------+
|
||||
// | terminating tag (varint32) |
|
||||
// +-----------------------------+
|
||||
//
|
||||
// Customized encoding for fields:
|
||||
// tag kPathId: 1 byte as path_id
|
||||
// tag kNeedCompaction:
|
||||
// now only can take one char value 1 indicating need-compaction
|
||||
//
|
||||
PutVarint32(dst, CustomTag::kOldestAncesterTime);
|
||||
std::string varint_oldest_ancester_time;
|
||||
PutVarint64(&varint_oldest_ancester_time, f.oldest_ancester_time);
|
||||
TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:VarintOldestAncesterTime",
|
||||
&varint_oldest_ancester_time);
|
||||
PutLengthPrefixedSlice(dst, Slice(varint_oldest_ancester_time));
|
||||
if (f.fd.GetPathId() != 0) {
|
||||
PutVarint32(dst, CustomTag::kPathId);
|
||||
char p = static_cast<char>(f.fd.GetPathId());
|
||||
PutLengthPrefixedSlice(dst, Slice(&p, 1));
|
||||
}
|
||||
if (f.marked_for_compaction) {
|
||||
PutVarint32(dst, CustomTag::kNeedCompaction);
|
||||
char p = static_cast<char>(1);
|
||||
PutLengthPrefixedSlice(dst, Slice(&p, 1));
|
||||
}
|
||||
if (has_min_log_number_to_keep_ && !min_log_num_written) {
|
||||
PutVarint32(dst, CustomTag::kMinLogNumberToKeepHack);
|
||||
std::string varint_log_number;
|
||||
PutFixed64(&varint_log_number, min_log_number_to_keep_);
|
||||
PutLengthPrefixedSlice(dst, Slice(varint_log_number));
|
||||
min_log_num_written = true;
|
||||
}
|
||||
if (f.oldest_blob_file_number != kInvalidBlobFileNumber) {
|
||||
PutVarint32(dst, CustomTag::kOldestBlobFileNumber);
|
||||
std::string oldest_blob_file_number;
|
||||
PutVarint64(&oldest_blob_file_number, f.oldest_blob_file_number);
|
||||
PutLengthPrefixedSlice(dst, Slice(oldest_blob_file_number));
|
||||
}
|
||||
TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:NewFile4:CustomizeFields",
|
||||
dst);
|
||||
|
||||
PutVarint32(dst, CustomTag::kTerminate);
|
||||
}
|
||||
|
||||
// 0 is default and does not need to be explicitly written
|
||||
@ -340,6 +330,11 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
|
||||
return "path_id wrong vaue";
|
||||
}
|
||||
break;
|
||||
case kOldestAncesterTime:
|
||||
if (!GetVarint64(&field, &f.oldest_ancester_time)) {
|
||||
return "invalid oldest ancester time";
|
||||
}
|
||||
break;
|
||||
case kNeedCompaction:
|
||||
if (field.size() != 1) {
|
||||
return "need_compaction field wrong size";
|
||||
@ -663,6 +658,8 @@ std::string VersionEdit::DebugString(bool hex_key) const {
|
||||
r.append(" blob_file:");
|
||||
AppendNumberTo(&r, f.oldest_blob_file_number);
|
||||
}
|
||||
r.append(" oldest_ancester_time:");
|
||||
AppendNumberTo(&r, f.oldest_ancester_time);
|
||||
}
|
||||
r.append("\n ColumnFamily: ");
|
||||
AppendNumberTo(&r, column_family_);
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include "db/dbformat.h"
|
||||
#include "memory/arena.h"
|
||||
#include "rocksdb/cache.h"
|
||||
#include "table/table_reader.h"
|
||||
#include "util/autovector.h"
|
||||
|
||||
namespace rocksdb {
|
||||
@ -24,6 +25,7 @@ class VersionSet;
|
||||
|
||||
constexpr uint64_t kFileNumberMask = 0x3FFFFFFFFFFFFFFF;
|
||||
constexpr uint64_t kInvalidBlobFileNumber = 0;
|
||||
constexpr uint64_t kUnknownOldestAncesterTime = 0;
|
||||
|
||||
extern uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id);
|
||||
|
||||
@ -122,18 +124,25 @@ struct FileMetaData {
|
||||
// refers to. 0 is an invalid value; BlobDB numbers the files starting from 1.
|
||||
uint64_t oldest_blob_file_number = kInvalidBlobFileNumber;
|
||||
|
||||
// The file could be the compaction output from other SST files, which could
|
||||
// in turn be outputs for compact older SST files. We track the memtable
|
||||
// flush timestamp for the oldest SST file that eventaully contribute data
|
||||
// to this file. 0 means the information is not available.
|
||||
uint64_t oldest_ancester_time = 0;
|
||||
|
||||
FileMetaData() = default;
|
||||
|
||||
FileMetaData(uint64_t file, uint32_t file_path_id, uint64_t file_size,
|
||||
const InternalKey& smallest_key, const InternalKey& largest_key,
|
||||
const SequenceNumber& smallest_seq,
|
||||
const SequenceNumber& largest_seq, bool marked_for_compact,
|
||||
uint64_t oldest_blob_file)
|
||||
uint64_t oldest_blob_file, uint64_t _oldest_ancester_time)
|
||||
: fd(file, file_path_id, file_size, smallest_seq, largest_seq),
|
||||
smallest(smallest_key),
|
||||
largest(largest_key),
|
||||
marked_for_compaction(marked_for_compact),
|
||||
oldest_blob_file_number(oldest_blob_file) {}
|
||||
oldest_blob_file_number(oldest_blob_file),
|
||||
oldest_ancester_time(_oldest_ancester_time) {}
|
||||
|
||||
// REQUIRED: Keys must be given to the function in sorted order (it expects
|
||||
// the last key to be the largest).
|
||||
@ -154,6 +163,19 @@ struct FileMetaData {
|
||||
fd.smallest_seqno = std::min(fd.smallest_seqno, seqno);
|
||||
fd.largest_seqno = std::max(fd.largest_seqno, seqno);
|
||||
}
|
||||
|
||||
// Try to get oldest ancester time from the class itself or table properties
|
||||
// if table reader is already pinned.
|
||||
// 0 means the information is not available.
|
||||
uint64_t TryGetOldestAncesterTime() {
|
||||
if (oldest_ancester_time != 0) {
|
||||
return oldest_ancester_time;
|
||||
} else if (fd.table_reader != nullptr &&
|
||||
fd.table_reader->GetTableProperties() != nullptr) {
|
||||
return fd.table_reader->GetTableProperties()->creation_time;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
||||
// A compressed copy of file meta data that just contain minimum data needed
|
||||
@ -255,12 +277,14 @@ class VersionEdit {
|
||||
uint64_t file_size, const InternalKey& smallest,
|
||||
const InternalKey& largest, const SequenceNumber& smallest_seqno,
|
||||
const SequenceNumber& largest_seqno, bool marked_for_compaction,
|
||||
uint64_t oldest_blob_file_number) {
|
||||
uint64_t oldest_blob_file_number,
|
||||
uint64_t oldest_ancester_time) {
|
||||
assert(smallest_seqno <= largest_seqno);
|
||||
new_files_.emplace_back(
|
||||
level, FileMetaData(file, file_path_id, file_size, smallest, largest,
|
||||
smallest_seqno, largest_seqno,
|
||||
marked_for_compaction, oldest_blob_file_number));
|
||||
level,
|
||||
FileMetaData(file, file_path_id, file_size, smallest, largest,
|
||||
smallest_seqno, largest_seqno, marked_for_compaction,
|
||||
oldest_blob_file_number, oldest_ancester_time));
|
||||
}
|
||||
|
||||
void AddFile(int level, const FileMetaData& f) {
|
||||
|
@ -36,7 +36,8 @@ TEST_F(VersionEditTest, EncodeDecode) {
|
||||
edit.AddFile(3, kBig + 300 + i, kBig32Bit + 400 + i, 0,
|
||||
InternalKey("foo", kBig + 500 + i, kTypeValue),
|
||||
InternalKey("zoo", kBig + 600 + i, kTypeDeletion),
|
||||
kBig + 500 + i, kBig + 600 + i, false, kInvalidBlobFileNumber);
|
||||
kBig + 500 + i, kBig + 600 + i, false, kInvalidBlobFileNumber,
|
||||
888);
|
||||
edit.DeleteFile(4, kBig + 700 + i);
|
||||
}
|
||||
|
||||
@ -53,16 +54,18 @@ TEST_F(VersionEditTest, EncodeDecodeNewFile4) {
|
||||
VersionEdit edit;
|
||||
edit.AddFile(3, 300, 3, 100, InternalKey("foo", kBig + 500, kTypeValue),
|
||||
InternalKey("zoo", kBig + 600, kTypeDeletion), kBig + 500,
|
||||
kBig + 600, true, kInvalidBlobFileNumber);
|
||||
kBig + 600, true, kInvalidBlobFileNumber,
|
||||
kUnknownOldestAncesterTime);
|
||||
edit.AddFile(4, 301, 3, 100, InternalKey("foo", kBig + 501, kTypeValue),
|
||||
InternalKey("zoo", kBig + 601, kTypeDeletion), kBig + 501,
|
||||
kBig + 601, false, kInvalidBlobFileNumber);
|
||||
kBig + 601, false, kInvalidBlobFileNumber,
|
||||
kUnknownOldestAncesterTime);
|
||||
edit.AddFile(5, 302, 0, 100, InternalKey("foo", kBig + 502, kTypeValue),
|
||||
InternalKey("zoo", kBig + 602, kTypeDeletion), kBig + 502,
|
||||
kBig + 602, true, kInvalidBlobFileNumber);
|
||||
kBig + 602, true, kInvalidBlobFileNumber, 666);
|
||||
edit.AddFile(5, 303, 0, 100, InternalKey("foo", kBig + 503, kTypeBlobIndex),
|
||||
InternalKey("zoo", kBig + 603, kTypeBlobIndex), kBig + 503,
|
||||
kBig + 603, true, 1001);
|
||||
kBig + 603, true, 1001, kUnknownOldestAncesterTime);
|
||||
|
||||
edit.DeleteFile(4, 700);
|
||||
|
||||
@ -100,10 +103,11 @@ TEST_F(VersionEditTest, ForwardCompatibleNewFile4) {
|
||||
VersionEdit edit;
|
||||
edit.AddFile(3, 300, 3, 100, InternalKey("foo", kBig + 500, kTypeValue),
|
||||
InternalKey("zoo", kBig + 600, kTypeDeletion), kBig + 500,
|
||||
kBig + 600, true, kInvalidBlobFileNumber);
|
||||
kBig + 600, true, kInvalidBlobFileNumber,
|
||||
kUnknownOldestAncesterTime);
|
||||
edit.AddFile(4, 301, 3, 100, InternalKey("foo", kBig + 501, kTypeValue),
|
||||
InternalKey("zoo", kBig + 601, kTypeDeletion), kBig + 501,
|
||||
kBig + 601, false, kInvalidBlobFileNumber);
|
||||
kBig + 601, false, kInvalidBlobFileNumber, 686);
|
||||
edit.DeleteFile(4, 700);
|
||||
|
||||
edit.SetComparatorName("foo");
|
||||
@ -149,7 +153,8 @@ TEST_F(VersionEditTest, NewFile4NotSupportedField) {
|
||||
VersionEdit edit;
|
||||
edit.AddFile(3, 300, 3, 100, InternalKey("foo", kBig + 500, kTypeValue),
|
||||
InternalKey("zoo", kBig + 600, kTypeDeletion), kBig + 500,
|
||||
kBig + 600, true, kInvalidBlobFileNumber);
|
||||
kBig + 600, true, kInvalidBlobFileNumber,
|
||||
kUnknownOldestAncesterTime);
|
||||
|
||||
edit.SetComparatorName("foo");
|
||||
edit.SetLogNumber(kBig + 100);
|
||||
@ -177,7 +182,7 @@ TEST_F(VersionEditTest, NewFile4NotSupportedField) {
|
||||
TEST_F(VersionEditTest, EncodeEmptyFile) {
|
||||
VersionEdit edit;
|
||||
edit.AddFile(0, 0, 0, 0, InternalKey(), InternalKey(), 0, 0, false,
|
||||
kInvalidBlobFileNumber);
|
||||
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
|
||||
std::string buffer;
|
||||
ASSERT_TRUE(!edit.EncodeTo(&buffer));
|
||||
}
|
||||
|
@ -2330,13 +2330,11 @@ uint32_t GetExpiredTtlFilesCount(const ImmutableCFOptions& ioptions,
|
||||
auto status = ioptions.env->GetCurrentTime(&_current_time);
|
||||
if (status.ok()) {
|
||||
const uint64_t current_time = static_cast<uint64_t>(_current_time);
|
||||
for (auto f : files) {
|
||||
if (!f->being_compacted && f->fd.table_reader != nullptr &&
|
||||
f->fd.table_reader->GetTableProperties() != nullptr) {
|
||||
auto creation_time =
|
||||
f->fd.table_reader->GetTableProperties()->creation_time;
|
||||
if (creation_time > 0 &&
|
||||
creation_time < (current_time - mutable_cf_options.ttl)) {
|
||||
for (FileMetaData* f : files) {
|
||||
if (!f->being_compacted) {
|
||||
uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
|
||||
if (oldest_ancester_time != 0 &&
|
||||
oldest_ancester_time < (current_time - mutable_cf_options.ttl)) {
|
||||
ttl_expired_files_count++;
|
||||
}
|
||||
}
|
||||
@ -2489,12 +2487,11 @@ void VersionStorageInfo::ComputeExpiredTtlFiles(
|
||||
const uint64_t current_time = static_cast<uint64_t>(_current_time);
|
||||
|
||||
for (int level = 0; level < num_levels() - 1; level++) {
|
||||
for (auto f : files_[level]) {
|
||||
if (!f->being_compacted && f->fd.table_reader != nullptr &&
|
||||
f->fd.table_reader->GetTableProperties() != nullptr) {
|
||||
auto creation_time =
|
||||
f->fd.table_reader->GetTableProperties()->creation_time;
|
||||
if (creation_time > 0 && creation_time < (current_time - ttl)) {
|
||||
for (FileMetaData* f : files_[level]) {
|
||||
if (!f->being_compacted) {
|
||||
uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
|
||||
if (oldest_ancester_time > 0 &&
|
||||
oldest_ancester_time < (current_time - ttl)) {
|
||||
expired_ttl_files_.emplace_back(level, f);
|
||||
}
|
||||
}
|
||||
@ -2539,8 +2536,7 @@ void VersionStorageInfo::ComputeFilesMarkedForPeriodicCompaction(
|
||||
uint64_t file_modification_time =
|
||||
f->fd.table_reader->GetTableProperties()->file_creation_time;
|
||||
if (file_modification_time == 0) {
|
||||
file_modification_time =
|
||||
f->fd.table_reader->GetTableProperties()->creation_time;
|
||||
file_modification_time = f->TryGetOldestAncesterTime();
|
||||
}
|
||||
if (file_modification_time == 0) {
|
||||
auto file_path = TableFileName(ioptions.cf_paths, f->fd.GetNumber(),
|
||||
@ -4984,7 +4980,8 @@ Status VersionSet::WriteCurrentStateToManifest(log::Writer* log) {
|
||||
edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
|
||||
f->fd.GetFileSize(), f->smallest, f->largest,
|
||||
f->fd.smallest_seqno, f->fd.largest_seqno,
|
||||
f->marked_for_compaction, f->oldest_blob_file_number);
|
||||
f->marked_for_compaction, f->oldest_blob_file_number,
|
||||
f->oldest_ancester_time);
|
||||
}
|
||||
}
|
||||
edit.SetLogNumber(cfd->GetLogNumber());
|
||||
|
@ -39,7 +39,8 @@ class GenerateLevelFilesBriefTest : public testing::Test {
|
||||
files_.size() + 1, 0, 0,
|
||||
InternalKey(smallest, smallest_seq, kTypeValue),
|
||||
InternalKey(largest, largest_seq, kTypeValue), smallest_seq,
|
||||
largest_seq, /* marked_for_compact */ false, kInvalidBlobFileNumber);
|
||||
largest_seq, /* marked_for_compact */ false, kInvalidBlobFileNumber,
|
||||
kUnknownOldestAncesterTime);
|
||||
files_.push_back(f);
|
||||
}
|
||||
|
||||
@ -133,7 +134,8 @@ class VersionStorageInfoTest : public testing::Test {
|
||||
FileMetaData* f = new FileMetaData(
|
||||
file_number, 0, file_size, GetInternalKey(smallest, 0),
|
||||
GetInternalKey(largest, 0), /* smallest_seq */ 0, /* largest_seq */ 0,
|
||||
/* marked_for_compact */ false, kInvalidBlobFileNumber);
|
||||
/* marked_for_compact */ false, kInvalidBlobFileNumber,
|
||||
kUnknownOldestAncesterTime);
|
||||
f->compensated_file_size = file_size;
|
||||
vstorage_.AddFile(level, f);
|
||||
}
|
||||
@ -144,7 +146,7 @@ class VersionStorageInfoTest : public testing::Test {
|
||||
FileMetaData* f = new FileMetaData(
|
||||
file_number, 0, file_size, smallest, largest, /* smallest_seq */ 0,
|
||||
/* largest_seq */ 0, /* marked_for_compact */ false,
|
||||
kInvalidBlobFileNumber);
|
||||
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
|
||||
f->compensated_file_size = file_size;
|
||||
vstorage_.AddFile(level, f);
|
||||
}
|
||||
|
@ -647,7 +647,6 @@ struct AdvancedColumnFamilyOptions {
|
||||
bool report_bg_io_stats = false;
|
||||
|
||||
// Files older than TTL will go through the compaction process.
|
||||
// Supported in Level and FIFO compaction.
|
||||
// Pre-req: This needs max_open_files to be set to -1.
|
||||
// In Level: Non-bottom-level files older than TTL will go through the
|
||||
// compation process.
|
||||
|
@ -120,35 +120,20 @@ class autovector {
|
||||
}
|
||||
|
||||
// -- Reference
|
||||
reference operator*() {
|
||||
reference operator*() const {
|
||||
assert(vect_->size() >= index_);
|
||||
return (*vect_)[index_];
|
||||
}
|
||||
|
||||
const_reference operator*() const {
|
||||
assert(vect_->size() >= index_);
|
||||
return (*vect_)[index_];
|
||||
}
|
||||
|
||||
pointer operator->() {
|
||||
pointer operator->() const {
|
||||
assert(vect_->size() >= index_);
|
||||
return &(*vect_)[index_];
|
||||
}
|
||||
|
||||
const_pointer operator->() const {
|
||||
assert(vect_->size() >= index_);
|
||||
return &(*vect_)[index_];
|
||||
}
|
||||
|
||||
reference operator[](difference_type len) {
|
||||
reference operator[](difference_type len) const {
|
||||
return *(*this + len);
|
||||
}
|
||||
|
||||
const_reference operator[](difference_type len) const {
|
||||
return *(*this + len);
|
||||
}
|
||||
|
||||
|
||||
// -- Logical Operators
|
||||
bool operator==(const self_type& other) const {
|
||||
assert(vect_ == other.vect_);
|
||||
|
Loading…
Reference in New Issue
Block a user