Ensure all MultiGet IO errors are propagated to user (#6403)
Summary: Unrevert the previous fix to propagate error status, and an additional fix to not treat a memtable lookup MergeInProgress status as an error. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6403 Test Plan: Unit tests Tried running stress tests but couldn't repro the stress failure Differential Revision: D19846721 Pulled By: anand1976 fbshipit-source-id: 7db10cccbdc863d9b559497f0a46b608d2488ca4
This commit is contained in:
parent
e412a426d6
commit
3e49249d30
@ -10,6 +10,7 @@
|
||||
* Fixed issue #6316 that can cause a corruption of the MANIFEST file in the middle when writing to it fails due to no disk space.
|
||||
* Add DBOptions::skip_checking_sst_file_sizes_on_db_open. It disables potentially expensive checking of all sst file sizes in DB::Open().
|
||||
* BlobDB now ignores trivially moved files when updating the mapping between blob files and SSTs. This should mitigate issue #6338 where out of order flush/compaction notifications could trigger an assertion with the earlier code.
|
||||
* Batched MultiGet() ignores IO errors while reading data blocks, causing it to potentially continue looking for a key and returning stale results.
|
||||
|
||||
### Performance Improvements
|
||||
* Perfom readahead when reading from option files. Inside DB, options.log_readahead_size will be used as the readahead size. In other cases, a default 512KB is used.
|
||||
|
@ -1405,6 +1405,91 @@ TEST_F(DBBasicTest, MultiGetBatchedMultiLevel) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBBasicTest, MultiGetBatchedMultiLevelMerge) {
|
||||
Options options = CurrentOptions();
|
||||
options.disable_auto_compactions = true;
|
||||
options.merge_operator = MergeOperators::CreateStringAppendOperator();
|
||||
BlockBasedTableOptions bbto;
|
||||
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
|
||||
Reopen(options);
|
||||
int num_keys = 0;
|
||||
|
||||
for (int i = 0; i < 128; ++i) {
|
||||
ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
|
||||
num_keys++;
|
||||
if (num_keys == 8) {
|
||||
Flush();
|
||||
num_keys = 0;
|
||||
}
|
||||
}
|
||||
if (num_keys > 0) {
|
||||
Flush();
|
||||
num_keys = 0;
|
||||
}
|
||||
MoveFilesToLevel(2);
|
||||
|
||||
for (int i = 0; i < 128; i += 3) {
|
||||
ASSERT_OK(Merge("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
|
||||
num_keys++;
|
||||
if (num_keys == 8) {
|
||||
Flush();
|
||||
num_keys = 0;
|
||||
}
|
||||
}
|
||||
if (num_keys > 0) {
|
||||
Flush();
|
||||
num_keys = 0;
|
||||
}
|
||||
MoveFilesToLevel(1);
|
||||
|
||||
for (int i = 0; i < 128; i += 5) {
|
||||
ASSERT_OK(Merge("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
|
||||
num_keys++;
|
||||
if (num_keys == 8) {
|
||||
Flush();
|
||||
num_keys = 0;
|
||||
}
|
||||
}
|
||||
if (num_keys > 0) {
|
||||
Flush();
|
||||
num_keys = 0;
|
||||
}
|
||||
ASSERT_EQ(0, num_keys);
|
||||
|
||||
for (int i = 0; i < 128; i += 9) {
|
||||
ASSERT_OK(Merge("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
|
||||
}
|
||||
|
||||
std::vector<std::string> keys;
|
||||
std::vector<std::string> values;
|
||||
|
||||
for (int i = 32; i < 80; ++i) {
|
||||
keys.push_back("key_" + std::to_string(i));
|
||||
}
|
||||
|
||||
values = MultiGet(keys, nullptr);
|
||||
ASSERT_EQ(values.size(), keys.size());
|
||||
for (unsigned int j = 0; j < 48; ++j) {
|
||||
int key = j + 32;
|
||||
std::string value;
|
||||
value.append("val_l2_" + std::to_string(key));
|
||||
if (key % 3 == 0) {
|
||||
value.append(",");
|
||||
value.append("val_l1_" + std::to_string(key));
|
||||
}
|
||||
if (key % 5 == 0) {
|
||||
value.append(",");
|
||||
value.append("val_l0_" + std::to_string(key));
|
||||
}
|
||||
if (key % 9 == 0) {
|
||||
value.append(",");
|
||||
value.append("val_mem_" + std::to_string(key));
|
||||
}
|
||||
ASSERT_EQ(values[j], value);
|
||||
}
|
||||
}
|
||||
|
||||
// Test class for batched MultiGet with prefix extractor
|
||||
// Param bool - If true, use partitioned filters
|
||||
// If false, use full filter block
|
||||
@ -2011,6 +2096,90 @@ TEST_P(DBBasicTestWithParallelIO, MultiGet) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_P(DBBasicTestWithParallelIO, MultiGetWithChecksumMismatch) {
|
||||
std::vector<std::string> key_data(10);
|
||||
std::vector<Slice> keys;
|
||||
// We cannot resize a PinnableSlice vector, so just set initial size to
|
||||
// largest we think we will need
|
||||
std::vector<PinnableSlice> values(10);
|
||||
std::vector<Status> statuses;
|
||||
int read_count = 0;
|
||||
ReadOptions ro;
|
||||
ro.fill_cache = fill_cache();
|
||||
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"RetrieveMultipleBlocks:VerifyChecksum", [&](void *status) {
|
||||
Status* s = static_cast<Status*>(status);
|
||||
read_count++;
|
||||
if (read_count == 2) {
|
||||
*s = Status::Corruption();
|
||||
}
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
// Warm up the cache first
|
||||
key_data.emplace_back(Key(0));
|
||||
keys.emplace_back(Slice(key_data.back()));
|
||||
key_data.emplace_back(Key(50));
|
||||
keys.emplace_back(Slice(key_data.back()));
|
||||
statuses.resize(keys.size());
|
||||
|
||||
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
|
||||
keys.data(), values.data(), statuses.data(), true);
|
||||
ASSERT_TRUE(CheckValue(0, values[0].ToString()));
|
||||
//ASSERT_TRUE(CheckValue(50, values[1].ToString()));
|
||||
ASSERT_EQ(statuses[0], Status::OK());
|
||||
ASSERT_EQ(statuses[1], Status::Corruption());
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_P(DBBasicTestWithParallelIO, MultiGetWithMissingFile) {
|
||||
std::vector<std::string> key_data(10);
|
||||
std::vector<Slice> keys;
|
||||
// We cannot resize a PinnableSlice vector, so just set initial size to
|
||||
// largest we think we will need
|
||||
std::vector<PinnableSlice> values(10);
|
||||
std::vector<Status> statuses;
|
||||
ReadOptions ro;
|
||||
ro.fill_cache = fill_cache();
|
||||
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"TableCache::MultiGet:FindTable", [&](void *status) {
|
||||
Status* s = static_cast<Status*>(status);
|
||||
*s = Status::IOError();
|
||||
});
|
||||
// DB open will create table readers unless we reduce the table cache
|
||||
// capacity.
|
||||
// SanitizeOptions will set max_open_files to minimum of 20. Table cache
|
||||
// is allocated with max_open_files - 10 as capacity. So override
|
||||
// max_open_files to 11 so table cache capacity will become 1. This will
|
||||
// prevent file open during DB open and force the file to be opened
|
||||
// during MultiGet
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"SanitizeOptions::AfterChangeMaxOpenFiles", [&](void *arg) {
|
||||
int* max_open_files = (int*)arg;
|
||||
*max_open_files = 11;
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
Reopen(CurrentOptions());
|
||||
|
||||
// Warm up the cache first
|
||||
key_data.emplace_back(Key(0));
|
||||
keys.emplace_back(Slice(key_data.back()));
|
||||
key_data.emplace_back(Key(50));
|
||||
keys.emplace_back(Slice(key_data.back()));
|
||||
statuses.resize(keys.size());
|
||||
|
||||
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
|
||||
keys.data(), values.data(), statuses.data(), true);
|
||||
ASSERT_EQ(statuses[0], Status::IOError());
|
||||
ASSERT_EQ(statuses[1], Status::IOError());
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
ParallelIO, DBBasicTestWithParallelIO,
|
||||
// Params are as follows -
|
||||
|
@ -490,6 +490,7 @@ Status TableCache::MultiGet(const ReadOptions& options,
|
||||
file_options_, internal_comparator, fd, &handle, prefix_extractor,
|
||||
options.read_tier == kBlockCacheTier /* no_io */,
|
||||
true /* record_read_stats */, file_read_hist, skip_filters, level);
|
||||
TEST_SYNC_POINT_CALLBACK("TableCache::MultiGet:FindTable", &s);
|
||||
if (s.ok()) {
|
||||
t = GetTableReaderFromHandle(handle);
|
||||
assert(t);
|
||||
|
@ -1923,6 +1923,11 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
|
||||
&iter->max_covering_tombstone_seq, this->env_, nullptr,
|
||||
merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
|
||||
tracing_mget_id);
|
||||
// MergeInProgress status, if set, has been transferred to the get_context
|
||||
// state, so we set status to ok here. From now on, the iter status will
|
||||
// be used for IO errors, and get_context state will be used for any
|
||||
// key level errors
|
||||
*(iter->s) = Status::OK();
|
||||
}
|
||||
int get_ctx_index = 0;
|
||||
for (auto iter = range->begin(); iter != range->end();
|
||||
@ -1967,6 +1972,15 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
|
||||
for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
|
||||
GetContext& get_context = *iter->get_context;
|
||||
Status* status = iter->s;
|
||||
// The Status in the KeyContext takes precedence over GetContext state
|
||||
// Status may be an error if there were any IO errors in the table
|
||||
// reader. We never expect Status to be NotFound(), as that is
|
||||
// determined by get_context
|
||||
assert(!status->IsNotFound());
|
||||
if (!status->ok()) {
|
||||
file_range.MarkKeyDone(iter);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (get_context.sample()) {
|
||||
sample_file_read_inc(f->file_metadata);
|
||||
|
@ -2458,6 +2458,7 @@ void BlockBasedTable::RetrieveMultipleBlocks(
|
||||
s = rocksdb::VerifyChecksum(footer.checksum(),
|
||||
req.result.data() + req_offset,
|
||||
handle.size() + 1, expected);
|
||||
TEST_SYNC_POINT_CALLBACK("RetrieveMultipleBlocks:VerifyChecksum", &s);
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user