Address review comments
This commit is contained in:
parent
7c6d3fddec
commit
12d6c4ee80
@ -42,13 +42,15 @@ static void DeleteEntry(const Slice& /*key*/, void* value) {
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
// Generate the regular and coroutine versions of some methods by
|
||||
// including table_cache_coro.h twice
|
||||
// including table_cache_sync_and_async.h twice
|
||||
// Macros in the header will expand differently based on whether
|
||||
// WITH_COROUTINES or WITHOUT_COROUTINES is defined
|
||||
// clang-format off
|
||||
#define WITHOUT_COROUTINES
|
||||
#include "db/table_cache_coro.h"
|
||||
#include "db/table_cache_sync_and_async.h"
|
||||
#undef WITHOUT_COROUTINES
|
||||
#define WITH_COROUTINES
|
||||
#include "db/table_cache_coro.h"
|
||||
#include "db/table_cache_sync_and_async.h"
|
||||
// clang-format on
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
@ -72,13 +72,15 @@
|
||||
#include "util/user_comparator_wrapper.h"
|
||||
|
||||
// Generate the regular and coroutine versions of some methods by
|
||||
// including version_set_coro.h twice
|
||||
// including version_set_sync_and_async.h twice
|
||||
// Macros in the header will expand differently based on whether
|
||||
// WITH_COROUTINES or WITHOUT_COROUTINES is defined
|
||||
// clang-format off
|
||||
#define WITHOUT_COROUTINES
|
||||
#include "db/version_set_coro.h"
|
||||
#include "db/version_set_sync_and_async.h"
|
||||
#undef WITHOUT_COROUTINES
|
||||
#define WITH_COROUTINES
|
||||
#include "db/version_set_coro.h"
|
||||
#include "db/version_set_sync_and_async.h"
|
||||
// clang-format on
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
@ -86,13 +86,15 @@ CacheAllocationPtr CopyBufferToHeap(MemoryAllocator* allocator, Slice& buf) {
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
// Generate the regular and coroutine versions of some methods by
|
||||
// including block_based_table_reader_coro.h twice
|
||||
// including block_based_table_reader_sync_and_async.h twice
|
||||
// Macros in the header will expand differently based on whether
|
||||
// WITH_COROUTINES or WITHOUT_COROUTINES is defined
|
||||
// clang-format off
|
||||
#define WITHOUT_COROUTINES
|
||||
#include "table/block_based/block_based_table_reader_coro.h"
|
||||
#include "table/block_based/block_based_table_reader_sync_and_async.h"
|
||||
#undef WITHOUT_COROUTINES
|
||||
#define WITH_COROUTINES
|
||||
#include "table/block_based/block_based_table_reader_coro.h"
|
||||
#include "table/block_based/block_based_table_reader_sync_and_async.h"
|
||||
// clang-format on
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
@ -36,7 +36,7 @@ bool AsyncFileReader::MultiReadAsyncImpl(ReadAwaiter* awaiter) {
|
||||
return true;
|
||||
}
|
||||
|
||||
void AsyncFileReader::Poll() {
|
||||
void AsyncFileReader::Wait() {
|
||||
if (!head_) {
|
||||
return;
|
||||
}
|
||||
|
@ -21,7 +21,7 @@ class SingleThreadExecutor;
|
||||
// coroutines to co_await it. When the AsyncFileReader Awaitable is
|
||||
// resumed, it initiates the fie reads requested by the awaiting caller
|
||||
// by calling RandomAccessFileReader's ReadAsync. It then suspends the
|
||||
// awaiting coroutine. The suspended awaiter is later resumed by Poll().
|
||||
// awaiting coroutine. The suspended awaiter is later resumed by Wait().
|
||||
class AsyncFileReader {
|
||||
class ReadAwaiter;
|
||||
template <typename Awaiter>
|
||||
@ -84,6 +84,11 @@ class AsyncFileReader {
|
||||
autovector<void*, 32> io_handle_;
|
||||
autovector<IOHandleDeleter, 32> del_fn_;
|
||||
std::experimental::coroutine_handle<> awaiting_coro_;
|
||||
// Use this to link to the next ReadAwaiter in the suspended coroutine
|
||||
// list. The head and tail of the list are tracked by AsyncFileReader.
|
||||
// We use this approach rather than an STL container in order to avoid
|
||||
// extra memory allocations. The coroutine call already allocates a
|
||||
// ReadAwaiter object.
|
||||
ReadAwaiter* next_;
|
||||
};
|
||||
|
||||
@ -123,7 +128,7 @@ class AsyncFileReader {
|
||||
|
||||
// Called by the SingleThreadExecutor to poll for async IO completion.
|
||||
// This also resumes the awaiting coroutines.
|
||||
void Poll();
|
||||
void Wait();
|
||||
|
||||
// Head of the queue of awaiters waiting for async IO completion
|
||||
ReadAwaiter* head_ = nullptr;
|
||||
|
@ -37,9 +37,9 @@ class SingleThreadExecutor : public folly::Executor {
|
||||
q.pop();
|
||||
|
||||
if (q.empty()) {
|
||||
// Prevent recursion, as the Poll may queue resumed coroutines
|
||||
// Prevent recursion, as the Wait may queue resumed coroutines
|
||||
busy_ = true;
|
||||
reader_.Poll();
|
||||
reader_.Wait();
|
||||
busy_ = false;
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user