diff --git a/db/db_impl.cc b/db/db_impl.cc index 8cf3037ee..19ce9c311 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1338,6 +1338,8 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, if (!s.ok()) { break; } + TEST_SYNC_POINT("DBImpl::RunManualCompaction()::1"); + TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2"); } } if (!s.ok()) { @@ -1865,9 +1867,6 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { } else if (shutting_down_.load(std::memory_order_acquire)) { // DB is being deleted; no more background compactions return; - } else if (bg_manual_only_) { - // manual only - return; } while (unscheduled_flushes_ > 0 && @@ -1877,6 +1876,12 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this); } + if (bg_manual_only_) { + // only manual compactions are allowed to run. don't schedule automatic + // compactions + return; + } + if (db_options_.max_background_flushes == 0 && bg_compaction_scheduled_ < db_options_.max_background_compactions && unscheduled_flushes_ > 0) { diff --git a/db/db_test.cc b/db/db_test.cc index 94e4d6a5e..9f0bd43d2 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -12881,6 +12881,70 @@ TEST_F(DBTest, LargeBatchWithColumnFamilies) { ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options)); } +// Make sure that Flushes can proceed in parallel with CompactRange() +TEST_F(DBTest, FlushesInParallelWithCompactRange) { + // iter == 0 -- leveled + // iter == 1 -- leveled, but throw in a flush between two levels compacting + // iter == 2 -- universal + for (int iter = 0; iter < 3; ++iter) { + printf("iter %d\n", iter); + Options options = CurrentOptions(); + if (iter < 2) { + options.compaction_style = kCompactionStyleLevel; + } else { + options.compaction_style = kCompactionStyleUniversal; + } + options.write_buffer_size = 110 << 10; + options.level0_file_num_compaction_trigger = 4; + options.num_levels = 4; + options.compression = kNoCompression; + options.max_bytes_for_level_base = 450 << 10; + options.target_file_size_base = 98 << 10; + options.max_write_buffer_number = 2; + + DestroyAndReopen(options); + + Random rnd(301); + for (int num = 0; num < 14; num++) { + GenerateNewRandomFile(&rnd); + } + + if (iter == 1) { + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::RunManualCompaction()::1", + "DBTest::FlushesInParallelWithCompactRange:1"}, + {"DBTest::FlushesInParallelWithCompactRange:2", + "DBImpl::RunManualCompaction()::2"}}); + } else { + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"CompactionJob::Run():Start", + "DBTest::FlushesInParallelWithCompactRange:1"}, + {"DBTest::FlushesInParallelWithCompactRange:2", + "CompactionJob::Run():End"}}); + } + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + std::vector threads; + threads.emplace_back([&]() { Compact("a", "z"); }); + + TEST_SYNC_POINT("DBTest::FlushesInParallelWithCompactRange:1"); + + // this has to start a flush. if flushes are blocked, this will try to + // create + // 3 memtables, and that will fail because max_write_buffer_number is 2 + for (int num = 0; num < 3; num++) { + GenerateNewRandomFile(&rnd, /* nowait */ true); + } + + TEST_SYNC_POINT("DBTest::FlushesInParallelWithCompactRange:2"); + + for (auto& t : threads) { + t.join(); + } + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + } +} + } // namespace rocksdb int main(int argc, char** argv) {