diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index 0e3a0b228..63ad89457 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -99,7 +99,9 @@ class Transaction { virtual Status RollbackToSavePoint() = 0; // This function is similar to DB::Get() except it will also read pending - // changes in this transaction. + // changes in this transaction. Currently, this function will return + // Status::MergeInProgress if the most recent write to the queried key in + // this batch is a Merge. // // If read_options.snapshot is not set, the current version of the key will // be read. Calling SetSnapshot() does not affect the version of the data @@ -131,6 +133,9 @@ class Transaction { // snapshot is set in this transaction). The transaction behavior is the // same regardless of whether the key exists or not. // + // Note: Currently, this function will return Status::MergeInProgress + // if the most recent write to the queried key in this batch is a Merge. + // // The values returned by this function are similar to Transaction::Get(). // If value==nullptr, then this function will not read any data, but will // still ensure that this key cannot be written to by outside of this @@ -146,6 +151,7 @@ class Transaction { // Status::TimedOut() if a lock could not be acquired, // Status::TryAgain() if the memtable history size is not large enough // (See max_write_buffer_number_to_maintain) + // Status::MergeInProgress() if merge operations cannot be resolved. // or other errors if this key could not be read. virtual Status GetForUpdate(const ReadOptions& options, ColumnFamilyHandle* column_family, diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index 402e3f3a7..5cf0d5d3c 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -120,7 +120,14 @@ class WriteBatchWithIndex : public WriteBatchBase { WBWIIterator* NewIterator(); // Will create a new Iterator that will use WBWIIterator as a delta and - // base_iterator as base + // base_iterator as base. + // + // This function is only supported if the WriteBatchWithIndex was + // constructed with overwrite_key=true. + // + // The returned iterator should be deleted by the caller. + // The base_iterator is now 'owned' by the returned iterator. Deleting the + // returned iterator will also delete the base_iterator. Iterator* NewIteratorWithBase(ColumnFamilyHandle* column_family, Iterator* base_iterator); // default column family @@ -135,7 +142,7 @@ class WriteBatchWithIndex : public WriteBatchBase { // Similar to previous function but does not require a column_family. // Note: An InvalidArgument status will be returned if there are any Merge - // operators for this key. + // operators for this key. Use previous method instead. Status GetFromBatch(const DBOptions& options, const Slice& key, std::string* value) { return GetFromBatch(nullptr, options, key, value); diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 473d738bb..e009d0c0f 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -12,6 +12,8 @@ #include "rocksdb/utilities/transaction_db.h" #include "util/logging.h" #include "util/testharness.h" +#include "utilities/merge_operators.h" +#include "utilities/merge_operators/string_append/stringappend.h" using std::string; @@ -28,6 +30,7 @@ class TransactionTest : public testing::Test { TransactionTest() { options.create_if_missing = true; options.max_write_buffer_number = 2; + options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); dbname = test::TmpDir() + "/transaction_testdb"; DestroyDB(dbname, options); @@ -1722,6 +1725,62 @@ TEST_F(TransactionTest, TimeoutTest) { delete txn2; } +TEST_F(TransactionTest, MergeTest) { + WriteOptions write_options; + ReadOptions read_options; + string value; + Status s; + + Transaction* txn = db->BeginTransaction(write_options, TransactionOptions()); + ASSERT_TRUE(txn); + + s = db->Put(write_options, "A", "a0"); + ASSERT_OK(s); + + s = txn->Merge("A", "1"); + ASSERT_OK(s); + + s = txn->Merge("A", "2"); + ASSERT_OK(s); + + s = txn->Get(read_options, "A", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + + s = txn->Put("A", "a"); + ASSERT_OK(s); + + s = txn->Get(read_options, "A", &value); + ASSERT_OK(s); + ASSERT_EQ("a", value); + + s = txn->Merge("A", "3"); + ASSERT_OK(s); + + s = txn->Get(read_options, "A", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + + TransactionOptions txn_options; + txn_options.lock_timeout = 1; // 1 ms + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn2); + + // verify that txn has "A" locked + s = txn2->Merge("A", "4"); + ASSERT_TRUE(s.IsTimedOut()); + + s = txn2->Commit(); + ASSERT_OK(s); + delete txn2; + + s = txn->Commit(); + ASSERT_OK(s); + delete txn; + + s = db->Get(read_options, "A", &value); + ASSERT_OK(s); + ASSERT_EQ("a,3", value); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index e3caa7932..cdc12e8cf 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -619,9 +619,9 @@ Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family, MergeContext merge_context; WriteBatchWithIndexInternal::Result result = - WriteBatchWithIndexInternal::GetFromBatch(options, this, column_family, - key, &merge_context, - &rep->comparator, value, &s); + WriteBatchWithIndexInternal::GetFromBatch( + options, this, column_family, key, &merge_context, &rep->comparator, + value, rep->overwrite_key, &s); switch (result) { case WriteBatchWithIndexInternal::Result::kFound: @@ -662,8 +662,8 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, std::string batch_value; WriteBatchWithIndexInternal::Result result = WriteBatchWithIndexInternal::GetFromBatch( - options, this, column_family, key, &merge_context, - &rep->comparator, &batch_value, &s); + options, this, column_family, key, &merge_context, &rep->comparator, + &batch_value, rep->overwrite_key, &s); if (result == WriteBatchWithIndexInternal::Result::kFound) { value->assign(batch_value.data(), batch_value.size()); @@ -675,6 +675,14 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, if (result == WriteBatchWithIndexInternal::Result::kError) { return s; } + if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress && + rep->overwrite_key == true) { + // Since we've overwritten keys, we do not know what other operations are + // in this batch for this key, so we cannot do a Merge to compute the + // result. Instead, we will simply return MergeInProgress. + return Status::MergeInProgress(); + } + assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress || result == WriteBatchWithIndexInternal::Result::kNotFound); diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/utilities/write_batch_with_index/write_batch_with_index_internal.cc index f5c141121..bc5c3800d 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.cc @@ -132,7 +132,7 @@ WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch( const DBOptions& options, WriteBatchWithIndex* batch, ColumnFamilyHandle* column_family, const Slice& key, MergeContext* merge_context, WriteBatchEntryComparator* cmp, - std::string* value, Status* s) { + std::string* value, bool overwrite_key, Status* s) { uint32_t cf_id = GetColumnFamilyID(column_family); *s = Status::OK(); WriteBatchWithIndexInternal::Result result = @@ -205,6 +205,13 @@ WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch( // We can stop iterating once we find a PUT or DELETE break; } + if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress && + overwrite_key == true) { + // Since we've overwritten keys, we do not know what other operations are + // in this batch for this key, so we cannot do a Merge to compute the + // result. Instead, we will simply return MergeInProgress. + break; + } iter->Prev(); } diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.h b/utilities/write_batch_with_index/write_batch_with_index_internal.h index 54bbd81da..bba9e8bf3 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.h +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.h @@ -91,7 +91,7 @@ class WriteBatchWithIndexInternal { const DBOptions& options, WriteBatchWithIndex* batch, ColumnFamilyHandle* column_family, const Slice& key, MergeContext* merge_context, WriteBatchEntryComparator* cmp, - std::string* value, Status* s); + std::string* value, bool overwrite_key, Status* s); }; } // namespace rocksdb diff --git a/utilities/write_batch_with_index/write_batch_with_index_test.cc b/utilities/write_batch_with_index/write_batch_with_index_test.cc index 3e509ca93..067132272 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_test.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_test.cc @@ -971,7 +971,7 @@ TEST_F(WriteBatchWithIndexTest, TestGetFromBatchMerge) { DestroyDB(dbname, options); Status s = DB::Open(options, dbname, &db); - assert(s.ok()); + ASSERT_OK(s); ColumnFamilyHandle* column_family = db->DefaultColumnFamily(); WriteBatchWithIndex batch; @@ -1009,6 +1009,66 @@ TEST_F(WriteBatchWithIndexTest, TestGetFromBatchMerge) { DestroyDB(dbname, options); } +TEST_F(WriteBatchWithIndexTest, TestGetFromBatchMerge2) { + DB* db; + Options options; + options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); + options.create_if_missing = true; + + std::string dbname = test::TmpDir() + "/write_batch_with_index_test"; + + DestroyDB(dbname, options); + Status s = DB::Open(options, dbname, &db); + ASSERT_OK(s); + + ColumnFamilyHandle* column_family = db->DefaultColumnFamily(); + + // Test batch with overwrite_key=true + WriteBatchWithIndex batch(BytewiseComparator(), 0, true); + std::string value; + + s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_TRUE(s.IsNotFound()); + + batch.Put(column_family, "X", "x"); + s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_OK(s); + ASSERT_EQ("x", value); + + batch.Put(column_family, "X", "x2"); + s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_OK(s); + ASSERT_EQ("x2", value); + + batch.Merge(column_family, "X", "aaa"); + s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + + batch.Merge(column_family, "X", "bbb"); + s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + + batch.Put(column_family, "X", "x3"); + s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_OK(s); + ASSERT_EQ("x3", value); + + batch.Merge(column_family, "X", "ccc"); + s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + + batch.Delete(column_family, "X"); + s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_TRUE(s.IsNotFound()); + + batch.Merge(column_family, "X", "ddd"); + s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + + delete db; + DestroyDB(dbname, options); +} + TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDB) { DB* db; Options options; @@ -1017,7 +1077,7 @@ TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDB) { DestroyDB(dbname, options); Status s = DB::Open(options, dbname, &db); - assert(s.ok()); + ASSERT_OK(s); WriteBatchWithIndex batch; ReadOptions read_options; @@ -1185,6 +1245,54 @@ TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge) { DestroyDB(dbname, options); } +TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge2) { + DB* db; + Options options; + + options.create_if_missing = true; + std::string dbname = test::TmpDir() + "/write_batch_with_index_test"; + + options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); + + DestroyDB(dbname, options); + Status s = DB::Open(options, dbname, &db); + assert(s.ok()); + + // Test batch with overwrite_key=true + WriteBatchWithIndex batch(BytewiseComparator(), 0, true); + + ReadOptions read_options; + WriteOptions write_options; + std::string value; + + s = batch.GetFromBatchAndDB(db, read_options, "A", &value); + ASSERT_TRUE(s.IsNotFound()); + + batch.Merge("A", "xxx"); + + s = batch.GetFromBatchAndDB(db, read_options, "A", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + + batch.Merge("A", "yyy"); + + s = batch.GetFromBatchAndDB(db, read_options, "A", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + + s = db->Put(write_options, "A", "a0"); + ASSERT_OK(s); + + s = batch.GetFromBatchAndDB(db, read_options, "A", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + + batch.Delete("A"); + + s = batch.GetFromBatchAndDB(db, read_options, "A", &value); + ASSERT_TRUE(s.IsNotFound()); + + delete db; + DestroyDB(dbname, options); +} + void AssertKey(std::string key, WBWIIterator* iter) { ASSERT_TRUE(iter->Valid()); ASSERT_EQ(key, iter->Entry().key.ToString());