Ensure all MultiGet IO errors are propagated to user (#6403)
Summary: Unrevert the previous fix to propagate error status, and an additional fix to not treat a memtable lookup MergeInProgress status as an error. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6403 Test Plan: Unit tests Tried running stress tests but couldn't repro the stress failure Differential Revision: D19846721 Pulled By: anand1976 fbshipit-source-id: 7db10cccbdc863d9b559497f0a46b608d2488ca4
This commit is contained in:
parent
2c62c227ae
commit
10c141a3b7
@ -1,5 +1,8 @@
|
|||||||
# Rocksdb Change Log
|
# Rocksdb Change Log
|
||||||
## Unreleased
|
## 6.7.1 (02/13/2020)
|
||||||
|
### Bug Fixes
|
||||||
|
* Fixed issue #6316 that can cause a corruption of the MANIFEST file in the middle when writing to it fails due to no disk space.
|
||||||
|
* Batched MultiGet() ignores IO errors while reading data blocks, causing it to potentially continue looking for a key and returning stale results.
|
||||||
|
|
||||||
## 6.7.0 (01/21/2020)
|
## 6.7.0 (01/21/2020)
|
||||||
### Public API Change
|
### Public API Change
|
||||||
@ -17,7 +20,6 @@
|
|||||||
* Fixed an issue where the thread pools were not resized upon setting `max_background_jobs` dynamically through the `SetDBOptions` interface.
|
* Fixed an issue where the thread pools were not resized upon setting `max_background_jobs` dynamically through the `SetDBOptions` interface.
|
||||||
* Fix a bug that can cause write threads to hang when a slowdown/stall happens and there is a mix of writers with WriteOptions::no_slowdown set/unset.
|
* Fix a bug that can cause write threads to hang when a slowdown/stall happens and there is a mix of writers with WriteOptions::no_slowdown set/unset.
|
||||||
* Fixed an issue where an incorrect "number of input records" value was used to compute the "records dropped" statistics for compactions.
|
* Fixed an issue where an incorrect "number of input records" value was used to compute the "records dropped" statistics for compactions.
|
||||||
* Fixed issue #6316 that can cause a corruption of the MANIFEST file in the middle when writing to it fails due to no disk space.
|
|
||||||
|
|
||||||
### New Features
|
### New Features
|
||||||
* It is now possible to enable periodic compactions for the base DB when using BlobDB.
|
* It is now possible to enable periodic compactions for the base DB when using BlobDB.
|
||||||
|
@ -1405,6 +1405,91 @@ TEST_F(DBBasicTest, MultiGetBatchedMultiLevel) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(DBBasicTest, MultiGetBatchedMultiLevelMerge) {
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
options.disable_auto_compactions = true;
|
||||||
|
options.merge_operator = MergeOperators::CreateStringAppendOperator();
|
||||||
|
BlockBasedTableOptions bbto;
|
||||||
|
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
|
||||||
|
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
|
||||||
|
Reopen(options);
|
||||||
|
int num_keys = 0;
|
||||||
|
|
||||||
|
for (int i = 0; i < 128; ++i) {
|
||||||
|
ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
|
||||||
|
num_keys++;
|
||||||
|
if (num_keys == 8) {
|
||||||
|
Flush();
|
||||||
|
num_keys = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (num_keys > 0) {
|
||||||
|
Flush();
|
||||||
|
num_keys = 0;
|
||||||
|
}
|
||||||
|
MoveFilesToLevel(2);
|
||||||
|
|
||||||
|
for (int i = 0; i < 128; i += 3) {
|
||||||
|
ASSERT_OK(Merge("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
|
||||||
|
num_keys++;
|
||||||
|
if (num_keys == 8) {
|
||||||
|
Flush();
|
||||||
|
num_keys = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (num_keys > 0) {
|
||||||
|
Flush();
|
||||||
|
num_keys = 0;
|
||||||
|
}
|
||||||
|
MoveFilesToLevel(1);
|
||||||
|
|
||||||
|
for (int i = 0; i < 128; i += 5) {
|
||||||
|
ASSERT_OK(Merge("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
|
||||||
|
num_keys++;
|
||||||
|
if (num_keys == 8) {
|
||||||
|
Flush();
|
||||||
|
num_keys = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (num_keys > 0) {
|
||||||
|
Flush();
|
||||||
|
num_keys = 0;
|
||||||
|
}
|
||||||
|
ASSERT_EQ(0, num_keys);
|
||||||
|
|
||||||
|
for (int i = 0; i < 128; i += 9) {
|
||||||
|
ASSERT_OK(Merge("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<std::string> keys;
|
||||||
|
std::vector<std::string> values;
|
||||||
|
|
||||||
|
for (int i = 32; i < 80; ++i) {
|
||||||
|
keys.push_back("key_" + std::to_string(i));
|
||||||
|
}
|
||||||
|
|
||||||
|
values = MultiGet(keys, nullptr);
|
||||||
|
ASSERT_EQ(values.size(), keys.size());
|
||||||
|
for (unsigned int j = 0; j < 48; ++j) {
|
||||||
|
int key = j + 32;
|
||||||
|
std::string value;
|
||||||
|
value.append("val_l2_" + std::to_string(key));
|
||||||
|
if (key % 3 == 0) {
|
||||||
|
value.append(",");
|
||||||
|
value.append("val_l1_" + std::to_string(key));
|
||||||
|
}
|
||||||
|
if (key % 5 == 0) {
|
||||||
|
value.append(",");
|
||||||
|
value.append("val_l0_" + std::to_string(key));
|
||||||
|
}
|
||||||
|
if (key % 9 == 0) {
|
||||||
|
value.append(",");
|
||||||
|
value.append("val_mem_" + std::to_string(key));
|
||||||
|
}
|
||||||
|
ASSERT_EQ(values[j], value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Test class for batched MultiGet with prefix extractor
|
// Test class for batched MultiGet with prefix extractor
|
||||||
// Param bool - If true, use partitioned filters
|
// Param bool - If true, use partitioned filters
|
||||||
// If false, use full filter block
|
// If false, use full filter block
|
||||||
@ -2011,6 +2096,90 @@ TEST_P(DBBasicTestWithParallelIO, MultiGet) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_P(DBBasicTestWithParallelIO, MultiGetWithChecksumMismatch) {
|
||||||
|
std::vector<std::string> key_data(10);
|
||||||
|
std::vector<Slice> keys;
|
||||||
|
// We cannot resize a PinnableSlice vector, so just set initial size to
|
||||||
|
// largest we think we will need
|
||||||
|
std::vector<PinnableSlice> values(10);
|
||||||
|
std::vector<Status> statuses;
|
||||||
|
int read_count = 0;
|
||||||
|
ReadOptions ro;
|
||||||
|
ro.fill_cache = fill_cache();
|
||||||
|
|
||||||
|
SyncPoint::GetInstance()->SetCallBack(
|
||||||
|
"RetrieveMultipleBlocks:VerifyChecksum", [&](void *status) {
|
||||||
|
Status* s = static_cast<Status*>(status);
|
||||||
|
read_count++;
|
||||||
|
if (read_count == 2) {
|
||||||
|
*s = Status::Corruption();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
|
||||||
|
// Warm up the cache first
|
||||||
|
key_data.emplace_back(Key(0));
|
||||||
|
keys.emplace_back(Slice(key_data.back()));
|
||||||
|
key_data.emplace_back(Key(50));
|
||||||
|
keys.emplace_back(Slice(key_data.back()));
|
||||||
|
statuses.resize(keys.size());
|
||||||
|
|
||||||
|
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
|
||||||
|
keys.data(), values.data(), statuses.data(), true);
|
||||||
|
ASSERT_TRUE(CheckValue(0, values[0].ToString()));
|
||||||
|
//ASSERT_TRUE(CheckValue(50, values[1].ToString()));
|
||||||
|
ASSERT_EQ(statuses[0], Status::OK());
|
||||||
|
ASSERT_EQ(statuses[1], Status::Corruption());
|
||||||
|
|
||||||
|
SyncPoint::GetInstance()->DisableProcessing();
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_P(DBBasicTestWithParallelIO, MultiGetWithMissingFile) {
|
||||||
|
std::vector<std::string> key_data(10);
|
||||||
|
std::vector<Slice> keys;
|
||||||
|
// We cannot resize a PinnableSlice vector, so just set initial size to
|
||||||
|
// largest we think we will need
|
||||||
|
std::vector<PinnableSlice> values(10);
|
||||||
|
std::vector<Status> statuses;
|
||||||
|
ReadOptions ro;
|
||||||
|
ro.fill_cache = fill_cache();
|
||||||
|
|
||||||
|
SyncPoint::GetInstance()->SetCallBack(
|
||||||
|
"TableCache::MultiGet:FindTable", [&](void *status) {
|
||||||
|
Status* s = static_cast<Status*>(status);
|
||||||
|
*s = Status::IOError();
|
||||||
|
});
|
||||||
|
// DB open will create table readers unless we reduce the table cache
|
||||||
|
// capacity.
|
||||||
|
// SanitizeOptions will set max_open_files to minimum of 20. Table cache
|
||||||
|
// is allocated with max_open_files - 10 as capacity. So override
|
||||||
|
// max_open_files to 11 so table cache capacity will become 1. This will
|
||||||
|
// prevent file open during DB open and force the file to be opened
|
||||||
|
// during MultiGet
|
||||||
|
SyncPoint::GetInstance()->SetCallBack(
|
||||||
|
"SanitizeOptions::AfterChangeMaxOpenFiles", [&](void *arg) {
|
||||||
|
int* max_open_files = (int*)arg;
|
||||||
|
*max_open_files = 11;
|
||||||
|
});
|
||||||
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
|
||||||
|
Reopen(CurrentOptions());
|
||||||
|
|
||||||
|
// Warm up the cache first
|
||||||
|
key_data.emplace_back(Key(0));
|
||||||
|
keys.emplace_back(Slice(key_data.back()));
|
||||||
|
key_data.emplace_back(Key(50));
|
||||||
|
keys.emplace_back(Slice(key_data.back()));
|
||||||
|
statuses.resize(keys.size());
|
||||||
|
|
||||||
|
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
|
||||||
|
keys.data(), values.data(), statuses.data(), true);
|
||||||
|
ASSERT_EQ(statuses[0], Status::IOError());
|
||||||
|
ASSERT_EQ(statuses[1], Status::IOError());
|
||||||
|
|
||||||
|
SyncPoint::GetInstance()->DisableProcessing();
|
||||||
|
}
|
||||||
|
|
||||||
INSTANTIATE_TEST_CASE_P(
|
INSTANTIATE_TEST_CASE_P(
|
||||||
ParallelIO, DBBasicTestWithParallelIO,
|
ParallelIO, DBBasicTestWithParallelIO,
|
||||||
// Params are as follows -
|
// Params are as follows -
|
||||||
|
@ -485,6 +485,7 @@ Status TableCache::MultiGet(const ReadOptions& options,
|
|||||||
file_options_, internal_comparator, fd, &handle, prefix_extractor,
|
file_options_, internal_comparator, fd, &handle, prefix_extractor,
|
||||||
options.read_tier == kBlockCacheTier /* no_io */,
|
options.read_tier == kBlockCacheTier /* no_io */,
|
||||||
true /* record_read_stats */, file_read_hist, skip_filters, level);
|
true /* record_read_stats */, file_read_hist, skip_filters, level);
|
||||||
|
TEST_SYNC_POINT_CALLBACK("TableCache::MultiGet:FindTable", &s);
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
t = GetTableReaderFromHandle(handle);
|
t = GetTableReaderFromHandle(handle);
|
||||||
assert(t);
|
assert(t);
|
||||||
|
@ -1918,6 +1918,11 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
|
|||||||
&iter->max_covering_tombstone_seq, this->env_, nullptr,
|
&iter->max_covering_tombstone_seq, this->env_, nullptr,
|
||||||
merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
|
merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
|
||||||
tracing_mget_id);
|
tracing_mget_id);
|
||||||
|
// MergeInProgress status, if set, has been transferred to the get_context
|
||||||
|
// state, so we set status to ok here. From now on, the iter status will
|
||||||
|
// be used for IO errors, and get_context state will be used for any
|
||||||
|
// key level errors
|
||||||
|
*(iter->s) = Status::OK();
|
||||||
}
|
}
|
||||||
int get_ctx_index = 0;
|
int get_ctx_index = 0;
|
||||||
for (auto iter = range->begin(); iter != range->end();
|
for (auto iter = range->begin(); iter != range->end();
|
||||||
@ -1962,6 +1967,15 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
|
|||||||
for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
|
for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
|
||||||
GetContext& get_context = *iter->get_context;
|
GetContext& get_context = *iter->get_context;
|
||||||
Status* status = iter->s;
|
Status* status = iter->s;
|
||||||
|
// The Status in the KeyContext takes precedence over GetContext state
|
||||||
|
// Status may be an error if there were any IO errors in the table
|
||||||
|
// reader. We never expect Status to be NotFound(), as that is
|
||||||
|
// determined by get_context
|
||||||
|
assert(!status->IsNotFound());
|
||||||
|
if (!status->ok()) {
|
||||||
|
file_range.MarkKeyDone(iter);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (get_context.sample()) {
|
if (get_context.sample()) {
|
||||||
sample_file_read_inc(f->file_metadata);
|
sample_file_read_inc(f->file_metadata);
|
||||||
|
@ -2455,6 +2455,7 @@ void BlockBasedTable::RetrieveMultipleBlocks(
|
|||||||
s = rocksdb::VerifyChecksum(footer.checksum(),
|
s = rocksdb::VerifyChecksum(footer.checksum(),
|
||||||
req.result.data() + req_offset,
|
req.result.data() + req_offset,
|
||||||
handle.size() + 1, expected);
|
handle.size() + 1, expected);
|
||||||
|
TEST_SYNC_POINT_CALLBACK("RetrieveMultipleBlocks:VerifyChecksum", &s);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user