Compare commits

...

22 Commits
main ... 7.1.fb

Author SHA1 Message Date
Peter Dillinger
91f175888a Remove slack CircleCI hook (#9982)
Summary:
Our Slack site is deprecated

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9982

Test Plan: CircleCI

Reviewed By: siying

Differential Revision: D36322050

Pulled By: pdillinger

fbshipit-source-id: 678202404d307e1547e4203d7e6bd467803ccd5e
2022-05-12 09:30:59 -07:00
Hui Xiao
00724f43bc Update version.h and HISTORY.md for 7.1.2 2022-04-19 02:55:12 -07:00
Hui Xiao
3d15d07740 Conditionally declare and define variable that is unused in LITE mode (#9854)
Summary:
Context:
As mentioned in https://github.com/facebook/rocksdb/issues/9701, we have the following in LITE=1 make static_lib for v7.0.2
```
  CC       file/sequence_file_reader.o
  CC       file/sst_file_manager_impl.o
  CC       file/writable_file_writer.o
In file included from file/writable_file_writer.cc:10:
./file/writable_file_writer.h:163:15: error: private field 'temperature_' is not used [-Werror,-Wunused-private-field]
  Temperature temperature_;
              ^
1 error generated.
make: *** [file/writable_file_writer.o] Error 1
```

 as titled

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9854

Test Plan:
- Local `LITE=1 make static_lib` reveals the same error and error is gone after this fix
- CI

Reviewed By: ajkr, jay-zhuang

Differential Revision: D35706585

Pulled By: hx235

fbshipit-source-id: 7743310298231ad6866304ffa2225c8abdc91d9a
2022-04-19 01:23:49 -07:00
Andrew Kryczka
3345d12a1d Remove explicit padding from CacheAlignedInstrumentedMutex (#9809)
Summary:
Fixes https://github.com/facebook/rocksdb/issues/9779.

The padding at the end of a struct is added implicitly according to the
sizeof spec: "When applied to a class, the result is the
number of bytes in an object of that class including any padding
required for placing objects of that type in an array"
(https://eel.is/c++draft/expr.sizeof#2.sentence-2). We should drop the
explicit padding since it assumed support for zero-length arrays, which
is non-standard.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9809

Test Plan: rely on CI

Reviewed By: riversand963

Differential Revision: D35413496

Pulled By: ajkr

fbshipit-source-id: 25d52ca45e648ad0d5657149f26f6adecbed1cb4
2022-04-18 10:45:02 -07:00
Andrew Kryczka
9ed2144ebd Fix GetMergeOperands() heap-use-after-free on flushed memtable (#9805)
Summary:
Fixes https://github.com/facebook/rocksdb/issues/9066.

Prior to the fix in this PR, this PR's unit test reported the following error under ASAN:

```
==2175705==ERROR: AddressSanitizer: heap-use-after-free on address 0x61f0000012a5 at pc 0x7f0fc36e76ce bp 0x7ffc103e9ca0 sp 0x7ffc103e9450
READ of size 5 at 0x61f0000012a5 thread T0
    #0 0x7f0fc36e76cd in __interceptor_memcpy /home/engshare/third-party2/gcc/9.x/src/gcc-10.x/libsanitizer/sanitizer_common/sanitizer_common_interceptors.inc:790
    https://github.com/facebook/rocksdb/issues/1 0x7f0fc35a207e in std::char_traits<char>::copy(char*, char const*, unsigned long) /home/engshare/third-party2/libgcc/9.x/src/gcc-9.x/x86_64-facebook-linux/libstdc++-v3/include/bits/char_traits.h:365
    https://github.com/facebook/rocksdb/issues/2 0x7f0fc35a207e in std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >::_S_copy(char*, char const*, unsigned long) /home/engshare/third-party2/libgcc/9.x/src/gcc-9.x/x86_64-facebook-linux/libstdc++-v3/include/bits/basic_string.h:351
    https://github.com/facebook/rocksdb/issues/3 0x7f0fc35a207e in std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >::_M_replace(unsigned long, unsigned long, char const*, unsigned long) /home/engshare/third-party2/libgcc/9.x/src/gcc-9.x/x86_64-facebook-linux/libstdc++-v3/include/bits/basic_string.tcc:440
    https://github.com/facebook/rocksdb/issues/4 0x8679ca in std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >::assign(char const*, unsigned long) /mnt/gvfs/third-party2/libgcc/4959b39cfbe5965a37c861c4c327fa7c5c759b87/9.x/platform009/9202ce7/include/c++/9.3.0/bits/basic_string.h:1422
    https://github.com/facebook/rocksdb/issues/5 0x8679ca in rocksdb::PinnableSlice::PinSelf(rocksdb::Slice const&) include/rocksdb/slice.h:171
    https://github.com/facebook/rocksdb/issues/6 0x8679ca in rocksdb::DBImpl::GetImpl(rocksdb::ReadOptions const&, rocksdb::Slice const&, rocksdb::DBImpl::GetImplOptions&) db/db_impl/db_impl.cc:1930
    https://github.com/facebook/rocksdb/issues/7 0x547324 in rocksdb::DBImpl::GetMergeOperands(rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&, rocksdb::PinnableSlice*, rocksdb::GetMergeOperandsOptions*, int*) db/db_impl/db_impl.h:203
    https://github.com/facebook/rocksdb/issues/8 0x547324 in rocksdb::DBMergeOperandTest_FlushedMergeOperandReadAfterFreeBug_Test::TestBody() db/db_merge_operand_test.cc:117
    https://github.com/facebook/rocksdb/issues/9 0x7241da in void testing::internal::HandleSehExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) third-party/gtest-1.8.1/fused-src/gtest/gtest-all.cc:3899
    https://github.com/facebook/rocksdb/issues/10 0x7241da in void testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) third-party/gtest-1.8.1/fused-src/gtest/gtest-all.cc:3935
    https://github.com/facebook/rocksdb/issues/11 0x701a47 in testing::Test::Run() third-party/gtest-1.8.1/fused-src/gtest/gtest-all.cc:3973
    https://github.com/facebook/rocksdb/issues/12 0x702040 in testing::Test::Run() third-party/gtest-1.8.1/fused-src/gtest/gtest-all.cc:3965
    https://github.com/facebook/rocksdb/issues/13 0x702040 in testing::TestInfo::Run() third-party/gtest-1.8.1/fused-src/gtest/gtest-all.cc:4149
    https://github.com/facebook/rocksdb/issues/14 0x7025f7 in testing::TestInfo::Run() third-party/gtest-1.8.1/fused-src/gtest/gtest-all.cc:4124
    https://github.com/facebook/rocksdb/issues/15 0x7025f7 in testing::TestCase::Run() third-party/gtest-1.8.1/fused-src/gtest/gtest-all.cc:4267
    https://github.com/facebook/rocksdb/issues/16 0x704217 in testing::TestCase::Run() third-party/gtest-1.8.1/fused-src/gtest/gtest-all.cc:4253
    https://github.com/facebook/rocksdb/issues/17 0x704217 in testing::internal::UnitTestImpl::RunAllTests() third-party/gtest-1.8.1/fused-src/gtest/gtest-all.cc:6633
    https://github.com/facebook/rocksdb/issues/18 0x72505a in bool testing::internal::HandleSehExceptionsInMethodIfSupported<testing::internal::UnitTestImpl, bool>(testing::internal::UnitTestImpl*, bool (testing::internal::UnitTestImpl::*)(), char const*) third-party/gtest-1.8.1/fused-src/gtest/gtest-all.cc:3899
    https://github.com/facebook/rocksdb/issues/19 0x72505a in bool testing::internal::HandleExceptionsInMethodIfSupported<testing::internal::UnitTestImpl, bool>(testing::internal::UnitTestImpl*, bool (testing::internal::UnitTestImpl::*)(), char const*) third-party/gtest-1.8.1/fused-src/gtest/gtest-all.cc:3935
    https://github.com/facebook/rocksdb/issues/20 0x704aa1 in testing::UnitTest::Run() third-party/gtest-1.8.1/fused-src/gtest/gtest-all.cc:6242
    https://github.com/facebook/rocksdb/issues/21 0x4c4aff in RUN_ALL_TESTS() third-party/gtest-1.8.1/fused-src/gtest/gtest.h:22110
    https://github.com/facebook/rocksdb/issues/22 0x4c4aff in main db/db_merge_operand_test.cc:404
    https://github.com/facebook/rocksdb/issues/23 0x7f0fc3108dc4 in __libc_start_main ../csu/libc-start.c:308
    https://github.com/facebook/rocksdb/issues/24 0x5445fd in _start (/data/users/andrewkr/rocksdb/db_merge_operand_test+0x5445fd)

0x61f0000012a5 is located 1061 bytes inside of 3264-byte region [0x61f000000e80,0x61f000001b40)
freed by thread T0 here:
    #0 0x7f0fc375b6af in operator delete(void*, unsigned long) /home/engshare/third-party2/gcc/9.x/src/gcc-10.x/libsanitizer/asan/asan_new_delete.cc:177
    https://github.com/facebook/rocksdb/issues/1 0x743be8 in rocksdb::SuperVersion::~SuperVersion() db/column_family.cc:432
    https://github.com/facebook/rocksdb/issues/2 0x8052aa in rocksdb::DBImpl::CleanupSuperVersion(rocksdb::SuperVersion*) db/db_impl/db_impl.cc:3534
    https://github.com/facebook/rocksdb/issues/3 0x8676c2 in rocksdb::DBImpl::ReturnAndCleanupSuperVersion(rocksdb::ColumnFamilyData*, rocksdb::SuperVersion*) db/db_impl/db_impl.cc:3544
    https://github.com/facebook/rocksdb/issues/4 0x8676c2 in rocksdb::DBImpl::GetImpl(rocksdb::ReadOptions const&, rocksdb::Slice const&, rocksdb::DBImpl::GetImplOptions&) db/db_impl/db_impl.cc:1911
    https://github.com/facebook/rocksdb/issues/5 0x547324 in rocksdb::DBImpl::GetMergeOperands(rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&, rocksdb::PinnableSlice*, rocksdb::GetMergeOperandsOptions*, int*) db/db_impl/db_impl.h:203
    https://github.com/facebook/rocksdb/issues/6 0x547324 in rocksdb::DBMergeOperandTest_FlushedMergeOperandReadAfterFreeBug_Test::TestBody() db/db_merge_operand_test.cc:117
    https://github.com/facebook/rocksdb/issues/7 0x7241da in void testing::internal::HandleSehExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) third-party/gtest-1.8.1/fused-src/gtest/gtest-all.cc:3899
    https://github.com/facebook/rocksdb/issues/8 0x7241da in void testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) third-party/gtest-1.8.1/fused-src/gtest/gtest-all.cc:3935
    https://github.com/facebook/rocksdb/issues/9 0x701a47 in testing::Test::Run() third-party/gtest-1.8.1/fused-src/gtest/gtest-all.cc:3973
    https://github.com/facebook/rocksdb/issues/10 0x702040 in testing::Test::Run() third-party/gtest-1.8.1/fused-src/gtest/gtest-all.cc:3965
    https://github.com/facebook/rocksdb/issues/11 0x702040 in testing::TestInfo::Run() third-party/gtest-1.8.1/fused-src/gtest/gtest-all.cc:4149
    https://github.com/facebook/rocksdb/issues/12 0x7025f7 in testing::TestInfo::Run() third-party/gtest-1.8.1/fused-src/gtest/gtest-all.cc:4124
    https://github.com/facebook/rocksdb/issues/13 0x7025f7 in testing::TestCase::Run() third-party/gtest-1.8.1/fused-src/gtest/gtest-all.cc:4267
    https://github.com/facebook/rocksdb/issues/14 0x704217 in testing::TestCase::Run() third-party/gtest-1.8.1/fused-src/gtest/gtest-all.cc:4253
    https://github.com/facebook/rocksdb/issues/15 0x704217 in testing::internal::UnitTestImpl::RunAllTests() third-party/gtest-1.8.1/fused-src/gtest/gtest-all.cc:6633
    https://github.com/facebook/rocksdb/issues/16 0x72505a in bool testing::internal::HandleSehExceptionsInMethodIfSupported<testing::internal::UnitTestImpl, bool>(testing::internal::UnitTestImpl*, bool (testing::internal::UnitTestImpl::*)(), char const*) third-party/gtest-1.8.1/fused-src/gtest/gtest-all.cc:3899
    https://github.com/facebook/rocksdb/issues/17 0x72505a in bool testing::internal::HandleExceptionsInMethodIfSupported<testing::internal::UnitTestImpl, bool>(testing::internal::UnitTestImpl*, bool (testing::internal::UnitTestImpl::*)(), char const*) third-party/gtest-1.8.1/fused-src/gtest/gtest-all.cc:3935
    https://github.com/facebook/rocksdb/issues/18 0x704aa1 in testing::UnitTest::Run() third-party/gtest-1.8.1/fused-src/gtest/gtest-all.cc:6242
    https://github.com/facebook/rocksdb/issues/19 0x4c4aff in RUN_ALL_TESTS() third-party/gtest-1.8.1/fused-src/gtest/gtest.h:22110
    https://github.com/facebook/rocksdb/issues/20 0x4c4aff in main db/db_merge_operand_test.cc:404
    https://github.com/facebook/rocksdb/issues/21 0x7f0fc3108dc4 in __libc_start_main ../csu/libc-start.c:308
    https://github.com/facebook/rocksdb/issues/22 0x5445fd in _start (/data/users/andrewkr/rocksdb/db_merge_operand_test+0x5445fd)
...
```

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9805

Test Plan: following the fix in this PR, the new unit test passes

Reviewed By: jay-zhuang

Differential Revision: D35388415

Pulled By: ajkr

fbshipit-source-id: b39c5d002155906c8abc4a3429eca696dbf916d0
2022-04-18 10:44:33 -07:00
Yanqin Jin
7e42a5ba89 Encode min_log_number_to_keep and delete_wals_before in one version edit (#9766)
Summary:
min_log_number_to_keep denotes that the WALs whose numbers are below
this value **will** be deleted by RocksDB.
delete_wals_before will be used by RocksDB if
track_and_verify_wals_in_manifest is set to true. During recovery,
RocksDB uses the info encoded in delete_wals_before to reconstruct its
knowledge about what WALs to expect existing.
If these two tags are not encoded in the same VersionEdit, then it's
possible for min_log_number_to_keep=100 to exist, but
delete_wals_before=100 to be lost due to power failure. Subsequent
recovery will delete 99.log. If the db crashes again, the following
recovery will expect to see 99.log since there is no
delete_wals_before=100 in the MANIFEST, but the WAL is already deleted.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9766

Test Plan:
First of all, make check.
Second, format compatibility.
SHORT_TEST=1 ./tools/check_format_compatible.sh

Reviewed By: ltamasi

Differential Revision: D35203623

Pulled By: riversand963

fbshipit-source-id: 45623fc4b4b50d299d5e0f9559a3a4c5e9522c8f
2022-04-18 10:43:05 -07:00
sdong
8654ac5d9a Fix DB::Open() error logging (#9784)
Summary:
Right now we log a wrong error when DB::Open() fails. Fix it.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9784

Test Plan: CI runs should pass

Reviewed By: ajkr, riversand963

Differential Revision: D35290203

fbshipit-source-id: ffc640afa27f6b0a2382ee153dc43f28d9e242be
2022-04-18 10:42:37 -07:00
bbkot
e07d836391 fixing issue #8345 RocksDB does not work when using UNC network paths (#9384)
Summary:
Fix https://github.com/facebook/rocksdb/issues/8345
RocksDB does not work with network filesystem paths on Windows, e.g. "\\hostname\folder\..."

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9384

Reviewed By: mrambacher

Differential Revision: D33830622

Pulled By: riversand963

fbshipit-source-id: 2a99dc3c94415eb1460e110784b97d71600218f1
2022-04-18 10:41:55 -07:00
Peter Dillinger
e6bca1a54e Fix FileStorageInfo fields from GetLiveFilesMetaData (#9769)
Summary:
In making `SstFileMetaData` inherit from `FileStorageInfo`, I
overlooked setting some `FileStorageInfo` fields when then default
`SstFileMetaData()` ctor is used. This affected `GetLiveFilesMetaData()`.

Also removed some buggy `static_cast<size_t>`

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9769

Test Plan: Updated tests

Reviewed By: jay-zhuang

Differential Revision: D35220383

Pulled By: pdillinger

fbshipit-source-id: 05b4ee468258dbd3699517e1124838bf405fe7f8
2022-04-18 10:40:43 -07:00
Jack Robison
c815f31ad3 Fix broken zlib dependency, update it from 1.2.11 to 1.2.12 (#9764)
Summary:
Zlib (https://www.zlib.net/) has been updated to 1.2.12 due to CVE-2018-25032

- https://nvd.nist.gov/vuln/detail/CVE-2018-25032
- https://github.com/madler/zlib/issues/605

The source .tar.gz is no longer available, and the Makefile for rocksdb now fails as a result. This PR updates the dependency to the newer (and available) version, 1.2.12

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9764

Reviewed By: ajkr

Differential Revision: D35220367

Pulled By: jay-zhuang

fbshipit-source-id: 1f68ff8f048a6dba42077f048ac143468f0e2478
2022-04-18 10:39:47 -07:00
myasuka
6c8620451c Enable READ_BLOCK_COMPACTION_MICROS to track stats (#9722)
Summary:
After commit [d642c60](d642c60bdc), the stats `READ_BLOCK_COMPACTION_MICROS` cannot record any compaction read duration, and it always report zero.

This PR targets to distinguish `READ_BLOCK_COMPACTION_MICROS` with `READ_BLOCK_GET_MICROS` so that `READ_BLOCK_COMPACTION_MICROS` could record the correct stats.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9722

Reviewed By: ajkr

Differential Revision: D35021870

Pulled By: jay-zhuang

fbshipit-source-id: f1a804994265e51465de64c2a08f2e0eeb6fc5a3
2022-04-18 10:36:11 -07:00
Peter Dillinger
fa99e8afbc Fix heap use-after-free race with DropColumnFamily (#9730)
Summary:
Although ColumnFamilySet comments say that DB mutex can be
freed during iteration, as long as you hold a ref while releasing DB
mutex, this is not quite true because UnrefAndTryDelete might delete cfd
right before it is needed to get ->next_ for the next iteration of the
loop.

This change solves the problem by making a wrapper class that makes such
iteration easier while handling the tricky details of UnrefAndTryDelete
on the previous cfd only after getting next_ in operator++.

FreeDeadColumnFamilies should already have been obsolete; this removes
it for good. Similarly, ColumnFamilySet::iterator doesn't need to check
for cfd with 0 refs, because those are immediately deleted.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9730

Test Plan:
was reported with ASAN on unit tests like
DBLogicalBlockSizeCacheTest.CreateColumnFamily (very rare); keep watching

Reviewed By: ltamasi

Differential Revision: D35038143

Pulled By: pdillinger

fbshipit-source-id: 0a5478d5be96c135343a00603711b7df43ae19c9
2022-04-18 10:35:14 -07:00
Yanqin Jin
43890774ed Fix a race condition in WAL tracking causing DB open failure (#9715)
Summary:
There is a race condition if WAL tracking in the MANIFEST is enabled in a database that disables 2PC.

The race condition is between two background flush threads trying to install flush results to the MANIFEST.

Consider an example database with two column families: "default" (cfd0) and "cf1" (cfd1). Initially,
both column families have one mutable (active) memtable whose data backed by 6.log.

1. Trigger a manual flush for "cf1", creating a 7.log
2. Insert another key to "default", and trigger flush for "default", creating 8.log
3. BgFlushThread1 finishes writing 9.sst
4. BgFlushThread2 finishes writing 10.sst

```
Time  BgFlushThread1                                    BgFlushThread2
 |    mutex_.Lock()
 |    precompute min_wal_to_keep as 6
 |    mutex_.Unlock()
 |                                                     mutex_.Lock()
 |                                                     precompute min_wal_to_keep as 6
 |                                                     join MANIFEST write queue and mutex_.Unlock()
 |    write to MANIFEST
 |    mutex_.Lock()
 |    cfd1->log_number = 7
 |    Signal bg_flush_2 and mutex_.Unlock()
 |                                                     wake up and mutex_.Lock()
 |                                                     cfd0->log_number = 8
 |                                                     FindObsoleteFiles() with job_context->log_number == 7
 |                                                     mutex_.Unlock()
 |                                                     PurgeObsoleteFiles() deletes 6.log
 V
```

As shown in the above, BgFlushThread2 thinks that the min wal to keep is 6.log because "cf1" has unflushed data in 6.log (cf1.log_number=6).
Similarly, BgThread1 thinks that min wal to keep is also 6.log because "default" has unflushed data (default.log_number=6).
No WAL deletion will be written to MANIFEST because 6 is equal to `versions_->wals_.min_wal_number_to_keep`,
due to https://github.com/facebook/rocksdb/blob/7.1.fb/db/memtable_list.cc#L513:L514.
The bg flush thread that finishes last will perform file purging. `job_context.log_number` will be evaluated as 7, i.e.
the min wal that contains unflushed data, causing 6.log to be deleted. However, MANIFEST thinks 6.log should still exist.
If you close the db at this point, you won't be able to re-open it if `track_and_verify_wal_in_manifest` is true.

We must handle the case of multiple bg flush threads, and it is difficult for one bg flush thread to know
the correct min wal number until the other bg flush threads have finished committing to the manifest and updated
the `cfd::log_number`.
To fix this issue, we rename an existing variable `min_log_number_to_keep_2pc` to `min_log_number_to_keep`,
and use it to track WAL file deletion in non-2pc mode as well.
This variable is updated only 1) during recovery with mutex held, or 2) in the MANIFEST write thread.
`min_log_number_to_keep` means RocksDB will delete WALs below it, although there may be WALs
above it which are also obsolete. Formally, we will have [min_wal_to_keep, max_obsolete_wal]. During recovery, we
make sure that only WALs above max_obsolete_wal are checked and added back to `alive_log_files_`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9715

Test Plan:
```
make check
```
Also ran stress test below (with asan) to make sure it completes successfully.
```
TEST_TMPDIR=/dev/shm/rocksdb OPT=-g ASAN_OPTIONS=disable_coredump=0 \
CRASH_TEST_EXT_ARGS=--compression_type=zstd SKIP_FORMAT_BUCK_CHECKS=1 \
make J=52 -j52 blackbox_asan_crash_test
```

Reviewed By: ltamasi

Differential Revision: D34984412

Pulled By: riversand963

fbshipit-source-id: c7b21a8d84751bb55ea79c9f387103d21b231005
2022-04-18 10:34:13 -07:00
akankshamahajan
ee6ee5ff08 Update version.h and HISTORY.md for 7.1.1 2022-04-07 11:38:53 -07:00
Akanksha Mahajan
80bea2ba8e Update stats for Read and ReadAsync in random_access_file_reader for async prefetching (#9810)
Summary:
Update stats in random_access_file_reader for Read and
ReadAsync API to take into account the read latency for async
prefetching.

It also fixes ERROR_HANDLER_AUTORESUME_RETRY_COUNT stat whose value was
incorrect in portal.h

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9810

Test Plan: Update unit test

Reviewed By: anand1976

Differential Revision: D35433081

Pulled By: akankshamahajan15

fbshipit-source-id: aeec3901270e58a003ce6b5214bd25ddcb3a12a9
2022-04-07 09:15:37 -07:00
Akanksha Mahajan
71158842aa Fix reseting of async_read_in_progress_ variable in FilePrefetchBuffer to call Poll API (#9815)
Summary:
Currently RocksDB reset async_read_in_progress_ in callback
due to which underlying filesystem relying on Poll API won't be called
leading to stale memory access.
In order to fix it, async_read_in_progress_ will be reset after Poll API
is called to make sure underlying file_system waiting on Poll can clear
its state or take appropriate action.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9815

Test Plan: CircleCI tests

Reviewed By: anand1976

Differential Revision: D35451534

Pulled By: akankshamahajan15

fbshipit-source-id: b70ef6251a7aa9ed4876ba5e5100baa33d7d474c
2022-04-07 09:03:53 -07:00
Akanksha Mahajan
78641c0930 Fix segfault in FilePrefetchBuffer with async_io enabled (#9777)
Summary:
If FilePrefetchBuffer object is destroyed and then later Poll() calls callback on object which has been destroyed, it gives segfault on accessing destroyed object. It was caught after adding unit tests that tests Posix implementation of ReadAsync and Poll APIs.
This PR also updates and fixes existing IOURing tests which were not running locally because RocksDbIOUringEnable function wasn't defined and IOUring was disabled for those tests

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9777

Test Plan: Added new unit test

Reviewed By: anand1976

Differential Revision: D35254002

Pulled By: akankshamahajan15

fbshipit-source-id: 68e80054ffb14ae25c255920ebc6548ca5f130a1
2022-04-06 21:02:55 -07:00
Adam Retter
9d3d5e72e0
Retrieve ZLib from the archive (#9790) 2022-04-04 18:07:30 -07:00
Akanksha Mahajan
45770b5d23 Fix some errors in async prefetching in FilePrefetchBuffer (#9734)
Summary:
In ReadOption `async_io` which prefetches the data asynchronously, db_bench and db_stress runs were failing  because wrong data was prefetched which resulted in Error: Checksum mismatched. Wrong data was copied because capacity was less than actual size needed. It has been fixed in this PR.

Since there are two separate methods for async and sync prefetching, these changes are in async prefetching methods and any changes would not effect normal prefetching. I ran the regressions to make sure normal prefetching is fine.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9734

Test Plan:
1. CircleCI jobs

2.  Ran db_bench
```
. /db_bench -use_existing_db=true
-db=/tmp/prefix_scan_prefetch_main -benchmarks="seekrandom" -key_size=32
-value_size=512 -num=5000000 -use_direct_reads=true -seek_nexts=327680
-duration=120 -ops_between_duration_checks=1 -async_io=1 -adaptive_readahead=1

```
3. Ran db_stress test
```
export CRASH_TEST_EXT_ARGS=" --async_io=1 --adaptive_readahead=1"
make crash_test -j
```

4. Run regressions for async_io disabled.

Old flow without any async changes:
```
./db_bench -use_existing_db=true -db=/tmp/prefix_scan_prefetch_main -benchmarks="seekrandom" -key_size=32 -value_size=512 -num=5000000 -use_direct_reads=true -seek_nexts=327680 -duration=120 -ops_between_duration_checks=1
Initializing RocksDB Options from the specified file
Initializing RocksDB Options from command-line flags
RocksDB:    version 7.0
Date:       Thu Mar 17 13:11:34 2022
CPU:        24 * Intel Core Processor (Broadwell)
CPUCache:   16384 KB
Keys:       32 bytes each (+ 0 bytes user-defined timestamp)
Values:     512 bytes each (256 bytes after compression)
Entries:    5000000
Prefix:    0 bytes
Keys per prefix:    0
RawSize:    2594.0 MB (estimated)
FileSize:   1373.3 MB (estimated)
Write rate: 0 bytes/second
Read rate: 0 ops/second
Compression: Snappy
Compression sampling rate: 0
Memtablerep: SkipListFactory
Perf Level: 1
------------------------------------------------
DB path: [/tmp/prefix_scan_prefetch_main]
seekrandom   :  483618.390 micros/op 2 ops/sec;  338.9 MB/s (249 of 249 found)
```

With async prefetching changes and async_io disabled to make sure in normal prefetching there is no regression.
 ```
 ./db_bench -use_existing_db=true -db=/tmp/prefix_scan_prefetch_main -benchmarks="seekrandom" -key_size=32 -value_size=512 -num=5000000 -use_direct_reads=true -seek_nexts=327680 -duration=120 -ops_between_duration_checks=1 --async_io=0
Initializing RocksDB Options from the specified file
Initializing RocksDB Options from command-line flags
RocksDB:    version 7.1
Date:       Wed Mar 23 15:56:37 2022
CPU:        24 * Intel Core Processor (Broadwell)
CPUCache:   16384 KB
Keys:       32 bytes each (+ 0 bytes user-defined timestamp)
Values:     512 bytes each (256 bytes after compression)
Entries:    5000000
Prefix:    0 bytes
Keys per prefix:    0
RawSize:    2594.0 MB (estimated)
FileSize:   1373.3 MB (estimated)
Write rate: 0 bytes/second
Read rate: 0 ops/second
Compression: Snappy
Compression sampling rate: 0
Memtablerep: SkipListFactory
Perf Level: 1
------------------------------------------------
DB path: [/tmp/prefix_scan_prefetch_main]
seekrandom   :  481819.816 micros/op 2 ops/sec;  340.2 MB/s (250 of 250 found)
```

Reviewed By: riversand963

Differential Revision: D35058471

Pulled By: akankshamahajan15

fbshipit-source-id: 9233a1e6d97cea0c7a8111bfb9e8ac3251c341ce
2022-03-25 18:36:49 -07:00
Peter Dillinger
4411488585 Tidy up HISTORY.md for 7.1.0 2022-03-23 16:38:06 -07:00
Peter Dillinger
8cc712c0eb Revise history of 7.1.0 for patch 2022-03-23 15:42:35 -07:00
Peter Dillinger
62b71b3094 Fix a major performance bug in 7.0 re: filter compatibility (#9736)
Summary:
Bloom filters generated by pre-7.0 releases are not read by
7.0.x releases (and vice-versa) due to changes to FilterPolicy::Name()
in https://github.com/facebook/rocksdb/issues/9590. This can severely impact read performance and read I/O on
upgrade or downgrade with existing DB, but not data correctness.

To fix, we go back using the old, unified name in SST metadata but (for
a while anyway) recognize the aliases that could be generated by early
7.0.x releases. This unfortunately requires a public API change to avoid
interfering with all the good changes from https://github.com/facebook/rocksdb/issues/9590, but the API change
only affects users with custom FilterPolicy, which should be very few.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9736

Test Plan:
manual

Generate DBs with
```
./db_bench.7.0 -db=/dev/shm/rocksdb.7.0 -bloom_bits=10 -cache_index_and_filter_blocks=1 -benchmarks=fillrandom -num=10000000 -compaction_style=2 -fifo_compaction_max_table_files_size_mb=10000 -fifo_compaction_allow_compaction=0
```
and similar. Compare with
```
for IMPL in 6.29 7.0 fixed; do for DB in 6.29 7.0 fixed; do echo "Testing $IMPL on $DB:"; ./db_bench.$IMPL -db=/dev/shm/rocksdb.$DB -use_existing_db -readonly -bloom_bits=10 -benchmarks=readrandom -num=10000000 -compaction_style=2 -fifo_compaction_max_table_files_size_mb=10000 -fifo_compaction_allow_compaction=0 -duration=10 2>&1 | grep micros/op; done; done
```

Results:
```
Testing 6.29 on 6.29:
readrandom   :      34.381 micros/op 29085 ops/sec;    3.2 MB/s (291999 of 291999 found)
Testing 6.29 on 7.0:
readrandom   :     190.443 micros/op 5249 ops/sec;    0.6 MB/s (52999 of 52999 found)
Testing 6.29 on fixed:
readrandom   :      40.148 micros/op 24907 ops/sec;    2.8 MB/s (249999 of 249999 found)
Testing 7.0 on 6.29:
readrandom   :     229.430 micros/op 4357 ops/sec;    0.5 MB/s (43999 of 43999 found)
Testing 7.0 on 7.0:
readrandom   :      33.348 micros/op 29986 ops/sec;    3.3 MB/s (299999 of 299999 found)
Testing 7.0 on fixed:
readrandom   :     152.734 micros/op 6546 ops/sec;    0.7 MB/s (65999 of 65999 found)
Testing fixed on 6.29:
readrandom   :      32.024 micros/op 31224 ops/sec;    3.5 MB/s (312999 of 312999 found)
Testing fixed on 7.0:
readrandom   :      33.990 micros/op 29390 ops/sec;    3.3 MB/s (294999 of 294999 found)
Testing fixed on fixed:
readrandom   :      28.714 micros/op 34825 ops/sec;    3.9 MB/s (348999 of 348999 found)
```

Just paying attention to order of magnitude of ops/sec (short test
durations, lots of noise), it's clear that with the fix we can read <= 6.29
& >= 7.0 at full speed, where neither 6.29 nor 7.0 can on both. And 6.29
release can properly read fixed DB at full speed.

Reviewed By: siying, ajkr

Differential Revision: D35057844

Pulled By: pdillinger

fbshipit-source-id: a46893a6af4bf084375ebe4728066d00eb08f050
2022-03-23 15:00:14 -07:00
54 changed files with 846 additions and 281 deletions

View File

@ -2,7 +2,6 @@ version: 2.1
orbs:
win: circleci/windows@2.4.0
slack: circleci/slack@3.4.2
aliases:
- &notify-on-main-failure
@ -57,7 +56,6 @@ commands:
post-steps:
steps:
- slack/status: *notify-on-main-failure
- store_test_results: # store test result if there's any
path: /tmp/test-results
- store_artifacts: # store LOG for debugging if there's any

View File

@ -1,8 +1,18 @@
# Rocksdb Change Log
## 7.1.0 (03/21/2022)
### Public API changes
* Add DB::OpenAndTrimHistory API. This API will open DB and trim data to the timestamp specified by trim_ts (The data with timestamp larger than specified trim bound will be removed). This API should only be used at a timestamp-enabled column families recovery. If the column family doesn't have timestamp enabled, this API won't trim any data on that column family. This API is not compatible with avoid_flush_during_recovery option.
## 7.1.2 (04/19/2022)
### Bug Fixes
* Fixed bug which caused rocksdb failure in the situation when rocksdb was accessible using UNC path
* Fixed a race condition when 2PC is disabled and WAL tracking in the MANIFEST is enabled. The race condition is between two background flush threads trying to install flush results, causing a WAL deletion not tracked in the MANIFEST. A future DB open may fail.
* Fixed a heap use-after-free race with DropColumnFamily.
* Fixed a bug that `rocksdb.read.block.compaction.micros` cannot track compaction stats (#9722).
* Fixed `file_type`, `relative_filename` and `directory` fields returned by `GetLiveFilesMetaData()`, which were added in inheriting from `FileStorageInfo`.
* Fixed a bug affecting `track_and_verify_wals_in_manifest`. Without the fix, application may see "open error: Corruption: Missing WAL with log number" while trying to open the db. The corruption is a false alarm but prevents DB open (#9766).
## 7.1.1 (04/07/2022)
### Bug Fixes
* Fix segfault in FilePrefetchBuffer with async_io as it doesn't wait for pending jobs to complete on destruction.
## 7.1.0 (03/23/2022)
### New Features
* Allow WriteBatchWithIndex to index a WriteBatch that includes keys with user-defined timestamps. The index itself does not have timestamp.
* Add support for user-defined timestamps to write-committed transaction without API change. The `TransactionDB` layer APIs do not allow timestamps because we require that all user-defined-timestamps-aware operations go through the `Transaction` APIs.
@ -13,10 +23,12 @@
* Experimental support for async_io in ReadOptions which is used by FilePrefetchBuffer to prefetch some of the data asynchronously, if reads are sequential and auto readahead is enabled by rocksdb internally.
### Bug Fixes
* Fixed a major performance bug in which Bloom filters generated by pre-7.0 releases are not read by early 7.0.x releases (and vice-versa) due to changes to FilterPolicy::Name() in #9590. This can severely impact read performance and read I/O on upgrade or downgrade with existing DB, but not data correctness.
* Fixed a data race on `versions_` between `DBImpl::ResumeImpl()` and threads waiting for recovery to complete (#9496)
* Fixed a bug caused by race among flush, incoming writes and taking snapshots. Queries to snapshots created with these race condition can return incorrect result, e.g. resurfacing deleted data.
* Fixed a bug that DB flush uses `options.compression` even `options.compression_per_level` is set.
* Fixed a bug that DisableManualCompaction may assert when disable an unscheduled manual compaction.
* Fix a race condition when cancel manual compaction with `DisableManualCompaction`. Also DB close can cancel the manual compaction thread.
* Fixed a potential timer crash when open close DB concurrently.
* Fixed a race condition for `alive_log_files_` in non-two-write-queues mode. The race is between the write_thread_ in WriteToWAL() and another thread executing `FindObsoleteFiles()`. The race condition will be caught if `__glibcxx_requires_nonempty` is enabled.
* Fixed a bug that `Iterator::Refresh()` reads stale keys after DeleteRange() performed.
@ -25,12 +37,11 @@
* Fixed a race condition when mmaping a WritableFile on POSIX.
### Public API changes
* Remove BlockBasedTableOptions.hash_index_allow_collision which already takes no effect.
* Added pure virtual FilterPolicy::CompatibilityName(), which is needed for fixing major performance bug involving FilterPolicy naming in SST metadata without affecting Customizable aspect of FilterPolicy. This change only affects those with their own custom or wrapper FilterPolicy classes.
* `options.compression_per_level` is dynamically changeable with `SetOptions()`.
* Added `WriteOptions::rate_limiter_priority`. When set to something other than `Env::IO_TOTAL`, the internal rate limiter (`DBOptions::rate_limiter`) will be charged at the specified priority for writes associated with the API to which the `WriteOptions` was provided. Currently the support covers automatic WAL flushes, which happen during live updates (`Put()`, `Write()`, `Delete()`, etc.) when `WriteOptions::disableWAL == false` and `DBOptions::manual_wal_flush == false`.
### Bug Fixes
* Fix a race condition when cancel manual compaction with `DisableManualCompaction`. Also DB close can cancel the manual compaction thread.
* Add DB::OpenAndTrimHistory API. This API will open DB and trim data to the timestamp specified by trim_ts (The data with timestamp larger than specified trim bound will be removed). This API should only be used at a timestamp-enabled column families recovery. If the column family doesn't have timestamp enabled, this API won't trim any data on that column family. This API is not compatible with avoid_flush_during_recovery option.
* Remove BlockBasedTableOptions.hash_index_allow_collision which already takes no effect.
## 7.0.0 (02/20/2022)
### Bug Fixes

View File

@ -2044,8 +2044,8 @@ ROCKSDB_JAVADOCS_JAR = rocksdbjni-$(ROCKSDB_JAVA_VERSION)-javadoc.jar
ROCKSDB_SOURCES_JAR = rocksdbjni-$(ROCKSDB_JAVA_VERSION)-sources.jar
SHA256_CMD = sha256sum
ZLIB_VER ?= 1.2.11
ZLIB_SHA256 ?= c3e5e9fdd5004dcb542feda5ee4f0ff0744628baf8ed2dd5d66f8ca1197cb1a1
ZLIB_VER ?= 1.2.12
ZLIB_SHA256 ?= 91844808532e5ce316b3c010929493c0244f3d37593afd6de04f71821d5136d9
ZLIB_DOWNLOAD_BASE ?= http://zlib.net
BZIP2_VER ?= 1.0.8
BZIP2_SHA256 ?= ab5a03176ee106d3f0fa90e381da478ddae405918153cca248e682cd0c4a2269

View File

@ -3752,6 +3752,9 @@ rocksdb_filterpolicy_t* rocksdb_filterpolicy_create_bloom_format(
const FilterPolicy* rep_;
~Wrapper() override { delete rep_; }
const char* Name() const override { return rep_->Name(); }
const char* CompatibilityName() const override {
return rep_->CompatibilityName();
}
// No need to override GetFilterBitsBuilder if this one is overridden
ROCKSDB_NAMESPACE::FilterBitsBuilder* GetBuilderWithContext(
const ROCKSDB_NAMESPACE::FilterBuildingContext& context)
@ -3789,6 +3792,9 @@ rocksdb_filterpolicy_t* rocksdb_filterpolicy_create_ribbon_format(
const FilterPolicy* rep_;
~Wrapper() override { delete rep_; }
const char* Name() const override { return rep_->Name(); }
const char* CompatibilityName() const override {
return rep_->CompatibilityName();
}
ROCKSDB_NAMESPACE::FilterBitsBuilder* GetBuilderWithContext(
const ROCKSDB_NAMESPACE::FilterBuildingContext& context)
const override {

View File

@ -1562,20 +1562,6 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
return new_cfd;
}
// REQUIRES: DB mutex held
void ColumnFamilySet::FreeDeadColumnFamilies() {
autovector<ColumnFamilyData*> to_delete;
for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
to_delete.push_back(cfd);
}
}
for (auto cfd : to_delete) {
// this is very rare, so it's not a problem that we do it under a mutex
delete cfd;
}
}
// under a DB mutex AND from a write thread
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
auto cfd_iter = column_family_data_.find(cfd->GetID());

View File

@ -520,9 +520,10 @@ class ColumnFamilyData {
ThreadLocalPtr* TEST_GetLocalSV() { return local_sv_.get(); }
WriteBufferManager* write_buffer_mgr() { return write_buffer_manager_; }
static const uint32_t kDummyColumnFamilyDataId;
private:
friend class ColumnFamilySet;
static const uint32_t kDummyColumnFamilyDataId;
ColumnFamilyData(uint32_t id, const std::string& name,
Version* dummy_versions, Cache* table_cache,
WriteBufferManager* write_buffer_manager,
@ -628,10 +629,8 @@ class ColumnFamilyData {
// held and it needs to be executed from the write thread. SetDropped() also
// guarantees that it will be called only from single-threaded LogAndApply(),
// but this condition is not that important.
// * Iteration -- hold DB mutex, but you can release it in the body of
// iteration. If you release DB mutex in body, reference the column
// family before the mutex and unreference after you unlock, since the column
// family might get dropped when the DB mutex is released
// * Iteration -- hold DB mutex. If you want to release the DB mutex in the
// body of the iteration, wrap in a RefedColumnFamilySet.
// * GetDefault() -- thread safe
// * GetColumnFamily() -- either inside of DB mutex or from a write thread
// * GetNextColumnFamilyID(), GetMaxColumnFamily(), UpdateMaxColumnFamily(),
@ -643,17 +642,12 @@ class ColumnFamilySet {
public:
explicit iterator(ColumnFamilyData* cfd)
: current_(cfd) {}
// NOTE: minimum operators for for-loop iteration
iterator& operator++() {
// dropped column families might still be included in this iteration
// (we're only removing them when client drops the last reference to the
// column family).
// dummy is never dead, so this will never be infinite
do {
current_ = current_->next_;
} while (current_->refs_.load(std::memory_order_relaxed) == 0);
current_ = current_->next_;
return *this;
}
bool operator!=(const iterator& other) {
bool operator!=(const iterator& other) const {
return this->current_ != other.current_;
}
ColumnFamilyData* operator*() { return current_; }
@ -692,10 +686,6 @@ class ColumnFamilySet {
iterator begin() { return iterator(dummy_cfd_->next_); }
iterator end() { return iterator(dummy_cfd_); }
// REQUIRES: DB mutex held
// Don't call while iterating over ColumnFamilySet
void FreeDeadColumnFamilies();
Cache* get_table_cache() { return table_cache_; }
WriteBufferManager* write_buffer_manager() { return write_buffer_manager_; }
@ -738,6 +728,55 @@ class ColumnFamilySet {
std::string db_session_id_;
};
// A wrapper for ColumnFamilySet that supports releasing DB mutex during each
// iteration over the iterator, because the cfd is Refed and Unrefed during
// each iteration to prevent concurrent CF drop from destroying it (until
// Unref).
class RefedColumnFamilySet {
public:
explicit RefedColumnFamilySet(ColumnFamilySet* cfs) : wrapped_(cfs) {}
class iterator {
public:
explicit iterator(ColumnFamilySet::iterator wrapped) : wrapped_(wrapped) {
MaybeRef(*wrapped_);
}
~iterator() { MaybeUnref(*wrapped_); }
inline void MaybeRef(ColumnFamilyData* cfd) {
if (cfd->GetID() != ColumnFamilyData::kDummyColumnFamilyDataId) {
cfd->Ref();
}
}
inline void MaybeUnref(ColumnFamilyData* cfd) {
if (cfd->GetID() != ColumnFamilyData::kDummyColumnFamilyDataId) {
cfd->UnrefAndTryDelete();
}
}
// NOTE: minimum operators for for-loop iteration
inline iterator& operator++() {
ColumnFamilyData* old = *wrapped_;
++wrapped_;
// Can only unref & potentially free cfd after accessing its next_
MaybeUnref(old);
MaybeRef(*wrapped_);
return *this;
}
inline bool operator!=(const iterator& other) const {
return this->wrapped_ != other.wrapped_;
}
inline ColumnFamilyData* operator*() { return *wrapped_; }
private:
ColumnFamilySet::iterator wrapped_;
};
iterator begin() { return iterator(wrapped_->begin()); }
iterator end() { return iterator(wrapped_->end()); }
private:
ColumnFamilySet* wrapped_;
};
// We use ColumnFamilyMemTablesImpl to provide WriteBatch a way to access
// memtables of different column families (specified by ID in the write batch)
class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables {

View File

@ -1638,9 +1638,15 @@ class LevelAndStyleCustomFilterPolicy : public FilterPolicy {
policy_l0_other_(NewBloomFilterPolicy(bpk_l0_other)),
policy_otherwise_(NewBloomFilterPolicy(bpk_otherwise)) {}
const char* Name() const override {
return "LevelAndStyleCustomFilterPolicy";
}
// OK to use built-in policy name because we are deferring to a
// built-in builder. We aren't changing the serialized format.
const char* Name() const override { return policy_fifo_->Name(); }
const char* CompatibilityName() const override {
return policy_fifo_->CompatibilityName();
}
FilterBitsBuilder* GetBuilderWithContext(
const FilterBuildingContext& context) const override {

View File

@ -45,17 +45,15 @@ Status DBImpl::FlushForGetLiveFiles() {
}
mutex_.Lock();
} else {
for (auto cfd : *versions_->GetColumnFamilySet()) {
for (auto cfd : versions_->GetRefedColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
cfd->Ref();
mutex_.Unlock();
status = FlushMemTable(cfd, FlushOptions(), FlushReason::kGetLiveFiles);
TEST_SYNC_POINT("DBImpl::GetLiveFiles:1");
TEST_SYNC_POINT("DBImpl::GetLiveFiles:2");
mutex_.Lock();
cfd->UnrefAndTryDelete();
if (!status.ok() && !status.IsColumnFamilyDropped()) {
break;
} else if (status.IsColumnFamilyDropped()) {
@ -63,7 +61,6 @@ Status DBImpl::FlushForGetLiveFiles() {
}
}
}
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
return status;
}

View File

@ -2271,7 +2271,7 @@ TEST_P(DBAtomicFlushTest, ManualFlushUnder2PC) {
// The recovered min log number with prepared data should be non-zero.
// In 2pc mode, MinLogNumberToKeep returns the
// VersionSet::min_log_number_to_keep_2pc recovered from MANIFEST, if it's 0,
// VersionSet::min_log_number_to_keep recovered from MANIFEST, if it's 0,
// it means atomic flush didn't write the min_log_number_to_keep to MANIFEST.
cfs.push_back(kDefaultColumnFamilyName);
ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));

View File

@ -375,15 +375,12 @@ Status DBImpl::ResumeImpl(DBRecoverContext context) {
s = AtomicFlushMemTables(cfds, flush_opts, context.flush_reason);
mutex_.Lock();
} else {
for (auto cfd : *versions_->GetColumnFamilySet()) {
for (auto cfd : versions_->GetRefedColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
cfd->Ref();
mutex_.Unlock();
InstrumentedMutexUnlock u(&mutex_);
s = FlushMemTable(cfd, flush_opts, context.flush_reason);
mutex_.Lock();
cfd->UnrefAndTryDelete();
if (!s.ok()) {
break;
}
@ -495,18 +492,14 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
s.PermitUncheckedError(); //**TODO: What to do on error?
mutex_.Lock();
} else {
for (auto cfd : *versions_->GetColumnFamilySet()) {
for (auto cfd : versions_->GetRefedColumnFamilySet()) {
if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) {
cfd->Ref();
mutex_.Unlock();
InstrumentedMutexUnlock u(&mutex_);
Status s = FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown);
s.PermitUncheckedError(); //**TODO: What to do on error?
mutex_.Lock();
cfd->UnrefAndTryDelete();
}
}
}
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
}
shutting_down_.store(true, std::memory_order_release);
@ -969,18 +962,13 @@ void DBImpl::DumpStats() {
TEST_SYNC_POINT("DBImpl::DumpStats:StartRunning");
{
InstrumentedMutexLock l(&mutex_);
for (auto cfd : *versions_->GetColumnFamilySet()) {
for (auto cfd : versions_->GetRefedColumnFamilySet()) {
if (cfd->initialized()) {
// Release DB mutex for gathering cache entry stats. Pass over all
// column families for this first so that other stats are dumped
// near-atomically.
// Get a ref before unlocking
cfd->Ref();
{
InstrumentedMutexUnlock u(&mutex_);
cfd->internal_stats()->CollectCacheEntryStats(/*foreground=*/false);
}
cfd->UnrefAndTryDelete();
InstrumentedMutexUnlock u(&mutex_);
cfd->internal_stats()->CollectCacheEntryStats(/*foreground=*/false);
}
}
@ -1900,6 +1888,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
return s;
}
}
TEST_SYNC_POINT("DBImpl::GetImpl:PostMemTableGet:0");
TEST_SYNC_POINT("DBImpl::GetImpl:PostMemTableGet:1");
PinnedIteratorsManager pinned_iters_mgr;
if (!done) {
PERF_TIMER_GUARD(get_from_output_files_time);
@ -1917,8 +1907,6 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
{
PERF_TIMER_GUARD(get_post_process_time);
ReturnAndCleanupSuperVersion(cfd, sv);
RecordTick(stats_, NUMBER_KEYS_READ);
size_t size = 0;
if (s.ok()) {
@ -1945,6 +1933,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
PERF_COUNTER_ADD(get_read_bytes, size);
}
RecordInHistogram(stats_, BYTES_PER_READ, size);
ReturnAndCleanupSuperVersion(cfd, sv);
}
return s;
}
@ -3490,15 +3480,13 @@ bool DBImpl::GetAggregatedIntProperty(const Slice& property,
// Needs mutex to protect the list of column families.
InstrumentedMutexLock l(&mutex_);
uint64_t value;
for (auto* cfd : *versions_->GetColumnFamilySet()) {
for (auto* cfd : versions_->GetRefedColumnFamilySet()) {
if (!cfd->initialized()) {
continue;
}
cfd->Ref();
ret = GetIntPropertyInternal(cfd, *property_info, true, &value);
// GetIntPropertyInternal may release db mutex and re-acquire it.
mutex_.AssertHeld();
cfd->UnrefAndTryDelete();
if (ret) {
sum += value;
} else {
@ -5089,6 +5077,7 @@ Status DBImpl::VerifyChecksumInternal(const ReadOptions& read_options,
}
}
// TODO: simplify using GetRefedColumnFamilySet?
std::vector<ColumnFamilyData*> cfd_list;
{
InstrumentedMutexLock l(&mutex_);

View File

@ -2973,8 +2973,6 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
bg_bottom_compaction_scheduled_--;
}
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
// See if there's more work to be done
MaybeScheduleFlushOrCompaction();

View File

@ -23,11 +23,7 @@
namespace ROCKSDB_NAMESPACE {
uint64_t DBImpl::MinLogNumberToKeep() {
if (allow_2pc()) {
return versions_->min_log_number_to_keep_2pc();
} else {
return versions_->MinLogNumberWithUnflushedData();
}
return versions_->min_log_number_to_keep();
}
uint64_t DBImpl::MinObsoleteSstNumberToKeep() {
@ -224,7 +220,6 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
}
// Add log files in wal_dir
if (!immutable_db_options_.IsWalDirSameAsDBPath(dbname_)) {
std::vector<std::string> log_files;
Status s = env_->GetChildren(immutable_db_options_.wal_dir, &log_files);
@ -234,6 +229,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
log_file, immutable_db_options_.wal_dir);
}
}
// Add info log files in db_log_dir
if (!immutable_db_options_.db_log_dir.empty() &&
immutable_db_options_.db_log_dir != dbname_) {

View File

@ -866,6 +866,11 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
bool flushed = false;
uint64_t corrupted_wal_number = kMaxSequenceNumber;
uint64_t min_wal_number = MinLogNumberToKeep();
if (!allow_2pc()) {
// In non-2pc mode, we skip WALs that do not back unflushed data.
min_wal_number =
std::max(min_wal_number, versions_->MinLogNumberWithUnflushedData());
}
for (auto wal_number : wal_numbers) {
if (wal_number < min_wal_number) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
@ -1270,9 +1275,16 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
}
std::unique_ptr<VersionEdit> wal_deletion;
if (immutable_db_options_.track_and_verify_wals_in_manifest) {
wal_deletion.reset(new VersionEdit);
wal_deletion->DeleteWalsBefore(max_wal_number + 1);
if (flushed) {
wal_deletion = std::make_unique<VersionEdit>();
if (immutable_db_options_.track_and_verify_wals_in_manifest) {
wal_deletion->DeleteWalsBefore(max_wal_number + 1);
}
if (!allow_2pc()) {
// In non-2pc mode, flushing the memtables of the column families
// means we can advance min_log_number_to_keep.
wal_deletion->SetMinLogNumberToKeep(max_wal_number + 1);
}
edit_lists.back().push_back(wal_deletion.get());
}
@ -1351,7 +1363,14 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
// FindObsoleteFiles()
total_log_size_ = 0;
log_empty_ = false;
uint64_t min_wal_with_unflushed_data =
versions_->MinLogNumberWithUnflushedData();
for (auto wal_number : wal_numbers) {
if (!allow_2pc() && wal_number < min_wal_with_unflushed_data) {
// In non-2pc mode, the WAL files not backing unflushed data are not
// alive, thus should not be added to the alive_log_files_.
continue;
}
// We preallocate space for wals, but then after a crash and restart, those
// preallocated space are not needed anymore. It is likely only the last
// log has such preallocated space, so we only truncate for the last log.
@ -1952,10 +1971,10 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
"DB::Open() failed --- Unable to persist Options file",
persist_options_status.ToString());
}
} else {
}
if (!s.ok()) {
ROCKS_LOG_WARN(impl->immutable_db_options_.info_log,
"Persisting Option File error: %s",
persist_options_status.ToString().c_str());
"DB::Open() failed: %s", s.ToString().c_str());
}
if (s.ok()) {
s = impl->StartPeriodicWorkScheduler();

View File

@ -47,7 +47,7 @@ class DBMergeOperandTest : public DBTestBase {
: DBTestBase("db_merge_operand_test", /*env_do_fsync=*/true) {}
};
TEST_F(DBMergeOperandTest, MergeOperandReadAfterFreeBug) {
TEST_F(DBMergeOperandTest, CacheEvictedMergeOperandReadAfterFreeBug) {
// There was a bug of reading merge operands after they are mistakely freed
// in DB::GetMergeOperands, which is surfaced by cache full.
// See PR#9507 for more.
@ -86,6 +86,42 @@ TEST_F(DBMergeOperandTest, MergeOperandReadAfterFreeBug) {
ASSERT_EQ(values[3].ToString(), "v4");
}
TEST_F(DBMergeOperandTest, FlushedMergeOperandReadAfterFreeBug) {
// Repro for a bug where a memtable containing a merge operand could be
// deleted before the merge operand was saved to the result.
auto options = CurrentOptions();
options.merge_operator = MergeOperators::CreateStringAppendOperator();
Reopen(options);
ASSERT_OK(Merge("key", "value"));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::GetImpl:PostMemTableGet:0",
"DBMergeOperandTest::FlushedMergeOperandReadAfterFreeBug:PreFlush"},
{"DBMergeOperandTest::FlushedMergeOperandReadAfterFreeBug:PostFlush",
"DBImpl::GetImpl:PostMemTableGet:1"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
auto flush_thread = port::Thread([&]() {
TEST_SYNC_POINT(
"DBMergeOperandTest::FlushedMergeOperandReadAfterFreeBug:PreFlush");
ASSERT_OK(Flush());
TEST_SYNC_POINT(
"DBMergeOperandTest::FlushedMergeOperandReadAfterFreeBug:PostFlush");
});
PinnableSlice value;
GetMergeOperandsOptions merge_operands_info;
merge_operands_info.expected_max_number_of_operands = 1;
int number_of_operands;
ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
"key", &value, &merge_operands_info,
&number_of_operands));
ASSERT_EQ(1, number_of_operands);
flush_thread.join();
}
TEST_F(DBMergeOperandTest, GetMergeOperandsBasic) {
Options options;
options.create_if_missing = true;

View File

@ -1078,6 +1078,11 @@ void CheckColumnFamilyMeta(
ASSERT_LE(file_meta_from_cf.file_creation_time, end_time);
ASSERT_GE(file_meta_from_cf.oldest_ancester_time, start_time);
ASSERT_LE(file_meta_from_cf.oldest_ancester_time, end_time);
// More from FileStorageInfo
ASSERT_EQ(file_meta_from_cf.file_type, kTableFile);
ASSERT_EQ(file_meta_from_cf.name,
"/" + file_meta_from_cf.relative_filename);
ASSERT_EQ(file_meta_from_cf.directory, file_meta_from_cf.db_path);
}
ASSERT_EQ(level_meta_from_cf.size, level_size);
@ -1122,6 +1127,11 @@ void CheckLiveFilesMeta(
ASSERT_EQ(meta.oldest_blob_file_number,
expected_meta.oldest_blob_file_number);
// More from FileStorageInfo
ASSERT_EQ(meta.file_type, kTableFile);
ASSERT_EQ(meta.name, "/" + meta.relative_filename);
ASSERT_EQ(meta.directory, meta.db_path);
++i;
}
}

View File

@ -1491,6 +1491,93 @@ TEST_F(DBWALTest, kPointInTimeRecoveryCFConsistency) {
ASSERT_NOK(TryReopenWithColumnFamilies({"default", "one", "two"}, options));
}
TEST_F(DBWALTest, RaceInstallFlushResultsWithWalObsoletion) {
Options options = CurrentOptions();
options.env = env_;
options.track_and_verify_wals_in_manifest = true;
// The following make sure there are two bg flush threads.
options.max_background_jobs = 8;
const std::string cf1_name("cf1");
CreateAndReopenWithCF({cf1_name}, options);
assert(handles_.size() == 2);
{
dbfull()->TEST_LockMutex();
ASSERT_LE(2, dbfull()->GetBGJobLimits().max_flushes);
dbfull()->TEST_UnlockMutex();
}
ASSERT_OK(dbfull()->PauseBackgroundWork());
ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", "value"));
ASSERT_OK(db_->Put(WriteOptions(), "foo", "value"));
ASSERT_OK(dbfull()->TEST_FlushMemTable(false, true, handles_[1]));
ASSERT_OK(db_->Put(WriteOptions(), "foo", "value"));
ASSERT_OK(dbfull()->TEST_FlushMemTable(false, true, handles_[0]));
bool called = false;
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
// This callback will be called when the first bg flush thread reaches the
// point before entering the MANIFEST write queue after flushing the SST
// file.
// The purpose of the sync points here is to ensure both bg flush threads
// finish computing `min_wal_number_to_keep` before any of them updates the
// `log_number` for the column family that's being flushed.
SyncPoint::GetInstance()->SetCallBack(
"MemTableList::TryInstallMemtableFlushResults:AfterComputeMinWalToKeep",
[&](void* /*arg*/) {
dbfull()->mutex()->AssertHeld();
if (!called) {
// We are the first bg flush thread in the MANIFEST write queue.
// We set up the dependency between sync points for two threads that
// will be executing the same code.
// For the interleaving of events, see
// https://github.com/facebook/rocksdb/pull/9715.
// bg flush thread1 will release the db mutex while in the MANIFEST
// write queue. In the meantime, bg flush thread2 locks db mutex and
// computes the min_wal_number_to_keep (before thread1 writes to
// MANIFEST thus before cf1->log_number is updated). Bg thread2 joins
// the MANIFEST write queue afterwards and bg flush thread1 proceeds
// with writing to MANIFEST.
called = true;
SyncPoint::GetInstance()->LoadDependency({
{"VersionSet::LogAndApply:WriteManifestStart",
"DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2"},
{"DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2",
"VersionSet::LogAndApply:WriteManifest"},
});
} else {
// The other bg flush thread has already been in the MANIFEST write
// queue, and we are after.
TEST_SYNC_POINT(
"DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2");
}
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(dbfull()->ContinueBackgroundWork());
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0]));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
ASSERT_TRUE(called);
Close();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
DB* db1 = nullptr;
Status s = DB::OpenForReadOnly(options, dbname_, &db1);
ASSERT_OK(s);
assert(db1);
delete db1;
}
// Test scope:
// - We expect to open data store under all circumstances
// - We expect only data upto the point where the first error was encountered

View File

@ -95,8 +95,9 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished(
jwriter << "cf_name" << cf_name << "job" << job_id << "event"
<< "table_file_creation"
<< "file_number" << fd.GetNumber() << "file_size"
<< fd.GetFileSize() << "file_checksum" << file_checksum
<< "file_checksum_func_name" << file_checksum_func_name;
<< fd.GetFileSize() << "file_checksum"
<< Slice(file_checksum).ToString(true) << "file_checksum_func_name"
<< file_checksum_func_name;
// table_properties
{

View File

@ -171,6 +171,69 @@ TEST_F(FileNameTest, Construction) {
ASSERT_EQ(kMetaDatabase, type);
}
TEST_F(FileNameTest, NormalizePath) {
// No leading slash
const std::string sep = std::string(1, kFilePathSeparator);
std::string expected = "FOLDER" + sep + "filename.ext";
std::string given = "FOLDER" + sep + "filename.ext";
ASSERT_EQ(expected, NormalizePath(given));
// Two chars /a
expected = sep + "a";
given = expected;
ASSERT_EQ(expected, NormalizePath(given));
// Two chars a/
expected = "a" + sep;
given = expected;
ASSERT_EQ(expected, NormalizePath(given));
// Server only
expected = sep + sep + "a";
given = expected;
ASSERT_EQ(expected, NormalizePath(given));
// Two slashes after character
expected = "a" + sep;
given = "a" + sep + sep;
ASSERT_EQ(expected, NormalizePath(given));
// slash only /
expected = sep;
given = expected;
ASSERT_EQ(expected, NormalizePath(given));
// UNC only //
expected = sep;
given = sep + sep;
ASSERT_EQ(expected, NormalizePath(given));
// 3 slashesy //
expected = sep + sep;
given = sep + sep + sep;
ASSERT_EQ(expected, NormalizePath(given));
// 3 slashes //
expected = sep + sep + "a" + sep;
given = sep + sep + sep + "a" + sep;
ASSERT_EQ(expected, NormalizePath(given));
// 2 separators in the middle
expected = "a" + sep + "b";
given = "a" + sep + sep + "b";
ASSERT_EQ(expected, NormalizePath(given));
// UNC with duplicate slashes
expected = sep + sep + "SERVER" + sep + "a" + sep + "b" + sep + "c";
given = sep + sep + "SERVER" + sep + "a" + sep + sep + "b" + sep + "c";
ASSERT_EQ(expected, NormalizePath(given));
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

View File

@ -489,8 +489,8 @@ Status MemTableList::TryInstallMemtableFlushResults(
// TODO(myabandeh): Not sure how batch_count could be 0 here.
if (batch_count > 0) {
uint64_t min_wal_number_to_keep = 0;
assert(edit_list.size() > 0);
if (vset->db_options()->allow_2pc) {
assert(edit_list.size() > 0);
// Note that if mempurge is successful, the edit_list will
// not be applicable (contains info of new min_log number to keep,
// and level 0 file path of SST file created during normal flush,
@ -499,24 +499,26 @@ Status MemTableList::TryInstallMemtableFlushResults(
min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC(
vset, *cfd, edit_list, memtables_to_flush, prep_tracker);
// We piggyback the information of earliest log file to keep in the
// We piggyback the information of earliest log file to keep in the
// manifest entry for the last file flushed.
edit_list.back()->SetMinLogNumberToKeep(min_wal_number_to_keep);
} else {
min_wal_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, *cfd, edit_list);
}
std::unique_ptr<VersionEdit> wal_deletion;
VersionEdit wal_deletion;
wal_deletion.SetMinLogNumberToKeep(min_wal_number_to_keep);
if (vset->db_options()->track_and_verify_wals_in_manifest) {
if (!vset->db_options()->allow_2pc) {
min_wal_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, *cfd, edit_list);
}
if (min_wal_number_to_keep >
vset->GetWalSet().GetMinWalNumberToKeep()) {
wal_deletion.reset(new VersionEdit);
wal_deletion->DeleteWalsBefore(min_wal_number_to_keep);
edit_list.push_back(wal_deletion.get());
wal_deletion.DeleteWalsBefore(min_wal_number_to_keep);
}
TEST_SYNC_POINT_CALLBACK(
"MemTableList::TryInstallMemtableFlushResults:"
"AfterComputeMinWalToKeep",
nullptr);
}
edit_list.push_back(&wal_deletion);
const auto manifest_write_cb = [this, cfd, batch_count, log_buffer,
to_delete, mu](const Status& status) {
@ -798,22 +800,19 @@ Status InstallMemtableAtomicFlushResults(
if (vset->db_options()->allow_2pc) {
min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC(
vset, cfds, edit_lists, mems_list, prep_tracker);
edit_lists.back().back()->SetMinLogNumberToKeep(min_wal_number_to_keep);
} else {
min_wal_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, cfds, edit_lists);
}
std::unique_ptr<VersionEdit> wal_deletion;
if (vset->db_options()->track_and_verify_wals_in_manifest) {
if (!vset->db_options()->allow_2pc) {
min_wal_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, cfds, edit_lists);
}
if (min_wal_number_to_keep > vset->GetWalSet().GetMinWalNumberToKeep()) {
wal_deletion.reset(new VersionEdit);
wal_deletion->DeleteWalsBefore(min_wal_number_to_keep);
edit_lists.back().push_back(wal_deletion.get());
++num_entries;
}
VersionEdit wal_deletion;
wal_deletion.SetMinLogNumberToKeep(min_wal_number_to_keep);
if (vset->db_options()->track_and_verify_wals_in_manifest &&
min_wal_number_to_keep > vset->GetWalSet().GetMinWalNumberToKeep()) {
wal_deletion.DeleteWalsBefore(min_wal_number_to_keep);
}
edit_lists.back().push_back(&wal_deletion);
++num_entries;
// Mark the version edits as an atomic group if the number of version edits
// exceeds 1.

View File

@ -120,6 +120,9 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
if (has_max_column_family_) {
PutVarint32Varint32(dst, kMaxColumnFamily, max_column_family_);
}
if (has_min_log_number_to_keep_) {
PutVarint32Varint64(dst, kMinLogNumberToKeep, min_log_number_to_keep_);
}
if (has_last_sequence_) {
PutVarint32Varint64(dst, kLastSequence, last_sequence_);
}

View File

@ -394,7 +394,7 @@ void VersionEditHandler::CheckIterationResult(const log::Reader& reader,
if (s->ok()) {
version_set_->GetColumnFamilySet()->UpdateMaxColumnFamily(
version_edit_params_.max_column_family_);
version_set_->MarkMinLogNumberToKeep2PC(
version_set_->MarkMinLogNumberToKeep(
version_edit_params_.min_log_number_to_keep_);
version_set_->MarkFileNumberUsed(version_edit_params_.prev_log_number_);
version_set_->MarkFileNumberUsed(version_edit_params_.log_number_);
@ -970,12 +970,11 @@ void DumpManifestHandler::CheckIterationResult(const log::Reader& reader,
fprintf(stdout,
"next_file_number %" PRIu64 " last_sequence %" PRIu64
" prev_log_number %" PRIu64 " max_column_family %" PRIu32
" min_log_number_to_keep "
"%" PRIu64 "\n",
" min_log_number_to_keep %" PRIu64 "\n",
version_set_->current_next_file_number(),
version_set_->LastSequence(), version_set_->prev_log_number(),
version_set_->column_family_set_->GetMaxColumnFamily(),
version_set_->min_log_number_to_keep_2pc());
version_set_->min_log_number_to_keep());
}
} // namespace ROCKSDB_NAMESPACE

View File

@ -1475,7 +1475,7 @@ void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) {
const uint64_t file_number = file->fd.GetNumber();
files.emplace_back(
MakeTableFileName("", file_number), file_number, file_path,
static_cast<size_t>(file->fd.GetFileSize()), file->fd.smallest_seqno,
file->fd.GetFileSize(), file->fd.smallest_seqno,
file->fd.largest_seqno, file->smallest.user_key().ToString(),
file->largest.user_key().ToString(),
file->stats.num_reads_sampled.load(std::memory_order_relaxed),
@ -4141,7 +4141,7 @@ void VersionSet::Reset() {
}
db_id_.clear();
next_file_number_.store(2);
min_log_number_to_keep_2pc_.store(0);
min_log_number_to_keep_.store(0);
manifest_file_number_ = 0;
options_file_number_ = 0;
pending_manifest_file_number_ = 0;
@ -4610,8 +4610,7 @@ Status VersionSet::ProcessManifestWrites(
}
if (last_min_log_number_to_keep != 0) {
// Should only be set in 2PC mode.
MarkMinLogNumberToKeep2PC(last_min_log_number_to_keep);
MarkMinLogNumberToKeep(last_min_log_number_to_keep);
}
for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
@ -4965,7 +4964,7 @@ Status VersionSet::Recover(
",min_log_number_to_keep is %" PRIu64 "\n",
manifest_path.c_str(), manifest_file_number_, next_file_number_.load(),
last_sequence_.load(), log_number, prev_log_number_,
column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc());
column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep());
for (auto cfd : *column_family_set_) {
if (cfd->IsDropped()) {
@ -5378,9 +5377,9 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) {
}
// Called only either from ::LogAndApply which is protected by mutex or during
// recovery which is single-threaded.
void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
if (min_log_number_to_keep_2pc_.load(std::memory_order_relaxed) < number) {
min_log_number_to_keep_2pc_.store(number, std::memory_order_relaxed);
void VersionSet::MarkMinLogNumberToKeep(uint64_t number) {
if (min_log_number_to_keep_.load(std::memory_order_relaxed) < number) {
min_log_number_to_keep_.store(number, std::memory_order_relaxed);
}
}
@ -5506,7 +5505,7 @@ Status VersionSet::WriteCurrentStateToManifest(
// min_log_number_to_keep is for the whole db, not for specific column family.
// So it does not need to be set for every column family, just need to be set once.
// Since default CF can never be dropped, we set the min_log to the default CF here.
uint64_t min_log = min_log_number_to_keep_2pc();
uint64_t min_log = min_log_number_to_keep();
if (min_log != 0) {
edit.SetMinLogNumberToKeep(min_log);
}
@ -5894,11 +5893,13 @@ void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
assert(!cfd->ioptions()->cf_paths.empty());
filemetadata.db_path = cfd->ioptions()->cf_paths.back().path;
}
filemetadata.directory = filemetadata.db_path;
const uint64_t file_number = file->fd.GetNumber();
filemetadata.name = MakeTableFileName("", file_number);
filemetadata.relative_filename = filemetadata.name.substr(1);
filemetadata.file_number = file_number;
filemetadata.level = level;
filemetadata.size = static_cast<size_t>(file->fd.GetFileSize());
filemetadata.size = file->fd.GetFileSize();
filemetadata.smallestkey = file->smallest.user_key().ToString();
filemetadata.largestkey = file->largest.user_key().ToString();
filemetadata.smallest_seqno = file->fd.smallest_seqno;

View File

@ -1128,8 +1128,8 @@ class VersionSet {
uint64_t current_next_file_number() const { return next_file_number_.load(); }
uint64_t min_log_number_to_keep_2pc() const {
return min_log_number_to_keep_2pc_.load();
uint64_t min_log_number_to_keep() const {
return min_log_number_to_keep_.load();
}
// Allocate and return a new file number
@ -1187,7 +1187,7 @@ class VersionSet {
// Mark the specified log number as deleted
// REQUIRED: this is only called during single-threaded recovery or repair, or
// from ::LogAndApply where the global mutex is held.
void MarkMinLogNumberToKeep2PC(uint64_t number);
void MarkMinLogNumberToKeep(uint64_t number);
// Return the log file number for the log file that is currently
// being compacted, or zero if there is no such log file.
@ -1196,10 +1196,12 @@ class VersionSet {
// Returns the minimum log number which still has data not flushed to any SST
// file.
// In non-2PC mode, all the log numbers smaller than this number can be safely
// deleted.
// deleted, although we still use `min_log_number_to_keep_` to determine when
// to delete a WAL file.
uint64_t MinLogNumberWithUnflushedData() const {
return PreComputeMinLogNumberWithUnflushedData(nullptr);
}
// Returns the minimum log number which still has data not flushed to any SST
// file.
// Empty column families' log number is considered to be
@ -1297,6 +1299,10 @@ class VersionSet {
uint64_t min_pending_output);
ColumnFamilySet* GetColumnFamilySet() { return column_family_set_.get(); }
RefedColumnFamilySet GetRefedColumnFamilySet() {
return RefedColumnFamilySet(GetColumnFamilySet());
}
const FileOptions& file_options() { return file_options_; }
void ChangeFileOptions(const MutableDBOptions& new_options) {
file_options_.writable_file_max_buffer_size =
@ -1399,9 +1405,8 @@ class VersionSet {
const ImmutableDBOptions* const db_options_;
std::atomic<uint64_t> next_file_number_;
// Any WAL number smaller than this should be ignored during recovery,
// and is qualified for being deleted in 2PC mode. In non-2PC mode, this
// number is ignored.
std::atomic<uint64_t> min_log_number_to_keep_2pc_ = {0};
// and is qualified for being deleted.
std::atomic<uint64_t> min_log_number_to_keep_ = {0};
uint64_t manifest_file_number_;
uint64_t options_file_number_;
uint64_t options_file_size_;

View File

@ -3424,6 +3424,7 @@ TEST_F(VersionSetTestMissingFiles, NoFileMissing) {
}
TEST_F(VersionSetTestMissingFiles, MinLogNumberToKeep2PC) {
db_options_.allow_2pc = true;
NewDB();
SstInfo sst(100, kDefaultColumnFamilyName, "a");
@ -3435,12 +3436,12 @@ TEST_F(VersionSetTestMissingFiles, MinLogNumberToKeep2PC) {
edit.AddFile(0, file_metas[0]);
edit.SetMinLogNumberToKeep(kMinWalNumberToKeep2PC);
ASSERT_OK(LogAndApplyToDefaultCF(edit));
ASSERT_EQ(versions_->min_log_number_to_keep_2pc(), kMinWalNumberToKeep2PC);
ASSERT_EQ(versions_->min_log_number_to_keep(), kMinWalNumberToKeep2PC);
for (int i = 0; i < 3; i++) {
CreateNewManifest();
ReopenDB();
ASSERT_EQ(versions_->min_log_number_to_keep_2pc(), kMinWalNumberToKeep2PC);
ASSERT_EQ(versions_->min_log_number_to_keep(), kMinWalNumberToKeep2PC);
}
}

7
env/env_test.cc vendored
View File

@ -85,6 +85,8 @@ struct Deleter {
void (*fn_)(void*);
};
extern "C" bool RocksDbIOUringEnable() { return true; }
std::unique_ptr<char, Deleter> NewAligned(const size_t size, const char ch) {
char* ptr = nullptr;
#ifdef OS_WIN
@ -1256,7 +1258,7 @@ TEST_P(EnvPosixTestWithParam, MultiRead) {
// Random Read
Random rnd(301 + attempt);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"UpdateResults:io_uring_result", [&](void* arg) {
"UpdateResults::io_uring_result", [&](void* arg) {
if (attempt > 0) {
// No failure in the first attempt.
size_t& bytes_read = *static_cast<size_t*>(arg);
@ -1326,7 +1328,7 @@ TEST_F(EnvPosixTest, MultiReadNonAlignedLargeNum) {
const int num_reads = rnd.Uniform(512) + 1;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"UpdateResults:io_uring_result", [&](void* arg) {
"UpdateResults::io_uring_result", [&](void* arg) {
if (attempt > 5) {
// Improve partial result rates in second half of the run to
// cover the case of repeated partial results.
@ -3308,7 +3310,6 @@ TEST_F(TestAsyncRead, ReadAsync) {
}
}
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

9
env/fs_posix.cc vendored
View File

@ -1045,9 +1045,12 @@ class PosixFileSystem : public FileSystem {
// EXPERIMENTAL
//
// TODO akankshamahajan: Update Poll API to take into account min_completions
// TODO akankshamahajan:
// 1. Update Poll API to take into account min_completions
// and returns if number of handles in io_handles (any order) completed is
// equal to atleast min_completions.
// 2. Currently in case of direct_io, Read API is called because of which call
// to Poll API fails as it expects IOHandle to be populated.
virtual IOStatus Poll(std::vector<void*>& io_handles,
size_t /*min_completions*/) override {
#if defined(ROCKSDB_IOURING_PRESENT)
@ -1094,12 +1097,14 @@ class PosixFileSystem : public FileSystem {
req.offset = posix_handle->offset;
req.len = posix_handle->len;
size_t finished_len = 0;
size_t bytes_read = 0;
UpdateResult(cqe, "", req.len, posix_handle->iov.iov_len,
true /*async_read*/, finished_len, &req);
true /*async_read*/, finished_len, &req, bytes_read);
posix_handle->is_finished = true;
io_uring_cqe_seen(iu, cqe);
posix_handle->cb(req, posix_handle->cb_arg);
(void)finished_len;
(void)bytes_read;
if (static_cast<Posix_IOHandle*>(io_handles[i]) == posix_handle) {
break;

53
env/io_posix.cc vendored
View File

@ -744,31 +744,36 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs,
wrap_cache.erase(wrap_check);
FSReadRequest* req = req_wrap->req;
size_t bytes_read = 0;
UpdateResult(cqe, filename_, req->len, req_wrap->iov.iov_len,
false /*async_read*/, req_wrap->finished_len, req);
false /*async_read*/, req_wrap->finished_len, req,
bytes_read);
int32_t res = cqe->res;
if (res == 0) {
/// cqe->res == 0 can means EOF, or can mean partial results. See
// comment
// https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435
// Fall back to pread in this case.
if (use_direct_io() && !IsSectorAligned(req_wrap->finished_len,
GetRequiredBufferAlignment())) {
// Bytes reads don't fill sectors. Should only happen at the end
// of the file.
req->result = Slice(req->scratch, req_wrap->finished_len);
req->status = IOStatus::OK();
} else {
Slice tmp_slice;
req->status =
Read(req->offset + req_wrap->finished_len,
req->len - req_wrap->finished_len, options, &tmp_slice,
req->scratch + req_wrap->finished_len, dbg);
req->result =
Slice(req->scratch, req_wrap->finished_len + tmp_slice.size());
if (res >= 0) {
if (bytes_read == 0) {
/// cqe->res == 0 can means EOF, or can mean partial results. See
// comment
// https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435
// Fall back to pread in this case.
if (use_direct_io() &&
!IsSectorAligned(req_wrap->finished_len,
GetRequiredBufferAlignment())) {
// Bytes reads don't fill sectors. Should only happen at the end
// of the file.
req->result = Slice(req->scratch, req_wrap->finished_len);
req->status = IOStatus::OK();
} else {
Slice tmp_slice;
req->status =
Read(req->offset + req_wrap->finished_len,
req->len - req_wrap->finished_len, options, &tmp_slice,
req->scratch + req_wrap->finished_len, dbg);
req->result =
Slice(req->scratch, req_wrap->finished_len + tmp_slice.size());
}
} else if (bytes_read < req_wrap->iov.iov_len) {
incomplete_rq_list.push_back(req_wrap);
}
} else if (res > 0 && res < static_cast<int32_t>(req_wrap->iov.iov_len)) {
incomplete_rq_list.push_back(req_wrap);
}
io_uring_cqe_seen(iu, cqe);
}
@ -896,8 +901,8 @@ IOStatus PosixRandomAccessFile::ReadAsync(
// Initialize Posix_IOHandle.
posix_handle->iu = iu;
posix_handle->iov.iov_base = posix_handle->scratch;
posix_handle->iov.iov_len = posix_handle->len;
posix_handle->iov.iov_base = req.scratch;
posix_handle->iov.iov_len = req.len;
posix_handle->cb = cb;
posix_handle->cb_arg = cb_arg;
posix_handle->offset = req.offset;

5
env/io_posix.h vendored
View File

@ -66,12 +66,13 @@ struct Posix_IOHandle {
inline void UpdateResult(struct io_uring_cqe* cqe, const std::string& file_name,
size_t len, size_t iov_len, bool async_read,
size_t& finished_len, FSReadRequest* req) {
size_t& finished_len, FSReadRequest* req,
size_t& bytes_read) {
if (cqe->res < 0) {
req->result = Slice(req->scratch, 0);
req->status = IOError("Req failed", file_name, cqe->res);
} else {
size_t bytes_read = static_cast<size_t>(cqe->res);
bytes_read = static_cast<size_t>(cqe->res);
TEST_SYNC_POINT_CALLBACK("UpdateResults::io_uring_result", &bytes_read);
if (bytes_read == iov_len) {
req->result = Slice(req->scratch, req->len);

View File

@ -66,6 +66,17 @@ void FilePrefetchBuffer::CalculateOffsetAndLen(size_t alignment,
// chunk_len is greater than 0.
bufs_[index].buffer_.RefitTail(static_cast<size_t>(chunk_offset_in_buffer),
static_cast<size_t>(chunk_len));
} else if (chunk_len > 0) {
// For async prefetching, it doesn't call RefitTail with chunk_len > 0.
// Allocate new buffer if needed because aligned buffer calculate remaining
// buffer as capacity_ - cursize_ which might not be the case in this as we
// are not refitting.
// TODO akanksha: Update the condition when asynchronous prefetching is
// stable.
bufs_[index].buffer_.Alignment(alignment);
bufs_[index].buffer_.AllocateNewBuffer(
static_cast<size_t>(roundup_len), copy_data_to_new_buffer,
chunk_offset_in_buffer, static_cast<size_t>(chunk_len));
}
}
@ -100,13 +111,6 @@ Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts,
Env::IOPriority rate_limiter_priority,
uint64_t read_len, uint64_t chunk_len,
uint64_t rounddown_start, uint32_t index) {
// Reset io_handle.
if (io_handle_ != nullptr && del_fn_ != nullptr) {
del_fn_(io_handle_);
io_handle_ = nullptr;
del_fn_ = nullptr;
}
// callback for async read request.
auto fp = std::bind(&FilePrefetchBuffer::PrefetchAsyncCallback, this,
std::placeholders::_1, std::placeholders::_2);
@ -118,6 +122,7 @@ Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts,
req.scratch = bufs_[index].buffer_.BufferStart() + chunk_len;
Status s = reader->ReadAsync(req, opts, fp, nullptr /*cb_arg*/, &io_handle_,
&del_fn_, rate_limiter_priority);
req.status.PermitUncheckedError();
if (s.ok()) {
async_read_in_progress_ = true;
}
@ -210,24 +215,34 @@ void FilePrefetchBuffer::CopyDataToBuffer(uint32_t src, uint64_t& offset,
// the asynchronous prefetching in second buffer.
Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
RandomAccessFileReader* reader,
FileSystem* fs, uint64_t offset,
size_t length, size_t readahead_size,
uint64_t offset, size_t length,
size_t readahead_size,
Env::IOPriority rate_limiter_priority,
bool& copy_to_third_buffer) {
if (!enable_) {
return Status::OK();
}
if (async_read_in_progress_ && fs != nullptr) {
if (async_read_in_progress_ && fs_ != nullptr) {
// Wait for prefetch data to complete.
// No mutex is needed as PrefetchAsyncCallback updates the result in second
// buffer and FilePrefetchBuffer should wait for Poll before accessing the
// second buffer.
std::vector<void*> handles;
handles.emplace_back(io_handle_);
fs->Poll(handles, 1).PermitUncheckedError();
fs_->Poll(handles, 1).PermitUncheckedError();
}
// TODO akanksha: Update TEST_SYNC_POINT after new tests are added.
// Reset and Release io_handle_ after the Poll API as request has been
// completed.
async_read_in_progress_ = false;
if (io_handle_ != nullptr && del_fn_ != nullptr) {
del_fn_(io_handle_);
io_handle_ = nullptr;
del_fn_ = nullptr;
}
// TODO akanksha: Update TEST_SYNC_POINT after Async APIs are merged with
// normal prefetching.
TEST_SYNC_POINT("FilePrefetchBuffer::Prefetch:Start");
Status s;
size_t prefetch_size = length + readahead_size;
@ -236,34 +251,47 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
// Index of second buffer.
uint32_t second = curr_ ^ 1;
// First clear the buffers if it contains outdated data. Outdated data can be
// because previous sequential reads were read from the cache instead of these
// buffer.
{
if (bufs_[curr_].buffer_.CurrentSize() > 0 &&
offset >= bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) {
bufs_[curr_].buffer_.Clear();
}
if (bufs_[second].buffer_.CurrentSize() > 0 &&
offset >= bufs_[second].offset_ + bufs_[second].buffer_.CurrentSize()) {
bufs_[second].buffer_.Clear();
}
}
// If data is in second buffer, make it curr_. Second buffer can be either
// partial filled or full.
if (bufs_[second].buffer_.CurrentSize() > 0 &&
offset >= bufs_[second].offset_ &&
offset <= bufs_[second].offset_ + bufs_[second].buffer_.CurrentSize()) {
offset < bufs_[second].offset_ + bufs_[second].buffer_.CurrentSize()) {
// Clear the curr_ as buffers have been swapped and curr_ contains the
// outdated data.
// outdated data and switch the buffers.
bufs_[curr_].buffer_.Clear();
// Switch the buffers.
curr_ = curr_ ^ 1;
second = curr_ ^ 1;
}
// If second buffer contains outdated data, clear it for async prefetching.
// Outdated can be because previous sequential reads were read from the cache
// instead of this buffer.
if (bufs_[second].buffer_.CurrentSize() > 0 &&
offset >= bufs_[second].offset_ + bufs_[second].buffer_.CurrentSize()) {
bufs_[second].buffer_.Clear();
// After swap check if all the requested bytes are in curr_, it will go for
// async prefetching only.
if (bufs_[curr_].buffer_.CurrentSize() > 0 &&
offset + length <=
bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) {
offset += length;
length = 0;
prefetch_size -= length;
}
// Data is overlapping i.e. some of the data is in curr_ buffer and remaining
// in second buffer.
if (bufs_[curr_].buffer_.CurrentSize() > 0 &&
bufs_[second].buffer_.CurrentSize() > 0 &&
offset >= bufs_[curr_].offset_ &&
offset < bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize() &&
offset + prefetch_size > bufs_[second].offset_) {
offset + length > bufs_[second].offset_) {
// Allocate new buffer to third buffer;
bufs_[2].buffer_.Clear();
bufs_[2].buffer_.Alignment(alignment);
@ -273,12 +301,10 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
// Move data from curr_ buffer to third.
CopyDataToBuffer(curr_, offset, length);
if (length == 0) {
// Requested data has been copied and curr_ still has unconsumed data.
return s;
}
CopyDataToBuffer(second, offset, length);
// Length == 0: All the requested data has been copied to third buffer. It
// should go for only async prefetching.
@ -306,6 +332,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
if (length > 0) {
CalculateOffsetAndLen(alignment, offset, roundup_len1, curr_,
false /*refit_tail*/, chunk_len1);
assert(roundup_len1 >= chunk_len1);
read_len1 = static_cast<size_t>(roundup_len1 - chunk_len1);
}
{
@ -316,7 +343,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
Roundup(rounddown_start2 + readahead_size, alignment);
// For length == 0, do the asynchronous prefetching in second instead of
// synchronous prefetching of remaining prefetch_size.
// synchronous prefetching in curr_.
if (length == 0) {
rounddown_start2 =
bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize();
@ -330,8 +357,8 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
// Update the buffer offset.
bufs_[second].offset_ = rounddown_start2;
assert(roundup_len2 >= chunk_len2);
uint64_t read_len2 = static_cast<size_t>(roundup_len2 - chunk_len2);
ReadAsync(opts, reader, rate_limiter_priority, read_len2, chunk_len2,
rounddown_start2, second)
.PermitUncheckedError();
@ -344,7 +371,6 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
return s;
}
}
// Copy remaining requested bytes to third_buffer.
if (copy_to_third_buffer && length > 0) {
CopyDataToBuffer(curr_, offset, length);
@ -416,8 +442,8 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
bool FilePrefetchBuffer::TryReadFromCacheAsync(
const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset,
size_t n, Slice* result, Status* status,
Env::IOPriority rate_limiter_priority, bool for_compaction /* = false */,
FileSystem* fs) {
Env::IOPriority rate_limiter_priority, bool for_compaction /* = false */
) {
if (track_min_offset_ && offset < min_offset_read_) {
min_offset_read_ = static_cast<size_t>(offset);
}
@ -452,7 +478,7 @@ bool FilePrefetchBuffer::TryReadFromCacheAsync(
if (implicit_auto_readahead_ && async_io_) {
// Prefetch n + readahead_size_/2 synchronously as remaining
// readahead_size_/2 will be prefetched asynchronously.
s = PrefetchAsync(opts, reader, fs, offset, n, readahead_size_ / 2,
s = PrefetchAsync(opts, reader, offset, n, readahead_size_ / 2,
rate_limiter_priority, copy_to_third_buffer);
} else {
s = Prefetch(opts, reader, offset, n + readahead_size_,
@ -489,7 +515,6 @@ bool FilePrefetchBuffer::TryReadFromCacheAsync(
void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req,
void* /*cb_arg*/) {
async_read_in_progress_ = false;
uint32_t index = curr_ ^ 1;
if (req.status.ok()) {
if (req.offset + req.result.size() <=
@ -505,12 +530,5 @@ void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req,
size_t current_size = bufs_[index].buffer_.CurrentSize();
bufs_[index].buffer_.Size(current_size + req.result.size());
}
// Release io_handle_.
if (io_handle_ != nullptr && del_fn_ != nullptr) {
del_fn_(io_handle_);
io_handle_ = nullptr;
del_fn_ = nullptr;
}
}
} // namespace ROCKSDB_NAMESPACE

View File

@ -65,7 +65,7 @@ class FilePrefetchBuffer {
FilePrefetchBuffer(size_t readahead_size = 0, size_t max_readahead_size = 0,
bool enable = true, bool track_min_offset = false,
bool implicit_auto_readahead = false,
bool async_io = false)
bool async_io = false, FileSystem* fs = nullptr)
: curr_(0),
readahead_size_(readahead_size),
max_readahead_size_(max_readahead_size),
@ -79,13 +79,29 @@ class FilePrefetchBuffer {
io_handle_(nullptr),
del_fn_(nullptr),
async_read_in_progress_(false),
async_io_(async_io) {
async_io_(async_io),
fs_(fs) {
// If async_io_ is enabled, data is asynchronously filled in second buffer
// while curr_ is being consumed. If data is overlapping in two buffers,
// data is copied to third buffer to return continuous buffer.
bufs_.resize(3);
}
~FilePrefetchBuffer() {
// Wait for any pending async job before destroying the class object.
if (async_read_in_progress_ && fs_ != nullptr) {
std::vector<void*> handles;
handles.emplace_back(io_handle_);
fs_->Poll(handles, 1).PermitUncheckedError();
}
// Release io_handle_.
if (io_handle_ != nullptr && del_fn_ != nullptr) {
del_fn_(io_handle_);
io_handle_ = nullptr;
del_fn_ = nullptr;
}
}
// Load data into the buffer from a file.
// reader : the file reader.
// offset : the file offset to start reading from.
@ -100,8 +116,7 @@ class FilePrefetchBuffer {
Env::IOPriority rate_limiter_priority);
Status PrefetchAsync(const IOOptions& opts, RandomAccessFileReader* reader,
FileSystem* fs, uint64_t offset, size_t length,
size_t readahead_size,
uint64_t offset, size_t length, size_t readahead_size,
Env::IOPriority rate_limiter_priority,
bool& copy_to_third_buffer);
@ -129,7 +144,7 @@ class FilePrefetchBuffer {
RandomAccessFileReader* reader, uint64_t offset,
size_t n, Slice* result, Status* status,
Env::IOPriority rate_limiter_priority,
bool for_compaction /* = false */, FileSystem* fs);
bool for_compaction /* = false */);
// The minimum `offset` ever passed to TryReadFromCache(). This will nly be
// tracked if track_min_offset = true.
@ -256,5 +271,6 @@ class FilePrefetchBuffer {
IOHandleDeleter del_fn_;
bool async_read_in_progress_;
bool async_io_;
FileSystem* fs_;
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -491,6 +491,12 @@ Status GetInfoLogFiles(const std::shared_ptr<FileSystem>& fs,
std::string NormalizePath(const std::string& path) {
std::string dst;
if (path.length() > 2 && path[0] == kFilePathSeparator &&
path[1] == kFilePathSeparator) { // Handle UNC names
dst.append(2, kFilePathSeparator);
}
for (auto c : path) {
if (!dst.empty() && (c == kFilePathSeparator || c == '/') &&
(dst.back() == kFilePathSeparator || dst.back() == '/')) {

View File

@ -694,8 +694,10 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.statistics = CreateDBStatistics();
options.env = env.get();
if (std::get<0>(GetParam())) {
bool use_direct_io = std::get<0>(GetParam());
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
@ -708,8 +710,7 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
if (std::get<0>(GetParam()) &&
(s.IsNotSupported() || s.IsInvalidArgument())) {
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
// If direct IO is not supported, skip the test
return;
} else {
@ -766,6 +767,8 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
// TODO akanksha: Remove after adding new units.
ro.async_io = true;
}
ASSERT_OK(options.statistics->Reset());
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
int num_keys = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
@ -773,15 +776,25 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
num_keys++;
}
ASSERT_EQ(num_keys, total_keys);
ASSERT_GT(buff_prefetch_count, 0);
buff_prefetch_count = 0;
// For index and data blocks.
if (is_adaptive_readahead) {
ASSERT_EQ(readahead_carry_over_count, 2 * (num_sst_files - 1));
} else {
ASSERT_EQ(readahead_carry_over_count, 0);
}
// Check stats to make sure async prefetch is done.
{
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
if (ro.async_io && !use_direct_io) {
ASSERT_GT(async_read_bytes.count, 0);
} else {
ASSERT_EQ(async_read_bytes.count, 0);
}
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
@ -902,6 +915,8 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
options.statistics = CreateDBStatistics();
BlockBasedTableOptions table_options;
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2); // 8MB
table_options.block_cache = cache;
@ -948,7 +963,6 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
SyncPoint::GetInstance()->EnableProcessing();
ReadOptions ro;
ro.adaptive_readahead = true;
// TODO akanksha: Remove after adding new units.
ro.async_io = true;
{
/*
@ -964,7 +978,9 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
iter->Seek(BuildKey(1019));
buff_prefetch_count = 0;
}
{
ASSERT_OK(options.statistics->Reset());
// After caching, blocks will be read from cache (Sequential blocks)
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
iter->Seek(BuildKey(0));
@ -1008,11 +1024,118 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(current_readahead_size, expected_current_readahead_size);
ASSERT_EQ(buff_prefetch_count, 2);
// Check stats to make sure async prefetch is done.
{
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
if (GetParam()) {
ASSERT_EQ(async_read_bytes.count, 0);
} else {
ASSERT_GT(async_read_bytes.count, 0);
}
}
buff_prefetch_count = 0;
}
Close();
}
extern "C" bool RocksDbIOUringEnable() { return true; }
// Tests the default implementation of ReadAsync API with PosixFileSystem.
TEST_F(PrefetchTest2, ReadAsyncWithPosixFS) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
return;
}
const int kNumKeys = 1000;
std::shared_ptr<MockFS> fs = std::make_shared<MockFS>(
FileSystem::Default(), /*support_prefetch=*/false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool use_direct_io = false;
Options options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env.get();
options.statistics = CreateDBStatistics();
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
table_options.cache_index_and_filter_blocks = false;
table_options.metadata_block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
// If direct IO is not supported, skip the test
return;
} else {
ASSERT_OK(s);
}
int total_keys = 0;
// Write the keys.
{
WriteBatch batch;
Random rnd(309);
for (int j = 0; j < 5; j++) {
for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
total_keys++;
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(Flush());
}
MoveFilesToLevel(2);
}
int buff_prefetch_count = 0;
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// Read the keys.
{
ReadOptions ro;
ro.adaptive_readahead = true;
ro.async_io = true;
ASSERT_OK(options.statistics->Reset());
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
int num_keys = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
num_keys++;
}
ASSERT_EQ(num_keys, total_keys);
ASSERT_GT(buff_prefetch_count, 0);
// Check stats to make sure async prefetch is done.
{
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
#if defined(ROCKSDB_IOURING_PRESENT)
ASSERT_GT(async_read_bytes.count, 0);
#else
ASSERT_EQ(async_read_bytes.count, 0);
#endif
}
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
Close();
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

View File

@ -425,19 +425,75 @@ IOStatus RandomAccessFileReader::PrepareIOOptions(const ReadOptions& ro,
}
}
// TODO akanksha: Add perf_times etc.
// TODO akanksha:
// 1. Handle use_direct_io case which currently calls Read API.
IOStatus RandomAccessFileReader::ReadAsync(
FSReadRequest& req, const IOOptions& opts,
std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg,
void** io_handle, IOHandleDeleter* del_fn,
Env::IOPriority rate_limiter_priority) {
if (use_direct_io()) {
// For direct_io, it calls Read API.
req.status = Read(opts, req.offset, req.len, &(req.result), req.scratch,
nullptr /*dbg*/, rate_limiter_priority);
cb(req, cb_arg);
return IOStatus::OK();
}
return file_->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn,
nullptr /*dbg*/);
// Create a callback and populate info.
auto read_async_callback =
std::bind(&RandomAccessFileReader::ReadAsyncCallback, this,
std::placeholders::_1, std::placeholders::_2);
ReadAsyncInfo* read_async_info = new ReadAsyncInfo;
read_async_info->cb_ = cb;
read_async_info->cb_arg_ = cb_arg;
read_async_info->start_time_ = clock_->NowMicros();
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
read_async_info->fs_start_ts_ = FileOperationInfo::StartNow();
}
#endif
IOStatus s = file_->ReadAsync(req, opts, read_async_callback, read_async_info,
io_handle, del_fn, nullptr /*dbg*/);
if (!s.ok()) {
delete read_async_info;
}
return s;
}
void RandomAccessFileReader::ReadAsyncCallback(const FSReadRequest& req,
void* cb_arg) {
ReadAsyncInfo* read_async_info = static_cast<ReadAsyncInfo*>(cb_arg);
assert(read_async_info);
assert(read_async_info->cb_);
read_async_info->cb_(req, read_async_info->cb_arg_);
// Update stats and notify listeners.
if (stats_ != nullptr && file_read_hist_ != nullptr) {
// elapsed doesn't take into account delay and overwrite as StopWatch does
// in Read.
uint64_t elapsed = clock_->NowMicros() - read_async_info->start_time_;
file_read_hist_->Add(elapsed);
}
if (req.status.ok()) {
RecordInHistogram(stats_, ASYNC_READ_BYTES, req.result.size());
}
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow();
NotifyOnFileReadFinish(req.offset, req.result.size(),
read_async_info->fs_start_ts_, finish_ts,
req.status);
}
if (!req.status.ok()) {
NotifyOnIOError(req.status, FileOperationType::kRead, file_name(),
req.result.size(), req.offset);
}
#endif
RecordIOStats(stats_, file_temperature_, is_last_level_, req.result.size());
delete read_async_info;
}
} // namespace ROCKSDB_NAMESPACE

View File

@ -92,6 +92,15 @@ class RandomAccessFileReader {
const Temperature file_temperature_;
const bool is_last_level_;
struct ReadAsyncInfo {
#ifndef ROCKSDB_LITE
FileOperationInfo::StartTimePoint fs_start_ts_;
#endif
uint64_t start_time_;
std::function<void(const FSReadRequest&, void*)> cb_;
void* cb_arg_;
};
public:
explicit RandomAccessFileReader(
std::unique_ptr<FSRandomAccessFile>&& raf, const std::string& _file_name,
@ -179,5 +188,7 @@ class RandomAccessFileReader {
std::function<void(const FSReadRequest&, void*)> cb,
void* cb_arg, void** io_handle, IOHandleDeleter* del_fn,
Env::IOPriority rate_limiter_priority);
void ReadAsyncCallback(const FSReadRequest& req, void* cb_arg);
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -160,7 +160,9 @@ class WritableFileWriter {
bool perform_data_verification_;
uint32_t buffered_data_crc32c_checksum_;
bool buffered_data_with_checksum_;
#ifndef ROCKSDB_LITE
Temperature temperature_;
#endif // ROCKSDB_LITE
public:
WritableFileWriter(
@ -191,8 +193,10 @@ class WritableFileWriter {
checksum_finalized_(false),
perform_data_verification_(perform_data_verification),
buffered_data_crc32c_checksum_(0),
buffered_data_with_checksum_(buffered_data_with_checksum),
temperature_(options.temperature) {
buffered_data_with_checksum_(buffered_data_with_checksum) {
#ifndef ROCKSDB_LITE
temperature_ = options.temperature;
#endif // ROCKSDB_LITE
assert(!use_direct_io() || max_buffer_size_ > 0);
TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0",
reinterpret_cast<void*>(max_buffer_size_));

View File

@ -90,6 +90,19 @@ class FilterPolicy : public Customizable {
virtual ~FilterPolicy();
static const char* Type() { return "FilterPolicy"; }
// The name used for identifying whether a filter on disk is readable
// by this FilterPolicy. If this FilterPolicy is part of a family that
// can read each others filters, such as built-in BloomFilterPolcy and
// RibbonFilterPolicy, the CompatibilityName is a shared family name,
// while kinds of filters in the family can have distinct Customizable
// Names. This function is pure virtual so that wrappers around built-in
// policies are prompted to defer to CompatibilityName() of the wrapped
// policy, which is important for compatibility.
//
// For custom filter policies that are not part of a read-compatible
// family (rare), implementations may return Name().
virtual const char* CompatibilityName() const = 0;
// Creates a new FilterPolicy based on the input value string and returns the
// result The value might be an ID, and ID with properties, or an old-style
// policy string.

View File

@ -72,10 +72,10 @@ struct LiveFileStorageInfo : public FileStorageInfo {
// The metadata that describes an SST file. (Does not need to extend
// LiveFileStorageInfo because SST files are always immutable.)
struct SstFileMetaData : public FileStorageInfo {
SstFileMetaData() {}
SstFileMetaData() { file_type = kTableFile; }
SstFileMetaData(const std::string& _file_name, uint64_t _file_number,
const std::string& _directory, size_t _size,
const std::string& _directory, uint64_t _size,
SequenceNumber _smallest_seqno, SequenceNumber _largest_seqno,
const std::string& _smallestkey,
const std::string& _largestkey, uint64_t _num_reads_sampled,

View File

@ -534,6 +534,8 @@ enum Histograms : uint32_t {
// Error handler statistics
ERROR_HANDLER_AUTORESUME_RETRY_COUNT,
ASYNC_READ_BYTES,
HISTOGRAM_ENUM_MAX,
};

View File

@ -11,7 +11,7 @@
#define ROCKSDB_MAJOR 7
#define ROCKSDB_MINOR 1
#define ROCKSDB_PATCH 0
#define ROCKSDB_PATCH 2
// Do not use these. We made the mistake of declaring macros starting with
// double underscore. Now we have to live with our choice. We'll deprecate these

View File

@ -5551,7 +5551,9 @@ class HistogramTypeJni {
case ROCKSDB_NAMESPACE::Histograms::NUM_SST_READ_PER_LEVEL:
return 0x31;
case ROCKSDB_NAMESPACE::Histograms::ERROR_HANDLER_AUTORESUME_RETRY_COUNT:
return 0x31;
return 0x32;
case ROCKSDB_NAMESPACE::Histograms::ASYNC_READ_BYTES:
return 0x33;
case ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX:
// 0x1F for backwards compatibility on current minor version.
return 0x1F;
@ -5669,6 +5671,8 @@ class HistogramTypeJni {
case 0x32:
return ROCKSDB_NAMESPACE::Histograms::
ERROR_HANDLER_AUTORESUME_RETRY_COUNT;
case 0x33:
return ROCKSDB_NAMESPACE::Histograms::ASYNC_READ_BYTES;
case 0x1F:
// 0x1F for backwards compatibility on current minor version.
return ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX;

View File

@ -180,6 +180,8 @@ public enum HistogramType {
*/
ERROR_HANDLER_AUTORESUME_RETRY_COUNT((byte) 0x32),
ASYNC_READ_BYTES((byte) 0x33),
// 0x1F for backwards compatibility on current minor version.
HISTOGRAM_ENUM_MAX((byte) 0x1F);

View File

@ -54,10 +54,9 @@ class InstrumentedMutex {
class ALIGN_AS(CACHE_LINE_SIZE) CacheAlignedInstrumentedMutex
: public InstrumentedMutex {
using InstrumentedMutex::InstrumentedMutex;
char padding[(CACHE_LINE_SIZE - sizeof(InstrumentedMutex) % CACHE_LINE_SIZE) %
CACHE_LINE_SIZE] ROCKSDB_FIELD_UNUSED;
};
static_assert(sizeof(CacheAlignedInstrumentedMutex) % CACHE_LINE_SIZE == 0);
static_assert(alignof(CacheAlignedInstrumentedMutex) != CACHE_LINE_SIZE ||
sizeof(CacheAlignedInstrumentedMutex) % CACHE_LINE_SIZE == 0);
// RAII wrapper for InstrumentedMutex
class InstrumentedMutexLock {

View File

@ -283,6 +283,7 @@ const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {
{NUM_SST_READ_PER_LEVEL, "rocksdb.num.sst.read.per.level"},
{ERROR_HANDLER_AUTORESUME_RETRY_COUNT,
"rocksdb.error.handler.autoresume.retry.count"},
{ASYNC_READ_BYTES, "rocksdb.async.read.bytes"},
};
std::shared_ptr<Statistics> CreateDBStatistics() {

View File

@ -1487,6 +1487,7 @@ class MockFilterPolicy : public FilterPolicy {
public:
static const char* kClassName() { return "MockFilterPolicy"; }
const char* Name() const override { return kClassName(); }
const char* CompatibilityName() const override { return Name(); }
FilterBitsBuilder* GetBuilderWithContext(
const FilterBuildingContext&) const override {
return nullptr;

View File

@ -1605,7 +1605,7 @@ void BlockBasedTableBuilder::WriteFilterBlock(
? BlockBasedTable::kPartitionedFilterBlockPrefix
: BlockBasedTable::kFullFilterBlockPrefix;
}
key.append(rep_->table_options.filter_policy->Name());
key.append(rep_->table_options.filter_policy->CompatibilityName());
meta_index_builder->Add(key, filter_block_handle);
}
}

View File

@ -12,6 +12,7 @@
#include <array>
#include <limits>
#include <string>
#include <unordered_set>
#include <utility>
#include <vector>
@ -50,6 +51,7 @@
#include "table/block_based/block_prefix_index.h"
#include "table/block_based/block_type.h"
#include "table/block_based/filter_block.h"
#include "table/block_based/filter_policy_internal.h"
#include "table/block_based/full_filter_block.h"
#include "table/block_based/hash_index_reader.h"
#include "table/block_based/partitioned_filter_block.h"
@ -897,33 +899,59 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks(
const BlockBasedTableOptions& table_options, const int level,
size_t file_size, size_t max_file_size_for_l0_meta_pin,
BlockCacheLookupContext* lookup_context) {
Status s;
// Find filter handle and filter type
if (rep_->filter_policy) {
for (auto filter_type :
{Rep::FilterType::kFullFilter, Rep::FilterType::kPartitionedFilter,
Rep::FilterType::kBlockFilter}) {
std::string prefix;
switch (filter_type) {
case Rep::FilterType::kFullFilter:
prefix = kFullFilterBlockPrefix;
auto name = rep_->filter_policy->CompatibilityName();
bool builtin_compatible =
strcmp(name, BuiltinFilterPolicy::kCompatibilityName()) == 0;
for (const auto& [filter_type, prefix] :
{std::make_pair(Rep::FilterType::kFullFilter, kFullFilterBlockPrefix),
std::make_pair(Rep::FilterType::kPartitionedFilter,
kPartitionedFilterBlockPrefix),
std::make_pair(Rep::FilterType::kBlockFilter, kFilterBlockPrefix)}) {
if (builtin_compatible) {
// This code is only here to deal with a hiccup in early 7.0.x where
// there was an unintentional name change in the SST files metadata.
// It should be OK to remove this in the future (late 2022) and just
// have the 'else' code.
// NOTE: the test:: names below are likely not needed but included
// out of caution
static const std::unordered_set<std::string> kBuiltinNameAndAliases = {
BuiltinFilterPolicy::kCompatibilityName(),
test::LegacyBloomFilterPolicy::kClassName(),
test::FastLocalBloomFilterPolicy::kClassName(),
test::Standard128RibbonFilterPolicy::kClassName(),
DeprecatedBlockBasedBloomFilterPolicy::kClassName(),
BloomFilterPolicy::kClassName(),
RibbonFilterPolicy::kClassName(),
};
// For efficiency, do a prefix seek and see if the first match is
// good.
meta_iter->Seek(prefix);
if (meta_iter->status().ok() && meta_iter->Valid()) {
Slice key = meta_iter->key();
if (key.starts_with(prefix)) {
key.remove_prefix(prefix.size());
if (kBuiltinNameAndAliases.find(key.ToString()) !=
kBuiltinNameAndAliases.end()) {
Slice v = meta_iter->value();
Status s = rep_->filter_handle.DecodeFrom(&v);
if (s.ok()) {
rep_->filter_type = filter_type;
break;
}
}
}
}
} else {
std::string filter_block_key = prefix + name;
if (FindMetaBlock(meta_iter, filter_block_key, &rep_->filter_handle)
.ok()) {
rep_->filter_type = filter_type;
break;
case Rep::FilterType::kPartitionedFilter:
prefix = kPartitionedFilterBlockPrefix;
break;
case Rep::FilterType::kBlockFilter:
prefix = kFilterBlockPrefix;
break;
default:
assert(0);
}
std::string filter_block_key = prefix;
filter_block_key.append(rep_->filter_policy->Name());
if (FindMetaBlock(meta_iter, filter_block_key, &rep_->filter_handle)
.ok()) {
rep_->filter_type = filter_type;
break;
}
}
}
}
@ -932,8 +960,8 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks(
rep_->index_type == BlockBasedTableOptions::kTwoLevelIndexSearch);
// Find compression dictionary handle
s = FindOptionalMetaBlock(meta_iter, kCompressionDictBlockName,
&rep_->compression_dict_handle);
Status s = FindOptionalMetaBlock(meta_iter, kCompressionDictBlockName,
&rep_->compression_dict_handle);
if (!s.ok()) {
return s;
}
@ -1437,9 +1465,10 @@ template <typename TBlocklike>
Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
const bool wait, CachableEntry<TBlocklike>* block_entry,
BlockType block_type, GetContext* get_context,
BlockCacheLookupContext* lookup_context, BlockContents* contents) const {
const bool wait, const bool for_compaction,
CachableEntry<TBlocklike>* block_entry, BlockType block_type,
GetContext* get_context, BlockCacheLookupContext* lookup_context,
BlockContents* contents) const {
assert(block_entry != nullptr);
const bool no_io = (ro.read_tier == kBlockCacheTier);
Cache* block_cache = rep_->table_options.block_cache.get();
@ -1492,7 +1521,9 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
CompressionType raw_block_comp_type;
BlockContents raw_block_contents;
if (!contents) {
StopWatch sw(rep_->ioptions.clock, statistics, READ_BLOCK_GET_MICROS);
Histograms histogram = for_compaction ? READ_BLOCK_COMPACTION_MICROS
: READ_BLOCK_GET_MICROS;
StopWatch sw(rep_->ioptions.clock, statistics, histogram);
BlockFetcher block_fetcher(
rep_->file.get(), prefetch_buffer, rep_->footer, ro, handle,
&raw_block_contents, rep_->ioptions, do_uncompress,
@ -1850,8 +1881,9 @@ void BlockBasedTable::RetrieveMultipleBlocks(
// avoid looking up the block cache
s = MaybeReadBlockAndLoadToCache(
nullptr, options, handle, uncompression_dict, /*wait=*/true,
block_entry, BlockType::kData, mget_iter->get_context,
&lookup_data_block_context, &raw_block_contents);
/*for_compaction=*/false, block_entry, BlockType::kData,
mget_iter->get_context, &lookup_data_block_context,
&raw_block_contents);
// block_entry value could be null if no block cache is present, i.e
// BlockBasedTableOptions::no_block_cache is true and no compressed
@ -1905,7 +1937,7 @@ Status BlockBasedTable::RetrieveBlock(
if (use_cache) {
s = MaybeReadBlockAndLoadToCache(
prefetch_buffer, ro, handle, uncompression_dict, wait_for_cache,
block_entry, block_type, get_context, lookup_context,
for_compaction, block_entry, block_type, get_context, lookup_context,
/*contents=*/nullptr);
if (!s.ok()) {
@ -1934,8 +1966,9 @@ Status BlockBasedTable::RetrieveBlock(
std::unique_ptr<TBlocklike> block;
{
StopWatch sw(rep_->ioptions.clock, rep_->ioptions.stats,
READ_BLOCK_GET_MICROS);
Histograms histogram =
for_compaction ? READ_BLOCK_COMPACTION_MICROS : READ_BLOCK_GET_MICROS;
StopWatch sw(rep_->ioptions.clock, rep_->ioptions.stats, histogram);
s = ReadBlockFromFile(
rep_->file.get(), prefetch_buffer, rep_->footer, ro, handle, &block,
rep_->ioptions, do_uncompress, maybe_compressed, block_type,

View File

@ -343,9 +343,10 @@ class BlockBasedTable : public TableReader {
Status MaybeReadBlockAndLoadToCache(
FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
const bool wait, CachableEntry<TBlocklike>* block_entry,
BlockType block_type, GetContext* get_context,
BlockCacheLookupContext* lookup_context, BlockContents* contents) const;
const bool wait, const bool for_compaction,
CachableEntry<TBlocklike>* block_entry, BlockType block_type,
GetContext* get_context, BlockCacheLookupContext* lookup_context,
BlockContents* contents) const;
// Similar to the above, with one crucial difference: it will retrieve the
// block from the file even if there are no caches configured (assuming the
@ -654,10 +655,10 @@ struct BlockBasedTable::Rep {
std::unique_ptr<FilePrefetchBuffer>* fpb,
bool implicit_auto_readahead,
bool async_io) const {
fpb->reset(new FilePrefetchBuffer(readahead_size, max_readahead_size,
!ioptions.allow_mmap_reads /* enable */,
false /* track_min_offset */,
implicit_auto_readahead, async_io));
fpb->reset(new FilePrefetchBuffer(
readahead_size, max_readahead_size,
!ioptions.allow_mmap_reads /* enable */, false /* track_min_offset */,
implicit_auto_readahead, async_io, ioptions.fs.get()));
}
void CreateFilePrefetchBufferIfNotExists(

View File

@ -1325,6 +1325,16 @@ bool BuiltinFilterPolicy::IsInstanceOf(const std::string& name) const {
}
}
static const char* kBuiltinFilterMetadataName = "rocksdb.BuiltinBloomFilter";
const char* BuiltinFilterPolicy::kCompatibilityName() {
return kBuiltinFilterMetadataName;
}
const char* BuiltinFilterPolicy::CompatibilityName() const {
return kBuiltinFilterMetadataName;
}
BloomLikeFilterPolicy::BloomLikeFilterPolicy(double bits_per_key)
: warned_(false), aggregate_rounding_balance_(0) {
// Sanitize bits_per_key
@ -1372,7 +1382,7 @@ bool BloomLikeFilterPolicy::IsInstanceOf(const std::string& name) const {
}
const char* ReadOnlyBuiltinFilterPolicy::kClassName() {
return "rocksdb.BuiltinBloomFilter";
return kBuiltinFilterMetadataName;
}
const char* DeprecatedBlockBasedBloomFilterPolicy::kClassName() {

View File

@ -135,6 +135,9 @@ class BuiltinFilterPolicy : public FilterPolicy {
FilterBitsReader* GetFilterBitsReader(const Slice& contents) const override;
static const char* kClassName();
bool IsInstanceOf(const std::string& id) const override;
// All variants of BuiltinFilterPolicy can read each others filters.
const char* CompatibilityName() const override;
static const char* kCompatibilityName();
public: // new
// An internal function for the implementation of

View File

@ -84,6 +84,7 @@ class TestFilterBitsReader : public FilterBitsReader {
class TestHashFilter : public FilterPolicy {
public:
const char* Name() const override { return "TestHashFilter"; }
const char* CompatibilityName() const override { return Name(); }
FilterBitsBuilder* GetBuilderWithContext(
const FilterBuildingContext&) const override {

View File

@ -520,8 +520,8 @@ Status PartitionedFilterBlockReader::CacheDependencies(const ReadOptions& ro,
// filter blocks
s = table()->MaybeReadBlockAndLoadToCache(
prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(),
/* wait */ true, &block, BlockType::kFilter, nullptr /* get_context */,
&lookup_context, nullptr /* contents */);
/* wait */ true, /* for_compaction */ false, &block, BlockType::kFilter,
nullptr /* get_context */, &lookup_context, nullptr /* contents */);
if (!s.ok()) {
return s;
}

View File

@ -182,8 +182,8 @@ Status PartitionIndexReader::CacheDependencies(const ReadOptions& ro,
// filter blocks
Status s = table()->MaybeReadBlockAndLoadToCache(
prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(),
/*wait=*/true, &block, BlockType::kIndex, /*get_context=*/nullptr,
&lookup_context, /*contents=*/nullptr);
/*wait=*/true, /*for_compaction=*/false, &block, BlockType::kIndex,
/*get_context=*/nullptr, &lookup_context, /*contents=*/nullptr);
if (!s.ok()) {
return s;

View File

@ -74,8 +74,7 @@ inline bool BlockFetcher::TryGetFromPrefetchBuffer() {
if (read_options_.async_io) {
read_from_prefetch_buffer = prefetch_buffer_->TryReadFromCacheAsync(
opts, file_, handle_.offset(), block_size_with_trailer_, &slice_,
&io_s, read_options_.rate_limiter_priority, for_compaction_,
ioptions_.fs.get());
&io_s, read_options_.rate_limiter_priority, for_compaction_);
} else {
read_from_prefetch_buffer = prefetch_buffer_->TryReadFromCache(
opts, file_, handle_.offset(), block_size_with_trailer_, &slice_,

View File

@ -93,7 +93,7 @@ void BlobDBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
for (auto bfile_pair : blob_files_) {
auto blob_file = bfile_pair.second;
LiveFileMetaData filemetadata;
filemetadata.size = static_cast<size_t>(blob_file->GetFileSize());
filemetadata.size = blob_file->GetFileSize();
const uint64_t file_number = blob_file->BlobFileNumber();
// Path should be relative to db_name, but begin with slash.
filemetadata.name = BlobFileName("", bdb_options_.blob_dir, file_number);