diff --git a/db/db_impl.cc b/db/db_impl.cc index e7465e0f9..c599f1ef2 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -527,6 +527,24 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, versions_->GetObsoleteFiles(&job_context->sst_delete_files, job_context->min_pending_output); + uint64_t min_log_number = versions_->MinLogNumber(); + if (!alive_log_files_.empty()) { + // find newly obsoleted log files + while (alive_log_files_.begin()->number < min_log_number) { + auto& earliest = *alive_log_files_.begin(); + job_context->log_delete_files.push_back(earliest.number); + total_log_size_ -= earliest.size; + alive_log_files_.pop_front(); + // Current log should always stay alive since it can't have + // number < MinLogNumber(). + assert(alive_log_files_.size()); + } + } + + // We're just cleaning up for DB::Write(). + job_context->logs_to_free = logs_to_free_; + logs_to_free_.clear(); + // store the current filenum, lognum, etc job_context->manifest_file_number = versions_->manifest_file_number(); job_context->pending_manifest_file_number = @@ -1309,17 +1327,6 @@ Status DBImpl::FlushMemTableToOutputFile( VersionStorageInfo::LevelSummaryStorage tmp; LogToBuffer(log_buffer, "[%s] Level summary: %s\n", cfd->GetName().c_str(), cfd->current()->storage_info()->LevelSummary(&tmp)); - - if (disable_delete_obsolete_files_ == 0) { - // add to deletion state - while (alive_log_files_.size() && - alive_log_files_.begin()->number < versions_->MinLogNumber()) { - const auto& earliest = *alive_log_files_.begin(); - job_context->log_delete_files.push_back(earliest.number); - total_log_size_ -= earliest.size; - alive_log_files_.pop_front(); - } - } } if (!s.ok() && !s.IsShutdownInProgress() && db_options_.paranoid_checks && @@ -2145,7 +2152,9 @@ void DBImpl::RecordFlushIOStats() { void DBImpl::BGWorkFlush(void* db) { IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH); + TEST_SYNC_POINT("DBImpl::BGWorkFlush"); reinterpret_cast(db)->BackgroundCallFlush(); + TEST_SYNC_POINT("DBImpl::BGWorkFlush:done"); } void DBImpl::BGWorkCompaction(void* db) { @@ -2238,10 +2247,6 @@ void DBImpl::BackgroundCallFlush() { ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); - // We're just cleaning up for DB::Write() - job_context.logs_to_free = logs_to_free_; - logs_to_free_.clear(); - // If flush failed, we want to delete all temporary files that we might have // created. Thus, we force full scan in FindObsoleteFiles() FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress()); @@ -2308,10 +2313,6 @@ void DBImpl::BackgroundCallCompaction() { ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); - // We're just cleaning up for DB::Write() - job_context.logs_to_free = logs_to_free_; - logs_to_free_.clear(); - // If compaction failed, we want to delete all temporary files that we might // have created (they might not be all recorded in job_context in case of a // failure). Thus, we force full scan in FindObsoleteFiles() diff --git a/db/db_impl.h b/db/db_impl.h index a649b2baa..dd37dd031 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -290,6 +290,8 @@ class DBImpl : public DB { size_t TEST_LogsToFreeSize(); + uint64_t TEST_LogfileNumber(); + #endif // ROCKSDB_LITE // Returns the list of live files in 'live' and the list diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 35703cf1a..66177ed7a 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -148,5 +148,10 @@ size_t DBImpl::TEST_LogsToFreeSize() { return logs_to_free_.size(); } +uint64_t DBImpl::TEST_LogfileNumber() { + InstrumentedMutexLock l(&mutex_); + return logfile_number_; +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/db/db_test.cc b/db/db_test.cc index c5d2a9b64..ddf7de9a4 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -8540,7 +8540,6 @@ TEST_F(DBTest, TransactionLogIterator) { } while (ChangeCompactOptions()); } -#ifndef NDEBUG // sync point is not included with DNDEBUG build TEST_F(DBTest, TransactionLogIteratorRace) { static const int LOG_ITERATOR_RACE_TEST_COUNT = 2; static const char* sync_points[LOG_ITERATOR_RACE_TEST_COUNT][4] = { @@ -8595,7 +8594,6 @@ TEST_F(DBTest, TransactionLogIteratorRace) { } while (ChangeCompactOptions()); } } -#endif TEST_F(DBTest, TransactionLogIteratorStallAtLastRecord) { do { @@ -14136,6 +14134,40 @@ TEST_F(DBTest, PrevAfterMerge) { ASSERT_EQ("1", it->key().ToString()); } +TEST_F(DBTest, DeletingOldWalAfterDrop) { + rocksdb::SyncPoint::GetInstance()->LoadDependency( + { { "Test:AllowFlushes", "DBImpl::BGWorkFlush" }, + { "DBImpl::BGWorkFlush:done", "Test:WaitForFlush"} }); + rocksdb::SyncPoint::GetInstance()->ClearTrace(); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + Options options = CurrentOptions(); + options.max_total_wal_size = 8192; + options.compression = kNoCompression; + options.write_buffer_size = 1 << 20; + options.level0_file_num_compaction_trigger = (1<<30); + options.level0_slowdown_writes_trigger = (1<<30); + options.level0_stop_writes_trigger = (1<<30); + options.disable_auto_compactions = true; + DestroyAndReopen(options); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + CreateColumnFamilies({"cf1", "cf2"}, options); + ASSERT_OK(Put(0, "key1", DummyString(8192))); + ASSERT_OK(Put(0, "key2", DummyString(8192))); + // the oldest wal should now be getting_flushed + ASSERT_OK(db_->DropColumnFamily(handles_[0])); + // all flushes should now do nothing because their CF is dropped + TEST_SYNC_POINT("Test:AllowFlushes"); + TEST_SYNC_POINT("Test:WaitForFlush"); + uint64_t lognum1 = dbfull()->TEST_LogfileNumber(); + ASSERT_OK(Put(1, "key3", DummyString(8192))); + ASSERT_OK(Put(1, "key4", DummyString(8192))); + // new wal should have been created + uint64_t lognum2 = dbfull()->TEST_LogfileNumber(); + EXPECT_GT(lognum2, lognum1); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/version_set.h b/db/version_set.h index 9ee6aeaa9..778e537f5 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -612,7 +612,9 @@ class VersionSet { uint64_t MinLogNumber() const { uint64_t min_log_num = std::numeric_limits::max(); for (auto cfd : *column_family_set_) { - if (min_log_num > cfd->GetLogNumber()) { + // It's safe to ignore dropped column families here: + // cfd->IsDropped() becomes true after the drop is persisted in MANIFEST. + if (min_log_num > cfd->GetLogNumber() && !cfd->IsDropped()) { min_log_num = cfd->GetLogNumber(); } }