Merge branch 'master' of https://github.com/facebook/rocksdb
This commit is contained in:
commit
65fba4b984
14
db/c.cc
14
db/c.cc
@ -330,6 +330,20 @@ rocksdb_t* rocksdb_open(
|
||||
return result;
|
||||
}
|
||||
|
||||
rocksdb_t* rocksdb_open_for_read_only(
|
||||
const rocksdb_options_t* options,
|
||||
const char* name,
|
||||
unsigned char error_if_log_file_exist,
|
||||
char** errptr) {
|
||||
DB* db;
|
||||
if (SaveError(errptr, DB::OpenForReadOnly(options->rep, std::string(name), &db, error_if_log_file_exist))) {
|
||||
return nullptr;
|
||||
}
|
||||
rocksdb_t* result = new rocksdb_t;
|
||||
result->rep = db;
|
||||
return result;
|
||||
}
|
||||
|
||||
void rocksdb_close(rocksdb_t* db) {
|
||||
delete db->rep;
|
||||
delete db;
|
||||
|
@ -5555,9 +5555,6 @@ TEST(DBTest, TransactionLogIteratorMoveOverZeroFiles) {
|
||||
} while (ChangeCompactOptions());
|
||||
}
|
||||
|
||||
// TODO(kailiu) disable the in non-linux platforms to temporarily solve
|
||||
// // the unit test failure.
|
||||
#ifdef OS_LINUX
|
||||
TEST(DBTest, TransactionLogIteratorStallAtLastRecord) {
|
||||
do {
|
||||
Options options = OptionsForLogIterTest();
|
||||
@ -5575,7 +5572,6 @@ TEST(DBTest, TransactionLogIteratorStallAtLastRecord) {
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
} while (ChangeCompactOptions());
|
||||
}
|
||||
#endif
|
||||
|
||||
TEST(DBTest, TransactionLogIteratorJustEmptyFile) {
|
||||
do {
|
||||
|
@ -17,7 +17,7 @@
|
||||
#include "db/write_batch_internal.h"
|
||||
#include "utilities/merge_operators.h"
|
||||
#include "util/testharness.h"
|
||||
#include "utilities/utility_db.h"
|
||||
#include "utilities/db_ttl.h"
|
||||
|
||||
using namespace std;
|
||||
using namespace rocksdb;
|
||||
@ -80,7 +80,6 @@ std::shared_ptr<DB> OpenDb(const string& dbname, const bool ttl = false,
|
||||
const size_t max_successive_merges = 0,
|
||||
const uint32_t min_partial_merge_operands = 2) {
|
||||
DB* db;
|
||||
StackableDB* sdb;
|
||||
Options options;
|
||||
options.create_if_missing = true;
|
||||
options.merge_operator = std::make_shared<CountMergeOperator>();
|
||||
@ -90,8 +89,9 @@ std::shared_ptr<DB> OpenDb(const string& dbname, const bool ttl = false,
|
||||
DestroyDB(dbname, Options());
|
||||
if (ttl) {
|
||||
cout << "Opening database with TTL\n";
|
||||
s = UtilityDB::OpenTtlDB(options, dbname, &sdb);
|
||||
db = sdb;
|
||||
DBWithTTL* db_with_ttl;
|
||||
s = DBWithTTL::Open(options, dbname, &db_with_ttl);
|
||||
db = db_with_ttl;
|
||||
} else {
|
||||
s = DB::Open(options, dbname, &db);
|
||||
}
|
||||
|
@ -83,6 +83,12 @@ extern rocksdb_t* rocksdb_open(
|
||||
const char* name,
|
||||
char** errptr);
|
||||
|
||||
extern rocksdb_t* rocksdb_open_for_read_only(
|
||||
const rocksdb_options_t* options,
|
||||
const char* name,
|
||||
unsigned char error_if_log_file_exist,
|
||||
char** errptr);
|
||||
|
||||
extern void rocksdb_close(rocksdb_t* db);
|
||||
|
||||
extern void rocksdb_put(
|
||||
|
68
include/utilities/db_ttl.h
Normal file
68
include/utilities/db_ttl.h
Normal file
@ -0,0 +1,68 @@
|
||||
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
|
||||
#pragma once
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "utilities/stackable_db.h"
|
||||
#include "rocksdb/db.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
// Database with TTL support.
|
||||
//
|
||||
// USE-CASES:
|
||||
// This API should be used to open the db when key-values inserted are
|
||||
// meant to be removed from the db in a non-strict 'ttl' amount of time
|
||||
// Therefore, this guarantees that key-values inserted will remain in the
|
||||
// db for >= ttl amount of time and the db will make efforts to remove the
|
||||
// key-values as soon as possible after ttl seconds of their insertion.
|
||||
//
|
||||
// BEHAVIOUR:
|
||||
// TTL is accepted in seconds
|
||||
// (int32_t)Timestamp(creation) is suffixed to values in Put internally
|
||||
// Expired TTL values deleted in compaction only:(Timestamp+ttl<time_now)
|
||||
// Get/Iterator may return expired entries(compaction not run on them yet)
|
||||
// Different TTL may be used during different Opens
|
||||
// Example: Open1 at t=0 with ttl=4 and insert k1,k2, close at t=2
|
||||
// Open2 at t=3 with ttl=5. Now k1,k2 should be deleted at t>=5
|
||||
// read_only=true opens in the usual read-only mode. Compactions will not be
|
||||
// triggered(neither manual nor automatic), so no expired entries removed
|
||||
//
|
||||
// CONSTRAINTS:
|
||||
// Not specifying/passing or non-positive TTL behaves like TTL = infinity
|
||||
//
|
||||
// !!!WARNING!!!:
|
||||
// Calling DB::Open directly to re-open a db created by this API will get
|
||||
// corrupt values(timestamp suffixed) and no ttl effect will be there
|
||||
// during the second Open, so use this API consistently to open the db
|
||||
// Be careful when passing ttl with a small positive value because the
|
||||
// whole database may be deleted in a small amount of time
|
||||
|
||||
class DBWithTTL : public StackableDB {
|
||||
public:
|
||||
virtual Status CreateColumnFamilyWithTtl(
|
||||
const ColumnFamilyOptions& options, const std::string& column_family_name,
|
||||
ColumnFamilyHandle** handle, int ttl) = 0;
|
||||
|
||||
static Status Open(const Options& options, const std::string& dbname,
|
||||
DBWithTTL** dbptr, int32_t ttl = 0,
|
||||
bool read_only = false);
|
||||
|
||||
static Status Open(const DBOptions& db_options, const std::string& dbname,
|
||||
const std::vector<ColumnFamilyDescriptor>& column_families,
|
||||
std::vector<ColumnFamilyHandle*>* handles,
|
||||
DBWithTTL** dbptr, std::vector<int32_t> ttls,
|
||||
bool read_only = false);
|
||||
|
||||
protected:
|
||||
explicit DBWithTTL(DB* db) : StackableDB(db) {}
|
||||
};
|
||||
|
||||
} // namespace rocksdb
|
||||
#endif // ROCKSDB_LITE
|
@ -21,6 +21,16 @@ class StackableDB : public DB {
|
||||
return db_;
|
||||
}
|
||||
|
||||
virtual Status CreateColumnFamily(const ColumnFamilyOptions& options,
|
||||
const std::string& column_family_name,
|
||||
ColumnFamilyHandle** handle) {
|
||||
return db_->CreateColumnFamily(options, column_family_name, handle);
|
||||
}
|
||||
|
||||
virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) {
|
||||
return db_->DropColumnFamily(column_family);
|
||||
}
|
||||
|
||||
using DB::Put;
|
||||
virtual Status Put(const WriteOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
|
@ -8,55 +8,22 @@
|
||||
#include <string>
|
||||
|
||||
#include "utilities/stackable_db.h"
|
||||
#include "utilities/db_ttl.h"
|
||||
#include "rocksdb/db.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
// This class contains APIs to open rocksdb with specific support eg. TTL
|
||||
// Please don't use this class. It's deprecated
|
||||
class UtilityDB {
|
||||
|
||||
public:
|
||||
// Open the database with TTL support.
|
||||
//
|
||||
// USE-CASES:
|
||||
// This API should be used to open the db when key-values inserted are
|
||||
// meant to be removed from the db in a non-strict 'ttl' amount of time
|
||||
// Therefore, this guarantees that key-values inserted will remain in the
|
||||
// db for >= ttl amount of time and the db will make efforts to remove the
|
||||
// key-values as soon as possible after ttl seconds of their insertion.
|
||||
//
|
||||
// BEHAVIOUR:
|
||||
// TTL is accepted in seconds
|
||||
// (int32_t)Timestamp(creation) is suffixed to values in Put internally
|
||||
// Expired TTL values deleted in compaction only:(Timestamp+ttl<time_now)
|
||||
// Get/Iterator may return expired entries(compaction not run on them yet)
|
||||
// Different TTL may be used during different Opens
|
||||
// Example: Open1 at t=0 with ttl=4 and insert k1,k2, close at t=2
|
||||
// Open2 at t=3 with ttl=5. Now k1,k2 should be deleted at t>=5
|
||||
// read_only=true opens in the usual read-only mode. Compactions will not be
|
||||
// triggered(neither manual nor automatic), so no expired entries removed
|
||||
//
|
||||
// CONSTRAINTS:
|
||||
// Not specifying/passing or non-positive TTL behaves like TTL = infinity
|
||||
//
|
||||
// !!!WARNING!!!:
|
||||
// Calling DB::Open directly to re-open a db created by this API will get
|
||||
// corrupt values(timestamp suffixed) and no ttl effect will be there
|
||||
// during the second Open, so use this API consistently to open the db
|
||||
// Be careful when passing ttl with a small positive value because the
|
||||
// whole database may be deleted in a small amount of time
|
||||
static Status OpenTtlDB(const Options& options,
|
||||
const std::string& name,
|
||||
StackableDB** dbptr,
|
||||
int32_t ttl = 0,
|
||||
bool read_only = false);
|
||||
|
||||
// OpenTtlDB with column family support
|
||||
static Status OpenTtlDB(
|
||||
const DBOptions& db_options, const std::string& name,
|
||||
const std::vector<ColumnFamilyDescriptor>& column_families,
|
||||
std::vector<ColumnFamilyHandle*>* handles, StackableDB** dbptr,
|
||||
std::vector<int32_t> ttls, bool read_only = false);
|
||||
public:
|
||||
// This function is here only for backwards compatibility. Please use the
|
||||
// functions defined in DBWithTTl (utilities/db_ttl.h)
|
||||
// (deprecated)
|
||||
__attribute__((deprecated)) static Status OpenTtlDB(const Options& options,
|
||||
const std::string& name,
|
||||
StackableDB** dbptr,
|
||||
int32_t ttl = 0,
|
||||
bool read_only = false);
|
||||
};
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -1 +1 @@
|
||||
java -Djava.library.path=.:../ -cp "rocksdbjni.jar:.:./*" org.rocksdb.benchmark.DbBenchmark $@
|
||||
java -server -d64 -XX:NewSize=4m -XX:+AggressiveOpts -Djava.library.path=.:../ -cp "rocksdbjni.jar:.:./*" org.rocksdb.benchmark.DbBenchmark $@
|
||||
|
@ -93,34 +93,6 @@ public class ReadOptions {
|
||||
private native void setFillCache(
|
||||
long handle, boolean fillCache);
|
||||
|
||||
/**
|
||||
* If this option is set and memtable implementation allows, Seek
|
||||
* might only return keys with the same prefix as the seek-key
|
||||
* Default: false
|
||||
*
|
||||
* @return true if prefix-seek is enabled.
|
||||
*/
|
||||
public boolean prefixSeek() {
|
||||
assert(isInitialized());
|
||||
return prefixSeek(nativeHandle_);
|
||||
}
|
||||
private native boolean prefixSeek(long handle);
|
||||
|
||||
/**
|
||||
* If this option is set and memtable implementation allows, Seek
|
||||
* might only return keys with the same prefix as the seek-key
|
||||
*
|
||||
* @param prefixSeek if true, then prefix-seek will be enabled.
|
||||
* @return the reference to the current ReadOptions.
|
||||
*/
|
||||
public ReadOptions setPrefixSeek(boolean prefixSeek) {
|
||||
assert(isInitialized());
|
||||
setPrefixSeek(nativeHandle_, prefixSeek);
|
||||
return this;
|
||||
}
|
||||
private native void setPrefixSeek(
|
||||
long handle, boolean prefixSeek);
|
||||
|
||||
/**
|
||||
* Specify to create a tailing iterator -- a special iterator that has a
|
||||
* view of the complete database (i.e. it can also be used to read newly
|
||||
|
@ -162,6 +162,15 @@ public class DbBenchmark {
|
||||
EXISTING
|
||||
}
|
||||
|
||||
enum CompressionType {
|
||||
NONE,
|
||||
SNAPPY,
|
||||
ZLIB,
|
||||
BZIP2,
|
||||
LZ4,
|
||||
LZ4HC
|
||||
}
|
||||
|
||||
static {
|
||||
System.loadLibrary("rocksdbjni");
|
||||
}
|
||||
@ -435,7 +444,6 @@ public class DbBenchmark {
|
||||
databaseDir_ = (String) flags.get(Flag.db);
|
||||
writesPerSeconds_ = (Integer) flags.get(Flag.writes_per_second);
|
||||
cacheSize_ = (Long) flags.get(Flag.cache_size);
|
||||
gen_ = new RandomGenerator(randSeed_, compressionRatio_);
|
||||
memtable_ = (String) flags.get(Flag.memtablerep);
|
||||
maxWriteBufferNumber_ = (Integer) flags.get(Flag.max_write_buffer_number);
|
||||
prefixSize_ = (Integer) flags.get(Flag.prefix_size);
|
||||
@ -446,6 +454,28 @@ public class DbBenchmark {
|
||||
finishLock_ = new Object();
|
||||
// options.setPrefixSize((Integer)flags_.get(Flag.prefix_size));
|
||||
// options.setKeysPerPrefix((Long)flags_.get(Flag.keys_per_prefix));
|
||||
compressionType_ = (String) flags.get(Flag.compression_type);
|
||||
compression_ = CompressionType.NONE;
|
||||
try {
|
||||
if (compressionType_.equals("snappy")) {
|
||||
System.loadLibrary("snappy");
|
||||
} else if (compressionType_.equals("zlib")) {
|
||||
System.loadLibrary("zlib");
|
||||
} else if (compressionType_.equals("bzip2")) {
|
||||
System.loadLibrary("bzip2");
|
||||
} else if (compressionType_.equals("lz4")) {
|
||||
System.loadLibrary("lz4");
|
||||
} else if (compressionType_.equals("lz4hc")) {
|
||||
System.loadLibrary("lz4hc");
|
||||
}
|
||||
} catch (UnsatisfiedLinkError e) {
|
||||
System.err.format("Unable to load %s library:%s%n" +
|
||||
"No compression is used.%n",
|
||||
compressionType_, e.toString());
|
||||
compressionType_ = "none";
|
||||
compressionRatio_ = 1.0;
|
||||
}
|
||||
gen_ = new RandomGenerator(randSeed_, compressionRatio_);
|
||||
}
|
||||
|
||||
private void prepareReadOptions(ReadOptions options) {
|
||||
@ -462,6 +492,8 @@ public class DbBenchmark {
|
||||
options.setCacheSize(cacheSize_);
|
||||
if (!useExisting_) {
|
||||
options.setCreateIfMissing(true);
|
||||
} else {
|
||||
options.setCreateIfMissing(false);
|
||||
}
|
||||
if (memtable_.equals("skip_list")) {
|
||||
options.setMemTableConfig(new SkipListMemTableConfig());
|
||||
@ -488,6 +520,8 @@ public class DbBenchmark {
|
||||
options.setTableFormatConfig(
|
||||
new PlainTableConfig().setKeySize(keySize_));
|
||||
}
|
||||
options.setWriteBufferSize(
|
||||
(Long)flags_.get(Flag.write_buffer_size));
|
||||
options.setMaxWriteBufferNumber(
|
||||
(Integer)flags_.get(Flag.max_write_buffer_number));
|
||||
options.setMaxBackgroundCompactions(
|
||||
@ -513,7 +547,7 @@ public class DbBenchmark {
|
||||
options.setDisableSeekCompaction(
|
||||
(Boolean)flags_.get(Flag.disable_seek_compaction));
|
||||
options.setDeleteObsoleteFilesPeriodMicros(
|
||||
(Long)flags_.get(Flag.delete_obsolete_files_period_micros));
|
||||
(Integer)flags_.get(Flag.delete_obsolete_files_period_micros));
|
||||
options.setTableCacheNumshardbits(
|
||||
(Integer)flags_.get(Flag.table_cache_numshardbits));
|
||||
options.setAllowMmapReads(
|
||||
@ -640,12 +674,12 @@ public class DbBenchmark {
|
||||
} else if (benchmark.equals("readseq")) {
|
||||
for (int t = 0; t < threadNum_; ++t) {
|
||||
tasks.add(new ReadSequentialTask(
|
||||
currentTaskId++, randSeed_, reads_, num_));
|
||||
currentTaskId++, randSeed_, reads_ / threadNum_, num_));
|
||||
}
|
||||
} else if (benchmark.equals("readrandom")) {
|
||||
for (int t = 0; t < threadNum_; ++t) {
|
||||
tasks.add(new ReadRandomTask(
|
||||
currentTaskId++, randSeed_, reads_, num_));
|
||||
currentTaskId++, randSeed_, reads_ / threadNum_, num_));
|
||||
}
|
||||
} else if (benchmark.equals("readwhilewriting")) {
|
||||
WriteTask writeTask = new WriteRandomTask(
|
||||
@ -717,12 +751,12 @@ public class DbBenchmark {
|
||||
(int) (valueSize_ * compressionRatio_ + 0.5));
|
||||
System.out.printf("Entries: %d\n", num_);
|
||||
System.out.printf("RawSize: %.1f MB (estimated)\n",
|
||||
((kKeySize + valueSize_) * num_) / 1048576.0);
|
||||
((double)(kKeySize + valueSize_) * num_) / SizeUnit.MB);
|
||||
System.out.printf("FileSize: %.1f MB (estimated)\n",
|
||||
(((kKeySize + valueSize_ * compressionRatio_) * num_)
|
||||
/ 1048576.0));
|
||||
(((kKeySize + valueSize_ * compressionRatio_) * num_) / SizeUnit.MB));
|
||||
System.out.format("Memtable Factory: %s%n", options.memTableFactoryName());
|
||||
System.out.format("Prefix: %d bytes%n", prefixSize_);
|
||||
System.out.format("Compression: %s%n", compressionType_);
|
||||
printWarnings();
|
||||
System.out.printf("------------------------------------------------\n");
|
||||
}
|
||||
@ -769,7 +803,7 @@ public class DbBenchmark {
|
||||
|
||||
System.out.printf(
|
||||
"%-16s : %11.5f micros/op; %6.1f MB/s; %d / %d task(s) finished.\n",
|
||||
benchmark, elapsedSeconds * 1e6 / stats.done_,
|
||||
benchmark, (double) elapsedSeconds / stats.done_ * 1e6,
|
||||
(stats.bytes_ / 1048576.0) / elapsedSeconds,
|
||||
taskFinishedCount, concurrentThreads);
|
||||
}
|
||||
@ -932,7 +966,7 @@ public class DbBenchmark {
|
||||
return Integer.parseInt(value);
|
||||
}
|
||||
},
|
||||
write_buffer_size(4 << 20,
|
||||
write_buffer_size(4 * SizeUnit.MB,
|
||||
"Number of bytes to buffer in memtable before compacting\n" +
|
||||
"\t(initialized to default value by 'main'.)") {
|
||||
@Override public Object parseValue(String value) {
|
||||
@ -1275,11 +1309,17 @@ public class DbBenchmark {
|
||||
return Boolean.parseBoolean(value);
|
||||
}
|
||||
},
|
||||
delete_obsolete_files_period_micros(0L,"Option to delete\n" +
|
||||
delete_obsolete_files_period_micros(0,"Option to delete\n" +
|
||||
"\tobsolete files periodically. 0 means that obsolete files are\n" +
|
||||
"\tdeleted after every compaction run.") {
|
||||
@Override public Object parseValue(String value) {
|
||||
return Long.parseLong(value);
|
||||
return Integer.parseInt(value);
|
||||
}
|
||||
},
|
||||
compression_type("snappy",
|
||||
"Algorithm used to compress the database.") {
|
||||
@Override public Object parseValue(String value) {
|
||||
return value;
|
||||
}
|
||||
},
|
||||
compression_level(-1,
|
||||
@ -1512,7 +1552,7 @@ public class DbBenchmark {
|
||||
final long cacheSize_;
|
||||
final boolean useExisting_;
|
||||
final String databaseDir_;
|
||||
final double compressionRatio_;
|
||||
double compressionRatio_;
|
||||
RandomGenerator gen_;
|
||||
long startTime_;
|
||||
|
||||
@ -1532,4 +1572,6 @@ public class DbBenchmark {
|
||||
// as the scope of a static member equals to the scope of the problem,
|
||||
// we let its c++ pointer to be disposed in its finalizer.
|
||||
static Options defaultOptions_ = new Options();
|
||||
String compressionType_;
|
||||
CompressionType compression_;
|
||||
}
|
||||
|
@ -27,12 +27,6 @@ public class ReadOptionsTest {
|
||||
assert(opt.fillCache() == boolValue);
|
||||
}
|
||||
|
||||
{ // PrefixSeek test
|
||||
boolean boolValue = rand.nextBoolean();
|
||||
opt.setPrefixSeek(boolValue);
|
||||
assert(opt.prefixSeek() == boolValue);
|
||||
}
|
||||
|
||||
{ // Tailing test
|
||||
boolean boolValue = rand.nextBoolean();
|
||||
opt.setTailing(boolValue);
|
||||
|
@ -1785,27 +1785,6 @@ void Java_org_rocksdb_ReadOptions_setFillCache(
|
||||
static_cast<bool>(jfill_cache);
|
||||
}
|
||||
|
||||
/*
|
||||
* Class: org_rocksdb_ReadOptions
|
||||
* Method: prefixSeek
|
||||
* Signature: (J)Z
|
||||
*/
|
||||
jboolean Java_org_rocksdb_ReadOptions_prefixSeek(
|
||||
JNIEnv* env, jobject jobj, jlong jhandle) {
|
||||
return reinterpret_cast<rocksdb::ReadOptions*>(jhandle)->prefix_seek;
|
||||
}
|
||||
|
||||
/*
|
||||
* Class: org_rocksdb_ReadOptions
|
||||
* Method: setPrefixSeek
|
||||
* Signature: (JZ)V
|
||||
*/
|
||||
void Java_org_rocksdb_ReadOptions_setPrefixSeek(
|
||||
JNIEnv* env, jobject jobj, jlong jhandle, jboolean jprefix_seek) {
|
||||
reinterpret_cast<rocksdb::ReadOptions*>(jhandle)->prefix_seek =
|
||||
static_cast<bool>(jprefix_seek);
|
||||
}
|
||||
|
||||
/*
|
||||
* Class: org_rocksdb_ReadOptions
|
||||
* Method: tailing
|
||||
|
@ -212,7 +212,7 @@ jbyteArray Java_org_rocksdb_WriteBatchTest_getContents(
|
||||
rocksdb::Status s =
|
||||
rocksdb::WriteBatchInternal::InsertInto(b, &cf_mems_default);
|
||||
int count = 0;
|
||||
rocksdb::Iterator* iter = mem->NewIterator();
|
||||
rocksdb::Iterator* iter = mem->NewIterator(rocksdb::ReadOptions());
|
||||
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
||||
rocksdb::ParsedInternalKey ikey;
|
||||
memset(reinterpret_cast<void*>(&ikey), 0, sizeof(ikey));
|
||||
|
@ -28,7 +28,7 @@
|
||||
#include "db/version_set.h"
|
||||
#include "rocksdb/statistics.h"
|
||||
#include "rocksdb/cache.h"
|
||||
#include "utilities/utility_db.h"
|
||||
#include "utilities/db_ttl.h"
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/write_batch.h"
|
||||
#include "rocksdb/slice.h"
|
||||
@ -42,7 +42,6 @@
|
||||
#include "util/random.h"
|
||||
#include "util/testutil.h"
|
||||
#include "util/logging.h"
|
||||
#include "utilities/ttl/db_ttl.h"
|
||||
#include "hdfs/env_hdfs.h"
|
||||
#include "utilities/merge_operators.h"
|
||||
|
||||
@ -1620,9 +1619,9 @@ class StressTest {
|
||||
assert(!s.ok() || column_families_.size() ==
|
||||
static_cast<size_t>(FLAGS_column_families));
|
||||
} else {
|
||||
StackableDB* sdb;
|
||||
s = UtilityDB::OpenTtlDB(options_, FLAGS_db, &sdb, FLAGS_ttl);
|
||||
db_ = sdb;
|
||||
DBWithTTL* db_with_ttl;
|
||||
s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl);
|
||||
db_ = db_with_ttl;
|
||||
}
|
||||
if (!s.ok()) {
|
||||
fprintf(stderr, "open error: %s\n", s.ToString().c_str());
|
||||
|
@ -182,6 +182,9 @@ class PosixSequentialFile: public SequentialFile {
|
||||
if (r < n) {
|
||||
if (feof(file_)) {
|
||||
// We leave status as ok if we hit the end of the file
|
||||
// We also clear the error so that the reads can continue
|
||||
// if a new data is written to the file
|
||||
clearerr(file_);
|
||||
} else {
|
||||
// A partial read with an error: return a non-ok status
|
||||
s = IOError(filename_, errno);
|
||||
|
@ -14,6 +14,7 @@
|
||||
#include "rocksdb/write_batch.h"
|
||||
#include "rocksdb/cache.h"
|
||||
#include "util/coding.h"
|
||||
#include "utilities/ttl/db_ttl_impl.h"
|
||||
|
||||
#include <ctime>
|
||||
#include <dirent.h>
|
||||
@ -909,11 +910,11 @@ void DBDumperCommand::DoCommand() {
|
||||
int max_keys = max_keys_;
|
||||
int ttl_start;
|
||||
if (!ParseIntOption(option_map_, ARG_TTL_START, ttl_start, exec_state_)) {
|
||||
ttl_start = DBWithTTL::kMinTimestamp; // TTL introduction time
|
||||
ttl_start = DBWithTTLImpl::kMinTimestamp; // TTL introduction time
|
||||
}
|
||||
int ttl_end;
|
||||
if (!ParseIntOption(option_map_, ARG_TTL_END, ttl_end, exec_state_)) {
|
||||
ttl_end = DBWithTTL::kMaxTimestamp; // Max time allowed by TTL feature
|
||||
ttl_end = DBWithTTLImpl::kMaxTimestamp; // Max time allowed by TTL feature
|
||||
}
|
||||
if (ttl_end < ttl_start) {
|
||||
fprintf(stderr, "Error: End time can't be less than start time\n");
|
||||
@ -1600,11 +1601,11 @@ void ScanCommand::DoCommand() {
|
||||
}
|
||||
int ttl_start;
|
||||
if (!ParseIntOption(option_map_, ARG_TTL_START, ttl_start, exec_state_)) {
|
||||
ttl_start = DBWithTTL::kMinTimestamp; // TTL introduction time
|
||||
ttl_start = DBWithTTLImpl::kMinTimestamp; // TTL introduction time
|
||||
}
|
||||
int ttl_end;
|
||||
if (!ParseIntOption(option_map_, ARG_TTL_END, ttl_end, exec_state_)) {
|
||||
ttl_end = DBWithTTL::kMaxTimestamp; // Max time allowed by TTL feature
|
||||
ttl_end = DBWithTTLImpl::kMaxTimestamp; // Max time allowed by TTL feature
|
||||
}
|
||||
if (ttl_end < ttl_start) {
|
||||
fprintf(stderr, "Error: End time can't be less than start time\n");
|
||||
|
@ -19,8 +19,8 @@
|
||||
#include "util/logging.h"
|
||||
#include "util/ldb_cmd_execute_result.h"
|
||||
#include "util/string_util.h"
|
||||
#include "utilities/utility_db.h"
|
||||
#include "utilities/ttl/db_ttl.h"
|
||||
#include "utilities/db_ttl.h"
|
||||
#include "utilities/ttl/db_ttl_impl.h"
|
||||
|
||||
using std::string;
|
||||
using std::map;
|
||||
@ -149,7 +149,7 @@ protected:
|
||||
LDBCommandExecuteResult exec_state_;
|
||||
string db_path_;
|
||||
DB* db_;
|
||||
StackableDB* sdb_;
|
||||
DBWithTTL* db_ttl_;
|
||||
|
||||
/**
|
||||
* true implies that this command can work if the db is opened in read-only
|
||||
@ -217,11 +217,11 @@ protected:
|
||||
Status st;
|
||||
if (is_db_ttl_) {
|
||||
if (is_read_only_) {
|
||||
st = UtilityDB::OpenTtlDB(opt, db_path_, &sdb_, 0, true);
|
||||
st = DBWithTTL::Open(opt, db_path_, &db_ttl_, 0, true);
|
||||
} else {
|
||||
st = UtilityDB::OpenTtlDB(opt, db_path_, &sdb_);
|
||||
st = DBWithTTL::Open(opt, db_path_, &db_ttl_);
|
||||
}
|
||||
db_ = sdb_;
|
||||
db_ = db_ttl_;
|
||||
} else if (is_read_only_) {
|
||||
st = DB::OpenForReadOnly(opt, db_path_, &db_);
|
||||
} else {
|
||||
|
@ -14,7 +14,7 @@
|
||||
#include "utilities/merge_operators.h"
|
||||
#include "utilities/merge_operators/string_append/stringappend.h"
|
||||
#include "utilities/merge_operators/string_append/stringappend2.h"
|
||||
#include "utilities/ttl/db_ttl.h"
|
||||
#include "utilities/db_ttl.h"
|
||||
#include "util/testharness.h"
|
||||
#include "util/random.h"
|
||||
|
||||
@ -38,11 +38,11 @@ std::shared_ptr<DB> OpenNormalDb(char delim_char) {
|
||||
|
||||
// Open a TtlDB with a non-associative StringAppendTESTOperator
|
||||
std::shared_ptr<DB> OpenTtlDb(char delim_char) {
|
||||
StackableDB* db;
|
||||
DBWithTTL* db;
|
||||
Options options;
|
||||
options.create_if_missing = true;
|
||||
options.merge_operator.reset(new StringAppendTESTOperator(delim_char));
|
||||
ASSERT_OK(UtilityDB::OpenTtlDB(options, kDbName, &db, 123456));
|
||||
ASSERT_OK(DBWithTTL::Open(options, kDbName, &db, 123456));
|
||||
return std::shared_ptr<DB>(db);
|
||||
}
|
||||
} // namespace
|
||||
|
@ -3,16 +3,18 @@
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
#include "utilities/ttl/db_ttl.h"
|
||||
#include "utilities/ttl/db_ttl_impl.h"
|
||||
|
||||
#include "utilities/db_ttl.h"
|
||||
#include "db/filename.h"
|
||||
#include "db/write_batch_internal.h"
|
||||
#include "util/coding.h"
|
||||
#include "include/rocksdb/env.h"
|
||||
#include "include/rocksdb/iterator.h"
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/iterator.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
void DBWithTTL::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options) {
|
||||
void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options) {
|
||||
if (options->compaction_filter) {
|
||||
options->compaction_filter =
|
||||
new TtlCompactionFilter(ttl, options->compaction_filter);
|
||||
@ -28,19 +30,25 @@ void DBWithTTL::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options) {
|
||||
}
|
||||
}
|
||||
|
||||
// Open the db inside DBWithTTL because options needs pointer to its ttl
|
||||
DBWithTTL::DBWithTTL(DB* db) : StackableDB(db) {}
|
||||
// Open the db inside DBWithTTLImpl because options needs pointer to its ttl
|
||||
DBWithTTLImpl::DBWithTTLImpl(DB* db) : DBWithTTL(db) {}
|
||||
|
||||
DBWithTTL::~DBWithTTL() {
|
||||
delete GetOptions().compaction_filter;
|
||||
DBWithTTLImpl::~DBWithTTLImpl() { delete GetOptions().compaction_filter; }
|
||||
|
||||
Status UtilityDB::OpenTtlDB(const Options& options, const std::string& dbname,
|
||||
StackableDB** dbptr, int32_t ttl, bool read_only) {
|
||||
DBWithTTL* db;
|
||||
Status s = DBWithTTL::Open(options, dbname, &db, ttl, read_only);
|
||||
if (s.ok()) {
|
||||
*dbptr = db;
|
||||
} else {
|
||||
*dbptr = nullptr;
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
Status UtilityDB::OpenTtlDB(
|
||||
const Options& options,
|
||||
const std::string& dbname,
|
||||
StackableDB** dbptr,
|
||||
int32_t ttl,
|
||||
bool read_only) {
|
||||
Status DBWithTTL::Open(const Options& options, const std::string& dbname,
|
||||
DBWithTTL** dbptr, int32_t ttl, bool read_only) {
|
||||
|
||||
DBOptions db_options(options);
|
||||
ColumnFamilyOptions cf_options(options);
|
||||
@ -48,8 +56,8 @@ Status UtilityDB::OpenTtlDB(
|
||||
column_families.push_back(
|
||||
ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
|
||||
std::vector<ColumnFamilyHandle*> handles;
|
||||
Status s = UtilityDB::OpenTtlDB(db_options, dbname, column_families, &handles,
|
||||
dbptr, {ttl}, read_only);
|
||||
Status s = DBWithTTL::Open(db_options, dbname, column_families, &handles,
|
||||
dbptr, {ttl}, read_only);
|
||||
if (s.ok()) {
|
||||
assert(handles.size() == 1);
|
||||
// i can delete the handle since DBImpl is always holding a reference to
|
||||
@ -59,10 +67,10 @@ Status UtilityDB::OpenTtlDB(
|
||||
return s;
|
||||
}
|
||||
|
||||
Status UtilityDB::OpenTtlDB(
|
||||
Status DBWithTTL::Open(
|
||||
const DBOptions& db_options, const std::string& dbname,
|
||||
const std::vector<ColumnFamilyDescriptor>& column_families,
|
||||
std::vector<ColumnFamilyHandle*>* handles, StackableDB** dbptr,
|
||||
std::vector<ColumnFamilyHandle*>* handles, DBWithTTL** dbptr,
|
||||
std::vector<int32_t> ttls, bool read_only) {
|
||||
|
||||
if (ttls.size() != column_families.size()) {
|
||||
@ -73,7 +81,8 @@ Status UtilityDB::OpenTtlDB(
|
||||
std::vector<ColumnFamilyDescriptor> column_families_sanitized =
|
||||
column_families;
|
||||
for (size_t i = 0; i < column_families_sanitized.size(); ++i) {
|
||||
DBWithTTL::SanitizeOptions(ttls[i], &column_families_sanitized[i].options);
|
||||
DBWithTTLImpl::SanitizeOptions(ttls[i],
|
||||
&column_families_sanitized[i].options);
|
||||
}
|
||||
DB* db;
|
||||
|
||||
@ -85,66 +94,81 @@ Status UtilityDB::OpenTtlDB(
|
||||
st = DB::Open(db_options, dbname, column_families_sanitized, handles, &db);
|
||||
}
|
||||
if (st.ok()) {
|
||||
*dbptr = new DBWithTTL(db);
|
||||
*dbptr = new DBWithTTLImpl(db);
|
||||
} else {
|
||||
*dbptr = nullptr;
|
||||
}
|
||||
return st;
|
||||
}
|
||||
|
||||
Status DBWithTTLImpl::CreateColumnFamilyWithTtl(
|
||||
const ColumnFamilyOptions& options, const std::string& column_family_name,
|
||||
ColumnFamilyHandle** handle, int ttl) {
|
||||
ColumnFamilyOptions sanitized_options = options;
|
||||
DBWithTTLImpl::SanitizeOptions(ttl, &sanitized_options);
|
||||
|
||||
return DBWithTTL::CreateColumnFamily(sanitized_options, column_family_name,
|
||||
handle);
|
||||
}
|
||||
|
||||
Status DBWithTTLImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
|
||||
const std::string& column_family_name,
|
||||
ColumnFamilyHandle** handle) {
|
||||
return CreateColumnFamilyWithTtl(options, column_family_name, handle, 0);
|
||||
}
|
||||
|
||||
// Gives back the current time
|
||||
Status DBWithTTL::GetCurrentTime(int64_t& curtime) {
|
||||
return Env::Default()->GetCurrentTime(&curtime);
|
||||
Status DBWithTTLImpl::GetCurrentTime(int64_t* curtime) {
|
||||
return Env::Default()->GetCurrentTime(curtime);
|
||||
}
|
||||
|
||||
// Appends the current timestamp to the string.
|
||||
// Returns false if could not get the current_time, true if append succeeds
|
||||
Status DBWithTTL::AppendTS(const Slice& val, std::string& val_with_ts) {
|
||||
val_with_ts.reserve(kTSLength + val.size());
|
||||
Status DBWithTTLImpl::AppendTS(const Slice& val, std::string* val_with_ts) {
|
||||
val_with_ts->reserve(kTSLength + val.size());
|
||||
char ts_string[kTSLength];
|
||||
int64_t curtime;
|
||||
Status st = GetCurrentTime(curtime);
|
||||
Status st = GetCurrentTime(&curtime);
|
||||
if (!st.ok()) {
|
||||
return st;
|
||||
}
|
||||
EncodeFixed32(ts_string, (int32_t)curtime);
|
||||
val_with_ts.append(val.data(), val.size());
|
||||
val_with_ts.append(ts_string, kTSLength);
|
||||
val_with_ts->append(val.data(), val.size());
|
||||
val_with_ts->append(ts_string, kTSLength);
|
||||
return st;
|
||||
}
|
||||
|
||||
// Returns corruption if the length of the string is lesser than timestamp, or
|
||||
// timestamp refers to a time lesser than ttl-feature release time
|
||||
Status DBWithTTL::SanityCheckTimestamp(const Slice& str) {
|
||||
Status DBWithTTLImpl::SanityCheckTimestamp(const Slice& str) {
|
||||
if (str.size() < kTSLength) {
|
||||
return Status::Corruption("Error: value's length less than timestamp's\n");
|
||||
}
|
||||
// Checks that TS is not lesser than kMinTimestamp
|
||||
// Gaurds against corruption & normal database opened incorrectly in ttl mode
|
||||
int32_t timestamp_value =
|
||||
DecodeFixed32(str.data() + str.size() - kTSLength);
|
||||
if (timestamp_value < kMinTimestamp){
|
||||
int32_t timestamp_value = DecodeFixed32(str.data() + str.size() - kTSLength);
|
||||
if (timestamp_value < kMinTimestamp) {
|
||||
return Status::Corruption("Error: Timestamp < ttl feature release time!\n");
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// Checks if the string is stale or not according to TTl provided
|
||||
bool DBWithTTL::IsStale(const Slice& value, int32_t ttl) {
|
||||
if (ttl <= 0) { // Data is fresh if TTL is non-positive
|
||||
bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl) {
|
||||
if (ttl <= 0) { // Data is fresh if TTL is non-positive
|
||||
return false;
|
||||
}
|
||||
int64_t curtime;
|
||||
if (!GetCurrentTime(curtime).ok()) {
|
||||
return false; // Treat the data as fresh if could not get current time
|
||||
if (!GetCurrentTime(&curtime).ok()) {
|
||||
return false; // Treat the data as fresh if could not get current time
|
||||
}
|
||||
int32_t timestamp_value =
|
||||
DecodeFixed32(value.data() + value.size() - kTSLength);
|
||||
DecodeFixed32(value.data() + value.size() - kTSLength);
|
||||
return (timestamp_value + ttl) < curtime;
|
||||
}
|
||||
|
||||
// Strips the TS from the end of the string
|
||||
Status DBWithTTL::StripTS(std::string* str) {
|
||||
Status DBWithTTLImpl::StripTS(std::string* str) {
|
||||
Status st;
|
||||
if (str->length() < kTSLength) {
|
||||
return Status::Corruption("Bad timestamp in key-value");
|
||||
@ -154,17 +178,17 @@ Status DBWithTTL::StripTS(std::string* str) {
|
||||
return st;
|
||||
}
|
||||
|
||||
Status DBWithTTL::Put(const WriteOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
const Slice& val) {
|
||||
Status DBWithTTLImpl::Put(const WriteOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
const Slice& val) {
|
||||
WriteBatch batch;
|
||||
batch.Put(column_family, key, val);
|
||||
return Write(options, &batch);
|
||||
}
|
||||
|
||||
Status DBWithTTL::Get(const ReadOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
std::string* value) {
|
||||
Status DBWithTTLImpl::Get(const ReadOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
std::string* value) {
|
||||
Status st = db_->Get(options, column_family, key, value);
|
||||
if (!st.ok()) {
|
||||
return st;
|
||||
@ -176,18 +200,18 @@ Status DBWithTTL::Get(const ReadOptions& options,
|
||||
return StripTS(value);
|
||||
}
|
||||
|
||||
std::vector<Status> DBWithTTL::MultiGet(
|
||||
std::vector<Status> DBWithTTLImpl::MultiGet(
|
||||
const ReadOptions& options,
|
||||
const std::vector<ColumnFamilyHandle*>& column_family,
|
||||
const std::vector<Slice>& keys, std::vector<std::string>* values) {
|
||||
return std::vector<Status>(keys.size(),
|
||||
Status::NotSupported("MultiGet not\
|
||||
supported with TTL"));
|
||||
return std::vector<Status>(
|
||||
keys.size(), Status::NotSupported("MultiGet not supported with TTL"));
|
||||
}
|
||||
|
||||
bool DBWithTTL::KeyMayExist(const ReadOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
std::string* value, bool* value_found) {
|
||||
bool DBWithTTLImpl::KeyMayExist(const ReadOptions& options,
|
||||
ColumnFamilyHandle* column_family,
|
||||
const Slice& key, std::string* value,
|
||||
bool* value_found) {
|
||||
bool ret = db_->KeyMayExist(options, column_family, key, value, value_found);
|
||||
if (ret && value != nullptr && value_found != nullptr && *value_found) {
|
||||
if (!SanityCheckTimestamp(*value).ok() || !StripTS(value).ok()) {
|
||||
@ -197,15 +221,15 @@ bool DBWithTTL::KeyMayExist(const ReadOptions& options,
|
||||
return ret;
|
||||
}
|
||||
|
||||
Status DBWithTTL::Merge(const WriteOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
const Slice& value) {
|
||||
Status DBWithTTLImpl::Merge(const WriteOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
const Slice& value) {
|
||||
WriteBatch batch;
|
||||
batch.Merge(column_family, key, value);
|
||||
return Write(options, &batch);
|
||||
}
|
||||
|
||||
Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) {
|
||||
Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
|
||||
class Handler : public WriteBatch::Handler {
|
||||
public:
|
||||
WriteBatch updates_ttl;
|
||||
@ -213,7 +237,7 @@ Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) {
|
||||
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
|
||||
const Slice& value) {
|
||||
std::string value_with_ts;
|
||||
Status st = AppendTS(value, value_with_ts);
|
||||
Status st = AppendTS(value, &value_with_ts);
|
||||
if (!st.ok()) {
|
||||
batch_rewrite_status = st;
|
||||
} else {
|
||||
@ -225,7 +249,7 @@ Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) {
|
||||
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
|
||||
const Slice& value) {
|
||||
std::string value_with_ts;
|
||||
Status st = AppendTS(value, value_with_ts);
|
||||
Status st = AppendTS(value, &value_with_ts);
|
||||
if (!st.ok()) {
|
||||
batch_rewrite_status = st;
|
||||
} else {
|
||||
@ -238,9 +262,7 @@ Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) {
|
||||
WriteBatchInternal::Delete(&updates_ttl, column_family_id, key);
|
||||
return Status::OK();
|
||||
}
|
||||
virtual void LogData(const Slice& blob) {
|
||||
updates_ttl.PutLogData(blob);
|
||||
}
|
||||
virtual void LogData(const Slice& blob) { updates_ttl.PutLogData(blob); }
|
||||
};
|
||||
Handler handler;
|
||||
updates->Iterate(&handler);
|
||||
@ -251,8 +273,8 @@ Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) {
|
||||
}
|
||||
}
|
||||
|
||||
Iterator* DBWithTTL::NewIterator(const ReadOptions& opts,
|
||||
ColumnFamilyHandle* column_family) {
|
||||
Iterator* DBWithTTLImpl::NewIterator(const ReadOptions& opts,
|
||||
ColumnFamilyHandle* column_family) {
|
||||
return new TtlIterator(db_->NewIterator(opts, column_family));
|
||||
}
|
||||
|
@ -14,17 +14,27 @@
|
||||
#include "rocksdb/compaction_filter.h"
|
||||
#include "rocksdb/merge_operator.h"
|
||||
#include "utilities/utility_db.h"
|
||||
#include "utilities/db_ttl.h"
|
||||
#include "db/db_impl.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
class DBWithTTL : public StackableDB {
|
||||
class DBWithTTLImpl : public DBWithTTL {
|
||||
public:
|
||||
static void SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options);
|
||||
|
||||
explicit DBWithTTL(DB* db);
|
||||
explicit DBWithTTLImpl(DB* db);
|
||||
|
||||
virtual ~DBWithTTL();
|
||||
virtual ~DBWithTTLImpl();
|
||||
|
||||
Status CreateColumnFamilyWithTtl(const ColumnFamilyOptions& options,
|
||||
const std::string& column_family_name,
|
||||
ColumnFamilyHandle** handle,
|
||||
int ttl) override;
|
||||
|
||||
Status CreateColumnFamily(const ColumnFamilyOptions& options,
|
||||
const std::string& column_family_name,
|
||||
ColumnFamilyHandle** handle) override;
|
||||
|
||||
using StackableDB::Put;
|
||||
virtual Status Put(const WriteOptions& options,
|
||||
@ -60,83 +70,60 @@ class DBWithTTL : public StackableDB {
|
||||
virtual Iterator* NewIterator(const ReadOptions& opts,
|
||||
ColumnFamilyHandle* column_family) override;
|
||||
|
||||
virtual DB* GetBaseDB() {
|
||||
return db_;
|
||||
}
|
||||
virtual DB* GetBaseDB() { return db_; }
|
||||
|
||||
static bool IsStale(const Slice& value, int32_t ttl);
|
||||
|
||||
static Status AppendTS(const Slice& val, std::string& val_with_ts);
|
||||
static Status AppendTS(const Slice& val, std::string* val_with_ts);
|
||||
|
||||
static Status SanityCheckTimestamp(const Slice& str);
|
||||
|
||||
static Status StripTS(std::string* str);
|
||||
|
||||
static Status GetCurrentTime(int64_t& curtime);
|
||||
static Status GetCurrentTime(int64_t* curtime);
|
||||
|
||||
static const uint32_t kTSLength = sizeof(int32_t); // size of timestamp
|
||||
static const uint32_t kTSLength = sizeof(int32_t); // size of timestamp
|
||||
|
||||
static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM GMT-8
|
||||
static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM GMT-8
|
||||
|
||||
static const int32_t kMaxTimestamp = 2147483647; // 01/18/2038:7:14PM GMT-8
|
||||
static const int32_t kMaxTimestamp = 2147483647; // 01/18/2038:7:14PM GMT-8
|
||||
};
|
||||
|
||||
class TtlIterator : public Iterator {
|
||||
|
||||
public:
|
||||
explicit TtlIterator(Iterator* iter)
|
||||
: iter_(iter) {
|
||||
assert(iter_);
|
||||
}
|
||||
explicit TtlIterator(Iterator* iter) : iter_(iter) { assert(iter_); }
|
||||
|
||||
~TtlIterator() {
|
||||
delete iter_;
|
||||
}
|
||||
~TtlIterator() { delete iter_; }
|
||||
|
||||
bool Valid() const {
|
||||
return iter_->Valid();
|
||||
}
|
||||
bool Valid() const { return iter_->Valid(); }
|
||||
|
||||
void SeekToFirst() {
|
||||
iter_->SeekToFirst();
|
||||
}
|
||||
void SeekToFirst() { iter_->SeekToFirst(); }
|
||||
|
||||
void SeekToLast() {
|
||||
iter_->SeekToLast();
|
||||
}
|
||||
void SeekToLast() { iter_->SeekToLast(); }
|
||||
|
||||
void Seek(const Slice& target) {
|
||||
iter_->Seek(target);
|
||||
}
|
||||
void Seek(const Slice& target) { iter_->Seek(target); }
|
||||
|
||||
void Next() {
|
||||
iter_->Next();
|
||||
}
|
||||
void Next() { iter_->Next(); }
|
||||
|
||||
void Prev() {
|
||||
iter_->Prev();
|
||||
}
|
||||
void Prev() { iter_->Prev(); }
|
||||
|
||||
Slice key() const {
|
||||
return iter_->key();
|
||||
}
|
||||
Slice key() const { return iter_->key(); }
|
||||
|
||||
int32_t timestamp() const {
|
||||
return DecodeFixed32(
|
||||
iter_->value().data() + iter_->value().size() - DBWithTTL::kTSLength);
|
||||
return DecodeFixed32(iter_->value().data() + iter_->value().size() -
|
||||
DBWithTTLImpl::kTSLength);
|
||||
}
|
||||
|
||||
Slice value() const {
|
||||
//TODO: handle timestamp corruption like in general iterator semantics
|
||||
assert(DBWithTTL::SanityCheckTimestamp(iter_->value()).ok());
|
||||
// TODO: handle timestamp corruption like in general iterator semantics
|
||||
assert(DBWithTTLImpl::SanityCheckTimestamp(iter_->value()).ok());
|
||||
Slice trimmed_value = iter_->value();
|
||||
trimmed_value.size_ -= DBWithTTL::kTSLength;
|
||||
trimmed_value.size_ -= DBWithTTLImpl::kTSLength;
|
||||
return trimmed_value;
|
||||
}
|
||||
|
||||
Status status() const {
|
||||
return iter_->status();
|
||||
}
|
||||
Status status() const { return iter_->status(); }
|
||||
|
||||
private:
|
||||
Iterator* iter_;
|
||||
@ -146,13 +133,13 @@ class TtlCompactionFilter : public CompactionFilter {
|
||||
|
||||
public:
|
||||
TtlCompactionFilter(
|
||||
int32_t ttl,
|
||||
const CompactionFilter* user_comp_filter,
|
||||
std::unique_ptr<const CompactionFilter>
|
||||
user_comp_filter_from_factory = nullptr)
|
||||
: ttl_(ttl),
|
||||
user_comp_filter_(user_comp_filter),
|
||||
user_comp_filter_from_factory_(std::move(user_comp_filter_from_factory)) {
|
||||
int32_t ttl, const CompactionFilter* user_comp_filter,
|
||||
std::unique_ptr<const CompactionFilter> user_comp_filter_from_factory =
|
||||
nullptr)
|
||||
: ttl_(ttl),
|
||||
user_comp_filter_(user_comp_filter),
|
||||
user_comp_filter_from_factory_(
|
||||
std::move(user_comp_filter_from_factory)) {
|
||||
// Unlike the merge operator, compaction filter is necessary for TTL, hence
|
||||
// this would be called even if user doesn't specify any compaction-filter
|
||||
if (!user_comp_filter_) {
|
||||
@ -160,34 +147,31 @@ class TtlCompactionFilter : public CompactionFilter {
|
||||
}
|
||||
}
|
||||
|
||||
virtual bool Filter(int level,
|
||||
const Slice& key,
|
||||
const Slice& old_val,
|
||||
std::string* new_val,
|
||||
bool* value_changed) const override {
|
||||
if (DBWithTTL::IsStale(old_val, ttl_)) {
|
||||
virtual bool Filter(int level, const Slice& key, const Slice& old_val,
|
||||
std::string* new_val, bool* value_changed) const
|
||||
override {
|
||||
if (DBWithTTLImpl::IsStale(old_val, ttl_)) {
|
||||
return true;
|
||||
}
|
||||
if (user_comp_filter_ == nullptr) {
|
||||
return false;
|
||||
}
|
||||
assert(old_val.size() >= DBWithTTL::kTSLength);
|
||||
assert(old_val.size() >= DBWithTTLImpl::kTSLength);
|
||||
Slice old_val_without_ts(old_val.data(),
|
||||
old_val.size() - DBWithTTL::kTSLength);
|
||||
old_val.size() - DBWithTTLImpl::kTSLength);
|
||||
if (user_comp_filter_->Filter(level, key, old_val_without_ts, new_val,
|
||||
value_changed)) {
|
||||
return true;
|
||||
}
|
||||
if (*value_changed) {
|
||||
new_val->append(old_val.data() + old_val.size() - DBWithTTL::kTSLength,
|
||||
DBWithTTL::kTSLength);
|
||||
new_val->append(
|
||||
old_val.data() + old_val.size() - DBWithTTLImpl::kTSLength,
|
||||
DBWithTTLImpl::kTSLength);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
virtual const char* Name() const override {
|
||||
return "Delete By TTL";
|
||||
}
|
||||
virtual const char* Name() const override { return "Delete By TTL"; }
|
||||
|
||||
private:
|
||||
int32_t ttl_;
|
||||
@ -196,47 +180,40 @@ class TtlCompactionFilter : public CompactionFilter {
|
||||
};
|
||||
|
||||
class TtlCompactionFilterFactory : public CompactionFilterFactory {
|
||||
public:
|
||||
TtlCompactionFilterFactory(
|
||||
int32_t ttl,
|
||||
std::shared_ptr<CompactionFilterFactory> comp_filter_factory)
|
||||
: ttl_(ttl),
|
||||
user_comp_filter_factory_(comp_filter_factory) { }
|
||||
public:
|
||||
TtlCompactionFilterFactory(
|
||||
int32_t ttl, std::shared_ptr<CompactionFilterFactory> comp_filter_factory)
|
||||
: ttl_(ttl), user_comp_filter_factory_(comp_filter_factory) {}
|
||||
|
||||
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
|
||||
const CompactionFilter::Context& context) {
|
||||
return std::unique_ptr<TtlCompactionFilter>(
|
||||
new TtlCompactionFilter(
|
||||
ttl_,
|
||||
nullptr,
|
||||
std::move(user_comp_filter_factory_->CreateCompactionFilter(context))
|
||||
)
|
||||
);
|
||||
}
|
||||
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
|
||||
const CompactionFilter::Context& context) {
|
||||
return std::unique_ptr<TtlCompactionFilter>(new TtlCompactionFilter(
|
||||
ttl_, nullptr,
|
||||
std::move(user_comp_filter_factory_->CreateCompactionFilter(context))));
|
||||
}
|
||||
|
||||
virtual const char* Name() const override {
|
||||
return "TtlCompactionFilterFactory";
|
||||
}
|
||||
virtual const char* Name() const override {
|
||||
return "TtlCompactionFilterFactory";
|
||||
}
|
||||
|
||||
private:
|
||||
int32_t ttl_;
|
||||
std::shared_ptr<CompactionFilterFactory> user_comp_filter_factory_;
|
||||
private:
|
||||
int32_t ttl_;
|
||||
std::shared_ptr<CompactionFilterFactory> user_comp_filter_factory_;
|
||||
};
|
||||
|
||||
class TtlMergeOperator : public MergeOperator {
|
||||
|
||||
public:
|
||||
explicit TtlMergeOperator(const std::shared_ptr<MergeOperator> merge_op)
|
||||
: user_merge_op_(merge_op) {
|
||||
: user_merge_op_(merge_op) {
|
||||
assert(merge_op);
|
||||
}
|
||||
|
||||
virtual bool FullMerge(const Slice& key,
|
||||
const Slice* existing_value,
|
||||
virtual bool FullMerge(const Slice& key, const Slice* existing_value,
|
||||
const std::deque<std::string>& operands,
|
||||
std::string* new_value,
|
||||
Logger* logger) const override {
|
||||
const uint32_t ts_len = DBWithTTL::kTSLength;
|
||||
std::string* new_value, Logger* logger) const
|
||||
override {
|
||||
const uint32_t ts_len = DBWithTTLImpl::kTSLength;
|
||||
if (existing_value && existing_value->size() < ts_len) {
|
||||
Log(logger, "Error: Could not remove timestamp from existing value.");
|
||||
return false;
|
||||
@ -244,7 +221,7 @@ class TtlMergeOperator : public MergeOperator {
|
||||
|
||||
// Extract time-stamp from each operand to be passed to user_merge_op_
|
||||
std::deque<std::string> operands_without_ts;
|
||||
for (const auto &operand : operands) {
|
||||
for (const auto& operand : operands) {
|
||||
if (operand.size() < ts_len) {
|
||||
Log(logger, "Error: Could not remove timestamp from operand value.");
|
||||
return false;
|
||||
@ -271,9 +248,10 @@ class TtlMergeOperator : public MergeOperator {
|
||||
|
||||
// Augment the *new_value with the ttl time-stamp
|
||||
int64_t curtime;
|
||||
if (!DBWithTTL::GetCurrentTime(curtime).ok()) {
|
||||
Log(logger, "Error: Could not get current time to be attached internally "
|
||||
"to the new value.");
|
||||
if (!DBWithTTLImpl::GetCurrentTime(&curtime).ok()) {
|
||||
Log(logger,
|
||||
"Error: Could not get current time to be attached internally "
|
||||
"to the new value.");
|
||||
return false;
|
||||
} else {
|
||||
char ts_string[ts_len];
|
||||
@ -287,7 +265,7 @@ class TtlMergeOperator : public MergeOperator {
|
||||
const std::deque<Slice>& operand_list,
|
||||
std::string* new_value, Logger* logger) const
|
||||
override {
|
||||
const uint32_t ts_len = DBWithTTL::kTSLength;
|
||||
const uint32_t ts_len = DBWithTTLImpl::kTSLength;
|
||||
std::deque<Slice> operands_without_ts;
|
||||
|
||||
for (const auto& operand : operand_list) {
|
||||
@ -309,9 +287,10 @@ class TtlMergeOperator : public MergeOperator {
|
||||
|
||||
// Augment the *new_value with the ttl time-stamp
|
||||
int64_t curtime;
|
||||
if (!DBWithTTL::GetCurrentTime(curtime).ok()) {
|
||||
Log(logger, "Error: Could not get current time to be attached internally "
|
||||
"to the new value.");
|
||||
if (!DBWithTTLImpl::GetCurrentTime(&curtime).ok()) {
|
||||
Log(logger,
|
||||
"Error: Could not get current time to be attached internally "
|
||||
"to the new value.");
|
||||
return false;
|
||||
} else {
|
||||
char ts_string[ts_len];
|
||||
@ -319,16 +298,12 @@ class TtlMergeOperator : public MergeOperator {
|
||||
new_value->append(ts_string, ts_len);
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
virtual const char* Name() const override {
|
||||
return "Merge By TTL";
|
||||
}
|
||||
virtual const char* Name() const override { return "Merge By TTL"; }
|
||||
|
||||
private:
|
||||
std::shared_ptr<MergeOperator> user_merge_op_;
|
||||
};
|
||||
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
@ -4,7 +4,7 @@
|
||||
|
||||
#include <memory>
|
||||
#include "rocksdb/compaction_filter.h"
|
||||
#include "utilities/utility_db.h"
|
||||
#include "utilities/db_ttl.h"
|
||||
#include "util/testharness.h"
|
||||
#include "util/logging.h"
|
||||
#include <map>
|
||||
@ -45,13 +45,13 @@ class TtlTest {
|
||||
void OpenTtl() {
|
||||
ASSERT_TRUE(db_ttl_ ==
|
||||
nullptr); // db should be closed before opening again
|
||||
ASSERT_OK(UtilityDB::OpenTtlDB(options_, dbname_, &db_ttl_));
|
||||
ASSERT_OK(DBWithTTL::Open(options_, dbname_, &db_ttl_));
|
||||
}
|
||||
|
||||
// Open database with TTL support when TTL provided with db_ttl_ pointer
|
||||
void OpenTtl(int32_t ttl) {
|
||||
ASSERT_TRUE(db_ttl_ == nullptr);
|
||||
ASSERT_OK(UtilityDB::OpenTtlDB(options_, dbname_, &db_ttl_, ttl));
|
||||
ASSERT_OK(DBWithTTL::Open(options_, dbname_, &db_ttl_, ttl));
|
||||
}
|
||||
|
||||
// Open with TestFilter compaction filter
|
||||
@ -65,7 +65,7 @@ class TtlTest {
|
||||
// Open database with TTL support in read_only mode
|
||||
void OpenReadOnlyTtl(int32_t ttl) {
|
||||
ASSERT_TRUE(db_ttl_ == nullptr);
|
||||
ASSERT_OK(UtilityDB::OpenTtlDB(options_, dbname_, &db_ttl_, ttl, true));
|
||||
ASSERT_OK(DBWithTTL::Open(options_, dbname_, &db_ttl_, ttl, true));
|
||||
}
|
||||
|
||||
void CloseTtl() {
|
||||
@ -317,7 +317,7 @@ class TtlTest {
|
||||
// Choose carefully so that Put, Gets & Compaction complete in 1 second buffer
|
||||
const int64_t kSampleSize_ = 100;
|
||||
std::string dbname_;
|
||||
StackableDB* db_ttl_;
|
||||
DBWithTTL* db_ttl_;
|
||||
|
||||
private:
|
||||
Options options_;
|
||||
@ -532,25 +532,33 @@ TEST(TtlTest, ColumnFamiliesTest) {
|
||||
|
||||
std::vector<ColumnFamilyHandle*> handles;
|
||||
|
||||
ASSERT_OK(UtilityDB::OpenTtlDB(DBOptions(options), dbname_, column_families,
|
||||
&handles, &db_ttl_, {2, 4}, false));
|
||||
ASSERT_OK(DBWithTTL::Open(DBOptions(options), dbname_, column_families,
|
||||
&handles, &db_ttl_, {2, 4}, false));
|
||||
ASSERT_EQ(handles.size(), 2U);
|
||||
ColumnFamilyHandle* new_handle;
|
||||
ASSERT_OK(db_ttl_->CreateColumnFamilyWithTtl(options, "ttl_column_family_2",
|
||||
&new_handle, 2));
|
||||
handles.push_back(new_handle);
|
||||
|
||||
MakeKVMap(kSampleSize_);
|
||||
PutValues(0, kSampleSize_, false, handles[0]);
|
||||
PutValues(0, kSampleSize_, false, handles[1]);
|
||||
PutValues(0, kSampleSize_, false, handles[2]);
|
||||
|
||||
// everything should be there after 1 second
|
||||
SleepCompactCheck(1, 0, kSampleSize_, true, false, handles[0]);
|
||||
SleepCompactCheck(0, 0, kSampleSize_, true, false, handles[1]);
|
||||
SleepCompactCheck(0, 0, kSampleSize_, true, false, handles[2]);
|
||||
|
||||
// only column family 1 should be alive after 3 seconds
|
||||
SleepCompactCheck(2, 0, kSampleSize_, false, false, handles[0]);
|
||||
SleepCompactCheck(0, 0, kSampleSize_, true, false, handles[1]);
|
||||
SleepCompactCheck(0, 0, kSampleSize_, false, false, handles[2]);
|
||||
|
||||
// nothing should be there after 5 seconds
|
||||
SleepCompactCheck(2, 0, kSampleSize_, false, false, handles[0]);
|
||||
SleepCompactCheck(0, 0, kSampleSize_, false, false, handles[1]);
|
||||
SleepCompactCheck(0, 0, kSampleSize_, false, false, handles[2]);
|
||||
|
||||
for (auto h : handles) {
|
||||
delete h;
|
||||
|
Loading…
Reference in New Issue
Block a user