diff --git a/db/compact_files_test.cc b/db/compact_files_test.cc index 3a4980f44..82622a69b 100644 --- a/db/compact_files_test.cc +++ b/db/compact_files_test.cc @@ -50,6 +50,7 @@ class FlushedFileCollector : public EventListener { } return result; } + void ClearFlushedFiles() { flushed_files_.clear(); } private: std::vector flushed_files_; @@ -116,7 +117,7 @@ TEST_F(CompactFilesTest, L0ConflictsFiles) { TEST_F(CompactFilesTest, ObsoleteFiles) { Options options; // to trigger compaction more easily - const int kWriteBufferSize = 10000; + const int kWriteBufferSize = 65536; options.create_if_missing = true; // Disable RocksDB background compaction. options.compaction_style = kCompactionStyleNone; @@ -154,6 +155,46 @@ TEST_F(CompactFilesTest, ObsoleteFiles) { delete db; } +TEST_F(CompactFilesTest, NotCutOutputOnLevel0) { + Options options; + options.create_if_missing = true; + // Disable RocksDB background compaction. + options.compaction_style = kCompactionStyleNone; + options.level0_slowdown_writes_trigger = 1000; + options.level0_stop_writes_trigger = 1000; + options.write_buffer_size = 65536; + options.max_write_buffer_number = 2; + options.compression = kNoCompression; + options.max_compaction_bytes = 5000; + + // Add listener + FlushedFileCollector* collector = new FlushedFileCollector(); + options.listeners.emplace_back(collector); + + DB* db = nullptr; + DestroyDB(db_name_, options); + Status s = DB::Open(options, db_name_, &db); + assert(s.ok()); + assert(db); + + // create couple files + for (int i = 0; i < 500; ++i) { + db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26))); + } + reinterpret_cast(db)->TEST_WaitForFlushMemTable(); + auto l0_files_1 = collector->GetFlushedFiles(); + collector->ClearFlushedFiles(); + for (int i = 0; i < 500; ++i) { + db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26))); + } + reinterpret_cast(db)->TEST_WaitForFlushMemTable(); + auto l0_files_2 = collector->GetFlushedFiles(); + ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_1, 0)); + ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_2, 0)); + // no assertion failure + delete db; +} + TEST_F(CompactFilesTest, CapturingPendingFiles) { Options options; options.create_if_missing = true; diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 4d9da9061..3931becb9 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -762,7 +762,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { if (end != nullptr && cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) { break; - } else if (sub_compact->ShouldStopBefore( + } else if (sub_compact->compaction->output_level() != 0 && + sub_compact->ShouldStopBefore( key, sub_compact->current_output_file_size) && sub_compact->builder != nullptr) { status = FinishCompactionOutputFile(input->status(), sub_compact);