Implement a basic coroutine MultiGet without async IO
Summary: Implement a basic coroutine version of MultiGet that is still synchronous, but lays most of the groundwork by converting the code into coroutine code, and setting up the dependencies required in order to compile with folly coroutines.
This commit is contained in:
parent
1ee57c5f28
commit
ebd85d1a8b
10
Makefile
10
Makefile
@ -136,6 +136,13 @@ CXXFLAGS += $(PLATFORM_SHARED_CFLAGS) -DROCKSDB_DLL
|
||||
CFLAGS += $(PLATFORM_SHARED_CFLAGS) -DROCKSDB_DLL
|
||||
endif
|
||||
|
||||
ifeq ($(USE_COROUTINES), 1)
|
||||
USE_FOLLY = 1
|
||||
OPT += -DUSE_COROUTINES
|
||||
ROCKSDB_CXX_STANDARD = c++2a
|
||||
USE_RTTI = 1
|
||||
endif
|
||||
|
||||
# if we're compiling for release, compile without debug code (-DNDEBUG)
|
||||
ifeq ($(DEBUG_LEVEL),0)
|
||||
OPT += -DNDEBUG
|
||||
@ -226,6 +233,7 @@ dummy := $(shell (export ROCKSDB_ROOT="$(CURDIR)"; \
|
||||
export ROCKSDB_NO_FBCODE="$(ROCKSDB_NO_FBCODE)"; \
|
||||
export USE_CLANG="$(USE_CLANG)"; \
|
||||
export LIB_MODE="$(LIB_MODE)"; \
|
||||
export ROCKSDB_CXX_STANDARD="$(ROCKSDB_CXX_STANDARD)"; \
|
||||
"$(CURDIR)/build_tools/build_detect_platform" "$(CURDIR)/make_config.mk"))
|
||||
# this file is generated by the previous line to set build flags and sources
|
||||
include make_config.mk
|
||||
@ -2344,7 +2352,7 @@ checkout_folly:
|
||||
cd third-party/folly && git reset --hard 98b9b2c1124e99f50f9085ddee74ce32afffc665
|
||||
@# A hack to remove boost dependency.
|
||||
@# NOTE: this hack is not needed if using FBCODE compiler config
|
||||
perl -pi -e 's/^(#include <boost)/\/\/$$1/' third-party/folly/folly/functional/Invoke.h
|
||||
@#perl -pi -e 's/^(#include <boost)/\/\/$$1/' third-party/folly/folly/functional/Invoke.h
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Build size testing
|
||||
|
22
TARGETS
22
TARGETS
@ -327,7 +327,15 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
|
||||
"utilities/wal_filter.cc",
|
||||
"utilities/write_batch_with_index/write_batch_with_index.cc",
|
||||
"utilities/write_batch_with_index/write_batch_with_index_internal.cc",
|
||||
], deps=["//folly/container:f14_hash"], headers=None, link_whole=False, extra_test_libs=False)
|
||||
],
|
||||
deps=[
|
||||
"//folly/container:f14_hash",
|
||||
"//folly/experimental/coro:blocking_wait",
|
||||
"//folly/experimental/coro:collect",
|
||||
"//folly/experimental/coro:coroutine",
|
||||
"//folly/experimental/coro:task",
|
||||
],
|
||||
headers=None, link_whole=False, extra_test_libs=False)
|
||||
|
||||
cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
|
||||
"cache/cache.cc",
|
||||
@ -648,7 +656,15 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
|
||||
"utilities/wal_filter.cc",
|
||||
"utilities/write_batch_with_index/write_batch_with_index.cc",
|
||||
"utilities/write_batch_with_index/write_batch_with_index_internal.cc",
|
||||
], deps=["//folly/container:f14_hash"], headers=None, link_whole=True, extra_test_libs=False)
|
||||
],
|
||||
deps=[
|
||||
"//folly/container:f14_hash",
|
||||
"//folly/experimental/coro:blocking_wait",
|
||||
"//folly/experimental/coro:collect",
|
||||
"//folly/experimental/coro:coroutine",
|
||||
"//folly/experimental/coro:task",
|
||||
],
|
||||
headers=None, link_whole=True, extra_test_libs=False)
|
||||
|
||||
cpp_library_wrapper(name="rocksdb_test_lib", srcs=[
|
||||
"db/db_test_util.cc",
|
||||
@ -698,6 +714,8 @@ cpp_binary_wrapper(name="ribbon_bench", srcs=["microbench/ribbon_bench.cc"], dep
|
||||
|
||||
cpp_binary_wrapper(name="db_basic_bench", srcs=["microbench/db_basic_bench.cc"], deps=[], extra_preprocessor_flags=[], extra_bench_libs=True)
|
||||
|
||||
cpp_binary_wrapper(name="db_bench", srcs=["tools/db_bench.cc"], deps=[":rocksdb_tools_lib"], extra_preprocessor_flags=[], extra_bench_libs=False)
|
||||
|
||||
add_c_test_wrapper()
|
||||
|
||||
fancy_bench_wrapper(suite_name="rocksdb_microbench_suite_0", binary_to_bench_to_metric_list_map={'db_basic_bench': {'DBGet/comp_style:1/max_data:134217728/per_key_size:256/enable_statistics:1/negative_query:0/enable_filter:1/iterations:10240/threads:1': ['db_size',
|
||||
|
@ -20,3 +20,7 @@ VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/6ae525939ad02e5e676855082fbbc7828d
|
||||
LUA_BASE=/mnt/gvfs/third-party2/lua/162efd9561a3d21f6869f4814011e9cf1b3ff4dc/5.3.4/platform009/a6271c4
|
||||
BENCHMARK_BASE=/mnt/gvfs/third-party2/benchmark/30bf49ad6414325e17f3425b0edcb64239427ae3/1.6.1/platform009/7f3b187
|
||||
BOOST_BASE=/mnt/gvfs/third-party2/boost/201b7d74941e54b436dfa364a063aa6d2cd7de4c/1.69.0/platform009/8a7ffdf
|
||||
GLOG_BASE=/mnt/gvfs/third-party2/glog/32d751bd5673375b438158717ab6a57c1cc57e3d/0.3.2_fb/platform009/10a364d/
|
||||
FMT_BASE=/mnt/gvfs/third-party2/fmt/ce0c25f67165f4d2c22a29b8ef50f5600d7873ca/6.1.1/platform009/7f3b187/
|
||||
DBL_CONV_BASE=/mnt/gvfs/third-party2/double_conversion/109b3d9696d71f1048678cd7da1e22505470543d/20141126/platform009/7f3b187/
|
||||
LIBEVENT_BASE=/mnt/gvfs/third-party2/libevent/4a4d3a79a76c2439b6bd471bf3586b3481dde75e/1.4.14b_hphp/platform009/7f3b187/
|
||||
|
@ -14,7 +14,7 @@ source "$BASEDIR/dependencies_platform009.sh"
|
||||
CFLAGS=""
|
||||
|
||||
# libgcc
|
||||
LIBGCC_INCLUDE="$LIBGCC_BASE/include/c++/9.3.0"
|
||||
LIBGCC_INCLUDE="$LIBGCC_BASE/include/c++/9.3.0 -I $LIBGCC_BASE/include/c++/9.3.0/backward"
|
||||
LIBGCC_LIBS=" -L $LIBGCC_BASE/lib"
|
||||
|
||||
# glibc
|
||||
@ -70,6 +70,18 @@ BENCHMARK_LIBS=" $BENCHMARK_BASE/lib/libbenchmark${MAYBE_PIC}.a"
|
||||
|
||||
BOOST_INCLUDE=" -I $BOOST_BASE/include/"
|
||||
|
||||
GLOG_INCLUDE=" -I $GLOG_BASE/include/"
|
||||
GLOG_LIBS=" $GLOG_BASE/lib/libglog${MAYBE_PIC}.a"
|
||||
|
||||
FMT_INCLUDE=" -I $FMT_BASE/include/"
|
||||
FMT_LIBS=" $FMT_BASE/lib/libfmt${MAYBE_PIC}.a"
|
||||
|
||||
DBL_CONV_INCLUDE=" -I $DBL_CONV_BASE/include/"
|
||||
DBL_CONV_LIBS=" $DBL_CONV_BASE/lib/libdouble-conversion${MAYBE_PIC}.a"
|
||||
|
||||
LIBEVENT_INCLUDE=" -I $LIBEVENT_BASE/include/"
|
||||
LIBEVENT_LIBS=" $LIBEVENT_BASE/lib/libevent${MAYBE_PIC}.a"
|
||||
|
||||
# location of jemalloc
|
||||
JEMALLOC_INCLUDE=" -I $JEMALLOC_BASE/include/"
|
||||
JEMALLOC_LIB=" $JEMALLOC_BASE/lib/libjemalloc${MAYBE_PIC}.a"
|
||||
@ -101,7 +113,7 @@ BINUTILS="$BINUTILS_BASE/bin"
|
||||
AR="$BINUTILS/ar"
|
||||
AS="$BINUTILS/as"
|
||||
|
||||
DEPS_INCLUDE="$SNAPPY_INCLUDE $ZLIB_INCLUDE $BZIP_INCLUDE $LZ4_INCLUDE $ZSTD_INCLUDE $GFLAGS_INCLUDE $NUMA_INCLUDE $TBB_INCLUDE $LIBURING_INCLUDE $BENCHMARK_INCLUDE $BOOST_INCLUDE"
|
||||
DEPS_INCLUDE="$SNAPPY_INCLUDE $ZLIB_INCLUDE $BZIP_INCLUDE $LZ4_INCLUDE $ZSTD_INCLUDE $GFLAGS_INCLUDE $NUMA_INCLUDE $TBB_INCLUDE $LIBURING_INCLUDE $BENCHMARK_INCLUDE $BOOST_INCLUDE $GLOG_INCLUDE $FMT_INCLUDE $DBL_CONV_INCLUDE $LIBEVENT_INCLUDE"
|
||||
|
||||
STDLIBS="-L $GCC_BASE/lib64"
|
||||
|
||||
|
@ -31,7 +31,23 @@
|
||||
#include "util/coding.h"
|
||||
#include "util/stop_watch.h"
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
namespace {
|
||||
template <class T>
|
||||
static void DeleteEntry(const Slice& /*key*/, void* value) {
|
||||
T* typed_value = reinterpret_cast<T*>(value);
|
||||
delete typed_value;
|
||||
}
|
||||
} // namespace
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
// Generate the regular and coroutine versions of some methods by
|
||||
// including table_cache_coro.h twice
|
||||
// clang-format off
|
||||
#define WITHOUT_COROUTINES
|
||||
#include "db/table_cache_coro.h"
|
||||
#undef WITHOUT_COROUTINES
|
||||
#define WITH_COROUTINES
|
||||
#include "db/table_cache_coro.h"
|
||||
// clang-format on
|
||||
|
||||
|
@ -24,6 +24,7 @@
|
||||
#include "rocksdb/table.h"
|
||||
#include "table/table_reader.h"
|
||||
#include "trace_replay/block_cache_tracer.h"
|
||||
#include "util/coro_utils.h"
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
@ -115,8 +116,8 @@ class TableCache {
|
||||
// in the embedded GetContext
|
||||
// @param skip_filters Disables loading/accessing the filter block
|
||||
// @param level The level this table is at, -1 for "not set / don't know"
|
||||
Status MultiGet(
|
||||
const ReadOptions& options,
|
||||
DECLARE_SYNC_AND_ASYNC(
|
||||
Status, MultiGet, const ReadOptions& options,
|
||||
const InternalKeyComparator& internal_comparator,
|
||||
const FileMetaData& file_meta, const MultiGetContext::Range* mget_range,
|
||||
const std::shared_ptr<const SliceTransform>& prefix_extractor = nullptr,
|
||||
|
@ -3,26 +3,21 @@
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
|
||||
#pragma once
|
||||
#include "util/coro_utils.h"
|
||||
|
||||
#if defined(WITHOUT_COROUTINES) || \
|
||||
(defined(USE_COROUTINES) && defined(WITH_COROUTINES))
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
namespace {
|
||||
|
||||
template <class T>
|
||||
static void DeleteEntry(const Slice& /*key*/, void* value) {
|
||||
T* typed_value = reinterpret_cast<T*>(value);
|
||||
delete typed_value;
|
||||
}
|
||||
}
|
||||
#if defined(WITHOUT_COROUTINES)
|
||||
#endif
|
||||
|
||||
// Batched version of TableCache::MultiGet.
|
||||
Status TableCache::MultiGet(
|
||||
const ReadOptions& options,
|
||||
const InternalKeyComparator& internal_comparator,
|
||||
const FileMetaData& file_meta, const MultiGetContext::Range* mget_range,
|
||||
const std::shared_ptr<const SliceTransform>& prefix_extractor,
|
||||
HistogramImpl* file_read_hist, bool skip_filters, int level) {
|
||||
DEFINE_SYNC_AND_ASYNC(Status, TableCache::MultiGet)
|
||||
(const ReadOptions& options, const InternalKeyComparator& internal_comparator,
|
||||
const FileMetaData& file_meta, const MultiGetContext::Range* mget_range,
|
||||
const std::shared_ptr<const SliceTransform>& prefix_extractor,
|
||||
HistogramImpl* file_read_hist, bool skip_filters, int level) {
|
||||
auto& fd = file_meta.fd;
|
||||
Status s;
|
||||
TableReader* t = fd.table_reader;
|
||||
@ -93,7 +88,8 @@ Status TableCache::MultiGet(
|
||||
}
|
||||
}
|
||||
if (s.ok()) {
|
||||
t->MultiGet(options, &table_range, prefix_extractor.get(), skip_filters);
|
||||
CO_AWAIT(t->MultiGet)
|
||||
(options, &table_range, prefix_extractor.get(), skip_filters);
|
||||
} else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) {
|
||||
for (auto iter = table_range.begin(); iter != table_range.end(); ++iter) {
|
||||
Status* status = iter->s;
|
||||
@ -138,6 +134,7 @@ Status TableCache::MultiGet(
|
||||
if (handle != nullptr) {
|
||||
ReleaseHandle(handle);
|
||||
}
|
||||
return s;
|
||||
CO_RETURN s;
|
||||
}
|
||||
} // ROCKSDB_NAMESPACE
|
||||
#endif
|
||||
|
@ -38,6 +38,10 @@
|
||||
#include "db/table_cache.h"
|
||||
#include "db/version_builder.h"
|
||||
#include "db/version_edit_handler.h"
|
||||
#if USE_COROUTINES
|
||||
#include "folly/experimental/coro/BlockingWait.h"
|
||||
#include "folly/experimental/coro/Collect.h"
|
||||
#endif
|
||||
#include "file/filename.h"
|
||||
#include "file/random_access_file_reader.h"
|
||||
#include "file/read_write_util.h"
|
||||
@ -62,11 +66,18 @@
|
||||
#include "test_util/sync_point.h"
|
||||
#include "util/cast_util.h"
|
||||
#include "util/coding.h"
|
||||
#include "util/coro_utils.h"
|
||||
#include "util/stop_watch.h"
|
||||
#include "util/string_util.h"
|
||||
#include "util/user_comparator_wrapper.h"
|
||||
|
||||
// Generate the regular and coroutine versions of some methods by
|
||||
// including version_set_coro.h twice
|
||||
// clang-format off
|
||||
#define WITHOUT_COROUTINES
|
||||
#include "db/version_set_coro.h"
|
||||
#undef WITHOUT_COROUTINES
|
||||
#define WITH_COROUTINES
|
||||
#include "db/version_set_coro.h"
|
||||
// clang-format on
|
||||
|
||||
@ -2216,15 +2227,49 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
|
||||
level = fp.GetHitFileLevel();
|
||||
}
|
||||
|
||||
if (f) {
|
||||
// Call MultiGetFromSST for looking up a single file
|
||||
s = MultiGetFromSST(read_options, fp.CurrentFileRange(),
|
||||
fp.GetHitFileLevel(), fp.IsHitFileLastInLevel(), f,
|
||||
blob_rqs, num_filter_read, num_index_read,
|
||||
num_data_read, num_sst_read);
|
||||
// Avoid using the coroutine version if we're looking in a L0 file, since
|
||||
// L0 files won't be parallelized anyway. The regular synchronous version
|
||||
// is faster.
|
||||
if (!read_options.async_io || !using_coroutines() ||
|
||||
fp.GetHitFileLevel() == 0) {
|
||||
if (f) {
|
||||
// Call MultiGetFromSST for looking up a single file
|
||||
s = MultiGetFromSST(read_options, fp.CurrentFileRange(),
|
||||
fp.GetHitFileLevel(), fp.IsHitFileLastInLevel(), f,
|
||||
blob_rqs, num_filter_read, num_index_read,
|
||||
num_data_read, num_sst_read);
|
||||
}
|
||||
if (s.ok()) {
|
||||
f = fp.GetNextFileInLevel();
|
||||
}
|
||||
#if USE_COROUTINES
|
||||
} else {
|
||||
std::vector<folly::coro::Task<Status>> mget_tasks;
|
||||
while (f != nullptr) {
|
||||
mget_tasks.emplace_back(MultiGetFromSSTCoroutine(
|
||||
read_options, fp.CurrentFileRange(), fp.GetHitFileLevel(),
|
||||
fp.IsHitFileLastInLevel(), f, blob_rqs, num_filter_read,
|
||||
num_index_read, num_data_read, num_sst_read));
|
||||
if (fp.KeyMaySpanNextFile()) {
|
||||
break;
|
||||
}
|
||||
f = fp.GetNextFileInLevel();
|
||||
}
|
||||
if (mget_tasks.size() > 0) {
|
||||
// Collect all results so far
|
||||
std::vector<Status> statuses = folly::coro::blockingWait(
|
||||
folly::coro::collectAllRange(std::move(mget_tasks)));
|
||||
for (Status stat : statuses) {
|
||||
if (!stat.ok()) {
|
||||
s = stat;
|
||||
}
|
||||
}
|
||||
|
||||
if (s.ok() && fp.KeyMaySpanNextFile()) {
|
||||
f = fp.GetNextFileInLevel();
|
||||
}
|
||||
}
|
||||
#endif // USE_COROUTINES
|
||||
}
|
||||
// If bad status or we found final result for all the keys
|
||||
if (!s.ok() || file_picker_range.empty()) {
|
||||
|
@ -54,6 +54,7 @@
|
||||
#include "table/get_context.h"
|
||||
#include "table/multiget_context.h"
|
||||
#include "trace_replay/block_cache_tracer.h"
|
||||
#include "util/coro_utils.h"
|
||||
#include "util/hash_containers.h"
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
@ -882,7 +883,8 @@ class Version {
|
||||
// This accumulated stats will be used in compaction.
|
||||
void UpdateAccumulatedStats();
|
||||
|
||||
Status MultiGetFromSST(
|
||||
DECLARE_SYNC_AND_ASYNC(
|
||||
/* ret_type */ Status, /* func_name */ MultiGetFromSST,
|
||||
const ReadOptions& read_options, MultiGetRange file_range,
|
||||
int hit_file_level, bool is_hit_file_last_in_level, FdWithKeyRange* f,
|
||||
std::unordered_map<uint64_t, BlobReadRequests>& blob_rqs,
|
||||
|
@ -3,23 +3,26 @@
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
|
||||
#pragma once
|
||||
#include "util/coro_utils.h"
|
||||
|
||||
#if defined(WITHOUT_COROUTINES) || \
|
||||
(defined(USE_COROUTINES) && defined(WITH_COROUTINES))
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
// Lookup a batch of keys in a single SST file
|
||||
Status Version::MultiGetFromSST(
|
||||
const ReadOptions& read_options, MultiGetRange file_range,
|
||||
int hit_file_level, bool is_hit_file_last_in_level, FdWithKeyRange* f,
|
||||
std::unordered_map<uint64_t, BlobReadRequests>& blob_rqs,
|
||||
uint64_t& num_filter_read, uint64_t& num_index_read,
|
||||
uint64_t& num_data_read, uint64_t& num_sst_read) {
|
||||
DEFINE_SYNC_AND_ASYNC(Status, Version::MultiGetFromSST)
|
||||
(const ReadOptions& read_options, MultiGetRange file_range, int hit_file_level,
|
||||
bool is_hit_file_last_in_level, FdWithKeyRange* f,
|
||||
std::unordered_map<uint64_t, BlobReadRequests>& blob_rqs,
|
||||
uint64_t& num_filter_read, uint64_t& num_index_read, uint64_t& num_data_read,
|
||||
uint64_t& num_sst_read) {
|
||||
bool timer_enabled = GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
|
||||
get_perf_context()->per_level_perf_context_enabled;
|
||||
|
||||
Status s;
|
||||
StopWatchNano timer(clock_, timer_enabled /* auto_start */);
|
||||
s = table_cache_->MultiGet(
|
||||
s = CO_AWAIT(table_cache_->MultiGet)(
|
||||
read_options, *internal_comparator(), *f->file_metadata, &file_range,
|
||||
mutable_cf_options_.prefix_extractor,
|
||||
cfd_->internal_stats()->GetFileReadHist(hit_file_level),
|
||||
@ -37,7 +40,7 @@ Status Version::MultiGetFromSST(
|
||||
*iter->s = s;
|
||||
file_range.MarkKeyDone(iter);
|
||||
}
|
||||
return s;
|
||||
CO_RETURN s;
|
||||
}
|
||||
uint64_t batch_size = 0;
|
||||
for (auto iter = file_range.begin(); s.ok() && iter != file_range.end();
|
||||
@ -145,7 +148,7 @@ Status Version::MultiGetFromSST(
|
||||
}
|
||||
|
||||
RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size);
|
||||
return s;
|
||||
CO_RETURN s;
|
||||
}
|
||||
} // ROCKSDB_NAMESPACE
|
||||
|
||||
#endif
|
||||
|
@ -73,7 +73,25 @@
|
||||
#include "util/stop_watch.h"
|
||||
#include "util/string_util.h"
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
namespace {
|
||||
|
||||
CacheAllocationPtr CopyBufferToHeap(MemoryAllocator* allocator, Slice& buf) {
|
||||
CacheAllocationPtr heap_buf;
|
||||
heap_buf = AllocateBlock(buf.size(), allocator);
|
||||
memcpy(heap_buf.get(), buf.data(), buf.size());
|
||||
return heap_buf;
|
||||
}
|
||||
} // namespace
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
// Generate the regular and coroutine versions of some methods by
|
||||
// including block_based_table_reader_coro.h twice
|
||||
// clang-format off
|
||||
#define WITHOUT_COROUTINES
|
||||
#include "table/block_based/block_based_table_reader_coro.h"
|
||||
#undef WITHOUT_COROUTINES
|
||||
#define WITH_COROUTINES
|
||||
#include "table/block_based/block_based_table_reader_coro.h"
|
||||
// clang-format on
|
||||
|
||||
|
@ -31,6 +31,7 @@
|
||||
#include "table/table_reader.h"
|
||||
#include "table/two_level_iterator.h"
|
||||
#include "trace_replay/block_cache_tracer.h"
|
||||
#include "util/coro_utils.h"
|
||||
#include "util/hash_containers.h"
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
@ -141,10 +142,11 @@ class BlockBasedTable : public TableReader {
|
||||
GetContext* get_context, const SliceTransform* prefix_extractor,
|
||||
bool skip_filters = false) override;
|
||||
|
||||
void MultiGet(const ReadOptions& readOptions,
|
||||
const MultiGetContext::Range* mget_range,
|
||||
const SliceTransform* prefix_extractor,
|
||||
bool skip_filters = false) override;
|
||||
DECLARE_SYNC_AND_ASYNC_OVERRIDE(void, MultiGet,
|
||||
const ReadOptions& readOptions,
|
||||
const MultiGetContext::Range* mget_range,
|
||||
const SliceTransform* prefix_extractor,
|
||||
bool skip_filters = false);
|
||||
|
||||
// Pre-fetch the disk blocks that correspond to the key range specified by
|
||||
// (kbegin, kend). The call will return error status in the event of
|
||||
@ -366,13 +368,14 @@ class BlockBasedTable : public TableReader {
|
||||
bool for_compaction, bool use_cache,
|
||||
bool wait_for_cache) const;
|
||||
|
||||
void RetrieveMultipleBlocks(
|
||||
const ReadOptions& options, const MultiGetRange* batch,
|
||||
DECLARE_SYNC_AND_ASYNC_CONST(
|
||||
void, RetrieveMultipleBlocks, const ReadOptions& options,
|
||||
const MultiGetRange* batch,
|
||||
const autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE>* handles,
|
||||
autovector<Status, MultiGetContext::MAX_BATCH_SIZE>* statuses,
|
||||
autovector<CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE>*
|
||||
results,
|
||||
char* scratch, const UncompressionDict& uncompression_dict) const;
|
||||
char* scratch, const UncompressionDict& uncompression_dict);
|
||||
|
||||
// Get the iterator from the index reader.
|
||||
//
|
||||
|
@ -3,20 +3,13 @@
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
|
||||
#pragma once
|
||||
#include "util/coro_utils.h"
|
||||
|
||||
#if defined(WITHOUT_COROUTINES) || \
|
||||
(defined(USE_COROUTINES) && defined(WITH_COROUTINES))
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
namespace {
|
||||
|
||||
CacheAllocationPtr CopyBufferToHeap(MemoryAllocator* allocator, Slice& buf) {
|
||||
CacheAllocationPtr heap_buf;
|
||||
heap_buf = AllocateBlock(buf.size(), allocator);
|
||||
memcpy(heap_buf.get(), buf.data(), buf.size());
|
||||
return heap_buf;
|
||||
}
|
||||
}
|
||||
|
||||
// This function reads multiple data blocks from disk using Env::MultiRead()
|
||||
// and optionally inserts them into the block cache. It uses the scratch
|
||||
// buffer provided by the caller, which is contiguous. If scratch is a nullptr
|
||||
@ -34,12 +27,12 @@ CacheAllocationPtr CopyBufferToHeap(MemoryAllocator* allocator, Slice& buf) {
|
||||
// found in cache
|
||||
// handles - A vector of block handles. Some of them me be NULL handles
|
||||
// scratch - An optional contiguous buffer to read compressed blocks into
|
||||
void BlockBasedTable::RetrieveMultipleBlocks(
|
||||
const ReadOptions& options, const MultiGetRange* batch,
|
||||
const autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE>* handles,
|
||||
autovector<Status, MultiGetContext::MAX_BATCH_SIZE>* statuses,
|
||||
autovector<CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE>* results,
|
||||
char* scratch, const UncompressionDict& uncompression_dict) const {
|
||||
DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks)
|
||||
(const ReadOptions& options, const MultiGetRange* batch,
|
||||
const autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE>* handles,
|
||||
autovector<Status, MultiGetContext::MAX_BATCH_SIZE>* statuses,
|
||||
autovector<CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE>* results,
|
||||
char* scratch, const UncompressionDict& uncompression_dict) const {
|
||||
RandomAccessFileReader* file = rep_->file.get();
|
||||
const Footer& footer = rep_->footer;
|
||||
const ImmutableOptions& ioptions = rep_->ioptions;
|
||||
@ -64,7 +57,7 @@ void BlockBasedTable::RetrieveMultipleBlocks(
|
||||
/* for_compaction */ false, /* use_cache */ true,
|
||||
/* wait_for_cache */ true);
|
||||
}
|
||||
return;
|
||||
CO_RETURN;
|
||||
}
|
||||
|
||||
// In direct IO mode, blocks share the direct io buffer.
|
||||
@ -310,14 +303,13 @@ void BlockBasedTable::RetrieveMultipleBlocks(
|
||||
}
|
||||
|
||||
using MultiGetRange = MultiGetContext::Range;
|
||||
void BlockBasedTable::MultiGet(const ReadOptions& read_options,
|
||||
const MultiGetRange* mget_range,
|
||||
const SliceTransform* prefix_extractor,
|
||||
bool skip_filters) {
|
||||
DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::MultiGet)
|
||||
(const ReadOptions& read_options, const MultiGetRange* mget_range,
|
||||
const SliceTransform* prefix_extractor, bool skip_filters) {
|
||||
if (mget_range->empty()) {
|
||||
// Caller should ensure non-empty (performance bug)
|
||||
assert(false);
|
||||
return; // Nothing to do
|
||||
CO_RETURN; // Nothing to do
|
||||
}
|
||||
|
||||
FilterBlockReader* const filter =
|
||||
@ -524,8 +516,9 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
|
||||
block_buf.reset(scratch);
|
||||
}
|
||||
}
|
||||
RetrieveMultipleBlocks(read_options, &data_block_range, &block_handles,
|
||||
&statuses, &results, scratch, dict);
|
||||
CO_AWAIT(RetrieveMultipleBlocks)
|
||||
(read_options, &data_block_range, &block_handles, &statuses, &results,
|
||||
scratch, dict);
|
||||
if (sst_file_range.begin()->get_context) {
|
||||
++(sst_file_range.begin()
|
||||
->get_context->get_context_stats_.num_sst_read);
|
||||
@ -745,4 +738,5 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
|
||||
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
|
||||
}
|
||||
}
|
||||
} // ROCKSDB_NAMESPACE
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
#endif
|
||||
|
@ -10,6 +10,10 @@
|
||||
#pragma once
|
||||
#include <memory>
|
||||
#include "db/range_tombstone_fragmenter.h"
|
||||
#if USE_COROUTINES
|
||||
#include "folly/experimental/coro/Coroutine.h"
|
||||
#include "folly/experimental/coro/Task.h"
|
||||
#endif
|
||||
#include "rocksdb/slice_transform.h"
|
||||
#include "table/get_context.h"
|
||||
#include "table/internal_iterator.h"
|
||||
@ -120,6 +124,15 @@ class TableReader {
|
||||
}
|
||||
}
|
||||
|
||||
#if USE_COROUTINES
|
||||
virtual folly::coro::Task<void> MultiGetCoroutine(
|
||||
const ReadOptions& readOptions, const MultiGetContext::Range* mget_range,
|
||||
const SliceTransform* prefix_extractor, bool skip_filters = false) {
|
||||
MultiGet(readOptions, mget_range, prefix_extractor, skip_filters);
|
||||
co_return;
|
||||
}
|
||||
#endif // USE_COROUTINES
|
||||
|
||||
// Prefetch data corresponding to a give range of keys
|
||||
// Typically this functionality is required for table implementations that
|
||||
// persists the data on a non volatile storage medium like disk/SSD
|
||||
|
110
util/coro_utils.h
Normal file
110
util/coro_utils.h
Normal file
@ -0,0 +1,110 @@
|
||||
// Copyright (c) Meta Platforms, Inc. and its affiliates. All Rights Reserved.
|
||||
// This source code is licensed under both the GPLv2 (found in the
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
|
||||
#if defined(USE_COROUTINES)
|
||||
#include "folly/experimental/coro/Coroutine.h"
|
||||
#include "folly/experimental/coro/Task.h"
|
||||
#endif
|
||||
|
||||
// This file has two sctions. The first section applies to all instances of
|
||||
// header file inclusion and has an include guard. The second section is
|
||||
// meant for multiple inclusions in the same source file, and is idempotent.
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
#ifndef UTIL_CORO_UTILS_H_
|
||||
#define UTIL_CORO_UTILS_H_
|
||||
|
||||
#if defined(USE_COROUTINES)
|
||||
|
||||
// The follwoing macros expand to regular and coroutine function
|
||||
// declarations for a given function
|
||||
#define DECLARE_SYNC_AND_ASYNC(__ret_type__, __func_name__, ...) \
|
||||
__ret_type__ __func_name__(__VA_ARGS__); \
|
||||
folly::coro::Task<__ret_type__> __func_name__##Coroutine(__VA_ARGS__);
|
||||
|
||||
#define DECLARE_SYNC_AND_ASYNC_OVERRIDE(__ret_type__, __func_name__, ...) \
|
||||
__ret_type__ __func_name__(__VA_ARGS__) override; \
|
||||
folly::coro::Task<__ret_type__> __func_name__##Coroutine(__VA_ARGS__) \
|
||||
override;
|
||||
|
||||
#define DECLARE_SYNC_AND_ASYNC_CONST(__ret_type__, __func_name__, ...) \
|
||||
__ret_type__ __func_name__(__VA_ARGS__) const; \
|
||||
folly::coro::Task<__ret_type__> __func_name__##Coroutine(__VA_ARGS__) const;
|
||||
|
||||
constexpr bool using_coroutines() { return true; }
|
||||
#else // USE_COROUTINES
|
||||
|
||||
// The follwoing macros expand to a regular function declaration for a given
|
||||
// function
|
||||
#define DECLARE_SYNC_AND_ASYNC(__ret_type__, __func_name__, ...) \
|
||||
__ret_type__ __func_name__(__VA_ARGS__);
|
||||
|
||||
#define DECLARE_SYNC_AND_ASYNC_OVERRIDE(__ret_type__, __func_name__, ...) \
|
||||
__ret_type__ __func_name__(__VA_ARGS__) override;
|
||||
|
||||
#define DECLARE_SYNC_AND_ASYNC_CONST(__ret_type__, __func_name__, ...) \
|
||||
__ret_type__ __func_name__(__VA_ARGS__) const;
|
||||
|
||||
constexpr bool using_coroutines() { return false; }
|
||||
#endif // USE_COROUTINES
|
||||
#endif // UTIL_CORO_UTILS_H_
|
||||
|
||||
// The following section of the file is meant to be included twice in a
|
||||
// source file - once defining WITH_COROUTINES and once defining
|
||||
// WITHOUT_COROUTINES
|
||||
#undef DEFINE_SYNC_AND_ASYNC
|
||||
#undef CO_AWAIT
|
||||
#undef CO_RETURN
|
||||
|
||||
#if defined(WITH_COROUTINES) && defined(USE_COROUTINES)
|
||||
|
||||
// This macro should be used in the beginning of the function
|
||||
// definition. The declaration should have been done using one of the
|
||||
// DECLARE_SYNC_AND_ASYNC* macros. It expands to the return type and
|
||||
// the function name with the Coroutine suffix. For example -
|
||||
// DEFINE_SYNC_AND_ASYNC(int, foo)(bool bar) {}
|
||||
// would expand to -
|
||||
// int fooCoroutine(bool bar) {}
|
||||
#define DEFINE_SYNC_AND_ASYNC(__ret_type__, __func_name__) \
|
||||
folly::coro::Task<__ret_type__> __func_name__##Coroutine
|
||||
|
||||
// This macro should be used to call a function that might be a
|
||||
// coroutine. It expands to the correct function name and prefixes
|
||||
// the co_await operator if necessary. For example -
|
||||
// s = CO_AWAIT(foo)(true);
|
||||
// if the code is compiled WITH_COROUTINES, would expand to
|
||||
// s = co_await fooCoroutine(true);
|
||||
// if compiled WITHOUT_COROUTINES, would expand to
|
||||
// s = foo(true);
|
||||
#define CO_AWAIT(__func_name__) co_await __func_name__##Coroutine
|
||||
|
||||
#define CO_RETURN co_return
|
||||
|
||||
#elif defined(WITHOUT_COROUTINES)
|
||||
|
||||
// This macro should be used in the beginning of the function
|
||||
// definition. The declaration should have been done using one of the
|
||||
// DECLARE_SYNC_AND_ASYNC* macros. It expands to the return type and
|
||||
// the function name without the Coroutine suffix. For example -
|
||||
// DEFINE_SYNC_AND_ASYNC(int, foo)(bool bar) {}
|
||||
// would expand to -
|
||||
// int foo(bool bar) {}
|
||||
#define DEFINE_SYNC_AND_ASYNC(__ret_type__, __func_name__) \
|
||||
__ret_type__ __func_name__
|
||||
|
||||
// This macro should be used to call a function that might be a
|
||||
// coroutine. It expands to the correct function name and prefixes
|
||||
// the co_await operator if necessary. For example -
|
||||
// s = CO_AWAIT(foo)(true);
|
||||
// if the code is compiled WITH_COROUTINES, would expand to
|
||||
// s = co_await fooCoroutine(true);
|
||||
// if compiled WITHOUT_COROUTINES, would expand to
|
||||
// s = foo(true);
|
||||
#define CO_AWAIT(__func_name__) __func_name__
|
||||
|
||||
#define CO_RETURN return
|
||||
|
||||
#endif // DO_NOT_USE_COROUTINES
|
||||
} // namespace ROCKSDB_NAMESPACE
|
Loading…
x
Reference in New Issue
Block a user