Merge branch 'performance' of github.com:facebook/rocksdb into performance

This commit is contained in:
Dhruba Borthakur 2013-08-12 10:31:21 -07:00
commit 03bd4461ad
5 changed files with 1200 additions and 900 deletions

View File

@ -1238,13 +1238,17 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
manual.level = level;
manual.done = false;
manual.in_progress = false;
if (begin == nullptr) {
// For universal compaction, we enforce every manual compaction to compact
// all files.
if (begin == nullptr ||
options_.compaction_style == kCompactionStyleUniversal) {
manual.begin = nullptr;
} else {
begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
manual.begin = &begin_storage;
}
if (end == nullptr) {
if (end == nullptr ||
options_.compaction_style == kCompactionStyleUniversal) {
manual.end = nullptr;
} else {
end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
@ -1498,6 +1502,18 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
if (!status.ok()) {
m->done = true;
}
// For universal compaction:
// Because universal compaction always happens at level 0, so one
// compaction will pick up all overlapped files. No files will be
// filtered out due to size limit and left for a successive compaction.
// So we can safely conclude the current compaction.
//
// Also note that, if we don't stop here, then the current compaction
// writes a new file back to level 0, which will be used in successive
// compaction. Hence the manual compaction will never finish.
if (options_.compaction_style == kCompactionStyleUniversal) {
m->done = true;
}
if (!m->done) {
// We only compacted part of the requested range. Update *m
// to the range that is left to be compacted.
@ -1745,14 +1761,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
}
// Is this compaction producing files at the bottommost level?
bool bottommost_level = true;
for (int i = compact->compaction->level() + 2;
i < versions_->NumberLevels(); i++) {
if (versions_->NumLevelFiles(i) > 0) {
bottommost_level = false;
break;
}
}
bool bottommost_level = compact->compaction->BottomMostLevel();
// Allocate the output file numbers before we release the lock
AllocateCompactionOutputFileNumbers(compact);
@ -2088,7 +2097,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
versions_->LevelSummary(&tmp),
(stats.bytes_readn + stats.bytes_readnp1 + stats.bytes_written) /
(double) stats.micros,
compact->compaction->level() + 1,
compact->compaction->output_level(),
stats.files_in_leveln, stats.files_in_levelnp1, stats.files_out_levelnp1,
stats.bytes_readn / 1048576.0,
stats.bytes_readnp1 / 1048576.0,

View File

@ -219,6 +219,7 @@ class DBTest {
kCompactOnFlush,
kPerfOptions,
kDeletesFilterFirst,
kUniversalCompaction,
kEnd
};
int option_config_;
@ -232,6 +233,14 @@ class DBTest {
Options last_options_;
// Skip some options, as they may not be applicable to a specific test.
// To add more skip constants, use values 4, 8, 16, etc.
enum OptionSkip {
kNoSkip = 0,
kSkipDeletesFilterFirst = 1,
kSkipUniversalCompaction = 2
};
DBTest() : option_config_(kDefault),
merge_operator_(MergeOperators::CreatePutOperator()),
env_(new SpecialEnv(Env::Default())) {
@ -251,8 +260,19 @@ class DBTest {
// Switch to a fresh database with the next option configuration to
// test. Return false if there are no more configurations to test.
bool ChangeOptions() {
bool ChangeOptions(int skip_mask = kNoSkip) {
option_config_++;
// skip some options
if (skip_mask & kSkipDeletesFilterFirst &&
option_config_ == kDeletesFilterFirst) {
option_config_++;
}
if (skip_mask & kSkipUniversalCompaction &&
option_config_ == kUniversalCompaction) {
option_config_++;
}
if (option_config_ >= kEnd) {
return false;
} else {
@ -261,6 +281,17 @@ class DBTest {
}
}
// Switch between different compaction styles (we have only 2 now).
bool ChangeCompactOptions() {
if (option_config_ == kDefault) {
option_config_ = kUniversalCompaction;
DestroyAndReopen();
return true;
} else {
return false;
}
}
// Return the current option configuration.
Options CurrentOptions() {
Options options;
@ -293,6 +324,9 @@ class DBTest {
case kDeletesFilterFirst:
options.filter_deletes = true;
break;
case kUniversalCompaction:
options.compaction_style = kCompactionStyleUniversal;
break;
default:
break;
}
@ -769,7 +803,7 @@ TEST(DBTest, GetEncountersEmptyLevel) {
env_->SleepForMicroseconds(1000000);
ASSERT_EQ(NumTableFilesAtLevel(0), 1); // XXX
} while (ChangeOptions());
} while (ChangeOptions(kSkipUniversalCompaction));
}
// KeyMayExist can lead to a few false positives, but not false negatives.
@ -815,6 +849,7 @@ TEST(DBTest, KeyMayExist) {
// A delete is skipped for key if KeyMayExist(key) returns False
// Tests Writebatch consistency and proper delete behaviour
TEST(DBTest, FilterDeletes) {
do {
Options options = CurrentOptions();
options.filter_policy = NewBloomFilterPolicy(20);
options.filter_deletes = true;
@ -848,9 +883,11 @@ TEST(DBTest, FilterDeletes) {
batch.Clear();
delete options.filter_policy;
} while (ChangeCompactOptions());
}
TEST(DBTest, IterEmpty) {
do {
Iterator* iter = db_->NewIterator(ReadOptions());
iter->SeekToFirst();
@ -863,9 +900,11 @@ TEST(DBTest, IterEmpty) {
ASSERT_EQ(IterStatus(iter), "(invalid)");
delete iter;
} while (ChangeCompactOptions());
}
TEST(DBTest, IterSingle) {
do {
ASSERT_OK(Put("a", "va"));
Iterator* iter = db_->NewIterator(ReadOptions());
@ -901,9 +940,11 @@ TEST(DBTest, IterSingle) {
ASSERT_EQ(IterStatus(iter), "(invalid)");
delete iter;
} while (ChangeCompactOptions());
}
TEST(DBTest, IterMulti) {
do {
ASSERT_OK(Put("a", "va"));
ASSERT_OK(Put("b", "vb"));
ASSERT_OK(Put("c", "vc"));
@ -984,9 +1025,11 @@ TEST(DBTest, IterMulti) {
ASSERT_EQ(IterStatus(iter), "(invalid)");
delete iter;
} while (ChangeCompactOptions());
}
TEST(DBTest, IterSmallAndLargeMix) {
do {
ASSERT_OK(Put("a", "va"));
ASSERT_OK(Put("b", std::string(100000, 'b')));
ASSERT_OK(Put("c", "vc"));
@ -1022,6 +1065,7 @@ TEST(DBTest, IterSmallAndLargeMix) {
ASSERT_EQ(IterStatus(iter), "(invalid)");
delete iter;
} while (ChangeCompactOptions());
}
TEST(DBTest, IterMultiWithDelete) {
@ -1083,6 +1127,7 @@ TEST(DBTest, RollLog) {
}
TEST(DBTest, WAL) {
do {
Options options = CurrentOptions();
WriteOptions writeOpt = WriteOptions();
writeOpt.disableWAL = true;
@ -1112,16 +1157,22 @@ TEST(DBTest, WAL) {
// again both values should be present.
ASSERT_EQ("v3", Get("foo"));
ASSERT_EQ("v3", Get("bar"));
} while (ChangeCompactOptions());
}
TEST(DBTest, CheckLock) {
do {
DB* localdb;
Options options = CurrentOptions();
ASSERT_TRUE(TryReopen(&options).ok());
ASSERT_TRUE(!(PureReopen(&options, &localdb).ok())); // second open should fail
// second open should fail
ASSERT_TRUE(!(PureReopen(&options, &localdb).ok()));
} while (ChangeCompactOptions());
}
TEST(DBTest, FLUSH) {
do {
Options options = CurrentOptions();
WriteOptions writeOpt = WriteOptions();
writeOpt.disableWAL = true;
@ -1153,6 +1204,7 @@ TEST(DBTest, FLUSH) {
// has WAL enabled.
ASSERT_EQ("v3", Get("foo"));
ASSERT_EQ("v3", Get("bar"));
} while (ChangeCompactOptions());
}
TEST(DBTest, RecoveryWithEmptyLog) {
@ -1191,6 +1243,7 @@ TEST(DBTest, RecoverDuringMemtableCompaction) {
}
TEST(DBTest, MinorCompactionsHappen) {
do {
Options options = CurrentOptions();
options.write_buffer_size = 10000;
Reopen(&options);
@ -1213,9 +1266,11 @@ TEST(DBTest, MinorCompactionsHappen) {
for (int i = 0; i < N; i++) {
ASSERT_EQ(Key(i) + std::string(1000, 'v'), Get(Key(i)));
}
} while (ChangeCompactOptions());
}
TEST(DBTest, ManifestRollOver) {
do {
Options options = CurrentOptions();
options.max_manifest_file_size = 10 ; // 10 bytes
Reopen(&options);
@ -1237,10 +1292,11 @@ TEST(DBTest, ManifestRollOver) {
ASSERT_EQ(std::string(1000, '2'), Get("manifest_key2"));
ASSERT_EQ(std::string(1000, '3'), Get("manifest_key3"));
}
} while (ChangeCompactOptions());
}
TEST(DBTest, RecoverWithLargeLog) {
do {
{
Options options = CurrentOptions();
Reopen(&options);
@ -1262,6 +1318,7 @@ TEST(DBTest, RecoverWithLargeLog) {
ASSERT_EQ(std::string(10, '3'), Get("small3"));
ASSERT_EQ(std::string(10, '4'), Get("small4"));
ASSERT_GT(NumTableFilesAtLevel(0), 1);
} while (ChangeCompactOptions());
}
TEST(DBTest, CompactionsGenerateMultipleFiles) {
@ -1325,6 +1382,139 @@ TEST(DBTest, CompactionTrigger) {
ASSERT_EQ(NumTableFilesAtLevel(1), 1);
}
TEST(DBTest, UniversalCompactionTrigger) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.write_buffer_size = 100<<10; //100KB
// trigger compaction if there are > 3 files
options.level0_file_num_compaction_trigger = 3;
Reopen(&options);
Random rnd(301);
int key_idx = 0;
// Stage 1:
// Generate a set of files at level 0, but don't trigger level-0
// compaction.
for (int num = 0;
num < options.level0_file_num_compaction_trigger;
num++) {
// Write 120KB (12 values, each 10K)
for (int i = 0; i < 12; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++;
}
dbfull()->TEST_WaitForCompactMemTable();
ASSERT_EQ(NumTableFilesAtLevel(0), num + 1);
}
// Generate one more file at level-0, which should trigger level-0
// compaction.
for (int i = 0; i < 12; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++;
}
dbfull()->TEST_WaitForCompact();
// Suppose each file flushed from mem table has size 1. Now we compact
// (level0_file_num_compaction_trigger+1)=4 files and should have a big
// file of size 4.
ASSERT_EQ(NumTableFilesAtLevel(0), 1);
for (int i = 1; i < options.num_levels ; i++) {
ASSERT_EQ(NumTableFilesAtLevel(i), 0);
}
// Stage 2:
// Now we have one file at level 0, with size 4. We also have some data in
// mem table. Let's continue generating new files at level 0, but don't
// trigger level-0 compaction.
// First, clean up memtable before inserting new data. This will generate
// a level-0 file, with size around 0.4 (according to previously written
// data amount).
dbfull()->Flush(FlushOptions());
for (int num = 0;
num < options.level0_file_num_compaction_trigger-2;
num++) {
// Write 120KB (12 values, each 10K)
for (int i = 0; i < 12; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++;
}
dbfull()->TEST_WaitForCompactMemTable();
ASSERT_EQ(NumTableFilesAtLevel(0), num + 3);
}
// Generate one more file at level-0, which should trigger level-0
// compaction.
for (int i = 0; i < 12; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++;
}
dbfull()->TEST_WaitForCompact();
// Before compaction, we have 4 files at level 0, with size 4, 0.4, 1, 1.
// After comapction, we should have 2 files, with size 4, 2.4.
ASSERT_EQ(NumTableFilesAtLevel(0), 2);
for (int i = 1; i < options.num_levels ; i++) {
ASSERT_EQ(NumTableFilesAtLevel(i), 0);
}
// Stage 3:
// Now we have 2 files at level 0, with size 4 and 2.4. Continue
// generating new files at level 0.
for (int num = 0;
num < options.level0_file_num_compaction_trigger-2;
num++) {
// Write 120KB (12 values, each 10K)
for (int i = 0; i < 12; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++;
}
dbfull()->TEST_WaitForCompactMemTable();
ASSERT_EQ(NumTableFilesAtLevel(0), num + 3);
}
// Generate one more file at level-0, which should trigger level-0
// compaction.
for (int i = 0; i < 12; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++;
}
dbfull()->TEST_WaitForCompact();
// Before compaction, we have 4 files at level 0, with size 4, 2.4, 1, 1.
// After comapction, we should have 3 files, with size 4, 2.4, 2.
ASSERT_EQ(NumTableFilesAtLevel(0), 3);
for (int i = 1; i < options.num_levels ; i++) {
ASSERT_EQ(NumTableFilesAtLevel(i), 0);
}
// Stage 4:
// Now we have 3 files at level 0, with size 4, 2.4, 2. Let's generate a
// new file of size 1.
for (int i = 0; i < 12; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++;
}
dbfull()->TEST_WaitForCompact();
// Level-0 compaction is triggered, but no file will be picked up.
ASSERT_EQ(NumTableFilesAtLevel(0), 4);
for (int i = 1; i < options.num_levels ; i++) {
ASSERT_EQ(NumTableFilesAtLevel(i), 0);
}
// Stage 5:
// Now we have 4 files at level 0, with size 4, 2.4, 2, 1. Let's generate
// a new file of size 1.
for (int i = 0; i < 12; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++;
}
dbfull()->TEST_WaitForCompact();
// All files at level 0 will be compacted into a single one.
ASSERT_EQ(NumTableFilesAtLevel(0), 1);
for (int i = 1; i < options.num_levels ; i++) {
ASSERT_EQ(NumTableFilesAtLevel(i), 0);
}
}
void MinLevelHelper(DBTest* self, Options& options) {
Random rnd(301);
@ -1390,6 +1580,7 @@ bool MinLevelToCompress(CompressionType& type, Options& options, int wbits,
}
return true;
}
TEST(DBTest, MinLevelToCompress1) {
Options options = CurrentOptions();
CompressionType type;
@ -1431,6 +1622,7 @@ TEST(DBTest, MinLevelToCompress2) {
}
TEST(DBTest, RepeatedWritesToSameKey) {
do {
Options options = CurrentOptions();
options.env = env_;
options.write_buffer_size = 100000; // Small write buffer
@ -1447,6 +1639,7 @@ TEST(DBTest, RepeatedWritesToSameKey) {
Put("key", value);
ASSERT_LE(TotalTableFiles(), kMaxFiles);
}
} while (ChangeCompactOptions());
}
// This is a static filter used for filtering
@ -1646,6 +1839,7 @@ TEST(DBTest, CompactionFilter) {
}
TEST(DBTest, CompactionFilterWithValueChange) {
do {
Options options = CurrentOptions();
options.num_levels = 3;
options.max_mem_compaction_level = 0;
@ -1691,9 +1885,11 @@ TEST(DBTest, CompactionFilterWithValueChange) {
std::string newvalue = Get(key);
ASSERT_EQ(newvalue.compare(NEW_VALUE), 0);
}
} while (ChangeCompactOptions());
}
TEST(DBTest, SparseMerge) {
do {
Options options = CurrentOptions();
options.compression = kNoCompression;
Reopen(&options);
@ -1731,6 +1927,7 @@ TEST(DBTest, SparseMerge) {
ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20*1048576);
dbfull()->TEST_CompactRange(1, nullptr, nullptr);
ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20*1048576);
} while (ChangeCompactOptions());
}
static bool Between(uint64_t val, uint64_t low, uint64_t high) {
@ -1791,7 +1988,7 @@ TEST(DBTest, ApproximateSizes) {
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_GT(NumTableFilesAtLevel(1), 0);
}
} while (ChangeOptions());
} while (ChangeOptions(kSkipUniversalCompaction));
}
TEST(DBTest, ApproximateSizes_MixOfSmallAndLarge) {
@ -1833,6 +2030,7 @@ TEST(DBTest, ApproximateSizes_MixOfSmallAndLarge) {
}
TEST(DBTest, IteratorPinsRef) {
do {
Put("foo", "hello");
// Get iterator that will yield the current contents of the DB.
@ -1852,6 +2050,7 @@ TEST(DBTest, IteratorPinsRef) {
iter->Next();
ASSERT_TRUE(!iter->Valid());
delete iter;
} while (ChangeCompactOptions());
}
TEST(DBTest, Snapshot) {
@ -1911,7 +2110,7 @@ TEST(DBTest, HiddenValuesAreRemoved) {
ASSERT_EQ(AllEntriesFor("foo"), "[ tiny ]");
ASSERT_TRUE(Between(Size("", "pastfoo"), 0, 1000));
} while (ChangeOptions());
} while (ChangeOptions(kSkipUniversalCompaction));
}
TEST(DBTest, CompactBetweenSnapshots) {
@ -2065,10 +2264,11 @@ TEST(DBTest, OverlapInLevel0) {
dbfull()->TEST_CompactMemTable();
ASSERT_EQ("3", FilesPerLevel());
ASSERT_EQ("NOT_FOUND", Get("600"));
} while (ChangeOptions());
} while (ChangeOptions(kSkipUniversalCompaction));
}
TEST(DBTest, L0_CompactionBug_Issue44_a) {
do {
Reopen();
ASSERT_OK(Put("b", "v"));
Reopen();
@ -2083,9 +2283,11 @@ TEST(DBTest, L0_CompactionBug_Issue44_a) {
ASSERT_EQ("(a->v)", Contents());
env_->SleepForMicroseconds(1000000); // Wait for compaction to finish
ASSERT_EQ("(a->v)", Contents());
} while (ChangeCompactOptions());
}
TEST(DBTest, L0_CompactionBug_Issue44_b) {
do {
Reopen();
Put("","");
Reopen();
@ -2109,6 +2311,7 @@ TEST(DBTest, L0_CompactionBug_Issue44_b) {
ASSERT_EQ("(->)(c->cv)", Contents());
env_->SleepForMicroseconds(1000000); // Wait for compaction to finish
ASSERT_EQ("(->)(c->cv)", Contents());
} while (ChangeCompactOptions());
}
TEST(DBTest, ComparatorCheck) {
@ -2125,6 +2328,8 @@ TEST(DBTest, ComparatorCheck) {
BytewiseComparator()->FindShortSuccessor(key);
}
};
do {
NewComparator cmp;
Options new_options = CurrentOptions();
new_options.comparator = &cmp;
@ -2132,6 +2337,7 @@ TEST(DBTest, ComparatorCheck) {
ASSERT_TRUE(!s.ok());
ASSERT_TRUE(s.ToString().find("comparator") != std::string::npos)
<< s.ToString();
} while (ChangeCompactOptions());
}
TEST(DBTest, CustomComparator) {
@ -2160,6 +2366,8 @@ TEST(DBTest, CustomComparator) {
return val;
}
};
do {
NumberComparator cmp;
Options new_options = CurrentOptions();
new_options.create_if_missing = true;
@ -2187,6 +2395,7 @@ TEST(DBTest, CustomComparator) {
}
Compact("[0]", "[1000000]");
}
} while (ChangeCompactOptions());
}
TEST(DBTest, ManualCompaction) {
@ -2319,9 +2528,9 @@ TEST(DBTest, DestroyDBMetaDatabase) {
ASSERT_TRUE(!DB::Open(opts, metametadbname, &db).ok());
}
// Check that number of files does not grow when we are out of space
TEST(DBTest, NoSpace) {
do {
Options options = CurrentOptions();
options.env = env_;
Reopen(&options);
@ -2342,10 +2551,11 @@ TEST(DBTest, NoSpace) {
// Check that compaction attempts slept after errors
ASSERT_GE(env_->sleep_counter_.Read(), 5);
} while (ChangeCompactOptions());
}
TEST(DBTest, NonWritableFileSystem)
{
TEST(DBTest, NonWritableFileSystem) {
do {
Options options = CurrentOptions();
options.write_buffer_size = 1000;
options.env = env_;
@ -2362,6 +2572,7 @@ TEST(DBTest, NonWritableFileSystem)
}
ASSERT_GT(errors, 0);
env_->non_writable_.Release_Store(nullptr);
} while (ChangeCompactOptions());
}
TEST(DBTest, ManifestWriteError) {
@ -2406,6 +2617,7 @@ TEST(DBTest, ManifestWriteError) {
}
TEST(DBTest, FilesDeletedAfterCompaction) {
do {
ASSERT_OK(Put("foo", "v2"));
Compact("a", "z");
const int num_files = CountLiveFiles();
@ -2414,9 +2626,11 @@ TEST(DBTest, FilesDeletedAfterCompaction) {
Compact("a", "z");
}
ASSERT_EQ(CountLiveFiles(), num_files);
} while (ChangeCompactOptions());
}
TEST(DBTest, BloomFilter) {
do {
env_->count_random_reads_ = true;
Options options = CurrentOptions();
options.env = env_;
@ -2460,9 +2674,11 @@ TEST(DBTest, BloomFilter) {
env_->delay_sstable_sync_.Release_Store(nullptr);
Close();
delete options.filter_policy;
} while (ChangeCompactOptions());
}
TEST(DBTest, SnapshotFiles) {
do {
Options options = CurrentOptions();
const EnvOptions soptions;
options.write_buffer_size = 100000000; // Large write buffer
@ -2590,9 +2806,11 @@ TEST(DBTest, SnapshotFiles) {
// release file snapshot
dbfull()->DisableFileDeletions();
} while (ChangeCompactOptions());
}
TEST(DBTest, CompactOnFlush) {
do {
Options options = CurrentOptions();
options.purge_redundant_kvs_while_flush = true;
options.disable_auto_compactions = true;
@ -2674,6 +2892,7 @@ TEST(DBTest, CompactOnFlush) {
ASSERT_OK(dbfull()->TEST_CompactMemTable());
ASSERT_EQ(AllEntriesFor("foo"), "[ v9 ]");
db_->ReleaseSnapshot(snapshot1);
} while (ChangeCompactOptions());
}
std::vector<std::uint64_t> ListLogFiles(Env* env, const std::string& path) {
@ -2693,6 +2912,7 @@ std::vector<std::uint64_t> ListLogFiles(Env* env, const std::string& path) {
}
TEST(DBTest, WALArchival) {
do {
std::string value(1024, '1');
Options options = CurrentOptions();
options.create_if_missing = true;
@ -2733,10 +2953,11 @@ TEST(DBTest, WALArchival) {
logFiles = ListLogFiles(env_, archiveDir);
ASSERT_TRUE(logFiles.size() == 0);
} while (ChangeCompactOptions());
}
TEST(DBTest, WALClear) {
do {
Options options = CurrentOptions();
options.create_if_missing = true;
options.WAL_ttl_seconds = 1;
@ -2752,6 +2973,7 @@ TEST(DBTest, WALClear) {
dbfull()->TEST_PurgeObsoleteteWAL();
log_files = ListLogFiles(env_, archive_dir);
ASSERT_TRUE(log_files.empty());
} while (ChangeCompactOptions());
}
void ExpectRecords(
@ -2771,6 +2993,7 @@ void ExpectRecords(
}
TEST(DBTest, TransactionLogIterator) {
do {
Options options = OptionsForLogIterTest();
DestroyAndReopen(&options);
Put("key1", DummyString(1024));
@ -2791,9 +3014,11 @@ TEST(DBTest, TransactionLogIterator) {
auto iter = OpenTransactionLogIter(0);
ExpectRecords(6, iter);
}
} while (ChangeCompactOptions());
}
TEST(DBTest, TransactionLogIteratorMoveOverZeroFiles) {
do {
Options options = OptionsForLogIterTest();
DestroyAndReopen(&options);
// Do a plain Reopen.
@ -2806,9 +3031,11 @@ TEST(DBTest, TransactionLogIteratorMoveOverZeroFiles) {
auto iter = OpenTransactionLogIter(0);
ExpectRecords(2, iter);
} while (ChangeCompactOptions());
}
TEST(DBTest, TransactionLogIteratorStallAtLastRecord) {
do {
Options options = OptionsForLogIterTest();
DestroyAndReopen(&options);
Put("key1", DummyString(1024));
@ -2822,17 +3049,21 @@ TEST(DBTest, TransactionLogIteratorStallAtLastRecord) {
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
} while (ChangeCompactOptions());
}
TEST(DBTest, TransactionLogIteratorJustEmptyFile) {
do {
Options options = OptionsForLogIterTest();
DestroyAndReopen(&options);
unique_ptr<TransactionLogIterator> iter;
Status status = dbfull()->GetUpdatesSince(0, &iter);
ASSERT_TRUE(!status.ok());
} while (ChangeCompactOptions());
}
TEST(DBTest, TransactionLogIteratorCheckAfterRestart) {
do {
Options options = OptionsForLogIterTest();
DestroyAndReopen(&options);
Put("key1", DummyString(1024));
@ -2841,9 +3072,11 @@ TEST(DBTest, TransactionLogIteratorCheckAfterRestart) {
Reopen(&options);
auto iter = OpenTransactionLogIter(0);
ExpectRecords(2, iter);
} while (ChangeCompactOptions());
}
TEST(DBTest, TransactionLogIteratorBatchOperations) {
do {
Options options = OptionsForLogIterTest();
DestroyAndReopen(&options);
WriteBatch batch;
@ -2857,6 +3090,7 @@ TEST(DBTest, TransactionLogIteratorBatchOperations) {
Put("key4", DummyString(1024));
auto iter = OpenTransactionLogIter(3);
ExpectRecords(1, iter);
} while (ChangeCompactOptions());
}
TEST(DBTest, ReadCompaction) {
@ -3243,9 +3477,6 @@ static bool CompareIterators(int step,
TEST(DBTest, Randomized) {
Random rnd(test::RandomSeed());
do {
if (CurrentOptions().filter_deletes) {
ChangeOptions(); // DBTest.Randomized not suited for filter_deletes
}
ModelDB model(CurrentOptions());
const int N = 10000;
const Snapshot* model_snap = nullptr;
@ -3308,10 +3539,11 @@ TEST(DBTest, Randomized) {
}
if (model_snap != nullptr) model.ReleaseSnapshot(model_snap);
if (db_snap != nullptr) db_->ReleaseSnapshot(db_snap);
} while (ChangeOptions());
} while (ChangeOptions(kSkipDeletesFilterFirst));
}
TEST(DBTest, MultiGetSimple) {
do {
ASSERT_OK(db_->Put(WriteOptions(),"k1","v1"));
ASSERT_OK(db_->Put(WriteOptions(),"k2","v2"));
ASSERT_OK(db_->Put(WriteOptions(),"k3","v3"));
@ -3343,9 +3575,11 @@ TEST(DBTest, MultiGetSimple) {
ASSERT_TRUE(s[3].IsNotFound());
ASSERT_OK(s[4]);
ASSERT_TRUE(s[5].IsNotFound());
} while (ChangeCompactOptions());
}
TEST(DBTest, MultiGetEmpty) {
do {
// Empty Key Set
std::vector<Slice> keys;
std::vector<std::string> values;
@ -3364,6 +3598,7 @@ TEST(DBTest, MultiGetEmpty) {
s = db_->MultiGet(ReadOptions(),keys,&values);
ASSERT_EQ((int)s.size(), 2);
ASSERT_TRUE(s[0].IsNotFound() && s[1].IsNotFound());
} while (ChangeCompactOptions());
}
std::string MakeKey(unsigned int num) {

View File

@ -2266,6 +2266,13 @@ Compaction* VersionSet::PickCompactionUniversal(int level, double score) {
newerfile = f;
}
// Is the earliest file part of this compaction?
int last_index = file_by_time[file_by_time.size()-1];
FileMetaData* last_file = current_->files_[level][last_index];
if (c->inputs_[0][c->inputs_[0].size()-1] == last_file) {
c->bottommost_level_ = true;
}
// update statistics
if (options_->statistics != nullptr) {
options_->statistics->measureTime(NUM_FILES_IN_SINGLE_COMPACTION,
@ -2403,7 +2410,7 @@ Compaction* VersionSet::PickCompaction() {
if (level != 0 || compactions_in_progress_[0].empty()) {
if(!ParentRangeInCompaction(&f->smallest, &f->largest, level,
&parent_index)) {
c = new Compaction(level, level, MaxFileSizeForLevel(level+1),
c = new Compaction(level, level+1, MaxFileSizeForLevel(level+1),
MaxGrandParentOverlapBytes(level), NumberLevels(), true);
c->inputs_[0].push_back(f);
c->parent_index_ = parent_index;
@ -2444,13 +2451,15 @@ Compaction* VersionSet::PickCompaction() {
assert(!c->inputs_[0].empty());
}
// Setup "level+1" files (inputs_[1])
SetupOtherInputs(c);
// mark all the files that are being compacted
c->MarkFilesBeingCompacted(true);
// Is this compaction creating a file at the bottommost level
c->SetupBottomMostLevel(false);
// remember this currently undergoing compaction
compactions_in_progress_[level].insert(c);
@ -2624,6 +2633,13 @@ Compaction* VersionSet::CompactRange(
const InternalKey* begin,
const InternalKey* end) {
std::vector<FileMetaData*> inputs;
// All files are 'overlapping' in universal style compaction.
// We have to compact the entire range in one shot.
if (options_->compaction_style == kCompactionStyleUniversal) {
begin = nullptr;
end = nullptr;
}
current_->GetOverlappingInputs(level, begin, end, &inputs);
if (inputs.empty()) {
return nullptr;
@ -2667,6 +2683,9 @@ Compaction* VersionSet::CompactRange(
// upon other files because manual compactions are processed when
// the system has a max of 1 background compaction thread.
c->MarkFilesBeingCompacted(true);
// Is this compaction creating a file at the bottommost level
c->SetupBottomMostLevel(true);
return c;
}
@ -2686,6 +2705,7 @@ Compaction::Compaction(int level, int out_level, uint64_t target_file_size,
base_index_(-1),
parent_index_(-1),
score_(0),
bottommost_level_(false),
level_ptrs_(std::vector<size_t>(number_levels)) {
edit_ = new VersionEdit(number_levels_);
for (int i = 0; i < number_levels_; i++) {
@ -2718,6 +2738,10 @@ void Compaction::AddInputDeletions(VersionEdit* edit) {
}
bool Compaction::IsBaseLevelForKey(const Slice& user_key) {
if (input_version_->vset_->options_->compaction_style ==
kCompactionStyleUniversal) {
return bottommost_level_;
}
// Maybe use binary search to find right entry instead of linear search?
const Comparator* user_cmp = input_version_->vset_->icmp_.user_comparator();
for (int lvl = level_ + 2; lvl < number_levels_; lvl++) {
@ -2776,6 +2800,31 @@ void Compaction::MarkFilesBeingCompacted(bool value) {
}
}
// Is this compaction producing files at the bottommost level?
void Compaction::SetupBottomMostLevel(bool isManual) {
if (input_version_->vset_->options_->compaction_style ==
kCompactionStyleUniversal) {
// If universal compaction style is used and manual
// compaction is occuring, then we are guaranteed that
// all files will be picked in a single compaction
// run. We can safely set bottommost_level_ = true.
// If it is not manual compaction, then bottommost_level_
// is already set when the Compaction was created.
if (isManual) {
bottommost_level_ = true;
}
return;
}
bottommost_level_ = true;
int num_levels = input_version_->vset_->NumberLevels();
for (int i = level() + 2; i < num_levels; i++) {
if (input_version_->vset_->NumLevelFiles(i) > 0) {
bottommost_level_ = false;
break;
}
}
}
void Compaction::ReleaseInputs() {
if (input_version_ != nullptr) {
input_version_->Unref();

View File

@ -557,6 +557,9 @@ class Compaction {
// Return the score that was used to pick this compaction run.
double score() const { return score_; }
// Is this compaction creating a file in the bottom most level?
bool BottomMostLevel() { return bottommost_level_; }
private:
friend class Version;
friend class VersionSet;
@ -589,7 +592,8 @@ class Compaction {
int parent_index_; // index of some file with same range in files_[level_+1]
double score_; // score that was used to pick this compaction.
// State for implementing IsBaseLevelForKey
// Is this compaction creating a file in the bottom most level?
bool bottommost_level_;
// level_ptrs_ holds indices into input_version_->levels_: our state
// is that we are positioned at one of the file ranges for each
@ -600,6 +604,9 @@ class Compaction {
// mark (or clear) all files that are being compacted
void MarkFilesBeingCompacted(bool);
// Initialize whether compaction producing files at the bottommost level
void SetupBottomMostLevel(bool isManual);
// In case of compaction error, reset the nextIndex that is used
// to pick up the next file to be compacted from files_by_size_
void ResetNextCompactionIndex();

View File

@ -185,7 +185,7 @@ Options::Dump(Logger* log) const
max_background_compactions);
Log(log," Options.hard_rate_limit: %.2f",
hard_rate_limit);
Log(log," Options.rate_limit_delay_max_milliseconds: %d",
Log(log," Options.rate_limit_delay_max_milliseconds: %u",
rate_limit_delay_max_milliseconds);
Log(log," Options.disable_auto_compactions: %d",
disable_auto_compactions);
@ -205,7 +205,7 @@ Options::Dump(Logger* log) const
is_fd_close_on_exec);
Log(log," Options.skip_log_error_on_recovery: %d",
skip_log_error_on_recovery);
Log(log," Options.stats_dump_period_sec: %d",
Log(log," Options.stats_dump_period_sec: %u",
stats_dump_period_sec);
Log(log," Options.block_size_deviation: %d",
block_size_deviation);
@ -221,11 +221,11 @@ Options::Dump(Logger* log) const
filter_deletes);
Log(log," Options.compaction_style: %d",
compaction_style);
Log(log," Options.compaction_options_universal.size_ratio: %d",
Log(log," Options.compaction_options_universal.size_ratio: %u",
compaction_options_universal.size_ratio);
Log(log," Options.compaction_options_universal.min_merge_width: %d",
Log(log," Options.compaction_options_universal.min_merge_width: %u",
compaction_options_universal.min_merge_width);
Log(log," Options.compaction_options_universal.max_merge_width: %d",
Log(log," Options.compaction_options_universal.max_merge_width: %u",
compaction_options_universal.max_merge_width);
} // Options::Dump