Merge branch 'master' into columnfamilies

This commit is contained in:
Igor Canadi 2014-02-25 17:04:48 -08:00
commit 8895526308
4 changed files with 126 additions and 4 deletions

82
db/c.cc
View File

@ -51,6 +51,7 @@ using rocksdb::Status;
using rocksdb::WritableFile; using rocksdb::WritableFile;
using rocksdb::WriteBatch; using rocksdb::WriteBatch;
using rocksdb::WriteOptions; using rocksdb::WriteOptions;
using rocksdb::LiveFileMetaData;
using std::shared_ptr; using std::shared_ptr;
@ -70,6 +71,7 @@ struct rocksdb_writablefile_t { WritableFile* rep; };
struct rocksdb_filelock_t { FileLock* rep; }; struct rocksdb_filelock_t { FileLock* rep; };
struct rocksdb_logger_t { shared_ptr<Logger> rep; }; struct rocksdb_logger_t { shared_ptr<Logger> rep; };
struct rocksdb_cache_t { shared_ptr<Cache> rep; }; struct rocksdb_cache_t { shared_ptr<Cache> rep; };
struct rocksdb_livefiles_t { std::vector<LiveFileMetaData> rep; };
struct rocksdb_comparator_t : public Comparator { struct rocksdb_comparator_t : public Comparator {
void* state_; void* state_;
@ -435,6 +437,19 @@ void rocksdb_approximate_sizes(
delete[] ranges; delete[] ranges;
} }
void rocksdb_delete_file(
rocksdb_t* db,
const char* name) {
db->rep->DeleteFile(name);
}
const rocksdb_livefiles_t* rocksdb_livefiles(
rocksdb_t* db) {
rocksdb_livefiles_t* result = new rocksdb_livefiles_t;
db->rep->GetLiveFilesMetaData(&result->rep);
return result;
}
void rocksdb_compact_range( void rocksdb_compact_range(
rocksdb_t* db, rocksdb_t* db,
const char* start_key, size_t start_key_len, const char* start_key, size_t start_key_len,
@ -537,6 +552,10 @@ void rocksdb_writebatch_clear(rocksdb_writebatch_t* b) {
b->rep.Clear(); b->rep.Clear();
} }
int rocksdb_writebatch_count(rocksdb_writebatch_t* b) {
return b->rep.Count();
}
void rocksdb_writebatch_put( void rocksdb_writebatch_put(
rocksdb_writebatch_t* b, rocksdb_writebatch_t* b,
const char* key, size_t klen, const char* key, size_t klen,
@ -581,6 +600,11 @@ void rocksdb_writebatch_iterate(
b->rep.Iterate(&handler); b->rep.Iterate(&handler);
} }
const char* rocksdb_writebatch_data(rocksdb_writebatch_t* b, size_t* size) {
*size = b->rep.GetDataSize();
return b->rep.Data().c_str();
}
rocksdb_options_t* rocksdb_options_create() { rocksdb_options_t* rocksdb_options_create() {
return new rocksdb_options_t; return new rocksdb_options_t;
} }
@ -983,7 +1007,6 @@ DB::GetSortedWalFiles
DB::GetLatestSequenceNumber DB::GetLatestSequenceNumber
DB::GetUpdatesSince DB::GetUpdatesSince
DB::DeleteFile DB::DeleteFile
DB::GetLiveFilesMetaData
DB::GetDbIdentity DB::GetDbIdentity
DB::RunManualCompaction DB::RunManualCompaction
custom cache custom cache
@ -1304,4 +1327,61 @@ void rocksdb_universal_compaction_options_destroy(
delete uco; delete uco;
} }
void rocksdb_options_set_min_level_to_compress(rocksdb_options_t* opt, int level) {
if (level >= 0) {
assert(level <= opt->rep.num_levels);
opt->rep.compression_per_level.resize(opt->rep.num_levels);
for (int i = 0; i < level; i++) {
opt->rep.compression_per_level[i] = rocksdb::kNoCompression;
}
for (int i = level; i < opt->rep.num_levels; i++) {
opt->rep.compression_per_level[i] = opt->rep.compression;
}
}
}
int rocksdb_livefiles_count(
const rocksdb_livefiles_t* lf) {
return lf->rep.size();
}
const char* rocksdb_livefiles_name(
const rocksdb_livefiles_t* lf,
int index) {
return lf->rep[index].name.c_str();
}
int rocksdb_livefiles_level(
const rocksdb_livefiles_t* lf,
int index) {
return lf->rep[index].level;
}
size_t rocksdb_livefiles_size(
const rocksdb_livefiles_t* lf,
int index) {
return lf->rep[index].size;
}
const char* rocksdb_livefiles_smallestkey(
const rocksdb_livefiles_t* lf,
int index,
size_t* size) {
*size = lf->rep[index].smallestkey.size();
return lf->rep[index].smallestkey.data();
}
const char* rocksdb_livefiles_largestkey(
const rocksdb_livefiles_t* lf,
int index,
size_t* size) {
*size = lf->rep[index].largestkey.size();
return lf->rep[index].largestkey.data();
}
extern void rocksdb_livefiles_destroy(
const rocksdb_livefiles_t* lf) {
delete lf;
}
} // end extern "C" } // end extern "C"

View File

@ -265,6 +265,8 @@ DEFINE_bool(use_fsync, false, "If true, issue fsync instead of fdatasync");
DEFINE_bool(disable_wal, false, "If true, do not write WAL for write."); DEFINE_bool(disable_wal, false, "If true, do not write WAL for write.");
DEFINE_string(wal_dir, "", "If not empty, use the given dir for WAL");
DEFINE_bool(use_snapshot, false, "If true, create a snapshot per query when" DEFINE_bool(use_snapshot, false, "If true, create a snapshot per query when"
" randomread benchmark is used"); " randomread benchmark is used");
@ -1478,6 +1480,7 @@ class Benchmark {
options.env = FLAGS_env; options.env = FLAGS_env;
options.disableDataSync = FLAGS_disable_data_sync; options.disableDataSync = FLAGS_disable_data_sync;
options.use_fsync = FLAGS_use_fsync; options.use_fsync = FLAGS_use_fsync;
options.wal_dir = FLAGS_wal_dir;
options.num_levels = FLAGS_num_levels; options.num_levels = FLAGS_num_levels;
options.target_file_size_base = FLAGS_target_file_size_base; options.target_file_size_base = FLAGS_target_file_size_base;
options.target_file_size_multiplier = FLAGS_target_file_size_multiplier; options.target_file_size_multiplier = FLAGS_target_file_size_multiplier;

View File

@ -3477,9 +3477,10 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) {
} else if (cfd->imm()->size() == } else if (cfd->imm()->size() ==
cfd->options()->max_write_buffer_number - 1) { cfd->options()->max_write_buffer_number - 1) {
// We have filled up the current memtable, but the previous // We have filled up the current memtable, but the previous
// ones are still being compacted, so we wait. // ones are still being flushed, so we wait.
DelayLoggingAndReset(); DelayLoggingAndReset();
Log(options_.info_log, "wait for memtable compaction...\n"); Log(options_.info_log, "wait for memtable flush...\n");
MaybeScheduleFlushOrCompaction();
uint64_t stall; uint64_t stall;
{ {
StopWatch sw(env_, options_.statistics.get(), StopWatch sw(env_, options_.statistics.get(),
@ -3554,7 +3555,7 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) {
unique_ptr<WritableFile> lfile; unique_ptr<WritableFile> lfile;
MemTable* new_mem = nullptr; MemTable* new_mem = nullptr;
// Attempt to switch to a new memtable and trigger compaction of old. // Attempt to switch to a new memtable and trigger flush of old.
// Do this without holding the dbmutex lock. // Do this without holding the dbmutex lock.
assert(versions_->PrevLogNumber() == 0); assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber(); uint64_t new_log_number = versions_->NewFileNumber();

View File

@ -74,6 +74,7 @@ typedef struct rocksdb_writablefile_t rocksdb_writablefile_t;
typedef struct rocksdb_writebatch_t rocksdb_writebatch_t; typedef struct rocksdb_writebatch_t rocksdb_writebatch_t;
typedef struct rocksdb_writeoptions_t rocksdb_writeoptions_t; typedef struct rocksdb_writeoptions_t rocksdb_writeoptions_t;
typedef struct rocksdb_universal_compaction_options_t rocksdb_universal_compaction_options_t; typedef struct rocksdb_universal_compaction_options_t rocksdb_universal_compaction_options_t;
typedef struct rocksdb_livefiles_t rocksdb_livefiles_t;
/* DB operations */ /* DB operations */
@ -148,6 +149,13 @@ extern void rocksdb_compact_range(
const char* start_key, size_t start_key_len, const char* start_key, size_t start_key_len,
const char* limit_key, size_t limit_key_len); const char* limit_key, size_t limit_key_len);
extern void rocksdb_delete_file(
rocksdb_t* db,
const char* name);
extern const rocksdb_livefiles_t* rocksdb_livefiles(
rocksdb_t* db);
extern void rocksdb_flush( extern void rocksdb_flush(
rocksdb_t* db, rocksdb_t* db,
const rocksdb_flushoptions_t* options, const rocksdb_flushoptions_t* options,
@ -192,6 +200,7 @@ extern void rocksdb_iter_get_error(const rocksdb_iterator_t*, char** errptr);
extern rocksdb_writebatch_t* rocksdb_writebatch_create(); extern rocksdb_writebatch_t* rocksdb_writebatch_create();
extern void rocksdb_writebatch_destroy(rocksdb_writebatch_t*); extern void rocksdb_writebatch_destroy(rocksdb_writebatch_t*);
extern void rocksdb_writebatch_clear(rocksdb_writebatch_t*); extern void rocksdb_writebatch_clear(rocksdb_writebatch_t*);
extern int rocksdb_writebatch_count(rocksdb_writebatch_t*);
extern void rocksdb_writebatch_put( extern void rocksdb_writebatch_put(
rocksdb_writebatch_t*, rocksdb_writebatch_t*,
const char* key, size_t klen, const char* key, size_t klen,
@ -208,6 +217,7 @@ extern void rocksdb_writebatch_iterate(
void* state, void* state,
void (*put)(void*, const char* k, size_t klen, const char* v, size_t vlen), void (*put)(void*, const char* k, size_t klen, const char* v, size_t vlen),
void (*deleted)(void*, const char* k, size_t klen)); void (*deleted)(void*, const char* k, size_t klen));
extern const char* rocksdb_writebatch_data(rocksdb_writebatch_t*, size_t *size);
/* Options */ /* Options */
@ -336,6 +346,12 @@ extern void rocksdb_options_set_delete_obsolete_files_period_micros(
extern void rocksdb_options_set_source_compaction_factor(rocksdb_options_t*, int); extern void rocksdb_options_set_source_compaction_factor(rocksdb_options_t*, int);
extern void rocksdb_options_prepare_for_bulk_load(rocksdb_options_t*); extern void rocksdb_options_prepare_for_bulk_load(rocksdb_options_t*);
extern void rocksdb_options_set_memtable_vector_rep(rocksdb_options_t*); extern void rocksdb_options_set_memtable_vector_rep(rocksdb_options_t*);
extern void rocksdb_options_set_max_bytes_for_level_base(rocksdb_options_t* opt, uint64_t n);
extern void rocksdb_options_set_stats_dump_period_sec(rocksdb_options_t* opt, unsigned int sec);
extern void rocksdb_options_set_min_level_to_compress(rocksdb_options_t* opt, int level);
extern void rocksdb_options_set_memtable_prefix_bloom_bits( extern void rocksdb_options_set_memtable_prefix_bloom_bits(
rocksdb_options_t*, uint32_t); rocksdb_options_t*, uint32_t);
extern void rocksdb_options_set_memtable_prefix_bloom_probes( extern void rocksdb_options_set_memtable_prefix_bloom_probes(
@ -508,6 +524,28 @@ extern void rocksdb_universal_compaction_options_set_stop_style(
extern void rocksdb_universal_compaction_options_destroy( extern void rocksdb_universal_compaction_options_destroy(
rocksdb_universal_compaction_options_t*); rocksdb_universal_compaction_options_t*);
extern int rocksdb_livefiles_count(
const rocksdb_livefiles_t*);
extern const char* rocksdb_livefiles_name(
const rocksdb_livefiles_t*,
int index);
extern int rocksdb_livefiles_level(
const rocksdb_livefiles_t*,
int index);
extern size_t rocksdb_livefiles_size(
const rocksdb_livefiles_t*,
int index);
extern const char* rocksdb_livefiles_smallestkey(
const rocksdb_livefiles_t*,
int index,
size_t* size);
extern const char* rocksdb_livefiles_largestkey(
const rocksdb_livefiles_t*,
int index,
size_t* size);
extern void rocksdb_livefiles_destroy(
const rocksdb_livefiles_t*);
#ifdef __cplusplus #ifdef __cplusplus
} /* end extern "C" */ } /* end extern "C" */
#endif #endif