Batch blob read IO for MultiGet (#8699)
Summary: In batched `MultiGet()`, RocksDB batches blob read IO and uses `RandomAccessFileReader::MultiRead()` to read the blobs instead of issuing multiple `Read()`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8699 Test Plan: ``` make check ``` Reviewed By: ltamasi Differential Revision: D31030861 Pulled By: riversand963 fbshipit-source-id: a0df6060cbfd54cff9515a4eee08807b1dbcb0c8
This commit is contained in:
parent
ba48ff8303
commit
b512f4bc76
@ -23,6 +23,7 @@
|
||||
* Allow a single write batch to include keys from multiple column families whose timestamps' formats can differ. For example, some column families may disable timestamp, while others enable timestamp.
|
||||
* Add compaction priority information in RemoteCompaction, which can be used to schedule high priority job first.
|
||||
* Added new callback APIs `OnBlobFileCreationStarted`,`OnBlobFileCreated`and `OnBlobFileDeleted` in `EventListener` class of listener.h. It notifies listeners during creation/deletion of individual blob files in Integrated BlobDB. It also log blob file creation finished event and deletion event in LOG file.
|
||||
* Batch blob read requests for `DB::MultiGet` using `MultiRead`.
|
||||
|
||||
### Public API change
|
||||
* Remove obsolete implementation details FullKey and ParseFullKey from public API
|
||||
|
@ -351,6 +351,117 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options,
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void BlobFileReader::MultiGetBlob(
|
||||
const ReadOptions& read_options,
|
||||
const autovector<std::reference_wrapper<const Slice>>& user_keys,
|
||||
const autovector<uint64_t>& offsets,
|
||||
const autovector<uint64_t>& value_sizes, autovector<Status*>& statuses,
|
||||
autovector<PinnableSlice*>& values, uint64_t* bytes_read) const {
|
||||
const size_t num_blobs = user_keys.size();
|
||||
assert(num_blobs == offsets.size());
|
||||
assert(num_blobs == value_sizes.size());
|
||||
assert(num_blobs == statuses.size());
|
||||
assert(num_blobs == values.size());
|
||||
|
||||
std::vector<FSReadRequest> read_reqs(num_blobs);
|
||||
autovector<uint64_t> adjustments;
|
||||
uint64_t total_len = 0;
|
||||
for (size_t i = 0; i < num_blobs; ++i) {
|
||||
const size_t key_size = user_keys[i].get().size();
|
||||
assert(IsValidBlobOffset(offsets[i], key_size, value_sizes[i], file_size_));
|
||||
const uint64_t adjustment =
|
||||
read_options.verify_checksums
|
||||
? BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size)
|
||||
: 0;
|
||||
assert(offsets[i] >= adjustment);
|
||||
adjustments.push_back(adjustment);
|
||||
read_reqs[i].offset = offsets[i] - adjustment;
|
||||
read_reqs[i].len = value_sizes[i] + adjustment;
|
||||
total_len += read_reqs[i].len;
|
||||
}
|
||||
|
||||
RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, total_len);
|
||||
|
||||
Buffer buf;
|
||||
AlignedBuf aligned_buf;
|
||||
|
||||
Status s;
|
||||
bool direct_io = file_reader_->use_direct_io();
|
||||
if (direct_io) {
|
||||
for (size_t i = 0; i < read_reqs.size(); ++i) {
|
||||
read_reqs[i].scratch = nullptr;
|
||||
}
|
||||
} else {
|
||||
buf.reset(new char[total_len]);
|
||||
std::ptrdiff_t pos = 0;
|
||||
for (size_t i = 0; i < read_reqs.size(); ++i) {
|
||||
read_reqs[i].scratch = buf.get() + pos;
|
||||
pos += read_reqs[i].len;
|
||||
}
|
||||
}
|
||||
TEST_SYNC_POINT("BlobFileReader::MultiGetBlob:ReadFromFile");
|
||||
s = file_reader_->MultiRead(IOOptions(), read_reqs.data(), read_reqs.size(),
|
||||
direct_io ? &aligned_buf : nullptr);
|
||||
if (!s.ok()) {
|
||||
for (auto& req : read_reqs) {
|
||||
req.status.PermitUncheckedError();
|
||||
}
|
||||
for (size_t i = 0; i < num_blobs; ++i) {
|
||||
assert(statuses[i]);
|
||||
*statuses[i] = s;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
assert(s.ok());
|
||||
for (size_t i = 0; i < num_blobs; ++i) {
|
||||
auto& req = read_reqs[i];
|
||||
assert(statuses[i]);
|
||||
if (req.status.ok() && req.result.size() != req.len) {
|
||||
req.status = IOStatus::Corruption("Failed to read data from blob file");
|
||||
}
|
||||
*statuses[i] = req.status;
|
||||
}
|
||||
|
||||
if (read_options.verify_checksums) {
|
||||
for (size_t i = 0; i < num_blobs; ++i) {
|
||||
assert(statuses[i]);
|
||||
if (!statuses[i]->ok()) {
|
||||
continue;
|
||||
}
|
||||
const Slice& record_slice = read_reqs[i].result;
|
||||
s = VerifyBlob(record_slice, user_keys[i], value_sizes[i]);
|
||||
if (!s.ok()) {
|
||||
assert(statuses[i]);
|
||||
*statuses[i] = s;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < num_blobs; ++i) {
|
||||
assert(statuses[i]);
|
||||
if (!statuses[i]->ok()) {
|
||||
continue;
|
||||
}
|
||||
const Slice& record_slice = read_reqs[i].result;
|
||||
const Slice value_slice(record_slice.data() + adjustments[i],
|
||||
value_sizes[i]);
|
||||
s = UncompressBlobIfNeeded(value_slice, compression_type_, clock_,
|
||||
statistics_, values[i]);
|
||||
if (!s.ok()) {
|
||||
*statuses[i] = s;
|
||||
}
|
||||
}
|
||||
|
||||
if (bytes_read) {
|
||||
uint64_t total_bytes = 0;
|
||||
for (const auto& req : read_reqs) {
|
||||
total_bytes += req.result.size();
|
||||
}
|
||||
*bytes_read = total_bytes;
|
||||
}
|
||||
}
|
||||
|
||||
Status BlobFileReader::VerifyBlob(const Slice& record_slice,
|
||||
const Slice& user_key, uint64_t value_size) {
|
||||
BlobLogRecord record;
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include "file/random_access_file_reader.h"
|
||||
#include "rocksdb/compression_type.h"
|
||||
#include "rocksdb/rocksdb_namespace.h"
|
||||
#include "util/autovector.h"
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
@ -43,6 +44,17 @@ class BlobFileReader {
|
||||
CompressionType compression_type, PinnableSlice* value,
|
||||
uint64_t* bytes_read) const;
|
||||
|
||||
void MultiGetBlob(
|
||||
const ReadOptions& read_options,
|
||||
const autovector<std::reference_wrapper<const Slice>>& user_keys,
|
||||
const autovector<uint64_t>& offsets,
|
||||
const autovector<uint64_t>& value_sizes, autovector<Status*>& statuses,
|
||||
autovector<PinnableSlice*>& values, uint64_t* bytes_read) const;
|
||||
|
||||
CompressionType GetCompressionType() const { return compression_type_; }
|
||||
|
||||
uint64_t GetFileSize() const { return file_size_; }
|
||||
|
||||
private:
|
||||
BlobFileReader(std::unique_ptr<RandomAccessFileReader>&& file_reader,
|
||||
uint64_t file_size, CompressionType compression_type,
|
||||
|
@ -27,23 +27,23 @@ namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
namespace {
|
||||
|
||||
// Creates a test blob file with a single blob in it. Note: this method
|
||||
// makes it possible to test various corner cases by allowing the caller
|
||||
// to specify the contents of various blob file header/footer fields.
|
||||
// Creates a test blob file with `num` blobs in it.
|
||||
void WriteBlobFile(const ImmutableOptions& immutable_options,
|
||||
uint32_t column_family_id, bool has_ttl,
|
||||
const ExpirationRange& expiration_range_header,
|
||||
const ExpirationRange& expiration_range_footer,
|
||||
uint64_t blob_file_number, const Slice& key,
|
||||
const Slice& blob, CompressionType compression_type,
|
||||
uint64_t* blob_offset, uint64_t* blob_size) {
|
||||
uint64_t blob_file_number, const std::vector<Slice>& keys,
|
||||
const std::vector<Slice>& blobs, CompressionType compression,
|
||||
std::vector<uint64_t>& blob_offsets,
|
||||
std::vector<uint64_t>& blob_sizes) {
|
||||
assert(!immutable_options.cf_paths.empty());
|
||||
assert(blob_offset);
|
||||
assert(blob_size);
|
||||
size_t num = keys.size();
|
||||
assert(num == blobs.size());
|
||||
assert(num == blob_offsets.size());
|
||||
assert(num == blob_sizes.size());
|
||||
|
||||
const std::string blob_file_path =
|
||||
BlobFileName(immutable_options.cf_paths.front().path, blob_file_number);
|
||||
|
||||
std::unique_ptr<FSWritableFile> file;
|
||||
ASSERT_OK(NewWritableFile(immutable_options.fs.get(), blob_file_path, &file,
|
||||
FileOptions()));
|
||||
@ -59,50 +59,77 @@ void WriteBlobFile(const ImmutableOptions& immutable_options,
|
||||
statistics, blob_file_number, use_fsync,
|
||||
do_flush);
|
||||
|
||||
BlobLogHeader header(column_family_id, compression_type, has_ttl,
|
||||
BlobLogHeader header(column_family_id, compression, has_ttl,
|
||||
expiration_range_header);
|
||||
|
||||
ASSERT_OK(blob_log_writer.WriteHeader(header));
|
||||
|
||||
std::string compressed_blob;
|
||||
Slice blob_to_write;
|
||||
|
||||
if (compression_type == kNoCompression) {
|
||||
blob_to_write = blob;
|
||||
*blob_size = blob.size();
|
||||
std::vector<std::string> compressed_blobs(num);
|
||||
std::vector<Slice> blobs_to_write(num);
|
||||
if (kNoCompression == compression) {
|
||||
for (size_t i = 0; i < num; ++i) {
|
||||
blobs_to_write[i] = blobs[i];
|
||||
blob_sizes[i] = blobs[i].size();
|
||||
}
|
||||
} else {
|
||||
CompressionOptions opts;
|
||||
CompressionContext context(compression_type);
|
||||
CompressionContext context(compression);
|
||||
constexpr uint64_t sample_for_compression = 0;
|
||||
|
||||
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
|
||||
compression_type, sample_for_compression);
|
||||
compression, sample_for_compression);
|
||||
|
||||
constexpr uint32_t compression_format_version = 2;
|
||||
|
||||
ASSERT_TRUE(
|
||||
CompressData(blob, info, compression_format_version, &compressed_blob));
|
||||
|
||||
blob_to_write = compressed_blob;
|
||||
*blob_size = compressed_blob.size();
|
||||
for (size_t i = 0; i < num; ++i) {
|
||||
ASSERT_TRUE(CompressData(blobs[i], info, compression_format_version,
|
||||
&compressed_blobs[i]));
|
||||
blobs_to_write[i] = compressed_blobs[i];
|
||||
blob_sizes[i] = compressed_blobs[i].size();
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t key_offset = 0;
|
||||
|
||||
ASSERT_OK(
|
||||
blob_log_writer.AddRecord(key, blob_to_write, &key_offset, blob_offset));
|
||||
for (size_t i = 0; i < num; ++i) {
|
||||
uint64_t key_offset = 0;
|
||||
ASSERT_OK(blob_log_writer.AddRecord(keys[i], blobs_to_write[i], &key_offset,
|
||||
&blob_offsets[i]));
|
||||
}
|
||||
|
||||
BlobLogFooter footer;
|
||||
footer.blob_count = 1;
|
||||
footer.blob_count = num;
|
||||
footer.expiration_range = expiration_range_footer;
|
||||
|
||||
std::string checksum_method;
|
||||
std::string checksum_value;
|
||||
|
||||
ASSERT_OK(
|
||||
blob_log_writer.AppendFooter(footer, &checksum_method, &checksum_value));
|
||||
}
|
||||
|
||||
// Creates a test blob file with a single blob in it. Note: this method
|
||||
// makes it possible to test various corner cases by allowing the caller
|
||||
// to specify the contents of various blob file header/footer fields.
|
||||
void WriteBlobFile(const ImmutableOptions& immutable_options,
|
||||
uint32_t column_family_id, bool has_ttl,
|
||||
const ExpirationRange& expiration_range_header,
|
||||
const ExpirationRange& expiration_range_footer,
|
||||
uint64_t blob_file_number, const Slice& key,
|
||||
const Slice& blob, CompressionType compression,
|
||||
uint64_t* blob_offset, uint64_t* blob_size) {
|
||||
std::vector<Slice> keys{key};
|
||||
std::vector<Slice> blobs{blob};
|
||||
std::vector<uint64_t> blob_offsets{0};
|
||||
std::vector<uint64_t> blob_sizes{0};
|
||||
WriteBlobFile(immutable_options, column_family_id, has_ttl,
|
||||
expiration_range_header, expiration_range_footer,
|
||||
blob_file_number, keys, blobs, compression, blob_offsets,
|
||||
blob_sizes);
|
||||
if (blob_offset) {
|
||||
*blob_offset = blob_offsets[0];
|
||||
}
|
||||
if (blob_size) {
|
||||
*blob_size = blob_sizes[0];
|
||||
}
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
class BlobFileReaderTest : public testing::Test {
|
||||
@ -127,15 +154,19 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
|
||||
constexpr bool has_ttl = false;
|
||||
constexpr ExpirationRange expiration_range;
|
||||
constexpr uint64_t blob_file_number = 1;
|
||||
constexpr char key[] = "key";
|
||||
constexpr char blob[] = "blob";
|
||||
constexpr size_t num_blobs = 3;
|
||||
const std::vector<std::string> key_strs = {"key1", "key2", "key3"};
|
||||
const std::vector<std::string> blob_strs = {"blob1", "blob2", "blob3"};
|
||||
|
||||
uint64_t blob_offset = 0;
|
||||
uint64_t blob_size = 0;
|
||||
const std::vector<Slice> keys = {key_strs[0], key_strs[1], key_strs[2]};
|
||||
const std::vector<Slice> blobs = {blob_strs[0], blob_strs[1], blob_strs[2]};
|
||||
|
||||
std::vector<uint64_t> blob_offsets(keys.size());
|
||||
std::vector<uint64_t> blob_sizes(keys.size());
|
||||
|
||||
WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
|
||||
expiration_range, blob_file_number, key, blob, kNoCompression,
|
||||
&blob_offset, &blob_size);
|
||||
expiration_range, blob_file_number, keys, blobs, kNoCompression,
|
||||
blob_offsets, blob_sizes);
|
||||
|
||||
constexpr HistogramImpl* blob_file_read_hist = nullptr;
|
||||
|
||||
@ -153,10 +184,36 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
|
||||
PinnableSlice value;
|
||||
uint64_t bytes_read = 0;
|
||||
|
||||
ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size,
|
||||
kNoCompression, &value, &bytes_read));
|
||||
ASSERT_EQ(value, blob);
|
||||
ASSERT_EQ(bytes_read, blob_size);
|
||||
ASSERT_OK(reader->GetBlob(read_options, keys[0], blob_offsets[0],
|
||||
blob_sizes[0], kNoCompression, &value,
|
||||
&bytes_read));
|
||||
ASSERT_EQ(value, blobs[0]);
|
||||
ASSERT_EQ(bytes_read, blob_sizes[0]);
|
||||
|
||||
// MultiGetBlob
|
||||
bytes_read = 0;
|
||||
size_t total_size = 0;
|
||||
autovector<std::reference_wrapper<const Slice>> key_refs;
|
||||
for (const auto& key_ref : keys) {
|
||||
key_refs.emplace_back(std::cref(key_ref));
|
||||
}
|
||||
autovector<uint64_t> offsets{blob_offsets[0], blob_offsets[1],
|
||||
blob_offsets[2]};
|
||||
autovector<uint64_t> sizes{blob_sizes[0], blob_sizes[1], blob_sizes[2]};
|
||||
std::array<Status, num_blobs> statuses_buf;
|
||||
autovector<Status*> statuses{&statuses_buf[0], &statuses_buf[1],
|
||||
&statuses_buf[2]};
|
||||
std::array<PinnableSlice, num_blobs> value_buf;
|
||||
autovector<PinnableSlice*> values{&value_buf[0], &value_buf[1],
|
||||
&value_buf[2]};
|
||||
reader->MultiGetBlob(read_options, key_refs, offsets, sizes, statuses,
|
||||
values, &bytes_read);
|
||||
for (size_t i = 0; i < num_blobs; ++i) {
|
||||
ASSERT_OK(statuses_buf[i]);
|
||||
ASSERT_EQ(value_buf[i], blobs[i]);
|
||||
total_size += blob_sizes[i];
|
||||
}
|
||||
ASSERT_EQ(bytes_read, total_size);
|
||||
}
|
||||
|
||||
read_options.verify_checksums = true;
|
||||
@ -165,14 +222,15 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
|
||||
PinnableSlice value;
|
||||
uint64_t bytes_read = 0;
|
||||
|
||||
ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size,
|
||||
kNoCompression, &value, &bytes_read));
|
||||
ASSERT_EQ(value, blob);
|
||||
ASSERT_OK(reader->GetBlob(read_options, keys[1], blob_offsets[1],
|
||||
blob_sizes[1], kNoCompression, &value,
|
||||
&bytes_read));
|
||||
ASSERT_EQ(value, blobs[1]);
|
||||
|
||||
constexpr uint64_t key_size = sizeof(key) - 1;
|
||||
const uint64_t key_size = keys[1].size();
|
||||
ASSERT_EQ(bytes_read,
|
||||
BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size) +
|
||||
blob_size);
|
||||
blob_sizes[1]);
|
||||
}
|
||||
|
||||
// Invalid offset (too close to start of file)
|
||||
@ -181,8 +239,9 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
|
||||
uint64_t bytes_read = 0;
|
||||
|
||||
ASSERT_TRUE(reader
|
||||
->GetBlob(read_options, key, blob_offset - 1, blob_size,
|
||||
kNoCompression, &value, &bytes_read)
|
||||
->GetBlob(read_options, keys[0], blob_offsets[0] - 1,
|
||||
blob_sizes[0], kNoCompression, &value,
|
||||
&bytes_read)
|
||||
.IsCorruption());
|
||||
ASSERT_EQ(bytes_read, 0);
|
||||
}
|
||||
@ -193,8 +252,9 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
|
||||
uint64_t bytes_read = 0;
|
||||
|
||||
ASSERT_TRUE(reader
|
||||
->GetBlob(read_options, key, blob_offset + 1, blob_size,
|
||||
kNoCompression, &value, &bytes_read)
|
||||
->GetBlob(read_options, keys[2], blob_offsets[2] + 1,
|
||||
blob_sizes[2], kNoCompression, &value,
|
||||
&bytes_read)
|
||||
.IsCorruption());
|
||||
ASSERT_EQ(bytes_read, 0);
|
||||
}
|
||||
@ -205,8 +265,8 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
|
||||
uint64_t bytes_read = 0;
|
||||
|
||||
ASSERT_TRUE(reader
|
||||
->GetBlob(read_options, key, blob_offset, blob_size, kZSTD,
|
||||
&value, &bytes_read)
|
||||
->GetBlob(read_options, keys[0], blob_offsets[0],
|
||||
blob_sizes[0], kZSTD, &value, &bytes_read)
|
||||
.IsCorruption());
|
||||
ASSERT_EQ(bytes_read, 0);
|
||||
}
|
||||
@ -219,23 +279,82 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
|
||||
|
||||
ASSERT_TRUE(reader
|
||||
->GetBlob(read_options, shorter_key,
|
||||
blob_offset - (sizeof(key) - sizeof(shorter_key)),
|
||||
blob_size, kNoCompression, &value, &bytes_read)
|
||||
blob_offsets[0] -
|
||||
(keys[0].size() - sizeof(shorter_key) + 1),
|
||||
blob_sizes[0], kNoCompression, &value,
|
||||
&bytes_read)
|
||||
.IsCorruption());
|
||||
ASSERT_EQ(bytes_read, 0);
|
||||
|
||||
// MultiGetBlob
|
||||
autovector<std::reference_wrapper<const Slice>> key_refs;
|
||||
for (const auto& key_ref : keys) {
|
||||
key_refs.emplace_back(std::cref(key_ref));
|
||||
}
|
||||
Slice shorter_key_slice(shorter_key, sizeof(shorter_key) - 1);
|
||||
key_refs[1] = std::cref(shorter_key_slice);
|
||||
|
||||
autovector<uint64_t> offsets{
|
||||
blob_offsets[0],
|
||||
blob_offsets[1] - (keys[1].size() - key_refs[1].get().size()),
|
||||
blob_offsets[2]};
|
||||
autovector<uint64_t> sizes{blob_sizes[0], blob_sizes[1], blob_sizes[2]};
|
||||
std::array<Status, num_blobs> statuses_buf;
|
||||
autovector<Status*> statuses{&statuses_buf[0], &statuses_buf[1],
|
||||
&statuses_buf[2]};
|
||||
std::array<PinnableSlice, num_blobs> value_buf;
|
||||
autovector<PinnableSlice*> values{&value_buf[0], &value_buf[1],
|
||||
&value_buf[2]};
|
||||
reader->MultiGetBlob(read_options, key_refs, offsets, sizes, statuses,
|
||||
values, &bytes_read);
|
||||
for (size_t i = 0; i < num_blobs; ++i) {
|
||||
if (i == 1) {
|
||||
ASSERT_TRUE(statuses_buf[i].IsCorruption());
|
||||
} else {
|
||||
ASSERT_OK(statuses_buf[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Incorrect key
|
||||
{
|
||||
constexpr char incorrect_key[] = "foo";
|
||||
constexpr char incorrect_key[] = "foo1";
|
||||
PinnableSlice value;
|
||||
uint64_t bytes_read = 0;
|
||||
|
||||
ASSERT_TRUE(reader
|
||||
->GetBlob(read_options, incorrect_key, blob_offset,
|
||||
blob_size, kNoCompression, &value, &bytes_read)
|
||||
->GetBlob(read_options, incorrect_key, blob_offsets[0],
|
||||
blob_sizes[0], kNoCompression, &value,
|
||||
&bytes_read)
|
||||
.IsCorruption());
|
||||
ASSERT_EQ(bytes_read, 0);
|
||||
|
||||
// MultiGetBlob
|
||||
autovector<std::reference_wrapper<const Slice>> key_refs;
|
||||
for (const auto& key_ref : keys) {
|
||||
key_refs.emplace_back(std::cref(key_ref));
|
||||
}
|
||||
Slice wrong_key_slice(incorrect_key, sizeof(incorrect_key) - 1);
|
||||
key_refs[2] = std::cref(wrong_key_slice);
|
||||
|
||||
autovector<uint64_t> offsets{blob_offsets[0], blob_offsets[1],
|
||||
blob_offsets[2]};
|
||||
autovector<uint64_t> sizes{blob_sizes[0], blob_sizes[1], blob_sizes[2]};
|
||||
std::array<Status, num_blobs> statuses_buf;
|
||||
autovector<Status*> statuses{&statuses_buf[0], &statuses_buf[1],
|
||||
&statuses_buf[2]};
|
||||
std::array<PinnableSlice, num_blobs> value_buf;
|
||||
autovector<PinnableSlice*> values{&value_buf[0], &value_buf[1],
|
||||
&value_buf[2]};
|
||||
reader->MultiGetBlob(read_options, key_refs, offsets, sizes, statuses,
|
||||
values, &bytes_read);
|
||||
for (size_t i = 0; i < num_blobs; ++i) {
|
||||
if (i == num_blobs - 1) {
|
||||
ASSERT_TRUE(statuses_buf[i].IsCorruption());
|
||||
} else {
|
||||
ASSERT_OK(statuses_buf[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Incorrect value size
|
||||
@ -244,10 +363,35 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
|
||||
uint64_t bytes_read = 0;
|
||||
|
||||
ASSERT_TRUE(reader
|
||||
->GetBlob(read_options, key, blob_offset, blob_size + 1,
|
||||
kNoCompression, &value, &bytes_read)
|
||||
->GetBlob(read_options, keys[1], blob_offsets[1],
|
||||
blob_sizes[1] + 1, kNoCompression, &value,
|
||||
&bytes_read)
|
||||
.IsCorruption());
|
||||
ASSERT_EQ(bytes_read, 0);
|
||||
|
||||
// MultiGetBlob
|
||||
autovector<std::reference_wrapper<const Slice>> key_refs;
|
||||
for (const auto& key_ref : keys) {
|
||||
key_refs.emplace_back(std::cref(key_ref));
|
||||
}
|
||||
autovector<uint64_t> offsets{blob_offsets[0], blob_offsets[1],
|
||||
blob_offsets[2]};
|
||||
autovector<uint64_t> sizes{blob_sizes[0], blob_sizes[1] + 1, blob_sizes[2]};
|
||||
std::array<Status, num_blobs> statuses_buf;
|
||||
autovector<Status*> statuses{&statuses_buf[0], &statuses_buf[1],
|
||||
&statuses_buf[2]};
|
||||
std::array<PinnableSlice, num_blobs> value_buf;
|
||||
autovector<PinnableSlice*> values{&value_buf[0], &value_buf[1],
|
||||
&value_buf[2]};
|
||||
reader->MultiGetBlob(read_options, key_refs, offsets, sizes, statuses,
|
||||
values, &bytes_read);
|
||||
for (size_t i = 0; i < num_blobs; ++i) {
|
||||
if (i != 1) {
|
||||
ASSERT_OK(statuses_buf[i]);
|
||||
} else {
|
||||
ASSERT_TRUE(statuses_buf[i].IsCorruption());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -52,6 +52,9 @@ class BlobIndex {
|
||||
|
||||
BlobIndex() : type_(Type::kUnknown) {}
|
||||
|
||||
BlobIndex(const BlobIndex&) = default;
|
||||
BlobIndex& operator=(const BlobIndex&) = default;
|
||||
|
||||
bool IsInlined() const { return type_ == Type::kInlinedTTL; }
|
||||
|
||||
bool HasTTL() const {
|
||||
|
@ -127,6 +127,46 @@ TEST_F(DBBlobBasicTest, MultiGetBlobs) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBBlobBasicTest, MultiGetBlobsFromMultipleFiles) {
|
||||
Options options = GetDefaultOptions();
|
||||
options.enable_blob_files = true;
|
||||
options.min_blob_size = 0;
|
||||
|
||||
Reopen(options);
|
||||
|
||||
constexpr size_t kNumBlobFiles = 3;
|
||||
constexpr size_t kNumBlobsPerFile = 3;
|
||||
constexpr size_t kNumKeys = kNumBlobsPerFile * kNumBlobFiles;
|
||||
|
||||
std::vector<std::string> key_strs;
|
||||
std::vector<std::string> value_strs;
|
||||
for (size_t i = 0; i < kNumBlobFiles; ++i) {
|
||||
for (size_t j = 0; j < kNumBlobsPerFile; ++j) {
|
||||
std::string key = "key" + std::to_string(i) + "_" + std::to_string(j);
|
||||
std::string value =
|
||||
"value_as_blob" + std::to_string(i) + "_" + std::to_string(j);
|
||||
ASSERT_OK(Put(key, value));
|
||||
key_strs.push_back(key);
|
||||
value_strs.push_back(value);
|
||||
}
|
||||
ASSERT_OK(Flush());
|
||||
}
|
||||
assert(key_strs.size() == kNumKeys);
|
||||
std::array<Slice, kNumKeys> keys;
|
||||
for (size_t i = 0; i < keys.size(); ++i) {
|
||||
keys[i] = key_strs[i];
|
||||
}
|
||||
std::array<PinnableSlice, kNumKeys> values;
|
||||
std::array<Status, kNumKeys> statuses;
|
||||
db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), kNumKeys, &keys[0],
|
||||
&values[0], &statuses[0]);
|
||||
|
||||
for (size_t i = 0; i < kNumKeys; ++i) {
|
||||
ASSERT_OK(statuses[i]);
|
||||
ASSERT_EQ(value_strs[i], values[i]);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBBlobBasicTest, GetBlob_CorruptIndex) {
|
||||
Options options = GetDefaultOptions();
|
||||
options.enable_blob_files = true;
|
||||
@ -150,6 +190,83 @@ TEST_F(DBBlobBasicTest, GetBlob_CorruptIndex) {
|
||||
.IsCorruption());
|
||||
}
|
||||
|
||||
TEST_F(DBBlobBasicTest, MultiGetBlob_CorruptIndex) {
|
||||
Options options = GetDefaultOptions();
|
||||
options.enable_blob_files = true;
|
||||
options.min_blob_size = 0;
|
||||
options.create_if_missing = true;
|
||||
|
||||
DestroyAndReopen(options);
|
||||
|
||||
constexpr size_t kNumOfKeys = 3;
|
||||
std::array<std::string, kNumOfKeys> key_strs;
|
||||
std::array<std::string, kNumOfKeys> value_strs;
|
||||
std::array<Slice, kNumOfKeys + 1> keys;
|
||||
for (size_t i = 0; i < kNumOfKeys; ++i) {
|
||||
key_strs[i] = "foo" + std::to_string(i);
|
||||
value_strs[i] = "blob_value" + std::to_string(i);
|
||||
ASSERT_OK(Put(key_strs[i], value_strs[i]));
|
||||
keys[i] = key_strs[i];
|
||||
}
|
||||
|
||||
constexpr char key[] = "key";
|
||||
{
|
||||
// Fake a corrupt blob index.
|
||||
const std::string blob_index("foobar");
|
||||
WriteBatch batch;
|
||||
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index));
|
||||
ASSERT_OK(db_->Write(WriteOptions(), &batch));
|
||||
keys[kNumOfKeys] = Slice(static_cast<const char*>(key), sizeof(key) - 1);
|
||||
}
|
||||
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
std::array<PinnableSlice, kNumOfKeys + 1> values;
|
||||
std::array<Status, kNumOfKeys + 1> statuses;
|
||||
db_->MultiGet(ReadOptions(), dbfull()->DefaultColumnFamily(), kNumOfKeys + 1,
|
||||
keys.data(), values.data(), statuses.data(),
|
||||
/*sorted_input=*/false);
|
||||
for (size_t i = 0; i < kNumOfKeys + 1; ++i) {
|
||||
if (i != kNumOfKeys) {
|
||||
ASSERT_OK(statuses[i]);
|
||||
ASSERT_EQ("blob_value" + std::to_string(i), values[i]);
|
||||
} else {
|
||||
ASSERT_TRUE(statuses[i].IsCorruption());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBBlobBasicTest, MultiGetBlob_ExceedSoftLimit) {
|
||||
Options options = GetDefaultOptions();
|
||||
options.enable_blob_files = true;
|
||||
options.min_blob_size = 0;
|
||||
|
||||
Reopen(options);
|
||||
|
||||
constexpr size_t kNumOfKeys = 3;
|
||||
std::array<std::string, kNumOfKeys> key_bufs;
|
||||
std::array<std::string, kNumOfKeys> value_bufs;
|
||||
std::array<Slice, kNumOfKeys> keys;
|
||||
for (size_t i = 0; i < kNumOfKeys; ++i) {
|
||||
key_bufs[i] = "foo" + std::to_string(i);
|
||||
value_bufs[i] = "blob_value" + std::to_string(i);
|
||||
ASSERT_OK(Put(key_bufs[i], value_bufs[i]));
|
||||
keys[i] = key_bufs[i];
|
||||
}
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
std::array<PinnableSlice, kNumOfKeys> values;
|
||||
std::array<Status, kNumOfKeys> statuses;
|
||||
ReadOptions read_opts;
|
||||
read_opts.value_size_soft_limit = 1;
|
||||
db_->MultiGet(read_opts, dbfull()->DefaultColumnFamily(), kNumOfKeys,
|
||||
keys.data(), values.data(), statuses.data(),
|
||||
/*sorted_input=*/true);
|
||||
for (const auto& s : statuses) {
|
||||
ASSERT_TRUE(s.IsAborted());
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBBlobBasicTest, GetBlob_InlinedTTLIndex) {
|
||||
constexpr uint64_t min_blob_size = 10;
|
||||
|
||||
@ -522,11 +639,21 @@ class DBBlobBasicIOErrorTest : public DBBlobBasicTest,
|
||||
std::string sync_point_;
|
||||
};
|
||||
|
||||
class DBBlobBasicIOErrorMultiGetTest : public DBBlobBasicIOErrorTest {
|
||||
public:
|
||||
DBBlobBasicIOErrorMultiGetTest() : DBBlobBasicIOErrorTest() {}
|
||||
};
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(DBBlobBasicTest, DBBlobBasicIOErrorTest,
|
||||
::testing::ValuesIn(std::vector<std::string>{
|
||||
"BlobFileReader::OpenFile:NewRandomAccessFile",
|
||||
"BlobFileReader::GetBlob:ReadFromFile"}));
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(DBBlobBasicTest, DBBlobBasicIOErrorMultiGetTest,
|
||||
::testing::ValuesIn(std::vector<std::string>{
|
||||
"BlobFileReader::OpenFile:NewRandomAccessFile",
|
||||
"BlobFileReader::MultiGetBlob:ReadFromFile"}));
|
||||
|
||||
TEST_P(DBBlobBasicIOErrorTest, GetBlob_IOError) {
|
||||
Options options;
|
||||
options.env = fault_injection_env_.get();
|
||||
@ -556,7 +683,7 @@ TEST_P(DBBlobBasicIOErrorTest, GetBlob_IOError) {
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
}
|
||||
|
||||
TEST_P(DBBlobBasicIOErrorTest, MultiGetBlobs_IOError) {
|
||||
TEST_P(DBBlobBasicIOErrorMultiGetTest, MultiGetBlobs_IOError) {
|
||||
Options options = GetDefaultOptions();
|
||||
options.env = fault_injection_env_.get();
|
||||
options.enable_blob_files = true;
|
||||
@ -598,6 +725,53 @@ TEST_P(DBBlobBasicIOErrorTest, MultiGetBlobs_IOError) {
|
||||
ASSERT_TRUE(statuses[1].IsIOError());
|
||||
}
|
||||
|
||||
TEST_P(DBBlobBasicIOErrorMultiGetTest, MultipleBlobFiles) {
|
||||
Options options = GetDefaultOptions();
|
||||
options.env = fault_injection_env_.get();
|
||||
options.enable_blob_files = true;
|
||||
options.min_blob_size = 0;
|
||||
|
||||
Reopen(options);
|
||||
|
||||
constexpr size_t num_keys = 2;
|
||||
|
||||
constexpr char key1[] = "key1";
|
||||
constexpr char value1[] = "blob1";
|
||||
|
||||
ASSERT_OK(Put(key1, value1));
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
constexpr char key2[] = "key2";
|
||||
constexpr char value2[] = "blob2";
|
||||
|
||||
ASSERT_OK(Put(key2, value2));
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
std::array<Slice, num_keys> keys{{key1, key2}};
|
||||
std::array<PinnableSlice, num_keys> values;
|
||||
std::array<Status, num_keys> statuses;
|
||||
|
||||
bool first_blob_file = true;
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
sync_point_, [&first_blob_file, this](void* /* arg */) {
|
||||
if (first_blob_file) {
|
||||
first_blob_file = false;
|
||||
return;
|
||||
}
|
||||
fault_injection_env_->SetFilesystemActive(false,
|
||||
Status::IOError(sync_point_));
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys,
|
||||
keys.data(), values.data(), statuses.data());
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
ASSERT_OK(statuses[0]);
|
||||
ASSERT_EQ(value1, values[0]);
|
||||
ASSERT_TRUE(statuses[1].IsIOError());
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
class ReadBlobCompactionFilter : public CompactionFilter {
|
||||
|
@ -2162,7 +2162,7 @@ bool DBImpl::MultiCFSnapshot(
|
||||
// consecutive retries, it means the write rate is very high. In that case
|
||||
// its probably ok to take the mutex on the 3rd try so we can succeed for
|
||||
// sure
|
||||
static const int num_retries = 3;
|
||||
constexpr int num_retries = 3;
|
||||
for (int i = 0; i < num_retries; ++i) {
|
||||
last_try = (i == num_retries - 1);
|
||||
bool retry = false;
|
||||
@ -2192,8 +2192,9 @@ bool DBImpl::MultiCFSnapshot(
|
||||
*snapshot = versions_->LastPublishedSequence();
|
||||
}
|
||||
} else {
|
||||
*snapshot = reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
|
||||
->number_;
|
||||
*snapshot =
|
||||
static_cast_with_check<const SnapshotImpl>(read_options.snapshot)
|
||||
->number_;
|
||||
}
|
||||
for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end();
|
||||
++cf_iter) {
|
||||
@ -2394,17 +2395,9 @@ void DBImpl::PrepareMultiGetKeys(
|
||||
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
|
||||
if (sorted_input) {
|
||||
#ifndef NDEBUG
|
||||
CompareKeyContext key_context_less;
|
||||
|
||||
for (size_t index = 1; index < sorted_keys->size(); ++index) {
|
||||
const KeyContext* const lhs = (*sorted_keys)[index - 1];
|
||||
const KeyContext* const rhs = (*sorted_keys)[index];
|
||||
|
||||
// lhs should be <= rhs, or in other words, rhs should NOT be < lhs
|
||||
assert(!key_context_less(rhs, lhs));
|
||||
}
|
||||
assert(std::is_sorted(sorted_keys->begin(), sorted_keys->end(),
|
||||
CompareKeyContext()));
|
||||
#endif
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -25,6 +25,7 @@
|
||||
#include "db/blob/blob_file_cache.h"
|
||||
#include "db/blob/blob_file_reader.h"
|
||||
#include "db/blob/blob_index.h"
|
||||
#include "db/blob/blob_log_format.h"
|
||||
#include "db/internal_stats.h"
|
||||
#include "db/log_reader.h"
|
||||
#include "db/log_writer.h"
|
||||
@ -1863,6 +1864,112 @@ Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
|
||||
return s;
|
||||
}
|
||||
|
||||
void Version::MultiGetBlob(
|
||||
const ReadOptions& read_options, MultiGetRange& range,
|
||||
const std::unordered_map<uint64_t, BlobReadRequests>& blob_rqs) {
|
||||
if (read_options.read_tier == kBlockCacheTier) {
|
||||
Status s = Status::Incomplete("Cannot read blob(s): no disk I/O allowed");
|
||||
for (const auto& elem : blob_rqs) {
|
||||
for (const auto& blob_rq : elem.second) {
|
||||
const KeyContext& key_context = blob_rq.second;
|
||||
assert(key_context.s);
|
||||
assert(key_context.s->ok());
|
||||
*(key_context.s) = s;
|
||||
assert(key_context.get_context);
|
||||
auto& get_context = *(key_context.get_context);
|
||||
get_context.MarkKeyMayExist();
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
assert(!blob_rqs.empty());
|
||||
Status status;
|
||||
const auto& blob_files = storage_info_.GetBlobFiles();
|
||||
for (auto& elem : blob_rqs) {
|
||||
uint64_t blob_file_number = elem.first;
|
||||
if (blob_files.find(blob_file_number) == blob_files.end()) {
|
||||
auto& blobs_in_file = elem.second;
|
||||
for (const auto& blob : blobs_in_file) {
|
||||
const KeyContext& key_context = blob.second;
|
||||
*(key_context.s) = Status::Corruption("Invalid blob file number");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
CacheHandleGuard<BlobFileReader> blob_file_reader;
|
||||
assert(blob_file_cache_);
|
||||
status = blob_file_cache_->GetBlobFileReader(blob_file_number,
|
||||
&blob_file_reader);
|
||||
assert(!status.ok() || blob_file_reader.GetValue());
|
||||
|
||||
auto& blobs_in_file = elem.second;
|
||||
if (!status.ok()) {
|
||||
for (const auto& blob : blobs_in_file) {
|
||||
const KeyContext& key_context = blob.second;
|
||||
*(key_context.s) = status;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
assert(blob_file_reader.GetValue());
|
||||
const uint64_t file_size = blob_file_reader.GetValue()->GetFileSize();
|
||||
const CompressionType compression =
|
||||
blob_file_reader.GetValue()->GetCompressionType();
|
||||
|
||||
// TODO: sort blobs_in_file by file offset.
|
||||
autovector<std::reference_wrapper<const KeyContext>> blob_read_key_contexts;
|
||||
autovector<std::reference_wrapper<const Slice>> user_keys;
|
||||
autovector<uint64_t> offsets;
|
||||
autovector<uint64_t> value_sizes;
|
||||
autovector<Status*> statuses;
|
||||
autovector<PinnableSlice*> values;
|
||||
for (const auto& blob : blobs_in_file) {
|
||||
const auto& blob_index = blob.first;
|
||||
const KeyContext& key_context = blob.second;
|
||||
if (blob_index.HasTTL() || blob_index.IsInlined()) {
|
||||
*(key_context.s) =
|
||||
Status::Corruption("Unexpected TTL/inlined blob index");
|
||||
continue;
|
||||
}
|
||||
const uint64_t key_size = key_context.ukey_with_ts.size();
|
||||
const uint64_t offset = blob_index.offset();
|
||||
const uint64_t value_size = blob_index.size();
|
||||
if (!IsValidBlobOffset(offset, key_size, value_size, file_size)) {
|
||||
*(key_context.s) = Status::Corruption("Invalid blob offset");
|
||||
continue;
|
||||
}
|
||||
if (blob_index.compression() != compression) {
|
||||
*(key_context.s) =
|
||||
Status::Corruption("Compression type mismatch when reading a blob");
|
||||
continue;
|
||||
}
|
||||
blob_read_key_contexts.emplace_back(std::cref(key_context));
|
||||
user_keys.emplace_back(std::cref(key_context.ukey_with_ts));
|
||||
offsets.push_back(blob_index.offset());
|
||||
value_sizes.push_back(blob_index.size());
|
||||
statuses.push_back(key_context.s);
|
||||
values.push_back(key_context.value);
|
||||
}
|
||||
blob_file_reader.GetValue()->MultiGetBlob(read_options, user_keys, offsets,
|
||||
value_sizes, statuses, values,
|
||||
/*bytes_read=*/nullptr);
|
||||
size_t num = blob_read_key_contexts.size();
|
||||
assert(num == user_keys.size());
|
||||
assert(num == offsets.size());
|
||||
assert(num == value_sizes.size());
|
||||
assert(num == statuses.size());
|
||||
assert(num == values.size());
|
||||
for (size_t i = 0; i < num; ++i) {
|
||||
if (statuses[i]->ok()) {
|
||||
range.AddValueSize(blob_read_key_contexts[i].get().value->size());
|
||||
if (range.GetValueSize() > read_options.value_size_soft_limit) {
|
||||
*(blob_read_key_contexts[i].get().s) = Status::Aborted();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void Version::Get(const ReadOptions& read_options, const LookupKey& k,
|
||||
PinnableSlice* value, std::string* timestamp, Status* status,
|
||||
MergeContext* merge_context,
|
||||
@ -2085,6 +2192,10 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
|
||||
uint64_t num_data_read = 0;
|
||||
uint64_t num_sst_read = 0;
|
||||
|
||||
MultiGetRange keys_with_blobs_range(*range, range->begin(), range->end());
|
||||
// blob_file => [[blob_idx, it], ...]
|
||||
std::unordered_map<uint64_t, BlobReadRequests> blob_rqs;
|
||||
|
||||
while (f != nullptr) {
|
||||
MultiGetRange file_range = fp.CurrentFileRange();
|
||||
bool timer_enabled =
|
||||
@ -2170,24 +2281,24 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
|
||||
|
||||
if (iter->is_blob_index) {
|
||||
if (iter->value) {
|
||||
constexpr uint64_t* bytes_read = nullptr;
|
||||
|
||||
*status = GetBlob(read_options, iter->ukey_with_ts, *iter->value,
|
||||
iter->value, bytes_read);
|
||||
if (!status->ok()) {
|
||||
if (status->IsIncomplete()) {
|
||||
get_context.MarkKeyMayExist();
|
||||
}
|
||||
|
||||
continue;
|
||||
const Slice& blob_index_slice = *(iter->value);
|
||||
BlobIndex blob_index;
|
||||
Status tmp_s = blob_index.DecodeFrom(blob_index_slice);
|
||||
if (tmp_s.ok()) {
|
||||
const uint64_t blob_file_num = blob_index.file_number();
|
||||
blob_rqs[blob_file_num].emplace_back(
|
||||
std::make_pair(blob_index, std::cref(*iter)));
|
||||
} else {
|
||||
*(iter->s) = tmp_s;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
file_range.AddValueSize(iter->value->size());
|
||||
if (file_range.GetValueSize() > read_options.value_size_soft_limit) {
|
||||
s = Status::Aborted();
|
||||
break;
|
||||
} else {
|
||||
file_range.AddValueSize(iter->value->size());
|
||||
if (file_range.GetValueSize() >
|
||||
read_options.value_size_soft_limit) {
|
||||
s = Status::Aborted();
|
||||
break;
|
||||
}
|
||||
}
|
||||
continue;
|
||||
case GetContext::kDeleted:
|
||||
@ -2233,6 +2344,10 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
|
||||
f = fp.GetNextFile();
|
||||
}
|
||||
|
||||
if (s.ok() && !blob_rqs.empty()) {
|
||||
MultiGetBlob(read_options, keys_with_blobs_range, blob_rqs);
|
||||
}
|
||||
|
||||
// Process any left over keys
|
||||
for (auto iter = range->begin(); s.ok() && iter != range->end(); ++iter) {
|
||||
GetContext& get_context = *iter->get_context;
|
||||
|
@ -713,6 +713,12 @@ class Version {
|
||||
const BlobIndex& blob_index, PinnableSlice* value,
|
||||
uint64_t* bytes_read) const;
|
||||
|
||||
using BlobReadRequests = std::vector<
|
||||
std::pair<BlobIndex, std::reference_wrapper<const KeyContext>>>;
|
||||
void MultiGetBlob(
|
||||
const ReadOptions& read_options, MultiGetRange& range,
|
||||
const std::unordered_map<uint64_t, BlobReadRequests>& blob_rqs);
|
||||
|
||||
// Loads some stats information from files. Call without mutex held. It needs
|
||||
// to be called before applying the version to the version set.
|
||||
void PrepareApply(const MutableCFOptions& mutable_cf_options,
|
||||
|
@ -97,6 +97,8 @@ class MultiGetContext {
|
||||
// that need to be performed
|
||||
static const int MAX_BATCH_SIZE = 32;
|
||||
|
||||
static_assert(MAX_BATCH_SIZE < 64, "MAX_BATCH_SIZE cannot exceed 63");
|
||||
|
||||
MultiGetContext(autovector<KeyContext*, MAX_BATCH_SIZE>* sorted_keys,
|
||||
size_t begin, size_t num_keys, SequenceNumber snapshot,
|
||||
const ReadOptions& read_opts)
|
||||
@ -104,6 +106,7 @@ class MultiGetContext {
|
||||
value_mask_(0),
|
||||
value_size_(0),
|
||||
lookup_key_ptr_(reinterpret_cast<LookupKey*>(lookup_key_stack_buf)) {
|
||||
assert(num_keys <= MAX_BATCH_SIZE);
|
||||
if (num_keys > MAX_LOOKUP_KEYS_ON_STACK) {
|
||||
lookup_key_heap_buf.reset(new char[sizeof(LookupKey) * num_keys]);
|
||||
lookup_key_ptr_ = reinterpret_cast<LookupKey*>(
|
||||
@ -236,6 +239,10 @@ class MultiGetContext {
|
||||
skip_mask_ |= uint64_t{1} << iter.index_;
|
||||
}
|
||||
|
||||
bool IsKeySkipped(const Iterator& iter) const {
|
||||
return skip_mask_ & (uint64_t{1} << iter.index_);
|
||||
}
|
||||
|
||||
// Update the value_mask_ in MultiGetContext so its
|
||||
// immediately reflected in all the Range Iterators
|
||||
void MarkKeyDone(Iterator& iter) {
|
||||
|
Loading…
Reference in New Issue
Block a user