diff --git a/HISTORY.md b/HISTORY.md index b76e5fb89..08ef0a3e3 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,5 +1,8 @@ # Rocksdb Change Log -## Unreleased +## 6.7.1 (02/13/2020) +### Bug Fixes +* Fixed issue #6316 that can cause a corruption of the MANIFEST file in the middle when writing to it fails due to no disk space. +* Batched MultiGet() ignores IO errors while reading data blocks, causing it to potentially continue looking for a key and returning stale results. ## 6.7.0 (01/21/2020) ### Public API Change @@ -17,7 +20,6 @@ * Fixed an issue where the thread pools were not resized upon setting `max_background_jobs` dynamically through the `SetDBOptions` interface. * Fix a bug that can cause write threads to hang when a slowdown/stall happens and there is a mix of writers with WriteOptions::no_slowdown set/unset. * Fixed an issue where an incorrect "number of input records" value was used to compute the "records dropped" statistics for compactions. -* Fixed issue #6316 that can cause a corruption of the MANIFEST file in the middle when writing to it fails due to no disk space. ### New Features * It is now possible to enable periodic compactions for the base DB when using BlobDB. diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 86f6f810d..5294c1a60 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -1405,6 +1405,91 @@ TEST_F(DBBasicTest, MultiGetBatchedMultiLevel) { } } +TEST_F(DBBasicTest, MultiGetBatchedMultiLevelMerge) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + BlockBasedTableOptions bbto; + bbto.filter_policy.reset(NewBloomFilterPolicy(10, false)); + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + Reopen(options); + int num_keys = 0; + + for (int i = 0; i < 128; ++i) { + ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i))); + num_keys++; + if (num_keys == 8) { + Flush(); + num_keys = 0; + } + } + if (num_keys > 0) { + Flush(); + num_keys = 0; + } + MoveFilesToLevel(2); + + for (int i = 0; i < 128; i += 3) { + ASSERT_OK(Merge("key_" + std::to_string(i), "val_l1_" + std::to_string(i))); + num_keys++; + if (num_keys == 8) { + Flush(); + num_keys = 0; + } + } + if (num_keys > 0) { + Flush(); + num_keys = 0; + } + MoveFilesToLevel(1); + + for (int i = 0; i < 128; i += 5) { + ASSERT_OK(Merge("key_" + std::to_string(i), "val_l0_" + std::to_string(i))); + num_keys++; + if (num_keys == 8) { + Flush(); + num_keys = 0; + } + } + if (num_keys > 0) { + Flush(); + num_keys = 0; + } + ASSERT_EQ(0, num_keys); + + for (int i = 0; i < 128; i += 9) { + ASSERT_OK(Merge("key_" + std::to_string(i), "val_mem_" + std::to_string(i))); + } + + std::vector keys; + std::vector values; + + for (int i = 32; i < 80; ++i) { + keys.push_back("key_" + std::to_string(i)); + } + + values = MultiGet(keys, nullptr); + ASSERT_EQ(values.size(), keys.size()); + for (unsigned int j = 0; j < 48; ++j) { + int key = j + 32; + std::string value; + value.append("val_l2_" + std::to_string(key)); + if (key % 3 == 0) { + value.append(","); + value.append("val_l1_" + std::to_string(key)); + } + if (key % 5 == 0) { + value.append(","); + value.append("val_l0_" + std::to_string(key)); + } + if (key % 9 == 0) { + value.append(","); + value.append("val_mem_" + std::to_string(key)); + } + ASSERT_EQ(values[j], value); + } +} + // Test class for batched MultiGet with prefix extractor // Param bool - If true, use partitioned filters // If false, use full filter block @@ -2011,6 +2096,90 @@ TEST_P(DBBasicTestWithParallelIO, MultiGet) { } } +TEST_P(DBBasicTestWithParallelIO, MultiGetWithChecksumMismatch) { + std::vector key_data(10); + std::vector keys; + // We cannot resize a PinnableSlice vector, so just set initial size to + // largest we think we will need + std::vector values(10); + std::vector statuses; + int read_count = 0; + ReadOptions ro; + ro.fill_cache = fill_cache(); + + SyncPoint::GetInstance()->SetCallBack( + "RetrieveMultipleBlocks:VerifyChecksum", [&](void *status) { + Status* s = static_cast(status); + read_count++; + if (read_count == 2) { + *s = Status::Corruption(); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + // Warm up the cache first + key_data.emplace_back(Key(0)); + keys.emplace_back(Slice(key_data.back())); + key_data.emplace_back(Key(50)); + keys.emplace_back(Slice(key_data.back())); + statuses.resize(keys.size()); + + dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), + keys.data(), values.data(), statuses.data(), true); + ASSERT_TRUE(CheckValue(0, values[0].ToString())); + //ASSERT_TRUE(CheckValue(50, values[1].ToString())); + ASSERT_EQ(statuses[0], Status::OK()); + ASSERT_EQ(statuses[1], Status::Corruption()); + + SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_P(DBBasicTestWithParallelIO, MultiGetWithMissingFile) { + std::vector key_data(10); + std::vector keys; + // We cannot resize a PinnableSlice vector, so just set initial size to + // largest we think we will need + std::vector values(10); + std::vector statuses; + ReadOptions ro; + ro.fill_cache = fill_cache(); + + SyncPoint::GetInstance()->SetCallBack( + "TableCache::MultiGet:FindTable", [&](void *status) { + Status* s = static_cast(status); + *s = Status::IOError(); + }); + // DB open will create table readers unless we reduce the table cache + // capacity. + // SanitizeOptions will set max_open_files to minimum of 20. Table cache + // is allocated with max_open_files - 10 as capacity. So override + // max_open_files to 11 so table cache capacity will become 1. This will + // prevent file open during DB open and force the file to be opened + // during MultiGet + SyncPoint::GetInstance()->SetCallBack( + "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void *arg) { + int* max_open_files = (int*)arg; + *max_open_files = 11; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + Reopen(CurrentOptions()); + + // Warm up the cache first + key_data.emplace_back(Key(0)); + keys.emplace_back(Slice(key_data.back())); + key_data.emplace_back(Key(50)); + keys.emplace_back(Slice(key_data.back())); + statuses.resize(keys.size()); + + dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), + keys.data(), values.data(), statuses.data(), true); + ASSERT_EQ(statuses[0], Status::IOError()); + ASSERT_EQ(statuses[1], Status::IOError()); + + SyncPoint::GetInstance()->DisableProcessing(); +} + INSTANTIATE_TEST_CASE_P( ParallelIO, DBBasicTestWithParallelIO, // Params are as follows - diff --git a/db/table_cache.cc b/db/table_cache.cc index 8a8a80ba3..fb9ae9bfa 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -485,6 +485,7 @@ Status TableCache::MultiGet(const ReadOptions& options, file_options_, internal_comparator, fd, &handle, prefix_extractor, options.read_tier == kBlockCacheTier /* no_io */, true /* record_read_stats */, file_read_hist, skip_filters, level); + TEST_SYNC_POINT_CALLBACK("TableCache::MultiGet:FindTable", &s); if (s.ok()) { t = GetTableReaderFromHandle(handle); assert(t); diff --git a/db/version_set.cc b/db/version_set.cc index 8420797df..292cfd5fc 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1918,6 +1918,11 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, &iter->max_covering_tombstone_seq, this->env_, nullptr, merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob, tracing_mget_id); + // MergeInProgress status, if set, has been transferred to the get_context + // state, so we set status to ok here. From now on, the iter status will + // be used for IO errors, and get_context state will be used for any + // key level errors + *(iter->s) = Status::OK(); } int get_ctx_index = 0; for (auto iter = range->begin(); iter != range->end(); @@ -1962,6 +1967,15 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) { GetContext& get_context = *iter->get_context; Status* status = iter->s; + // The Status in the KeyContext takes precedence over GetContext state + // Status may be an error if there were any IO errors in the table + // reader. We never expect Status to be NotFound(), as that is + // determined by get_context + assert(!status->IsNotFound()); + if (!status->ok()) { + file_range.MarkKeyDone(iter); + continue; + } if (get_context.sample()) { sample_file_read_inc(f->file_metadata); diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index dbed36b91..755ef312d 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -2455,6 +2455,7 @@ void BlockBasedTable::RetrieveMultipleBlocks( s = rocksdb::VerifyChecksum(footer.checksum(), req.result.data() + req_offset, handle.size() + 1, expected); + TEST_SYNC_POINT_CALLBACK("RetrieveMultipleBlocks:VerifyChecksum", &s); } }