fix a bug, c api, if enable inplace_update_support, and use create sn… (#9471)

Summary:
c api release snapshot will core dump when enable inplace_update_support and create snapshot

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9471

Reviewed By: akankshamahajan15

Differential Revision: D34965103

Pulled By: riversand963

fbshipit-source-id: c3aeeb9ea7126c2eda1466102794fecf57b6ab77
This commit is contained in:
duyuqi 2022-03-21 12:04:33 -07:00 committed by Andrew Kryczka
parent 7789c45856
commit fdd0e3a875
4 changed files with 72 additions and 8 deletions

View File

@ -7,12 +7,13 @@
#ifndef ROCKSDB_LITE // Lite does not support C API #ifndef ROCKSDB_LITE // Lite does not support C API
#include "rocksdb/c.h" #include <assert.h>
#include <stddef.h> #include <stddef.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <sys/types.h> #include <sys/types.h>
#include "rocksdb/c.h"
#ifndef OS_WIN #ifndef OS_WIN
#include <unistd.h> #include <unistd.h>
#endif #endif
@ -89,10 +90,8 @@ static void CheckEqual(const char* expected, const char* v, size_t n) {
// ok // ok
return; return;
} else { } else {
fprintf(stderr, "%s: expected '%s', got '%s'\n", fprintf(stderr, "%s: expected '%s', got '%s'\n", phase,
phase, (expected ? expected : "(null)"), (v ? v : "(null)"));
(expected ? expected : "(null)"),
(v ? v : "(null"));
abort(); abort();
} }
} }
@ -989,7 +988,36 @@ int main(int argc, char** argv) {
CheckGet(db, roptions, "foo", NULL); CheckGet(db, roptions, "foo", NULL);
rocksdb_release_snapshot(db, snap); rocksdb_release_snapshot(db, snap);
} }
StartPhase("snapshot_with_memtable_inplace_update");
{
rocksdb_close(db);
const rocksdb_snapshot_t* snap = NULL;
const char* s_key = "foo_snap";
const char* value1 = "hello_s1";
const char* value2 = "hello_s2";
rocksdb_options_set_allow_concurrent_memtable_write(options, 0);
rocksdb_options_set_inplace_update_support(options, 1);
rocksdb_options_set_error_if_exists(options, 0);
db = rocksdb_open(options, dbname, &err);
CheckNoError(err);
rocksdb_put(db, woptions, s_key, 8, value1, 8, &err);
snap = rocksdb_create_snapshot(db);
assert(snap != NULL);
rocksdb_put(db, woptions, s_key, 8, value2, 8, &err);
CheckNoError(err);
rocksdb_readoptions_set_snapshot(roptions, snap);
CheckGet(db, roptions, "foo", NULL);
// snapshot syntax is invalid, because of inplace update supported is set
CheckGet(db, roptions, s_key, value2);
// restore the data and options
rocksdb_delete(db, woptions, s_key, 8, &err);
CheckGet(db, roptions, s_key, NULL);
rocksdb_release_snapshot(db, snap);
rocksdb_readoptions_set_snapshot(roptions, NULL);
rocksdb_options_set_inplace_update_support(options, 0);
rocksdb_options_set_allow_concurrent_memtable_write(options, 1);
rocksdb_options_set_error_if_exists(options, 1);
}
StartPhase("repair"); StartPhase("repair");
{ {
// If we do not compact here, then the lazy deletion of // If we do not compact here, then the lazy deletion of

View File

@ -3207,6 +3207,12 @@ bool CfdListContains(const CfdList& list, ColumnFamilyData* cfd) {
} // namespace } // namespace
void DBImpl::ReleaseSnapshot(const Snapshot* s) { void DBImpl::ReleaseSnapshot(const Snapshot* s) {
if (s == nullptr) {
// DBImpl::GetSnapshot() can return nullptr when snapshot
// not supported by specifying the condition:
// inplace_update_support enabled.
return;
}
const SnapshotImpl* casted_s = reinterpret_cast<const SnapshotImpl*>(s); const SnapshotImpl* casted_s = reinterpret_cast<const SnapshotImpl*>(s);
{ {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);

View File

@ -169,6 +169,36 @@ TEST_F(DBTestInPlaceUpdate, InPlaceUpdateCallbackNoAction) {
ASSERT_EQ(Get(1, "key"), "NOT_FOUND"); ASSERT_EQ(Get(1, "key"), "NOT_FOUND");
} while (ChangeCompactOptions()); } while (ChangeCompactOptions());
} }
TEST_F(DBTestInPlaceUpdate, InPlaceUpdateAndSnapshot) {
do {
Options options = CurrentOptions();
options.create_if_missing = true;
options.inplace_update_support = true;
options.env = env_;
options.write_buffer_size = 100000;
options.allow_concurrent_memtable_write = false;
Reopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
// Update key with values of smaller size, and
// run GetSnapshot and ReleaseSnapshot
int numValues = 2;
for (int i = numValues; i > 0; i--) {
const Snapshot* s = db_->GetSnapshot();
ASSERT_EQ(nullptr, s);
std::string value = DummyString(i, 'a');
ASSERT_OK(Put(1, "key", value));
ASSERT_EQ(value, Get(1, "key"));
// release s (nullptr)
db_->ReleaseSnapshot(s);
}
// Only 1 instance for that key.
validateNumberOfEntries(1, 1);
} while (ChangeCompactOptions());
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

View File

@ -780,7 +780,7 @@ class DB {
// snapshot is no longer needed. // snapshot is no longer needed.
// //
// nullptr will be returned if the DB fails to take a snapshot or does // nullptr will be returned if the DB fails to take a snapshot or does
// not support snapshot. // not support snapshot (eg: inplace_update_support enabled).
virtual const Snapshot* GetSnapshot() = 0; virtual const Snapshot* GetSnapshot() = 0;
// Release a previously acquired snapshot. The caller must not // Release a previously acquired snapshot. The caller must not