diff --git a/Makefile b/Makefile index e30a19826..4dfe99606 100644 --- a/Makefile +++ b/Makefile @@ -33,8 +33,6 @@ MEMENVOBJECTS = $(MEMENV_SOURCES:.cc=.o) TESTUTIL = ./util/testutil.o TESTHARNESS = ./util/testharness.o $(TESTUTIL) -TOOLS = \ - leveldb_shell TESTS = \ arena_test \ @@ -55,10 +53,17 @@ TESTS = \ table_test \ version_edit_test \ version_set_test \ - write_batch_test + write_batch_test \ + filelock_test -PROGRAMS = db_bench $(TESTS) + +TOOLS = \ + manifest_dump \ + leveldb_shell + +PROGRAMS = db_bench $(TESTS) $(TOOLS) BENCHMARKS = db_bench_sqlite3 db_bench_tree_db +VERSIONFILE=util/build_version.cc LIBRARY = libleveldb.a MEMENVLIBRARY = libmemenv.a @@ -82,21 +87,20 @@ $(SHARED1): $(SHARED3) ln -fs $(SHARED3) $(SHARED1) endif -all: $(SHARED) $(LIBRARY) $(THRIFTSERVER) $(TOOLS) -check: all $(PROGRAMS) $(TESTS) +all: $(VERSIONFILE) $(SHARED) $(LIBRARY) $(THRIFTSERVER) $(TOOLS) + +check: all $(PROGRAMS) $(TESTS) $(TOOLS) for t in $(TESTS); do echo "***** Running $$t"; ./$$t || exit 1; done clean: - -rm -f $(PROGRAMS) $(BENCHMARKS) $(LIBRARY) $(SHARED) $(MEMENVLIBRARY) $(THRIFTSERVER) */*.o */*/*.o ios-x86/*/*.o ios-arm/*/*.o build_config.mk + -rm -f $(PROGRAMS) $(BENCHMARKS) $(LIBRARY) $(SHARED) $(MEMENVLIBRARY) $(THRIFTSERVER) */*.o */*/*.o ios-x86/*/*.o ios-arm/*/*.o build_config.mk $(VERSIONFILE) -rm -rf ios-x86/* ios-arm/* $(LIBRARY): $(LIBOBJECTS) rm -f $@ $(AR) -rs $@ $(LIBOBJECTS) -leveldb_shell: tools/shell/ShellContext.o tools/shell/ShellState.o tools/shell/LeveldbShell.o tools/shell/DBClientProxy.o tools/shell/ShellContext.h tools/shell/ShellState.h tools/shell/DBClientProxy.h $(LIBOBJECTS) - $(CXX) tools/shell/ShellContext.o tools/shell/ShellState.o tools/shell/LeveldbShell.o tools/shell/DBClientProxy.o $(LIBOBJECTS) -o $@ $(LDFLAGS) db_bench: db/db_bench.o $(LIBOBJECTS) $(TESTUTIL) $(CXX) db/db_bench.o $(LIBOBJECTS) $(TESTUTIL) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) @@ -174,9 +178,22 @@ leveldb_server: thrift/server.o $(LIBRARY) leveldb_server_test: thrift/test/simpletest.o $(LIBRARY) $(CXX) thrift/test/simpletest.o $(LIBRARY) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) +leveldb_shell: tools/shell/ShellContext.o tools/shell/ShellState.o tools/shell/LeveldbShell.o tools/shell/DBClientProxy.o tools/shell/ShellContext.h tools/shell/ShellState.h tools/shell/DBClientProxy.h $(LIBOBJECTS) + $(CXX) tools/shell/ShellContext.o tools/shell/ShellState.o tools/shell/LeveldbShell.o tools/shell/DBClientProxy.o $(LIBOBJECTS) -o $@ $(LDFLAGS) + DBClientProxy_test: tools/shell/test/DBClientProxyTest.o tools/shell/DBClientProxy.o $(LIBRARY) $(CXX) tools/shell/test/DBClientProxyTest.o tools/shell/DBClientProxy.o $(LIBRARY) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) +manifest_dump: tools/manifest_dump.o $(LIBOBJECTS) + $(CXX) tools/manifest_dump.o $(LIBOBJECTS) -o $@ $(LDFLAGS) + +filelock_test: util/filelock_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(CXX) util/filelock_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LDFLAGS) + +# recreate the version file with the latest git revision +$(VERSIONFILE): build_detect_version + $(shell ./build_detect_platform build_config.mk) + ifeq ($(PLATFORM), IOS) # For iOS, create universal object files to be used on both the simulator and # a device. diff --git a/build_detect_platform b/build_detect_platform index def5e52e2..ce33e2c92 100755 --- a/build_detect_platform +++ b/build_detect_platform @@ -96,15 +96,22 @@ case "$TARGET_OS" in exit 1 esac +./build_detect_version + # We want to make a list of all cc files within util, db, table, and helpers # except for the test and benchmark files. By default, find will output a list # of all files matching either rule, so we need to append -print to make the # prune take effect. DIRS="util db table" if test "$USE_THRIFT"; then - DIRS+=" thrift/gen-cpp thrift/server_utils.cpp " + DIRS+=" thrift/server_utils.cpp thrift/gen-cpp " THRIFTSERVER=leveldb_server fi + +if test "$USE_SCRIBE"; then + DIRS+=" scribe " +fi + set -f # temporarily disable globbing so that our patterns aren't expanded PRUNE_TEST="-name *test*.cc -prune" PRUNE_BENCH="-name *_bench.cc -prune" @@ -197,6 +204,11 @@ if test "$USE_THRIFT"; then PLATFORM_LDFLAGS+=$THRIFT_LDFLAGS fi +#shall we build with scribe +if test "$USE_SCRIBE"; then + COMMON_FLAGS="$COMMON_FLAGS -I./thrift/lib/ -DUSE_SCRIBE" +fi + PLATFORM_CCFLAGS="$PLATFORM_CCFLAGS $COMMON_FLAGS" PLATFORM_CXXFLAGS="$PLATFORM_CXXFLAGS $COMMON_FLAGS" diff --git a/build_detect_version b/build_detect_version new file mode 100755 index 000000000..ab56eadc6 --- /dev/null +++ b/build_detect_version @@ -0,0 +1,25 @@ +#!/bin/sh +# +# Record the version of the source that we are compiling. +# We keep a record of the git revision in util/version.cc. This source file +# is then built as a regular source file as part of the compilation process. +# One can run "strings executable_filename | grep _build_" to find the version of +# the source that we used to build the executable file. +# + +# create git version file +VFILE=util/build_version.cc + +# check to see if git is in the path +which git > /dev/null + +if [ "$?" = 0 ]; then + git rev-parse HEAD | awk ' BEGIN {print "#include \"build_version.h\""} {print "const char * leveldb_build_git_sha = \"leveldb_build_git_sha:" $0"\";"} END {}' > ${VFILE} +else + echo "git not found"| awk ' BEGIN {print "#include \"build_version.h\""} {print "const char * leveldb_build_git_sha = \"leveldb_build_git_sha:git not found\";"} END {}' > ${VFILE} +fi + +date | awk 'BEGIN {} {print "const char * leveldb_build_git_datetime = \"leveldb_build_git_datetime:"$0"\";"} END {} ' >> ${VFILE} +echo "const char * leveldb_build_compile_date = __DATE__;" >> ${VFILE} +echo "const char * leveldb_build_compile_time = __TIME__;" >> ${VFILE} + diff --git a/db/builder.cc b/db/builder.cc index f41988219..7b03d3c14 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -53,7 +53,7 @@ Status BuildTable(const std::string& dbname, delete builder; // Finish and check for file errors - if (s.ok()) { + if (s.ok() && !options.disableDataSync) { s = file->Sync(); } if (s.ok()) { diff --git a/db/db_bench.cc b/db/db_bench.cc index fffbe9928..2ee030c90 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -120,9 +120,35 @@ static class leveldb::DBStatistics* dbstats = NULL; // Number of write operations to do. If negative, do FLAGS_num reads. static long FLAGS_writes = -1; +// These default values might change if the hardcoded + // Sync all writes to disk static bool FLAGS_sync = false; +// If true, do not wait until data is synced to disk. +static bool FLAGS_disable_data_sync = false; + +// If true, do not write WAL for write. +static bool FLAGS_disable_wal = false; + +// Target level-0 file size for compaction +static int FLAGS_target_file_size_base = 2 * 1048576; + +// A multiplier to compute targe level-N file size +static int FLAGS_target_file_size_multiplier = 1; + +// Max bytes for level-0 +static int FLAGS_max_bytes_for_level_base = 10 * 1048576; + +// A multiplier to compute max bytes for level-N +static int FLAGS_max_bytes_for_level_multiplier = 10; + +// Number of files in level-0 that will trigger put stop. +static int FLAGS_level0_stop_writes_trigger = 12; + +// Number of files in level-0 that will slow down writes. +static int FLAGS_level0_slowdown_writes_trigger = 8; + // posix or hdfs environment static leveldb::Env* FLAGS_env = leveldb::Env::Default(); @@ -485,6 +511,8 @@ class Benchmark { write_options_.sync = true; } + write_options_.disableWAL = FLAGS_disable_wal; + void (Benchmark::*method)(ThreadState*) = NULL; bool fresh_db = false; int num_threads = FLAGS_threads; @@ -745,6 +773,15 @@ class Benchmark { options.max_open_files = FLAGS_open_files; options.statistics = dbstats; options.env = FLAGS_env; + options.disableDataSync = FLAGS_disable_data_sync; + options.target_file_size_base = FLAGS_target_file_size_base; + options.target_file_size_multiplier = FLAGS_target_file_size_multiplier; + options.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base; + options.max_bytes_for_level_multiplier = + FLAGS_max_bytes_for_level_multiplier; + options.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger; + options.level0_slowdown_writes_trigger = + FLAGS_level0_slowdown_writes_trigger; Status s = DB::Open(options, FLAGS_db, &db_); if (!s.ok()) { fprintf(stderr, "open error: %s\n", s.ToString().c_str()); @@ -1030,8 +1067,32 @@ int main(int argc, char** argv) { } else if (sscanf(argv[i], "--sync=%d%c", &n, &junk) == 1 && (n == 0 || n == 1)) { FLAGS_sync = n; + } else if (sscanf(argv[i], "--disable_data_sync=%d%c", &n, &junk) == 1 && + (n == 0 || n == 1)) { + FLAGS_disable_data_sync = n; + } else if (sscanf(argv[i], "--disable_wal=%d%c", &n, &junk) == 1 && + (n == 0 || n == 1)) { + FLAGS_disable_wal = n; } else if (sscanf(argv[i], "--hdfs=%s", &hdfsname) == 1) { FLAGS_env = new leveldb::HdfsEnv(hdfsname); + } else if (sscanf(argv[i], "--target_file_size_base=%d%c", + &n, &junk) == 1) { + FLAGS_target_file_size_base = n; + } else if ( sscanf(argv[i], "--target_file_size_multiplier=%d%c", + &n, &junk) == 1) { + FLAGS_target_file_size_multiplier = n; + } else if ( + sscanf(argv[i], "--max_bytes_for_level_base=%d%c", &n, &junk) == 1) { + FLAGS_max_bytes_for_level_base = n; + } else if (sscanf(argv[i], "--max_bytes_for_level_multiplier=%d%c", + &n, &junk) == 1) { + FLAGS_max_bytes_for_level_multiplier = n; + } else if (sscanf(argv[i],"--level0_stop_writes_trigger=%d%c", + &n, &junk) == 1) { + FLAGS_level0_stop_writes_trigger = n; + } else if (sscanf(argv[i],"--level0_slowdown_writes_trigger=%d%c", + &n, &junk) == 1) { + FLAGS_level0_slowdown_writes_trigger = n; } else { fprintf(stderr, "Invalid flag '%s'\n", argv[i]); exit(1); diff --git a/db/db_impl.cc b/db/db_impl.cc index 72e85f936..0c709255e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -10,6 +10,7 @@ #include #include #include +#include #include "db/builder.h" #include "db/db_iter.h" #include "db/dbformat.h" @@ -32,6 +33,7 @@ #include "util/coding.h" #include "util/logging.h" #include "util/mutexlock.h" +#include "util/build_version.h" namespace leveldb { @@ -99,7 +101,8 @@ Options SanitizeOptions(const std::string& dbname, if (result.info_log == NULL) { // Open a log file in the same directory as the db src.env->CreateDir(dbname); // In case it does not exist - src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname)); + src.env->RenameFile(InfoLogFileName(dbname), + OldInfoLogFileName(dbname, src.env->NowMicros())); Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log); if (!s.ok()) { // No place suitable for logging @@ -131,17 +134,34 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) log_(NULL), tmp_batch_(new WriteBatch), bg_compaction_scheduled_(false), - manual_compaction_(NULL) { + manual_compaction_(NULL), + logger_(NULL) { mem_->Ref(); has_imm_.Release_Store(NULL); stats_ = new CompactionStats[options.num_levels]; // Reserve ten files or so for other uses and give the rest to TableCache. - const int table_cache_size = options.max_open_files - 10; + const int table_cache_size = options_.max_open_files - 10; table_cache_ = new TableCache(dbname_, &options_, table_cache_size); versions_ = new VersionSet(dbname_, &options_, table_cache_, &internal_comparator_); + + options_.Dump(options_.info_log); + +#ifdef USE_SCRIBE + logger_ = new ScribeLogger("localhost", 1456); +#endif + + char name[100]; + Status st = env_->GetHostName(name, 100); + if(st.ok()) { + host_name_ = name; + } else { + Log(options_.info_log, "Can't get hostname, use localhost as host name."); + host_name_ = "localhost"; + } + last_log_ts = 0; } DBImpl::~DBImpl() { @@ -224,6 +244,7 @@ void DBImpl::DeleteObsoleteFiles() { env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose uint64_t number; FileType type; + std::vector old_log_files_ts; for (size_t i = 0; i < filenames.size(); i++) { if (ParseFileName(filenames[i], &number, &type)) { bool keep = true; @@ -245,9 +266,14 @@ void DBImpl::DeleteObsoleteFiles() { // be recorded in pending_outputs_, which is inserted into "live" keep = (live.find(number) != live.end()); break; + case kInfoLogFile: + keep = true; + if (number != 0) { + old_log_files_ts.push_back(number); + } + break; case kCurrentFile: case kDBLockFile: - case kInfoLogFile: keep = true; break; } @@ -263,6 +289,20 @@ void DBImpl::DeleteObsoleteFiles() { } } } + + // Delete old log files. + int old_log_file_count = old_log_files_ts.size(); + if (old_log_file_count >= KEEP_LOG_FILE_NUM) { + std::sort(old_log_files_ts.begin(), old_log_files_ts.end()); + for (int i = 0; i >= (old_log_file_count - KEEP_LOG_FILE_NUM); i++) { + uint64_t ts = old_log_files_ts.at(i); + std::string to_delete = OldInfoLogFileName(dbname_, ts); + Log(options_.info_log, "Delete type=%d #%lld\n", + int(kInfoLogFile), + static_cast(ts)); + env_->DeleteFile(dbname_ + "/" + to_delete); + } + } } Status DBImpl::Recover(VersionEdit* edit) { @@ -514,6 +554,7 @@ Status DBImpl::CompactMemTable() { imm_ = NULL; has_imm_.Release_Store(NULL); DeleteObsoleteFiles(); + MaybeScheduleLogDBDeployStats(); } return s; @@ -537,15 +578,15 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) { } int DBImpl::NumberLevels() { - return options_.num_levels; + return options_.num_levels; } int DBImpl::MaxMemCompactionLevel() { - return options_.max_mem_compaction_level; + return options_.max_mem_compaction_level; } int DBImpl::Level0StopWriteTrigger() { - return options_.level0_stop_writes_trigger; + return options_.level0_stop_writes_trigger; } Status DBImpl::Flush(const FlushOptions& options) { @@ -598,34 +639,33 @@ Status DBImpl::FlushMemTable(const FlushOptions& options) { } Status DBImpl::WaitForCompactMemTable() { - Status s; - // Wait until the compaction completes - MutexLock l(&mutex_); - while (imm_ != NULL && bg_error_.ok()) { - bg_cv_.Wait(); - } - if (imm_ != NULL) { - s = bg_error_; - } - return s; + Status s; + // Wait until the compaction completes + MutexLock l(&mutex_); + while (imm_ != NULL && bg_error_.ok()) { + bg_cv_.Wait(); + } + if (imm_ != NULL) { + s = bg_error_; + } + return s; } - Status DBImpl::TEST_CompactMemTable() { return FlushMemTable(FlushOptions()); } Status DBImpl::TEST_WaitForCompactMemTable() { - return WaitForCompactMemTable(); + return WaitForCompactMemTable(); } Status DBImpl::TEST_WaitForCompact() { - // Wait until the compaction completes - MutexLock l(&mutex_); - while (bg_compaction_scheduled_ && bg_error_.ok()) { - bg_cv_.Wait(); - } - return bg_error_; + // Wait until the compaction completes + MutexLock l(&mutex_); + while (bg_compaction_scheduled_ && bg_error_.ok()) { + bg_cv_.Wait(); + } + return bg_error_; } void DBImpl::MaybeScheduleCompaction() { @@ -656,6 +696,8 @@ void DBImpl::BackgroundCall() { } bg_compaction_scheduled_ = false; + MaybeScheduleLogDBDeployStats(); + // Previous compaction may have produced too many files in a level, // so reschedule another compaction if needed. MaybeScheduleCompaction(); @@ -868,6 +910,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { compact->compaction->level(), compact->compaction->num_input_files(1), compact->compaction->level() + 1); + char scratch[200]; + compact->compaction->Summary(scratch, 256); + Log(options_.info_log, "Compaction start summary: %s\n", scratch); assert(versions_->NumLevelFiles(compact->compaction->level()) > 0); assert(compact->builder == NULL); @@ -1457,6 +1502,7 @@ Status DB::Open(const Options& options, const std::string& dbname, if (s.ok()) { impl->DeleteObsoleteFiles(); impl->MaybeScheduleCompaction(); + impl->MaybeScheduleLogDBDeployStats(); } } impl->mutex_.Unlock(); @@ -1502,4 +1548,13 @@ Status DestroyDB(const std::string& dbname, const Options& options) { return result; } +// +// A global method that can dump out the build version +void printLeveldbBuildVersion() { + printf("Git sha %s", leveldb_build_git_sha); + printf("Git datetime %s", leveldb_build_git_datetime); + printf("Compile time %s", leveldb_build_compile_time); + printf("Compile date %s", leveldb_build_compile_date); +} + } // namespace leveldb diff --git a/db/db_impl.h b/db/db_impl.h index 9ff82caf5..e1e7578bc 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -13,6 +13,13 @@ #include "leveldb/db.h" #include "leveldb/env.h" #include "port/port.h" +#include "util/stats_logger.h" + +#ifdef USE_SCRIBE +#include "scribe/scribe_logger.h" +#endif + +#include namespace leveldb { @@ -107,6 +114,9 @@ class DBImpl : public DB { // Wait for memtable compaction Status WaitForCompactMemTable(); + void MaybeScheduleLogDBDeployStats(); + static void LogDBDeployStats(void* db); + void MaybeScheduleCompaction(); static void BGWork(void* db); void BackgroundCall(); @@ -144,6 +154,8 @@ class DBImpl : public DB { uint64_t logfile_number_; log::Writer* log_; + std::string host_name_; + // Queue of writers. std::deque writers_; WriteBatch* tmp_batch_; @@ -172,6 +184,10 @@ class DBImpl : public DB { // Have we encountered a background error in paranoid mode? Status bg_error_; + StatsLogger* logger_; + + int64_t volatile last_log_ts; + // Per level compaction stats. stats_[level] stores the stats for // compactions that produced data for the specified "level". struct CompactionStats { @@ -189,6 +205,8 @@ class DBImpl : public DB { }; CompactionStats* stats_; + static const int KEEP_LOG_FILE_NUM = 1000; + // No copying allowed DBImpl(const DBImpl&); void operator=(const DBImpl&); diff --git a/db/db_stats_logger.cc b/db/db_stats_logger.cc new file mode 100644 index 000000000..4cac0c95b --- /dev/null +++ b/db/db_stats_logger.cc @@ -0,0 +1,72 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/db_impl.h" +#include +#include +#include "db/version_set.h" +#include "leveldb/db.h" +#include "leveldb/env.h" + +namespace leveldb { + +void DBImpl::MaybeScheduleLogDBDeployStats() { + + // There is a lock in the actual logger. + if (!logger_ || options_.db_stats_log_interval < 0 + || host_name_.empty()) { + return; + } + if (shutting_down_.Acquire_Load()) { + // Already scheduled + } else { + int64_t current_ts = 0; + Status st = env_->GetCurrentTime(¤t_ts); + if (!st.ok()) { + return; + } + if ((current_ts - last_log_ts) < options_.db_stats_log_interval) { + return; + } + last_log_ts = current_ts; + env_->Schedule(&DBImpl::LogDBDeployStats, this); + } +} + +void DBImpl::LogDBDeployStats(void* db) { + DBImpl* db_inst = reinterpret_cast(db); + + if (db_inst->shutting_down_.Acquire_Load()) { + return; + } + + std::string version_info; + version_info += boost::lexical_cast(kMajorVersion); + version_info += "."; + version_info += boost::lexical_cast(kMinorVersion); + std::string data_dir; + db_inst->env_->GetAbsolutePath(db_inst->dbname_, &data_dir); + + uint64_t file_total_size = 0; + uint32_t file_total_num = 0; + for (int i = 0; i < db_inst->versions_->NumberLevels(); i++) { + file_total_num += db_inst->versions_->NumLevelFiles(i); + file_total_size += db_inst->versions_->NumLevelBytes(i); + } + + VersionSet::LevelSummaryStorage scratch; + const char* file_num_summary = db_inst->versions_->LevelSummary(&scratch); + std::string file_num_per_level(file_num_summary); + const char* file_size_summary = db_inst->versions_->LevelDataSizeSummary( + &scratch); + std::string data_size_per_level(file_num_summary); + int64_t unix_ts; + db_inst->env_->GetCurrentTime(&unix_ts); + + db_inst->logger_->Log_Deploy_Stats(version_info, db_inst->host_name_, + data_dir, file_total_size, file_total_num, file_num_per_level, + data_size_per_level, unix_ts); +} + +} diff --git a/db/db_test.cc b/db/db_test.cc index 105a8879b..674667c4d 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -213,6 +213,10 @@ class DBTest { ASSERT_OK(TryReopen(options)); } + Status PureReopen(Options* options, DB** db) { + return DB::Open(*options, dbname_, db); + } + Status TryReopen(Options* options) { delete db_; db_ = NULL; @@ -779,6 +783,22 @@ TEST(DBTest, Recover) { } while (ChangeOptions()); } +TEST(DBTest, RollLog) { + do { + ASSERT_OK(Put("foo", "v1")); + ASSERT_OK(Put("baz", "v5")); + + Reopen(); + for (int i = 0; i < 10; i++) { + Reopen(); + } + ASSERT_OK(Put("foo", "v4")); + for (int i = 0; i < 10; i++) { + Reopen(); + } + } while (ChangeOptions()); +} + TEST(DBTest, WAL) { Options options = CurrentOptions(); WriteOptions writeOpt = WriteOptions(); @@ -812,6 +832,13 @@ TEST(DBTest, WAL) { ASSERT_EQ("v3", Get("foo")); } +TEST(DBTest, CheckLock) { + DB* localdb; + Options options = CurrentOptions(); + ASSERT_TRUE(TryReopen(&options).ok()); + ASSERT_TRUE(!(PureReopen(&options, &localdb).ok())); // second open should fail +} + TEST(DBTest, FLUSH) { Options options = CurrentOptions(); WriteOptions writeOpt = WriteOptions(); diff --git a/db/filename.cc b/db/filename.cc index 3c4d49f64..cc9bc5994 100644 --- a/db/filename.cc +++ b/db/filename.cc @@ -60,8 +60,10 @@ std::string InfoLogFileName(const std::string& dbname) { } // Return the name of the old info log file for "dbname". -std::string OldInfoLogFileName(const std::string& dbname) { - return dbname + "/LOG.old"; +std::string OldInfoLogFileName(const std::string& dbname, uint64_t ts) { + char buf[50]; + snprintf(buf, sizeof(buf), "%llu", static_cast(ts)); + return dbname + "/LOG.old." + buf; } @@ -69,7 +71,7 @@ std::string OldInfoLogFileName(const std::string& dbname) { // dbname/CURRENT // dbname/LOCK // dbname/LOG -// dbname/LOG.old +// dbname/LOG.old.[0-9]+ // dbname/MANIFEST-[0-9]+ // dbname/[0-9]+.(log|sst) bool ParseFileName(const std::string& fname, @@ -82,9 +84,17 @@ bool ParseFileName(const std::string& fname, } else if (rest == "LOCK") { *number = 0; *type = kDBLockFile; - } else if (rest == "LOG" || rest == "LOG.old") { + } else if (rest == "LOG") { *number = 0; *type = kInfoLogFile; + } else if (rest.starts_with("LOG.old.")) { + uint64_t ts_suffix; + rest.remove_prefix(sizeof("LOG.old.")); + if (!ConsumeDecimalNumber(&rest, &ts_suffix)) { + return false; + } + *number = ts_suffix; + *type = kInfoLogFile; } else if (rest.starts_with("MANIFEST-")) { rest.remove_prefix(strlen("MANIFEST-")); uint64_t num; diff --git a/db/filename.h b/db/filename.h index d5d09b114..e6d6b19b5 100644 --- a/db/filename.h +++ b/db/filename.h @@ -60,7 +60,7 @@ extern std::string TempFileName(const std::string& dbname, uint64_t number); extern std::string InfoLogFileName(const std::string& dbname); // Return the name of the old info log file for "dbname". -extern std::string OldInfoLogFileName(const std::string& dbname); +extern std::string OldInfoLogFileName(const std::string& dbname, uint64_t ts); // If filename is a leveldb file, store the type of the file in *type. // The number encoded in the filename is stored in *number. If the diff --git a/db/version_set.cc b/db/version_set.cc index 015faa815..78b82343d 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -702,15 +702,16 @@ VersionSet::VersionSet(const std::string& dbname, compact_pointer_ = new std::string[options_->num_levels]; max_file_size_ = new uint64_t[options_->num_levels]; level_max_bytes_ = new uint64_t[options->num_levels]; - max_file_size_[0] = options_->target_file_size_base; - level_max_bytes_[0] = options_->max_bytes_for_level_base; int target_file_size_multiplier = options_->target_file_size_multiplier; int max_bytes_multiplier = options_->max_bytes_for_level_multiplier; - int i = 1; - while (i < options_->num_levels) { - max_file_size_[i] = max_file_size_[i-1] * target_file_size_multiplier; - level_max_bytes_[i] = level_max_bytes_[i-1] * max_bytes_multiplier; - i++; + for (int i = 0; i < options_->num_levels; i++) { + if (i > 1) { + max_file_size_[i] = max_file_size_[i-1] * target_file_size_multiplier; + level_max_bytes_[i] = level_max_bytes_[i-1] * max_bytes_multiplier; + } else { + max_file_size_[i] = options_->target_file_size_base; + level_max_bytes_[i] = options_->max_bytes_for_level_base; + } } AppendVersion(new Version(this)); } @@ -939,6 +940,119 @@ Status VersionSet::Recover() { return s; } +Status VersionSet::DumpManifest(Options& options, std::string& dscname) { + struct LogReporter : public log::Reader::Reporter { + Status* status; + virtual void Corruption(size_t bytes, const Status& s) { + if (this->status->ok()) *this->status = s; + } + }; + + // Open the specified manifest file. + SequentialFile* file; + Status s = options.env->NewSequentialFile(dscname, &file); + if (!s.ok()) { + return s; + } + + bool have_log_number = false; + bool have_prev_log_number = false; + bool have_next_file = false; + bool have_last_sequence = false; + uint64_t next_file = 0; + uint64_t last_sequence = 0; + uint64_t log_number = 0; + uint64_t prev_log_number = 0; + VersionSet::Builder builder(this, current_); + + { + LogReporter reporter; + reporter.status = &s; + log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/); + Slice record; + std::string scratch; + while (reader.ReadRecord(&record, &scratch) && s.ok()) { + VersionEdit edit(NumberLevels()); + s = edit.DecodeFrom(record); + if (s.ok()) { + if (edit.has_comparator_ && + edit.comparator_ != icmp_.user_comparator()->Name()) { + s = Status::InvalidArgument( + edit.comparator_ + "does not match existing comparator ", + icmp_.user_comparator()->Name()); + } + } + + if (s.ok()) { + builder.Apply(&edit); + } + + if (edit.has_log_number_) { + log_number = edit.log_number_; + have_log_number = true; + } + + if (edit.has_prev_log_number_) { + prev_log_number = edit.prev_log_number_; + have_prev_log_number = true; + } + + if (edit.has_next_file_number_) { + next_file = edit.next_file_number_; + have_next_file = true; + } + + if (edit.has_last_sequence_) { + last_sequence = edit.last_sequence_; + have_last_sequence = true; + } + } + } + delete file; + file = NULL; + + if (s.ok()) { + if (!have_next_file) { + s = Status::Corruption("no meta-nextfile entry in descriptor"); + printf("no meta-nextfile entry in descriptor"); + } else if (!have_log_number) { + s = Status::Corruption("no meta-lognumber entry in descriptor"); + printf("no meta-lognumber entry in descriptor"); + } else if (!have_last_sequence) { + printf("no last-sequence-number entry in descriptor"); + s = Status::Corruption("no last-sequence-number entry in descriptor"); + } + + if (!have_prev_log_number) { + prev_log_number = 0; + } + + MarkFileNumberUsed(prev_log_number); + MarkFileNumberUsed(log_number); + } + + if (s.ok()) { + Version* v = new Version(this); + builder.SaveTo(v); + // Install recovered version + Finalize(v); + AppendVersion(v); + manifest_file_number_ = next_file; + next_file_number_ = next_file + 1; + last_sequence_ = last_sequence; + log_number_ = log_number; + prev_log_number_ = prev_log_number; + + printf("manifest_file_number %d next_file_number %d last_sequence %d log_number %d prev_log_number %d\n", + manifest_file_number_, next_file_number_, + last_sequence, log_number, prev_log_number); + printf("%s \n", v->DebugString().c_str()); + } + + + return s; +} + void VersionSet::MarkFileNumberUsed(uint64_t number) { if (next_file_number_ <= number) { next_file_number_ = number + 1; @@ -1032,6 +1146,21 @@ const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const { return scratch->buffer; } +const char* VersionSet::LevelDataSizeSummary( + LevelSummaryStorage* scratch) const { + int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size["); + for (int i = 0; i < NumberLevels(); i++) { + int sz = sizeof(scratch->buffer) - len; + int ret = snprintf(scratch->buffer + len, sz, "%ld ", + NumLevelBytes(i)); + if (ret < 0 || ret >= sz) + break; + len += ret; + } + snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]"); + return scratch->buffer; +} + uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) { uint64_t result = 0; for (int level = 0; level < NumberLevels(); level++) { @@ -1443,4 +1572,37 @@ void Compaction::ReleaseInputs() { } } +static void InputSummary(std::vector& files, + char* output, + int len) { + int write = 0; + for (int i = 0; i < files.size(); i++) { + int sz = len - write; + int ret = snprintf(output + write, sz, "%llu(%llu) ", + files.at(i)->number, + files.at(i)->file_size); + if (ret < 0 || ret >= sz) + break; + write += ret; + } +} + +void Compaction::Summary(char* output, int len) { + int write = snprintf(output, len, "Base level %d, inputs:", level_); + if(write < 0 || write > len) + return; + + char level_low_summary[100]; + InputSummary(inputs_[0], level_low_summary, 100); + char level_up_summary[100]; + if (inputs_[1].size()) { + InputSummary(inputs_[1], level_up_summary, 100); + } else { + level_up_summary[0] = '\0'; + } + + snprintf(output + write, len - write, "[%s],[%s]", + level_low_summary, level_up_summary); +} + } // namespace leveldb diff --git a/db/version_set.h b/db/version_set.h index c0cbe7f80..2dc889708 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -238,6 +238,13 @@ class VersionSet { }; const char* LevelSummary(LevelSummaryStorage* scratch) const; + // printf contents (for debugging) + Status DumpManifest(Options& options, std::string& manifestFileName); + + // Return a human-readable short (single-line) summary of the data size + // of files per level. Uses *scratch as backing store. + const char* LevelDataSizeSummary(LevelSummaryStorage* scratch) const; + private: class Builder; @@ -344,6 +351,8 @@ class Compaction { // is successful. void ReleaseInputs(); + void Summary(char* output, int len); + private: friend class Version; friend class VersionSet; diff --git a/fbcode.sh b/fbcode.sh index 0f8cc3853..f21381f94 100644 --- a/fbcode.sh +++ b/fbcode.sh @@ -8,6 +8,13 @@ TOOLCHAIN_REV=d28c90311ca14f9f0b2bb720f4e34b285513d4f4 TOOLCHAIN_EXECUTABLES="/mnt/gvfs/third-party/$TOOLCHAIN_REV/centos5.2-native" TOOLCHAIN_LIB_BASE="/mnt/gvfs/third-party/$TOOLCHAIN_REV/gcc-4.6.2-glibc-2.13" +# always build thrift server +export USE_THRIFT=1 + +if ! test "$NO_SCRIBE"; then +export USE_SCRIBE=1 +fi + # location of libhdfs libraries if test "$USE_HDFS"; then JAVA_HOME="/usr/local/jdk-6u22-64" diff --git a/hdfs/env_hdfs.h b/hdfs/env_hdfs.h index 6ee67dcfa..4f2809871 100644 --- a/hdfs/env_hdfs.h +++ b/hdfs/env_hdfs.h @@ -122,6 +122,20 @@ class HdfsEnv : public Env { posixEnv->SleepForMicroseconds(micros); } + virtual Status GetHostName(char* name, uint len) { + return posixEnv->GetHostName(name, len); + } + + virtual Status GetCurrentTime(int64_t* unix_time) { + return posixEnv->NowUnixTime(unix_time); + } + + virtual Status GetAbsolutePath(const std::string& db_path, + std::string* output_path) { + return posixEnv->GetAbsolutePath(db_path, output_path); + } + + static uint64_t gettid() { assert(sizeof(pthread_t) <= sizeof(uint64_t)); return (uint64_t)pthread_self(); @@ -245,6 +259,14 @@ class HdfsEnv : public Env { virtual uint64_t NowMicros() {} virtual void SleepForMicroseconds(int micros) {} + + virtual Status GetHostName(char* name, uint len) {} + + virtual Status GetCurrentTime(int64_t* unix_time) {} + + virtual Status GetAbsolutePath(const std::string& db_path, + std::string* outputpath) {} + }; } diff --git a/include/leveldb/env.h b/include/leveldb/env.h index 272066718..378212ab0 100644 --- a/include/leveldb/env.h +++ b/include/leveldb/env.h @@ -145,6 +145,16 @@ class Env { // Sleep/delay the thread for the perscribed number of micro-seconds. virtual void SleepForMicroseconds(int micros) = 0; + // Get the current host name. + virtual Status GetHostName(char* name, uint len) = 0; + + // Get the number of seconds since the Epoch, 1970-01-01 00:00:00 (UTC). + virtual Status GetCurrentTime(int64_t* unix_time) = 0; + + // Get full directory name for this db. + virtual Status GetAbsolutePath(const std::string& db_path, + std::string* output_path) = 0; + private: // No copying allowed Env(const Env&); @@ -314,6 +324,17 @@ class EnvWrapper : public Env { void SleepForMicroseconds(int micros) { target_->SleepForMicroseconds(micros); } + Status GetHostName(char* name, uint len) { + return target_->GetHostName(name, len); + } + Status GetCurrentTime(int64_t* unix_time) { + return target_->GetCurrentTime(unix_time); + } + Status GetAbsolutePath(const std::string& db_path, + std::string* output_path) { + return target_->GetAbsolutePath(db_path, output_path); + } + private: Env* target_; }; diff --git a/include/leveldb/options.h b/include/leveldb/options.h index d0b5a4cf9..a271905fc 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -152,17 +152,17 @@ struct Options { // (max_bytes_for_level_base)^(max_bytes_for_level_multiplier). int max_bytes_for_level_base; - int max_bytes_for_level_multiplier; + int max_bytes_for_level_multiplier; - // Maximum number of bytes in all compacted files. We avoid expanding - // the lower level file set of a compaction if it would make the - // total compaction cover more than - // (expanded_compaction_factor * targetFileSizeLevel()) many bytes. - int expanded_compaction_factor; + // Maximum number of bytes in all compacted files. We avoid expanding + // the lower level file set of a compaction if it would make the + // total compaction cover more than + // (expanded_compaction_factor * targetFileSizeLevel()) many bytes. + int expanded_compaction_factor; - // Control maximum bytes of overlaps in grandparent (i.e., level+2) before we - // stop building a single file in a level->level+1 compaction. - int max_grandparent_overlap_factor; + // Control maximum bytes of overlaps in grandparent (i.e., level+2) before we + // stop building a single file in a level->level+1 compaction. + int max_grandparent_overlap_factor; // Compress blocks using the specified compression algorithm. This // parameter can be changed dynamically. @@ -198,8 +198,16 @@ struct Options { // Default: false bool disableDataSync; + // This number controls how often a new scribe log about + // db deploy stats is written out. + // -1 indicates no logging at all. + // Default value is 1800 (half an hour). + int db_stats_log_interval; + // Create an Options object with default values for all fields. Options(); + + void Dump(Logger * log) const; }; // Options that control read operations diff --git a/scribe/if/gen-cpp/scribe.cpp b/scribe/if/gen-cpp/scribe.cpp new file mode 100644 index 000000000..c632e5a16 --- /dev/null +++ b/scribe/if/gen-cpp/scribe.cpp @@ -0,0 +1,1012 @@ +/** + * Autogenerated by Thrift + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +#include "scribe.h" +#include "folly/ScopeGuard.h" + +namespace Tleveldb { + +uint32_t scribe_Log_args::read(apache::thrift::protocol::TProtocol* iprot) { + + uint32_t xfer = 0; + std::string fname; + apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 1: + if (ftype == apache::thrift::protocol::T_LIST) { + { + this->messages.clear(); + uint32_t _size14; + apache::thrift::protocol::TType _etype17; + xfer += iprot->readListBegin(_etype17, _size14); + this->messages.resize(_size14); + uint32_t _i18; + for (_i18 = 0; _i18 < _size14; ++_i18) + { + xfer += this->messages[_i18].read(iprot); + } + xfer += iprot->readListEnd(); + } + this->__isset.messages = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t scribe_Log_args::write(apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + xfer += oprot->writeStructBegin("scribe_Log_args"); + xfer += oprot->writeFieldBegin("messages", apache::thrift::protocol::T_LIST, 1); + { + xfer += oprot->writeListBegin(apache::thrift::protocol::T_STRUCT, this->messages.size()); + std::vector ::const_iterator _iter19; + for (_iter19 = this->messages.begin(); _iter19 != this->messages.end(); ++_iter19) + { + xfer += (*_iter19).write(oprot); + } + xfer += oprot->writeListEnd(); + } + xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +uint32_t scribe_Log_pargs::write(apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + xfer += oprot->writeStructBegin("scribe_Log_pargs"); + xfer += oprot->writeFieldBegin("messages", apache::thrift::protocol::T_LIST, 1); + { + xfer += oprot->writeListBegin(apache::thrift::protocol::T_STRUCT, (*(this->messages)).size()); + std::vector ::const_iterator _iter20; + for (_iter20 = (*(this->messages)).begin(); _iter20 != (*(this->messages)).end(); ++_iter20) + { + xfer += (*_iter20).write(oprot); + } + xfer += oprot->writeListEnd(); + } + xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +uint32_t scribe_Log_result::read(apache::thrift::protocol::TProtocol* iprot) { + + uint32_t xfer = 0; + std::string fname; + apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 0: + if (ftype == apache::thrift::protocol::T_I32) { + int32_t ecast21; + xfer += iprot->readI32(ecast21); + this->success = (ResultCode)ecast21; + this->__isset.success = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t scribe_Log_result::write(apache::thrift::protocol::TProtocol* oprot) const { + + uint32_t xfer = 0; + + xfer += oprot->writeStructBegin("scribe_Log_result"); + + if (this->__isset.success) { + xfer += oprot->writeFieldBegin("success", apache::thrift::protocol::T_I32, 0); + xfer += oprot->writeI32((int32_t)this->success); + xfer += oprot->writeFieldEnd(); + } + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +uint32_t scribe_Log_presult::read(apache::thrift::protocol::TProtocol* iprot) { + + uint32_t xfer = 0; + std::string fname; + apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 0: + if (ftype == apache::thrift::protocol::T_I32) { + int32_t ecast22; + xfer += iprot->readI32(ecast22); + (*(this->success)) = (ResultCode)ecast22; + this->__isset.success = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t scribe_LogMulti_args::read(apache::thrift::protocol::TProtocol* iprot) { + + uint32_t xfer = 0; + std::string fname; + apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 1: + if (ftype == apache::thrift::protocol::T_LIST) { + { + this->messages.clear(); + uint32_t _size23; + apache::thrift::protocol::TType _etype26; + xfer += iprot->readListBegin(_etype26, _size23); + this->messages.resize(_size23); + uint32_t _i27; + for (_i27 = 0; _i27 < _size23; ++_i27) + { + xfer += this->messages[_i27].read(iprot); + } + xfer += iprot->readListEnd(); + } + this->__isset.messages = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t scribe_LogMulti_args::write(apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + xfer += oprot->writeStructBegin("scribe_LogMulti_args"); + xfer += oprot->writeFieldBegin("messages", apache::thrift::protocol::T_LIST, 1); + { + xfer += oprot->writeListBegin(apache::thrift::protocol::T_STRUCT, this->messages.size()); + std::vector ::const_iterator _iter28; + for (_iter28 = this->messages.begin(); _iter28 != this->messages.end(); ++_iter28) + { + xfer += (*_iter28).write(oprot); + } + xfer += oprot->writeListEnd(); + } + xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +uint32_t scribe_LogMulti_pargs::write(apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + xfer += oprot->writeStructBegin("scribe_LogMulti_pargs"); + xfer += oprot->writeFieldBegin("messages", apache::thrift::protocol::T_LIST, 1); + { + xfer += oprot->writeListBegin(apache::thrift::protocol::T_STRUCT, (*(this->messages)).size()); + std::vector ::const_iterator _iter29; + for (_iter29 = (*(this->messages)).begin(); _iter29 != (*(this->messages)).end(); ++_iter29) + { + xfer += (*_iter29).write(oprot); + } + xfer += oprot->writeListEnd(); + } + xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +uint32_t scribe_LogMulti_result::read(apache::thrift::protocol::TProtocol* iprot) { + + uint32_t xfer = 0; + std::string fname; + apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 0: + if (ftype == apache::thrift::protocol::T_LIST) { + { + this->success.clear(); + uint32_t _size30; + apache::thrift::protocol::TType _etype33; + xfer += iprot->readListBegin(_etype33, _size30); + this->success.resize(_size30); + uint32_t _i34; + for (_i34 = 0; _i34 < _size30; ++_i34) + { + int32_t ecast35; + xfer += iprot->readI32(ecast35); + this->success[_i34] = (ResultCode)ecast35; + } + xfer += iprot->readListEnd(); + } + this->__isset.success = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t scribe_LogMulti_result::write(apache::thrift::protocol::TProtocol* oprot) const { + + uint32_t xfer = 0; + + xfer += oprot->writeStructBegin("scribe_LogMulti_result"); + + if (this->__isset.success) { + xfer += oprot->writeFieldBegin("success", apache::thrift::protocol::T_LIST, 0); + { + xfer += oprot->writeListBegin(apache::thrift::protocol::T_I32, this->success.size()); + std::vector ::const_iterator _iter36; + for (_iter36 = this->success.begin(); _iter36 != this->success.end(); ++_iter36) + { + xfer += oprot->writeI32((int32_t)(*_iter36)); + } + xfer += oprot->writeListEnd(); + } + xfer += oprot->writeFieldEnd(); + } + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +uint32_t scribe_LogMulti_presult::read(apache::thrift::protocol::TProtocol* iprot) { + + uint32_t xfer = 0; + std::string fname; + apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 0: + if (ftype == apache::thrift::protocol::T_LIST) { + { + (*(this->success)).clear(); + uint32_t _size37; + apache::thrift::protocol::TType _etype40; + xfer += iprot->readListBegin(_etype40, _size37); + (*(this->success)).resize(_size37); + uint32_t _i41; + for (_i41 = 0; _i41 < _size37; ++_i41) + { + int32_t ecast42; + xfer += iprot->readI32(ecast42); + (*(this->success))[_i41] = (ResultCode)ecast42; + } + xfer += iprot->readListEnd(); + } + this->__isset.success = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t scribe_LogCompressedMsg_args::read(apache::thrift::protocol::TProtocol* iprot) { + + uint32_t xfer = 0; + std::string fname; + apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 1: + if (ftype == apache::thrift::protocol::T_STRING) { + xfer += iprot->readBinary(this->compressedMessages); + this->__isset.compressedMessages = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t scribe_LogCompressedMsg_args::write(apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + xfer += oprot->writeStructBegin("scribe_LogCompressedMsg_args"); + xfer += oprot->writeFieldBegin("compressedMessages", apache::thrift::protocol::T_STRING, 1); + xfer += oprot->writeBinary(this->compressedMessages); + xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +uint32_t scribe_LogCompressedMsg_pargs::write(apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + xfer += oprot->writeStructBegin("scribe_LogCompressedMsg_pargs"); + xfer += oprot->writeFieldBegin("compressedMessages", apache::thrift::protocol::T_STRING, 1); + xfer += oprot->writeBinary((*(this->compressedMessages))); + xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +uint32_t scribe_LogCompressedMsg_result::read(apache::thrift::protocol::TProtocol* iprot) { + + uint32_t xfer = 0; + std::string fname; + apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 0: + if (ftype == apache::thrift::protocol::T_I32) { + int32_t ecast43; + xfer += iprot->readI32(ecast43); + this->success = (ResultCode)ecast43; + this->__isset.success = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t scribe_LogCompressedMsg_result::write(apache::thrift::protocol::TProtocol* oprot) const { + + uint32_t xfer = 0; + + xfer += oprot->writeStructBegin("scribe_LogCompressedMsg_result"); + + if (this->__isset.success) { + xfer += oprot->writeFieldBegin("success", apache::thrift::protocol::T_I32, 0); + xfer += oprot->writeI32((int32_t)this->success); + xfer += oprot->writeFieldEnd(); + } + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +uint32_t scribe_LogCompressedMsg_presult::read(apache::thrift::protocol::TProtocol* iprot) { + + uint32_t xfer = 0; + std::string fname; + apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 0: + if (ftype == apache::thrift::protocol::T_I32) { + int32_t ecast44; + xfer += iprot->readI32(ecast44); + (*(this->success)) = (ResultCode)ecast44; + this->__isset.success = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +int32_t scribeClient::getNextSendSequenceId() +{ + return nextSendSequenceId_++; +} + +int32_t scribeClient::getNextRecvSequenceId() +{ + return nextRecvSequenceId_++; +} + +ResultCode scribeClient::Log(const std::vector & messages) +{ + folly::ScopeGuard g = folly::makeGuard([&] { this->clearClientContextStack(); }); + this->generateClientContextStack("scribe.Log", NULL); + + try { + send_Log(messages); + return recv_Log(); + } catch(apache::thrift::transport::TTransportException& ex) { + this->handlerError(this->getClientContextStack(), "scribe.Log"); + iprot_->getTransport()->close(); + oprot_->getTransport()->close(); + throw; + } catch(apache::thrift::TApplicationException& ex) { + if (ex.getType() == apache::thrift::TApplicationException::BAD_SEQUENCE_ID) { + this->handlerError(this->getClientContextStack(), "scribe.Log"); + iprot_->getTransport()->close(); + oprot_->getTransport()->close(); + } + throw; + } +} + +void scribeClient::send_Log(const std::vector & messages) +{ + apache::thrift::ContextStack* ctx = this->getClientContextStack(); + this->preWrite(ctx, "scribe.Log"); + oprot_->writeMessageBegin("Log", apache::thrift::protocol::T_CALL, getNextSendSequenceId()); + + scribe_Log_pargs args; + args.messages = &messages; + args.write(oprot_); + + oprot_->writeMessageEnd(); + uint32_t _bytes45 = oprot_->getTransport()->writeEnd(); + oprot_->getTransport()->flush(); + this->postWrite(ctx, "scribe.Log", _bytes45); + return; +} + +ResultCode scribeClient::recv_Log() +{ + apache::thrift::ContextStack* ctx = this->getClientContextStack(); + uint32_t bytes; + int32_t rseqid = 0; + int32_t eseqid = getNextRecvSequenceId(); + std::string fname; + apache::thrift::protocol::TMessageType mtype; + this->preRead(ctx, "scribe.Log"); + + iprot_->readMessageBegin(fname, mtype, rseqid); + if (this->checkSeqid_ && rseqid != eseqid) { + iprot_->skip(apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + throw apache::thrift::TApplicationException(apache::thrift::TApplicationException::BAD_SEQUENCE_ID); + } + if (mtype == apache::thrift::protocol::T_EXCEPTION) { + apache::thrift::TApplicationException x; + x.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + throw x; + } + if (mtype != apache::thrift::protocol::T_REPLY) { + iprot_->skip(apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + throw apache::thrift::TApplicationException(apache::thrift::TApplicationException::INVALID_MESSAGE_TYPE); + } + if (fname.compare("Log") != 0) { + iprot_->skip(apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + throw apache::thrift::TApplicationException(apache::thrift::TApplicationException::WRONG_METHOD_NAME); + } + ResultCode _return; + scribe_Log_presult result; + result.success = &_return; + result.read(iprot_); + iprot_->readMessageEnd(); + bytes = iprot_->getTransport()->readEnd(); + this->postRead(ctx, "scribe.Log", bytes); + + if (result.__isset.success) { + return _return; + } + throw apache::thrift::TApplicationException(apache::thrift::TApplicationException::MISSING_RESULT, "Log failed: unknown result"); +} + +void scribeClient::LogMulti(std::vector & _return, const std::vector & messages) +{ + folly::ScopeGuard g = folly::makeGuard([&] { this->clearClientContextStack(); }); + this->generateClientContextStack("scribe.LogMulti", NULL); + + try { + send_LogMulti(messages); + recv_LogMulti(_return); + } catch(apache::thrift::transport::TTransportException& ex) { + this->handlerError(this->getClientContextStack(), "scribe.LogMulti"); + iprot_->getTransport()->close(); + oprot_->getTransport()->close(); + throw; + } catch(apache::thrift::TApplicationException& ex) { + if (ex.getType() == apache::thrift::TApplicationException::BAD_SEQUENCE_ID) { + this->handlerError(this->getClientContextStack(), "scribe.LogMulti"); + iprot_->getTransport()->close(); + oprot_->getTransport()->close(); + } + throw; + } +} + +void scribeClient::send_LogMulti(const std::vector & messages) +{ + apache::thrift::ContextStack* ctx = this->getClientContextStack(); + this->preWrite(ctx, "scribe.LogMulti"); + oprot_->writeMessageBegin("LogMulti", apache::thrift::protocol::T_CALL, getNextSendSequenceId()); + + scribe_LogMulti_pargs args; + args.messages = &messages; + args.write(oprot_); + + oprot_->writeMessageEnd(); + uint32_t _bytes46 = oprot_->getTransport()->writeEnd(); + oprot_->getTransport()->flush(); + this->postWrite(ctx, "scribe.LogMulti", _bytes46); + return; +} + +void scribeClient::recv_LogMulti(std::vector & _return) +{ + apache::thrift::ContextStack* ctx = this->getClientContextStack(); + uint32_t bytes; + int32_t rseqid = 0; + int32_t eseqid = getNextRecvSequenceId(); + std::string fname; + apache::thrift::protocol::TMessageType mtype; + this->preRead(ctx, "scribe.LogMulti"); + + iprot_->readMessageBegin(fname, mtype, rseqid); + if (this->checkSeqid_ && rseqid != eseqid) { + iprot_->skip(apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + throw apache::thrift::TApplicationException(apache::thrift::TApplicationException::BAD_SEQUENCE_ID); + } + if (mtype == apache::thrift::protocol::T_EXCEPTION) { + apache::thrift::TApplicationException x; + x.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + throw x; + } + if (mtype != apache::thrift::protocol::T_REPLY) { + iprot_->skip(apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + throw apache::thrift::TApplicationException(apache::thrift::TApplicationException::INVALID_MESSAGE_TYPE); + } + if (fname.compare("LogMulti") != 0) { + iprot_->skip(apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + throw apache::thrift::TApplicationException(apache::thrift::TApplicationException::WRONG_METHOD_NAME); + } + scribe_LogMulti_presult result; + result.success = &_return; + result.read(iprot_); + iprot_->readMessageEnd(); + bytes = iprot_->getTransport()->readEnd(); + this->postRead(ctx, "scribe.LogMulti", bytes); + + if (result.__isset.success) { + // _return pointer has now been filled + return; + } + throw apache::thrift::TApplicationException(apache::thrift::TApplicationException::MISSING_RESULT, "LogMulti failed: unknown result"); +} + +ResultCode scribeClient::LogCompressedMsg(const std::string& compressedMessages) +{ + folly::ScopeGuard g = folly::makeGuard([&] { this->clearClientContextStack(); }); + this->generateClientContextStack("scribe.LogCompressedMsg", NULL); + + try { + send_LogCompressedMsg(compressedMessages); + return recv_LogCompressedMsg(); + } catch(apache::thrift::transport::TTransportException& ex) { + this->handlerError(this->getClientContextStack(), "scribe.LogCompressedMsg"); + iprot_->getTransport()->close(); + oprot_->getTransport()->close(); + throw; + } catch(apache::thrift::TApplicationException& ex) { + if (ex.getType() == apache::thrift::TApplicationException::BAD_SEQUENCE_ID) { + this->handlerError(this->getClientContextStack(), "scribe.LogCompressedMsg"); + iprot_->getTransport()->close(); + oprot_->getTransport()->close(); + } + throw; + } +} + +void scribeClient::send_LogCompressedMsg(const std::string& compressedMessages) +{ + apache::thrift::ContextStack* ctx = this->getClientContextStack(); + this->preWrite(ctx, "scribe.LogCompressedMsg"); + oprot_->writeMessageBegin("LogCompressedMsg", apache::thrift::protocol::T_CALL, getNextSendSequenceId()); + + scribe_LogCompressedMsg_pargs args; + args.compressedMessages = &compressedMessages; + args.write(oprot_); + + oprot_->writeMessageEnd(); + uint32_t _bytes47 = oprot_->getTransport()->writeEnd(); + oprot_->getTransport()->flush(); + this->postWrite(ctx, "scribe.LogCompressedMsg", _bytes47); + return; +} + +ResultCode scribeClient::recv_LogCompressedMsg() +{ + apache::thrift::ContextStack* ctx = this->getClientContextStack(); + uint32_t bytes; + int32_t rseqid = 0; + int32_t eseqid = getNextRecvSequenceId(); + std::string fname; + apache::thrift::protocol::TMessageType mtype; + this->preRead(ctx, "scribe.LogCompressedMsg"); + + iprot_->readMessageBegin(fname, mtype, rseqid); + if (this->checkSeqid_ && rseqid != eseqid) { + iprot_->skip(apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + throw apache::thrift::TApplicationException(apache::thrift::TApplicationException::BAD_SEQUENCE_ID); + } + if (mtype == apache::thrift::protocol::T_EXCEPTION) { + apache::thrift::TApplicationException x; + x.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + throw x; + } + if (mtype != apache::thrift::protocol::T_REPLY) { + iprot_->skip(apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + throw apache::thrift::TApplicationException(apache::thrift::TApplicationException::INVALID_MESSAGE_TYPE); + } + if (fname.compare("LogCompressedMsg") != 0) { + iprot_->skip(apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + throw apache::thrift::TApplicationException(apache::thrift::TApplicationException::WRONG_METHOD_NAME); + } + ResultCode _return; + scribe_LogCompressedMsg_presult result; + result.success = &_return; + result.read(iprot_); + iprot_->readMessageEnd(); + bytes = iprot_->getTransport()->readEnd(); + this->postRead(ctx, "scribe.LogCompressedMsg", bytes); + + if (result.__isset.success) { + return _return; + } + throw apache::thrift::TApplicationException(apache::thrift::TApplicationException::MISSING_RESULT, "LogCompressedMsg failed: unknown result"); +} + +bool scribeProcessor::dispatchCall(::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, const std::string& fname, int32_t seqid, apache::thrift::server::TConnectionContext* connectionContext) { + ProcessMap::iterator pfn; + pfn = processMap_.find(fname); + if (pfn == processMap_.end()) { + iprot->skip(apache::thrift::protocol::T_STRUCT); + iprot->readMessageEnd(); + iprot->getTransport()->readEnd(); + apache::thrift::TApplicationException x(apache::thrift::TApplicationException::UNKNOWN_METHOD, "Invalid method name: '"+fname+"'"); + oprot->writeMessageBegin(fname, apache::thrift::protocol::T_EXCEPTION, seqid); + x.write(oprot); + oprot->writeMessageEnd(); + oprot->getTransport()->writeEnd(); + oprot->getTransport()->flush(); + return true; + } + const ProcessFunction& pf = pfn->second; + (this->*pf)(seqid, iprot, oprot, connectionContext); + return true; +} + +void scribeProcessor::process_Log(int32_t seqid, apache::thrift::protocol::TProtocol* iprot, apache::thrift::protocol::TProtocol* oprot, apache::thrift::server::TConnectionContext* connectionContext) +{ + std::unique_ptr ctx(this->getContextStack("scribe.Log", connectionContext)); + + this->preRead(ctx.get(), "scribe.Log"); + scribe_Log_args args; + args.read(iprot); + iprot->readMessageEnd(); + uint32_t bytes = iprot->getTransport()->readEnd(); + + this->postRead(ctx.get(), "scribe.Log", bytes); + + scribe_Log_result result; + try { + result.success = iface_->Log(args.messages); + result.__isset.success = true; + } catch (const std::exception& e) { + this->handlerError(ctx.get(), "scribe.Log"); + + + apache::thrift::TApplicationException x(e.what()); + oprot->writeMessageBegin("Log", apache::thrift::protocol::T_EXCEPTION, seqid); + x.write(oprot); + oprot->writeMessageEnd(); + oprot->getTransport()->writeEnd(); + oprot->getTransport()->flush(); + return; + } + + this->preWrite(ctx.get(), "scribe.Log"); + oprot->writeMessageBegin("Log", apache::thrift::protocol::T_REPLY, seqid); + result.write(oprot); + oprot->writeMessageEnd(); + bytes = oprot->getTransport()->writeEnd(); + oprot->getTransport()->flush(); + + this->postWrite(ctx.get(), "scribe.Log", bytes); + +} + +void scribeProcessor::process_LogMulti(int32_t seqid, apache::thrift::protocol::TProtocol* iprot, apache::thrift::protocol::TProtocol* oprot, apache::thrift::server::TConnectionContext* connectionContext) +{ + std::unique_ptr ctx(this->getContextStack("scribe.LogMulti", connectionContext)); + + this->preRead(ctx.get(), "scribe.LogMulti"); + scribe_LogMulti_args args; + args.read(iprot); + iprot->readMessageEnd(); + uint32_t bytes = iprot->getTransport()->readEnd(); + + this->postRead(ctx.get(), "scribe.LogMulti", bytes); + + scribe_LogMulti_result result; + try { + iface_->LogMulti(result.success, args.messages); + result.__isset.success = true; + } catch (const std::exception& e) { + this->handlerError(ctx.get(), "scribe.LogMulti"); + + + apache::thrift::TApplicationException x(e.what()); + oprot->writeMessageBegin("LogMulti", apache::thrift::protocol::T_EXCEPTION, seqid); + x.write(oprot); + oprot->writeMessageEnd(); + oprot->getTransport()->writeEnd(); + oprot->getTransport()->flush(); + return; + } + + this->preWrite(ctx.get(), "scribe.LogMulti"); + oprot->writeMessageBegin("LogMulti", apache::thrift::protocol::T_REPLY, seqid); + result.write(oprot); + oprot->writeMessageEnd(); + bytes = oprot->getTransport()->writeEnd(); + oprot->getTransport()->flush(); + + this->postWrite(ctx.get(), "scribe.LogMulti", bytes); + +} + +void scribeProcessor::process_LogCompressedMsg(int32_t seqid, apache::thrift::protocol::TProtocol* iprot, apache::thrift::protocol::TProtocol* oprot, apache::thrift::server::TConnectionContext* connectionContext) +{ + std::unique_ptr ctx(this->getContextStack("scribe.LogCompressedMsg", connectionContext)); + + this->preRead(ctx.get(), "scribe.LogCompressedMsg"); + scribe_LogCompressedMsg_args args; + args.read(iprot); + iprot->readMessageEnd(); + uint32_t bytes = iprot->getTransport()->readEnd(); + + this->postRead(ctx.get(), "scribe.LogCompressedMsg", bytes); + + scribe_LogCompressedMsg_result result; + try { + result.success = iface_->LogCompressedMsg(args.compressedMessages); + result.__isset.success = true; + } catch (const std::exception& e) { + this->handlerError(ctx.get(), "scribe.LogCompressedMsg"); + + + apache::thrift::TApplicationException x(e.what()); + oprot->writeMessageBegin("LogCompressedMsg", apache::thrift::protocol::T_EXCEPTION, seqid); + x.write(oprot); + oprot->writeMessageEnd(); + oprot->getTransport()->writeEnd(); + oprot->getTransport()->flush(); + return; + } + + this->preWrite(ctx.get(), "scribe.LogCompressedMsg"); + oprot->writeMessageBegin("LogCompressedMsg", apache::thrift::protocol::T_REPLY, seqid); + result.write(oprot); + oprot->writeMessageEnd(); + bytes = oprot->getTransport()->writeEnd(); + oprot->getTransport()->flush(); + + this->postWrite(ctx.get(), "scribe.LogCompressedMsg", bytes); + +} + +::boost::shared_ptr< ::apache::thrift::TProcessor > scribeProcessorFactory::getProcessor(::apache::thrift::server::TConnectionContext* ctx) { + ::apache::thrift::ReleaseHandler< scribeIfFactory > cleanup(handlerFactory_); + ::boost::shared_ptr< scribeIf > handler(handlerFactory_->getHandler(ctx), cleanup); + ::boost::shared_ptr< ::apache::thrift::TProcessor > processor(new scribeProcessor(handler)); + return processor; +} +} // namespace diff --git a/scribe/if/gen-cpp/scribe.h b/scribe/if/gen-cpp/scribe.h new file mode 100644 index 000000000..5f30edfbe --- /dev/null +++ b/scribe/if/gen-cpp/scribe.h @@ -0,0 +1,593 @@ +/** + * Autogenerated by Thrift + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +#ifndef _Tleveldb_scribe_H +#define _Tleveldb_scribe_H + +#include +#include "scribe_types.h" + +namespace Tleveldb { + +class scribeIf { + public: + virtual ~scribeIf() {} + virtual ResultCode Log(const std::vector & messages) = 0; + virtual void LogMulti(std::vector & _return, const std::vector & messages) = 0; + virtual ResultCode LogCompressedMsg(const std::string& compressedMessages) = 0; +}; + +class scribeIfFactory { + public: + typedef scribeIf Handler; + + virtual ~scribeIfFactory() {} + + virtual scribeIf* getHandler(::apache::thrift::server::TConnectionContext* ctx) = 0; + virtual void releaseHandler(scribeIf* handler) = 0; +}; + +class scribeIfSingletonFactory : virtual public scribeIfFactory { + public: + scribeIfSingletonFactory(const boost::shared_ptr& iface) : iface_(iface) {} + virtual ~scribeIfSingletonFactory() {} + + virtual scribeIf* getHandler(::apache::thrift::server::TConnectionContext*) { + return iface_.get(); + } + virtual void releaseHandler(scribeIf* handler) {} + + protected: + boost::shared_ptr iface_; +}; + +class scribeNull : virtual public scribeIf { + public: + virtual ~scribeNull() {} + ResultCode Log(const std::vector & /* messages */) { + ResultCode _return = (ResultCode)0; + return _return; + } + void LogMulti(std::vector & /* _return */, const std::vector & /* messages */) { + return; + } + ResultCode LogCompressedMsg(const std::string& /* compressedMessages */) { + ResultCode _return = (ResultCode)0; + return _return; + } +}; + +class scribe_Log_args { + public: + + static const uint64_t _reflection_id = 5902265217339133004U; + static void _reflection_register(::apache::thrift::reflection::Schema&); + scribe_Log_args() { + } + + scribe_Log_args(const scribe_Log_args&) = default; + scribe_Log_args& operator=(const scribe_Log_args&) = default; + scribe_Log_args(scribe_Log_args&&) = default; + scribe_Log_args& operator=(scribe_Log_args&&) = default; + + void __clear() { + messages.clear(); + __isset.__clear(); + } + + virtual ~scribe_Log_args() throw() {} + + std::vector messages; + + struct __isset { + __isset() { __clear(); } + void __clear() { + messages = false; + } + bool messages; + } __isset; + + bool operator == (const scribe_Log_args & rhs) const + { + if (!(this->messages == rhs.messages)) + return false; + return true; + } + bool operator != (const scribe_Log_args &rhs) const { + return !(*this == rhs); + } + + bool operator < (const scribe_Log_args & ) const; + + uint32_t read(apache::thrift::protocol::TProtocol* iprot); + uint32_t write(apache::thrift::protocol::TProtocol* oprot) const; + +}; + +class scribe_Log_pargs { + public: + + static const uint64_t _reflection_id = 5555604010648986412U; + static void _reflection_register(::apache::thrift::reflection::Schema&); + + virtual ~scribe_Log_pargs() throw() {} + + const std::vector * messages; + + uint32_t write(apache::thrift::protocol::TProtocol* oprot) const; + +}; + +class scribe_Log_result { + public: + + static const uint64_t _reflection_id = 18205781396971565932U; + static void _reflection_register(::apache::thrift::reflection::Schema&); + scribe_Log_result() : success(static_cast(0)) { + } + + scribe_Log_result(const scribe_Log_result&) = default; + scribe_Log_result& operator=(const scribe_Log_result&) = default; + scribe_Log_result(scribe_Log_result&&) = default; + scribe_Log_result& operator=(scribe_Log_result&&) = default; + + void __clear() { + success = static_cast(0); + __isset.__clear(); + } + + virtual ~scribe_Log_result() throw() {} + + ResultCode success; + + struct __isset { + __isset() { __clear(); } + void __clear() { + success = false; + } + bool success; + } __isset; + + bool operator == (const scribe_Log_result & rhs) const + { + if (!(this->success == rhs.success)) + return false; + return true; + } + bool operator != (const scribe_Log_result &rhs) const { + return !(*this == rhs); + } + + bool operator < (const scribe_Log_result & ) const; + + uint32_t read(apache::thrift::protocol::TProtocol* iprot); + uint32_t write(apache::thrift::protocol::TProtocol* oprot) const; + +}; + +class scribe_Log_presult { + public: + + static const uint64_t _reflection_id = 12945584136895385836U; + static void _reflection_register(::apache::thrift::reflection::Schema&); + + virtual ~scribe_Log_presult() throw() {} + + ResultCode* success; + + struct __isset { + __isset() { __clear(); } + void __clear() { + success = false; + } + bool success; + } __isset; + + uint32_t read(apache::thrift::protocol::TProtocol* iprot); + +}; + +class scribe_LogMulti_args { + public: + + static const uint64_t _reflection_id = 7590876486278061516U; + static void _reflection_register(::apache::thrift::reflection::Schema&); + scribe_LogMulti_args() { + } + + scribe_LogMulti_args(const scribe_LogMulti_args&) = default; + scribe_LogMulti_args& operator=(const scribe_LogMulti_args&) = default; + scribe_LogMulti_args(scribe_LogMulti_args&&) = default; + scribe_LogMulti_args& operator=(scribe_LogMulti_args&&) = default; + + void __clear() { + messages.clear(); + __isset.__clear(); + } + + virtual ~scribe_LogMulti_args() throw() {} + + std::vector messages; + + struct __isset { + __isset() { __clear(); } + void __clear() { + messages = false; + } + bool messages; + } __isset; + + bool operator == (const scribe_LogMulti_args & rhs) const + { + if (!(this->messages == rhs.messages)) + return false; + return true; + } + bool operator != (const scribe_LogMulti_args &rhs) const { + return !(*this == rhs); + } + + bool operator < (const scribe_LogMulti_args & ) const; + + uint32_t read(apache::thrift::protocol::TProtocol* iprot); + uint32_t write(apache::thrift::protocol::TProtocol* oprot) const; + +}; + +class scribe_LogMulti_pargs { + public: + + static const uint64_t _reflection_id = 9124384543551655628U; + static void _reflection_register(::apache::thrift::reflection::Schema&); + + virtual ~scribe_LogMulti_pargs() throw() {} + + const std::vector * messages; + + uint32_t write(apache::thrift::protocol::TProtocol* oprot) const; + +}; + +class scribe_LogMulti_result { + public: + + static const uint64_t _reflection_id = 4828367046341273164U; + static void _reflection_register(::apache::thrift::reflection::Schema&); + scribe_LogMulti_result() { + } + + scribe_LogMulti_result(const scribe_LogMulti_result&) = default; + scribe_LogMulti_result& operator=(const scribe_LogMulti_result&) = default; + scribe_LogMulti_result(scribe_LogMulti_result&&) = default; + scribe_LogMulti_result& operator=(scribe_LogMulti_result&&) = default; + + void __clear() { + success.clear(); + __isset.__clear(); + } + + virtual ~scribe_LogMulti_result() throw() {} + + std::vector success; + + struct __isset { + __isset() { __clear(); } + void __clear() { + success = false; + } + bool success; + } __isset; + + bool operator == (const scribe_LogMulti_result & rhs) const + { + if (!(this->success == rhs.success)) + return false; + return true; + } + bool operator != (const scribe_LogMulti_result &rhs) const { + return !(*this == rhs); + } + + bool operator < (const scribe_LogMulti_result & ) const; + + uint32_t read(apache::thrift::protocol::TProtocol* iprot); + uint32_t write(apache::thrift::protocol::TProtocol* oprot) const; + +}; + +class scribe_LogMulti_presult { + public: + + static const uint64_t _reflection_id = 5642041737363050316U; + static void _reflection_register(::apache::thrift::reflection::Schema&); + + virtual ~scribe_LogMulti_presult() throw() {} + + std::vector * success; + + struct __isset { + __isset() { __clear(); } + void __clear() { + success = false; + } + bool success; + } __isset; + + uint32_t read(apache::thrift::protocol::TProtocol* iprot); + +}; + +class scribe_LogCompressedMsg_args { + public: + + static const uint64_t _reflection_id = 12705053036625273964U; + static void _reflection_register(::apache::thrift::reflection::Schema&); + scribe_LogCompressedMsg_args() : compressedMessages("") { + } + + scribe_LogCompressedMsg_args(const scribe_LogCompressedMsg_args&) = default; + scribe_LogCompressedMsg_args& operator=(const scribe_LogCompressedMsg_args&) = default; + scribe_LogCompressedMsg_args(scribe_LogCompressedMsg_args&&) = default; + scribe_LogCompressedMsg_args& operator=(scribe_LogCompressedMsg_args&&) = default; + + void __clear() { + compressedMessages = ""; + __isset.__clear(); + } + + virtual ~scribe_LogCompressedMsg_args() throw() {} + + std::string compressedMessages; + + struct __isset { + __isset() { __clear(); } + void __clear() { + compressedMessages = false; + } + bool compressedMessages; + } __isset; + + bool operator == (const scribe_LogCompressedMsg_args & rhs) const + { + if (!(this->compressedMessages == rhs.compressedMessages)) + return false; + return true; + } + bool operator != (const scribe_LogCompressedMsg_args &rhs) const { + return !(*this == rhs); + } + + bool operator < (const scribe_LogCompressedMsg_args & ) const; + + uint32_t read(apache::thrift::protocol::TProtocol* iprot); + uint32_t write(apache::thrift::protocol::TProtocol* oprot) const; + +}; + +class scribe_LogCompressedMsg_pargs { + public: + + static const uint64_t _reflection_id = 13645577436870531500U; + static void _reflection_register(::apache::thrift::reflection::Schema&); + + virtual ~scribe_LogCompressedMsg_pargs() throw() {} + + const std::string* compressedMessages; + + uint32_t write(apache::thrift::protocol::TProtocol* oprot) const; + +}; + +class scribe_LogCompressedMsg_result { + public: + + static const uint64_t _reflection_id = 15026639991904524972U; + static void _reflection_register(::apache::thrift::reflection::Schema&); + scribe_LogCompressedMsg_result() : success(static_cast(0)) { + } + + scribe_LogCompressedMsg_result(const scribe_LogCompressedMsg_result&) = default; + scribe_LogCompressedMsg_result& operator=(const scribe_LogCompressedMsg_result&) = default; + scribe_LogCompressedMsg_result(scribe_LogCompressedMsg_result&&) = default; + scribe_LogCompressedMsg_result& operator=(scribe_LogCompressedMsg_result&&) = default; + + void __clear() { + success = static_cast(0); + __isset.__clear(); + } + + virtual ~scribe_LogCompressedMsg_result() throw() {} + + ResultCode success; + + struct __isset { + __isset() { __clear(); } + void __clear() { + success = false; + } + bool success; + } __isset; + + bool operator == (const scribe_LogCompressedMsg_result & rhs) const + { + if (!(this->success == rhs.success)) + return false; + return true; + } + bool operator != (const scribe_LogCompressedMsg_result &rhs) const { + return !(*this == rhs); + } + + bool operator < (const scribe_LogCompressedMsg_result & ) const; + + uint32_t read(apache::thrift::protocol::TProtocol* iprot); + uint32_t write(apache::thrift::protocol::TProtocol* oprot) const; + +}; + +class scribe_LogCompressedMsg_presult { + public: + + static const uint64_t _reflection_id = 5311776576442573772U; + static void _reflection_register(::apache::thrift::reflection::Schema&); + + virtual ~scribe_LogCompressedMsg_presult() throw() {} + + ResultCode* success; + + struct __isset { + __isset() { __clear(); } + void __clear() { + success = false; + } + bool success; + } __isset; + + uint32_t read(apache::thrift::protocol::TProtocol* iprot); + +}; + +class scribeClient : virtual public scribeIf, virtual public apache::thrift::TClientBase { + public: + scribeClient(boost::shared_ptr prot) : + checkSeqid_(true), + nextSendSequenceId_(1), + nextRecvSequenceId_(1), + piprot_(prot), + poprot_(prot) { + iprot_ = prot.get(); + oprot_ = prot.get(); + } + scribeClient(boost::shared_ptr iprot, boost::shared_ptr oprot) : + checkSeqid_(true), + nextSendSequenceId_(1), + nextRecvSequenceId_(1), + piprot_(iprot), + poprot_(oprot) { + iprot_ = iprot.get(); + oprot_ = oprot.get(); + } + boost::shared_ptr getInputProtocol() { + return piprot_; + } + boost::shared_ptr getOutputProtocol() { + return poprot_; + } + ResultCode Log(const std::vector & messages); + void send_Log(const std::vector & messages); + ResultCode recv_Log(); + void LogMulti(std::vector & _return, const std::vector & messages); + void send_LogMulti(const std::vector & messages); + void recv_LogMulti(std::vector & _return); + ResultCode LogCompressedMsg(const std::string& compressedMessages); + void send_LogCompressedMsg(const std::string& compressedMessages); + ResultCode recv_LogCompressedMsg(); + + /** + * Disable checking the seqid field in server responses. + * + * This should only be used with broken servers that return incorrect seqid values. + */ + void _disableSequenceIdChecks() { + checkSeqid_ = false; + } + + protected: + bool checkSeqid_; + int32_t nextSendSequenceId_; + int32_t nextRecvSequenceId_; + int32_t getNextSendSequenceId(); + int32_t getNextRecvSequenceId(); + boost::shared_ptr piprot_; + boost::shared_ptr poprot_; + apache::thrift::protocol::TProtocol* iprot_; + apache::thrift::protocol::TProtocol* oprot_; +}; + +class scribeProcessor : public ::apache::thrift::TDispatchProcessor { + protected: + boost::shared_ptr iface_; + virtual bool dispatchCall(apache::thrift::protocol::TProtocol* iprot, apache::thrift::protocol::TProtocol* oprot, const std::string& fname, int32_t seqid, apache::thrift::server::TConnectionContext* connectionContext); + private: + typedef void (scribeProcessor::*ProcessFunction)(int32_t, apache::thrift::protocol::TProtocol*, apache::thrift::protocol::TProtocol*, apache::thrift::server::TConnectionContext*); + typedef std::map ProcessMap; + ProcessMap processMap_; + void process_Log(int32_t seqid, apache::thrift::protocol::TProtocol* iprot, apache::thrift::protocol::TProtocol* oprot, apache::thrift::server::TConnectionContext* connectionContext); + void process_LogMulti(int32_t seqid, apache::thrift::protocol::TProtocol* iprot, apache::thrift::protocol::TProtocol* oprot, apache::thrift::server::TConnectionContext* connectionContext); + void process_LogCompressedMsg(int32_t seqid, apache::thrift::protocol::TProtocol* iprot, apache::thrift::protocol::TProtocol* oprot, apache::thrift::server::TConnectionContext* connectionContext); + public: + scribeProcessor(boost::shared_ptr iface) : + iface_(iface) { + processMap_["Log"] = &scribeProcessor::process_Log; + processMap_["LogMulti"] = &scribeProcessor::process_LogMulti; + processMap_["LogCompressedMsg"] = &scribeProcessor::process_LogCompressedMsg; + } + + virtual ~scribeProcessor() {} + + boost::shared_ptr > getProcessFunctions() { + boost::shared_ptr > rSet(new std::set()); + rSet->insert("scribe.Log"); + rSet->insert("scribe.LogMulti"); + rSet->insert("scribe.LogCompressedMsg"); + return rSet; + } +}; + +class scribeProcessorFactory : public ::apache::thrift::TProcessorFactory { + public: + scribeProcessorFactory(const ::boost::shared_ptr< scribeIfFactory >& handlerFactory) : + handlerFactory_(handlerFactory) {} + + ::boost::shared_ptr< ::apache::thrift::TProcessor > getProcessor(::apache::thrift::server::TConnectionContext* ctx); + + protected: + ::boost::shared_ptr< scribeIfFactory > handlerFactory_; +}; + +class scribeMultiface : virtual public scribeIf { + public: + scribeMultiface(std::vector >& ifaces) : ifaces_(ifaces) { + } + virtual ~scribeMultiface() {} + protected: + std::vector > ifaces_; + scribeMultiface() {} + void add(boost::shared_ptr iface) { + ifaces_.push_back(iface); + } + public: + ResultCode Log(const std::vector & messages) { + uint32_t i; + uint32_t sz = ifaces_.size(); + for (i = 0; i < sz - 1; ++i) { + ifaces_[i]->Log(messages); + } + return ifaces_[i]->Log(messages); + } + + void LogMulti(std::vector & _return, const std::vector & messages) { + uint32_t i; + uint32_t sz = ifaces_.size(); + for (i = 0; i < sz; ++i) { + ifaces_[i]->LogMulti(_return, messages); + } + } + + ResultCode LogCompressedMsg(const std::string& compressedMessages) { + uint32_t i; + uint32_t sz = ifaces_.size(); + for (i = 0; i < sz - 1; ++i) { + ifaces_[i]->LogCompressedMsg(compressedMessages); + } + return ifaces_[i]->LogCompressedMsg(compressedMessages); + } + +}; + +} // namespace + +#endif diff --git a/scribe/if/gen-cpp/scribe_constants.cpp b/scribe/if/gen-cpp/scribe_constants.cpp new file mode 100644 index 000000000..048069b5a --- /dev/null +++ b/scribe/if/gen-cpp/scribe_constants.cpp @@ -0,0 +1,17 @@ +/** + * Autogenerated by Thrift + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +#include "scribe_constants.h" + +namespace Tleveldb { + +const scribeConstants g_scribe_constants; + +scribeConstants::scribeConstants() { + SCRIBE_MAX_MESSAGE_LENGTH = 26214400; +} + +} // namespace diff --git a/scribe/if/gen-cpp/scribe_constants.h b/scribe/if/gen-cpp/scribe_constants.h new file mode 100644 index 000000000..f12108086 --- /dev/null +++ b/scribe/if/gen-cpp/scribe_constants.h @@ -0,0 +1,25 @@ +/** + * Autogenerated by Thrift + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +#ifndef scribe_CONSTANTS_H +#define scribe_CONSTANTS_H + +#include "scribe_types.h" + +namespace Tleveldb { + +class scribeConstants { + public: + scribeConstants(); + + int32_t SCRIBE_MAX_MESSAGE_LENGTH; +}; + +extern const scribeConstants g_scribe_constants; + +} // namespace + +#endif diff --git a/scribe/if/gen-cpp/scribe_types.cpp b/scribe/if/gen-cpp/scribe_types.cpp new file mode 100644 index 000000000..a72d36f2f --- /dev/null +++ b/scribe/if/gen-cpp/scribe_types.cpp @@ -0,0 +1,513 @@ +/** + * Autogenerated by Thrift + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +#include "scribe_types.h" + +#include +#include +#include + +namespace Tleveldb { + +int _kResultCodeValues[] = { + OK, + TRY_LATER, + ERROR_DECOMPRESS +}; + +const char* _kResultCodeNames[] = { + "OK", + "TRY_LATER", + "ERROR_DECOMPRESS" +}; + +const std::map _ResultCode_VALUES_TO_NAMES(apache::thrift::TEnumIterator(3, _kResultCodeValues, _kResultCodeNames), apache::thrift::TEnumIterator(-1, NULL, NULL)); + +const std::map _ResultCode_NAMES_TO_VALUES(apache::thrift::TEnumInverseIterator(3, _kResultCodeValues, _kResultCodeNames), apache::thrift::TEnumInverseIterator(-1, NULL, NULL)); + +} // namespace +namespace apache { namespace thrift { +template<> +const char* TEnumTraits< ::Tleveldb::ResultCode>::findName( ::Tleveldb::ResultCode value) { +return findName( ::Tleveldb::_ResultCode_VALUES_TO_NAMES, value); +} + +template<> +bool TEnumTraits< ::Tleveldb::ResultCode>::findValue(const char* name, ::Tleveldb::ResultCode* out) { +return findValue( ::Tleveldb::_ResultCode_NAMES_TO_VALUES, name, out); +} +}} // apache::thrift + +namespace Tleveldb { +// Reflection initializer for struct scribe.SourceInfo +namespace { +void reflectionInitializer_16557823557777806572(::apache::thrift::reflection::Schema& schema) { + const uint64_t id = 16557823557777806572U; + if (schema.dataTypes.count(id)) return; + ::apache::thrift::reflection::DataType dt; + dt.name = "struct scribe.SourceInfo"; + dt.__isset.fields = true; + { + ::apache::thrift::reflection::StructField f; + f.isRequired = true; + f.type = 1U; + f.name = "host"; + dt.fields[1] = f; + } + { + ::apache::thrift::reflection::StructField f; + f.isRequired = true; + f.type = 5U; + f.name = "port"; + dt.fields[2] = f; + } + { + ::apache::thrift::reflection::StructField f; + f.isRequired = true; + f.type = 6U; + f.name = "timestamp"; + dt.fields[3] = f; + } + schema.dataTypes[id] = dt; + schema.names[dt.name] = id; +} +} // namespace + +const uint64_t SourceInfo::_reflection_id; +void SourceInfo::_reflection_register(::apache::thrift::reflection::Schema& schema) { + reflectionInitializer_16557823557777806572(schema); +} +uint32_t SourceInfo::read(apache::thrift::protocol::TProtocol* iprot) { + + uint32_t xfer = 0; + std::string fname; + apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 1: + if (ftype == apache::thrift::protocol::T_STRING) { + xfer += iprot->readBinary(this->host); + this->__isset.host = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == apache::thrift::protocol::T_I32) { + xfer += iprot->readI32(this->port); + this->__isset.port = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 3: + if (ftype == apache::thrift::protocol::T_I64) { + xfer += iprot->readI64(this->timestamp); + this->__isset.timestamp = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t SourceInfo::write(apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + xfer += oprot->writeStructBegin("SourceInfo"); + xfer += oprot->writeFieldBegin("host", apache::thrift::protocol::T_STRING, 1); + xfer += oprot->writeBinary(this->host); + xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldBegin("port", apache::thrift::protocol::T_I32, 2); + xfer += oprot->writeI32(this->port); + xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldBegin("timestamp", apache::thrift::protocol::T_I64, 3); + xfer += oprot->writeI64(this->timestamp); + xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(SourceInfo &a, SourceInfo &b) { + using ::std::swap; + (void)a; + (void)b; + swap(a.host, b.host); + swap(a.port, b.port); + swap(a.timestamp, b.timestamp); + swap(a.__isset, b.__isset); +} + +// Reflection initializer for map +namespace { +void reflectionInitializer_9246346592659763371(::apache::thrift::reflection::Schema& schema) { + const uint64_t id = 9246346592659763371U; + if (schema.dataTypes.count(id)) return; + ::apache::thrift::reflection::DataType dt; + dt.name = "map"; + dt.__isset.mapKeyType = true; + dt.mapKeyType = 1U; + dt.__isset.valueType = true; + dt.valueType = 1U; + schema.dataTypes[id] = dt; + schema.names[dt.name] = id; +} +} // namespace + +// Reflection initializer for struct scribe.LogEntry +namespace { +void reflectionInitializer_15053466696968532300(::apache::thrift::reflection::Schema& schema) { + const uint64_t id = 15053466696968532300U; + if (schema.dataTypes.count(id)) return; + reflectionInitializer_16557823557777806572(schema); // struct scribe.SourceInfo + reflectionInitializer_9246346592659763371(schema); // map + ::apache::thrift::reflection::DataType dt; + dt.name = "struct scribe.LogEntry"; + dt.__isset.fields = true; + { + ::apache::thrift::reflection::StructField f; + f.isRequired = true; + f.type = 1U; + f.name = "category"; + dt.fields[1] = f; + } + { + ::apache::thrift::reflection::StructField f; + f.isRequired = true; + f.type = 1U; + f.name = "message"; + dt.fields[2] = f; + } + { + ::apache::thrift::reflection::StructField f; + f.isRequired = false; + f.type = 9246346592659763371U; + f.name = "metadata"; + dt.fields[3] = f; + } + { + ::apache::thrift::reflection::StructField f; + f.isRequired = false; + f.type = 5U; + f.name = "checksum"; + dt.fields[4] = f; + } + { + ::apache::thrift::reflection::StructField f; + f.isRequired = false; + f.type = 16557823557777806572U; + f.name = "source"; + dt.fields[5] = f; + } + { + ::apache::thrift::reflection::StructField f; + f.isRequired = false; + f.type = 5U; + f.name = "bucket"; + dt.fields[6] = f; + } + schema.dataTypes[id] = dt; + schema.names[dt.name] = id; +} +} // namespace + +const uint64_t LogEntry::_reflection_id; +void LogEntry::_reflection_register(::apache::thrift::reflection::Schema& schema) { + reflectionInitializer_15053466696968532300(schema); +} +uint32_t LogEntry::read(apache::thrift::protocol::TProtocol* iprot) { + + uint32_t xfer = 0; + std::string fname; + apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 1: + if (ftype == apache::thrift::protocol::T_STRING) { + xfer += iprot->readBinary(this->category); + this->__isset.category = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == apache::thrift::protocol::T_STRING) { + xfer += iprot->readBinary(this->message); + this->__isset.message = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 3: + if (ftype == apache::thrift::protocol::T_MAP) { + { + this->metadata.clear(); + uint32_t _size0; + apache::thrift::protocol::TType _ktype1; + apache::thrift::protocol::TType _vtype2; + xfer += iprot->readMapBegin(_ktype1, _vtype2, _size0); + uint32_t _i4; + for (_i4 = 0; _i4 < _size0; ++_i4) + { + std::string _key5; + xfer += iprot->readString(_key5); + std::string& _val6 = this->metadata[_key5]; + xfer += iprot->readString(_val6); + } + xfer += iprot->readMapEnd(); + } + this->__isset.metadata = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 4: + if (ftype == apache::thrift::protocol::T_I32) { + xfer += iprot->readI32(this->checksum); + this->__isset.checksum = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 5: + if (ftype == apache::thrift::protocol::T_STRUCT) { + xfer += this->source.read(iprot); + this->__isset.source = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 6: + if (ftype == apache::thrift::protocol::T_I32) { + xfer += iprot->readI32(this->bucket); + this->__isset.bucket = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t LogEntry::write(apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + xfer += oprot->writeStructBegin("LogEntry"); + xfer += oprot->writeFieldBegin("category", apache::thrift::protocol::T_STRING, 1); + xfer += oprot->writeBinary(this->category); + xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldBegin("message", apache::thrift::protocol::T_STRING, 2); + xfer += oprot->writeBinary(this->message); + xfer += oprot->writeFieldEnd(); + if (this->__isset.metadata) { + xfer += oprot->writeFieldBegin("metadata", apache::thrift::protocol::T_MAP, 3); + { + xfer += oprot->writeMapBegin(apache::thrift::protocol::T_STRING, apache::thrift::protocol::T_STRING, this->metadata.size()); + std::map ::const_iterator _iter7; + for (_iter7 = this->metadata.begin(); _iter7 != this->metadata.end(); ++_iter7) + { + xfer += oprot->writeString(_iter7->first); + xfer += oprot->writeString(_iter7->second); + } + xfer += oprot->writeMapEnd(); + } + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.checksum) { + xfer += oprot->writeFieldBegin("checksum", apache::thrift::protocol::T_I32, 4); + xfer += oprot->writeI32(this->checksum); + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.source) { + xfer += oprot->writeFieldBegin("source", apache::thrift::protocol::T_STRUCT, 5); + xfer += this->source.write(oprot); + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.bucket) { + xfer += oprot->writeFieldBegin("bucket", apache::thrift::protocol::T_I32, 6); + xfer += oprot->writeI32(this->bucket); + xfer += oprot->writeFieldEnd(); + } + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(LogEntry &a, LogEntry &b) { + using ::std::swap; + (void)a; + (void)b; + swap(a.category, b.category); + swap(a.message, b.message); + swap(a.metadata, b.metadata); + swap(a.checksum, b.checksum); + swap(a.source, b.source); + swap(a.bucket, b.bucket); + swap(a.__isset, b.__isset); +} + +// Reflection initializer for list +namespace { +void reflectionInitializer_10251729064312664553(::apache::thrift::reflection::Schema& schema) { + const uint64_t id = 10251729064312664553U; + if (schema.dataTypes.count(id)) return; + reflectionInitializer_15053466696968532300(schema); // struct scribe.LogEntry + ::apache::thrift::reflection::DataType dt; + dt.name = "list"; + dt.__isset.valueType = true; + dt.valueType = 15053466696968532300U; + schema.dataTypes[id] = dt; + schema.names[dt.name] = id; +} +} // namespace + +// Reflection initializer for struct scribe.MessageList +namespace { +void reflectionInitializer_5674270912483072844(::apache::thrift::reflection::Schema& schema) { + const uint64_t id = 5674270912483072844U; + if (schema.dataTypes.count(id)) return; + reflectionInitializer_10251729064312664553(schema); // list + ::apache::thrift::reflection::DataType dt; + dt.name = "struct scribe.MessageList"; + dt.__isset.fields = true; + { + ::apache::thrift::reflection::StructField f; + f.isRequired = true; + f.type = 10251729064312664553U; + f.name = "messages"; + dt.fields[1] = f; + } + schema.dataTypes[id] = dt; + schema.names[dt.name] = id; +} +} // namespace + +const uint64_t MessageList::_reflection_id; +void MessageList::_reflection_register(::apache::thrift::reflection::Schema& schema) { + reflectionInitializer_5674270912483072844(schema); +} +uint32_t MessageList::read(apache::thrift::protocol::TProtocol* iprot) { + + uint32_t xfer = 0; + std::string fname; + apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 1: + if (ftype == apache::thrift::protocol::T_LIST) { + { + this->messages.clear(); + uint32_t _size8; + apache::thrift::protocol::TType _etype11; + xfer += iprot->readListBegin(_etype11, _size8); + this->messages.resize(_size8); + uint32_t _i12; + for (_i12 = 0; _i12 < _size8; ++_i12) + { + xfer += this->messages[_i12].read(iprot); + } + xfer += iprot->readListEnd(); + } + this->__isset.messages = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t MessageList::write(apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + xfer += oprot->writeStructBegin("MessageList"); + xfer += oprot->writeFieldBegin("messages", apache::thrift::protocol::T_LIST, 1); + { + xfer += oprot->writeListBegin(apache::thrift::protocol::T_STRUCT, this->messages.size()); + std::vector ::const_iterator _iter13; + for (_iter13 = this->messages.begin(); _iter13 != this->messages.end(); ++_iter13) + { + xfer += (*_iter13).write(oprot); + } + xfer += oprot->writeListEnd(); + } + xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(MessageList &a, MessageList &b) { + using ::std::swap; + (void)a; + (void)b; + swap(a.messages, b.messages); + swap(a.__isset, b.__isset); +} + +} // namespace diff --git a/scribe/if/gen-cpp/scribe_types.h b/scribe/if/gen-cpp/scribe_types.h new file mode 100644 index 000000000..bf75a43e6 --- /dev/null +++ b/scribe/if/gen-cpp/scribe_types.h @@ -0,0 +1,247 @@ +/** + * Autogenerated by Thrift + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +#ifndef scribe_TYPES_H +#define scribe_TYPES_H + +#include +#include +#include +#include + +namespace apache { namespace thrift { namespace reflection { +class Schema; +}}} + + +namespace Tleveldb { + +enum ResultCode { + OK = 0, + TRY_LATER = 1, + ERROR_DECOMPRESS = 2 +}; + +extern const std::map _ResultCode_VALUES_TO_NAMES; + +extern const std::map _ResultCode_NAMES_TO_VALUES; + +} // namespace +namespace apache { namespace thrift { +template<> +inline constexpr ::Tleveldb::ResultCode TEnumTraits< ::Tleveldb::ResultCode>::min() { +return ::Tleveldb::ResultCode::OK; +} +template<> +inline constexpr ::Tleveldb::ResultCode TEnumTraits< ::Tleveldb::ResultCode>::max() { +return ::Tleveldb::ResultCode::ERROR_DECOMPRESS; +} +}} // apache:thrift + +namespace Tleveldb { +class SourceInfo { + public: + + static const uint64_t _reflection_id = 16557823557777806572U; + static void _reflection_register(::apache::thrift::reflection::Schema&); + SourceInfo() : host(""), port(0), timestamp(0) { + } + + SourceInfo(const SourceInfo&) = default; + SourceInfo& operator=(const SourceInfo&) = default; + SourceInfo(SourceInfo&&) = default; + SourceInfo& operator=(SourceInfo&&) = default; + + void __clear() { + host = ""; + port = 0; + timestamp = 0; + __isset.__clear(); + } + + virtual ~SourceInfo() throw() {} + + std::string host; + int32_t port; + int64_t timestamp; + + struct __isset { + __isset() { __clear(); } + void __clear() { + host = false; + port = false; + timestamp = false; + } + bool host; + bool port; + bool timestamp; + } __isset; + + bool operator == (const SourceInfo & rhs) const + { + if (!(this->host == rhs.host)) + return false; + if (!(this->port == rhs.port)) + return false; + if (!(this->timestamp == rhs.timestamp)) + return false; + return true; + } + bool operator != (const SourceInfo &rhs) const { + return !(*this == rhs); + } + + bool operator < (const SourceInfo & ) const; + + uint32_t read(apache::thrift::protocol::TProtocol* iprot); + uint32_t write(apache::thrift::protocol::TProtocol* oprot) const; + +}; + +class SourceInfo; +void swap(SourceInfo &a, SourceInfo &b); + +class LogEntry { + public: + + static const uint64_t _reflection_id = 15053466696968532300U; + static void _reflection_register(::apache::thrift::reflection::Schema&); + LogEntry() : category(""), message(""), checksum(0), bucket(0) { + } + + LogEntry(const LogEntry&) = default; + LogEntry& operator=(const LogEntry&) = default; + LogEntry(LogEntry&&) = default; + LogEntry& operator=(LogEntry&&) = default; + + void __clear() { + category = ""; + message = ""; + metadata.clear(); + checksum = 0; + source.__clear(); + bucket = 0; + __isset.__clear(); + } + + virtual ~LogEntry() throw() {} + + std::string category; + std::string message; + std::map metadata; + int32_t checksum; + SourceInfo source; + int32_t bucket; + + struct __isset { + __isset() { __clear(); } + void __clear() { + category = false; + message = false; + metadata = false; + checksum = false; + source = false; + bucket = false; + } + bool category; + bool message; + bool metadata; + bool checksum; + bool source; + bool bucket; + } __isset; + + bool operator == (const LogEntry & rhs) const + { + if (!(this->category == rhs.category)) + return false; + if (!(this->message == rhs.message)) + return false; + if (__isset.metadata != rhs.__isset.metadata) + return false; + else if (__isset.metadata && !(metadata == rhs.metadata)) + return false; + if (__isset.checksum != rhs.__isset.checksum) + return false; + else if (__isset.checksum && !(checksum == rhs.checksum)) + return false; + if (__isset.source != rhs.__isset.source) + return false; + else if (__isset.source && !(source == rhs.source)) + return false; + if (__isset.bucket != rhs.__isset.bucket) + return false; + else if (__isset.bucket && !(bucket == rhs.bucket)) + return false; + return true; + } + bool operator != (const LogEntry &rhs) const { + return !(*this == rhs); + } + + bool operator < (const LogEntry & ) const; + + uint32_t read(apache::thrift::protocol::TProtocol* iprot); + uint32_t write(apache::thrift::protocol::TProtocol* oprot) const; + +}; + +class LogEntry; +void swap(LogEntry &a, LogEntry &b); + +class MessageList { + public: + + static const uint64_t _reflection_id = 5674270912483072844U; + static void _reflection_register(::apache::thrift::reflection::Schema&); + MessageList() { + } + + MessageList(const MessageList&) = default; + MessageList& operator=(const MessageList&) = default; + MessageList(MessageList&&) = default; + MessageList& operator=(MessageList&&) = default; + + void __clear() { + messages.clear(); + __isset.__clear(); + } + + virtual ~MessageList() throw() {} + + std::vector messages; + + struct __isset { + __isset() { __clear(); } + void __clear() { + messages = false; + } + bool messages; + } __isset; + + bool operator == (const MessageList & rhs) const + { + if (!(this->messages == rhs.messages)) + return false; + return true; + } + bool operator != (const MessageList &rhs) const { + return !(*this == rhs); + } + + bool operator < (const MessageList & ) const; + + uint32_t read(apache::thrift::protocol::TProtocol* iprot); + uint32_t write(apache::thrift::protocol::TProtocol* oprot) const; + +}; + +class MessageList; +void swap(MessageList &a, MessageList &b); + +} // namespace + +#endif diff --git a/scribe/if/scribe.thrift b/scribe/if/scribe.thrift new file mode 100644 index 000000000..df09a5e78 --- /dev/null +++ b/scribe/if/scribe.thrift @@ -0,0 +1,82 @@ +#!/usr/local/bin/thrift --cpp --php + +## Copyright (c) 2007-2012 Facebook +## +## Licensed under the Apache License, Version 2.0 (the "License"); +## you may not use this file except in compliance with the License. +## You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## +## See accompanying file LICENSE or visit the Scribe site at: +## http://developers.facebook.com/scribe/ + +namespace cpp Tleveldb +namespace java Tleveldb + +// Max message length allowed to log through scribe +const i32 SCRIBE_MAX_MESSAGE_LENGTH = 26214400; + +enum ResultCode +{ + OK, + TRY_LATER, + ERROR_DECOMPRESS +} + +struct SourceInfo +{ + 1: binary host, + 2: i32 port, + 3: i64 timestamp +} + +struct LogEntry +{ + 1: binary category, + 2: binary message, + 3: optional map metadata, + 4: optional i32 checksum, + 5: optional SourceInfo source, + 6: optional i32 bucket +} + +struct MessageList +{ + 1: list messages +} + +service scribe +{ + # + # Delivers a list of LogEntry messages to the Scribe server. + # A returned ResultCode of anything other than OK indicates that the + # whole batch was unable to be delivered to the server. + # If data loss is a concern, the caller should buffer and retry the messages. + # + ResultCode Log(1: list messages); + + # + # NOTE: FOR INTERNAL USE ONLY! + # + # Delivers a list of LogEntry messages to the Scribe server, but + # allows partial successes. A list of ResultCodes will be returned to + # indicate the success or failure of each message at the corresponding index. + # If data loss is a concern, the caller should retry only the failed messages. + # + list LogMulti(1: list messages); + + # + # NOTE: FOR INTERNAL USE ONLY! + # + # The same as Log(...) except that the list of messages must first be + # serialized and compressed in some internal format. + # + ResultCode LogCompressedMsg(1: binary compressedMessages); +} diff --git a/scribe/scribe_logger.cc b/scribe/scribe_logger.cc new file mode 100644 index 000000000..5c10e93fb --- /dev/null +++ b/scribe/scribe_logger.cc @@ -0,0 +1,90 @@ +#include "scribe_logger.h" + +namespace leveldb { + +const std::string ScribeLogger::COL_SEPERATOR = "\x1"; +const std::string ScribeLogger::DEPLOY_STATS_CATEGORY = "leveldb_deploy_stats"; + +ScribeLogger::ScribeLogger(const std::string& host, int port, + int retry_times, uint32_t retry_intervals, int batch_size) + : host_(host), + port_(port), + retry_times_(retry_times), + retry_intervals_ (retry_intervals), + batch_size_ (batch_size) { + shared_ptr socket(new TSocket(host_, port_)); + shared_ptr framedTransport(new TFramedTransport(socket)); + framedTransport->open(); + shared_ptr protocol(new TBinaryProtocol(framedTransport)); + scribe_client_ = new scribeClient(protocol); +} + +void ScribeLogger::Log(const std::string& category, + const std::string& message) { + LogEntry entry; + entry.category = category; + entry.message = message; + + logger_mutex_.Lock(); + logs_.push_back(entry); + + if (logs_.size() >= batch_size_) { + ResultCode ret = scribe_client_->Log(logs_); + int retries_left = retry_times_; + while (ret == TRY_LATER && retries_left > 0) { + Env::Default()->SleepForMicroseconds(retry_intervals_); + ret = scribe_client_->Log(logs_); + retries_left--; + } + + // Clear the local messages if either successfully write out + // or has failed in the last 10 calls. + if (ret == OK || logs_.size() > batch_size_ * 5) { + logs_.clear(); + } + } + + logger_mutex_.Unlock(); +} + +void ScribeLogger::MakeScribeMessage(std::string& output, + std::vector& cols) { + int sz = cols.size(); + int i = 0; + for (; i < sz - 1; i++) { + std::string& col = cols.at(i); + output += col; + output += ScribeLogger::COL_SEPERATOR; + } + std::string& col = cols.at(i); + output+=col; +} + +void ScribeLogger::Log_Deploy_Stats( + const std::string& db_version, + const std::string& machine_info, + const std::string& data_dir, + const uint64_t data_size, + const uint32_t file_number, + const std::string& data_size_per_level, + const std::string& file_number_per_level, + const int64_t& ts_unix) { + std::string message; + std::vector cols; + cols.push_back(db_version); + cols.push_back(machine_info); + cols.push_back(data_dir); + cols.push_back(boost::lexical_cast(data_size)); + cols.push_back(boost::lexical_cast(file_number)); + cols.push_back(data_size_per_level); + cols.push_back(file_number_per_level); + cols.push_back(boost::lexical_cast(ts_unix)); + MakeScribeMessage(message, cols); + return Log(ScribeLogger::DEPLOY_STATS_CATEGORY, message); +} + +ScribeLogger::~ScribeLogger(){ + delete scribe_client_; +} + +} diff --git a/scribe/scribe_logger.h b/scribe/scribe_logger.h new file mode 100644 index 000000000..2e56a6c18 --- /dev/null +++ b/scribe/scribe_logger.h @@ -0,0 +1,71 @@ +#ifndef SCRIBE_LOGGER_H_ +#define SCRIBE_LOGGER_H_ + +#include "scribe/if/gen-cpp/scribe.h" +#include "scribe/if/gen-cpp/scribe_types.h" +#include "thrift/lib/cpp/protocol/TProtocol.h" +#include "thrift/lib/cpp/transport/TSocket.h" +#include "thrift/lib/cpp/protocol/TBinaryProtocol.h" +#include "thrift/lib/cpp/transport/TBufferTransports.h" + +#include "leveldb/env.h" +#include "port/port.h" +#include "util/stats_logger.h" + +#include "boost/lexical_cast.hpp" + +using namespace Tleveldb; +using Tleveldb::scribeClient; + +using namespace apache::thrift; +using namespace apache::thrift::protocol; +using namespace apache::thrift::transport; +using boost::shared_ptr; + + +using namespace ::Tleveldb; + +namespace leveldb { + +class ScribeLogger : public StatsLogger{ +private: + std::string host_; + int port_; + int batch_size_; + + scribeClient* scribe_client_; + std::vector logs_; + port::Mutex logger_mutex_; + + int retry_times_; + uint32_t retry_intervals_; + + void MakeScribeMessage(std::string& output, std::vector& cols); + +public: + + static const std::string COL_SEPERATOR; + static const std::string DEPLOY_STATS_CATEGORY; + + ScribeLogger(const std::string& host, int port, + int retry_times=3, uint32_t retry_intervals=1000000, + int batch_size=1); + virtual ~ScribeLogger(); + + virtual void Log(const std::string& category, const std::string& message); + + virtual void Log_Deploy_Stats( + const std::string& db_version, + const std::string& machine_info, + const std::string& data_dir, + const uint64_t data_size, + const uint32_t file_number, + const std::string& data_size_per_level, + const std::string& file_number_per_level, + const int64_t& ts_unix + ); + +}; +} + +#endif /* SCRIBE_LOGGER_H_ */ diff --git a/thrift/README b/thrift/README index 6405a063a..f76c2bbf9 100644 --- a/thrift/README +++ b/thrift/README @@ -21,5 +21,5 @@ You can run the leveldb server unit tests by You can regenerate the thrift cpp files by doing the following cd ./thrift -thrift --gen cpp if/leveldb.thrift +bin/thrift --gen cpp if/leveldb.thrift diff --git a/thrift/bin/thrift b/thrift/bin/thrift new file mode 100755 index 000000000..a28684503 Binary files /dev/null and b/thrift/bin/thrift differ diff --git a/thrift/gen-cpp/leveldb_types.cpp b/thrift/gen-cpp/leveldb_types.cpp index 14465f87b..4654ee213 100644 --- a/thrift/gen-cpp/leveldb_types.cpp +++ b/thrift/gen-cpp/leveldb_types.cpp @@ -422,6 +422,76 @@ void reflectionInitializer_6731746507948871532(::apache::thrift::reflection::Sch f.name = "compression"; dt.fields[7] = f; } + { + ::apache::thrift::reflection::StructField f; + f.isRequired = true; + f.type = 5U; + f.name = "num_levels"; + dt.fields[8] = f; + } + { + ::apache::thrift::reflection::StructField f; + f.isRequired = true; + f.type = 5U; + f.name = "level0_file_num_compaction_trigger"; + dt.fields[9] = f; + } + { + ::apache::thrift::reflection::StructField f; + f.isRequired = true; + f.type = 5U; + f.name = "level0_slowdown_writes_trigger"; + dt.fields[10] = f; + } + { + ::apache::thrift::reflection::StructField f; + f.isRequired = true; + f.type = 5U; + f.name = "level0_stop_writes_trigger"; + dt.fields[11] = f; + } + { + ::apache::thrift::reflection::StructField f; + f.isRequired = true; + f.type = 5U; + f.name = "target_file_size_base"; + dt.fields[12] = f; + } + { + ::apache::thrift::reflection::StructField f; + f.isRequired = true; + f.type = 5U; + f.name = "target_file_size_multiplier"; + dt.fields[13] = f; + } + { + ::apache::thrift::reflection::StructField f; + f.isRequired = true; + f.type = 5U; + f.name = "max_bytes_for_level_base"; + dt.fields[14] = f; + } + { + ::apache::thrift::reflection::StructField f; + f.isRequired = true; + f.type = 5U; + f.name = "max_bytes_for_level_multiplier"; + dt.fields[15] = f; + } + { + ::apache::thrift::reflection::StructField f; + f.isRequired = true; + f.type = 5U; + f.name = "max_grandparent_overlap_factor"; + dt.fields[16] = f; + } + { + ::apache::thrift::reflection::StructField f; + f.isRequired = true; + f.type = 2U; + f.name = "disableDataSync"; + dt.fields[17] = f; + } schema.dataTypes[id] = dt; schema.names[dt.name] = id; } @@ -509,6 +579,86 @@ uint32_t DBOptions::read(apache::thrift::protocol::TProtocol* iprot) { xfer += iprot->skip(ftype); } break; + case 8: + if (ftype == apache::thrift::protocol::T_I32) { + xfer += iprot->readI32(this->num_levels); + this->__isset.num_levels = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 9: + if (ftype == apache::thrift::protocol::T_I32) { + xfer += iprot->readI32(this->level0_file_num_compaction_trigger); + this->__isset.level0_file_num_compaction_trigger = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 10: + if (ftype == apache::thrift::protocol::T_I32) { + xfer += iprot->readI32(this->level0_slowdown_writes_trigger); + this->__isset.level0_slowdown_writes_trigger = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 11: + if (ftype == apache::thrift::protocol::T_I32) { + xfer += iprot->readI32(this->level0_stop_writes_trigger); + this->__isset.level0_stop_writes_trigger = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 12: + if (ftype == apache::thrift::protocol::T_I32) { + xfer += iprot->readI32(this->target_file_size_base); + this->__isset.target_file_size_base = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 13: + if (ftype == apache::thrift::protocol::T_I32) { + xfer += iprot->readI32(this->target_file_size_multiplier); + this->__isset.target_file_size_multiplier = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 14: + if (ftype == apache::thrift::protocol::T_I32) { + xfer += iprot->readI32(this->max_bytes_for_level_base); + this->__isset.max_bytes_for_level_base = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 15: + if (ftype == apache::thrift::protocol::T_I32) { + xfer += iprot->readI32(this->max_bytes_for_level_multiplier); + this->__isset.max_bytes_for_level_multiplier = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 16: + if (ftype == apache::thrift::protocol::T_I32) { + xfer += iprot->readI32(this->max_grandparent_overlap_factor); + this->__isset.max_grandparent_overlap_factor = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 17: + if (ftype == apache::thrift::protocol::T_BOOL) { + xfer += iprot->readBool(this->disableDataSync); + this->__isset.disableDataSync = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -545,6 +695,36 @@ uint32_t DBOptions::write(apache::thrift::protocol::TProtocol* oprot) const { xfer += oprot->writeFieldBegin("compression", apache::thrift::protocol::T_I32, 7); xfer += oprot->writeI32((int32_t)this->compression); xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldBegin("num_levels", apache::thrift::protocol::T_I32, 8); + xfer += oprot->writeI32(this->num_levels); + xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldBegin("level0_file_num_compaction_trigger", apache::thrift::protocol::T_I32, 9); + xfer += oprot->writeI32(this->level0_file_num_compaction_trigger); + xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldBegin("level0_slowdown_writes_trigger", apache::thrift::protocol::T_I32, 10); + xfer += oprot->writeI32(this->level0_slowdown_writes_trigger); + xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldBegin("level0_stop_writes_trigger", apache::thrift::protocol::T_I32, 11); + xfer += oprot->writeI32(this->level0_stop_writes_trigger); + xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldBegin("target_file_size_base", apache::thrift::protocol::T_I32, 12); + xfer += oprot->writeI32(this->target_file_size_base); + xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldBegin("target_file_size_multiplier", apache::thrift::protocol::T_I32, 13); + xfer += oprot->writeI32(this->target_file_size_multiplier); + xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldBegin("max_bytes_for_level_base", apache::thrift::protocol::T_I32, 14); + xfer += oprot->writeI32(this->max_bytes_for_level_base); + xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldBegin("max_bytes_for_level_multiplier", apache::thrift::protocol::T_I32, 15); + xfer += oprot->writeI32(this->max_bytes_for_level_multiplier); + xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldBegin("max_grandparent_overlap_factor", apache::thrift::protocol::T_I32, 16); + xfer += oprot->writeI32(this->max_grandparent_overlap_factor); + xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldBegin("disableDataSync", apache::thrift::protocol::T_BOOL, 17); + xfer += oprot->writeBool(this->disableDataSync); + xfer += oprot->writeFieldEnd(); xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -561,6 +741,16 @@ void swap(DBOptions &a, DBOptions &b) { swap(a.block_size, b.block_size); swap(a.block_restart_interval, b.block_restart_interval); swap(a.compression, b.compression); + swap(a.num_levels, b.num_levels); + swap(a.level0_file_num_compaction_trigger, b.level0_file_num_compaction_trigger); + swap(a.level0_slowdown_writes_trigger, b.level0_slowdown_writes_trigger); + swap(a.level0_stop_writes_trigger, b.level0_stop_writes_trigger); + swap(a.target_file_size_base, b.target_file_size_base); + swap(a.target_file_size_multiplier, b.target_file_size_multiplier); + swap(a.max_bytes_for_level_base, b.max_bytes_for_level_base); + swap(a.max_bytes_for_level_multiplier, b.max_bytes_for_level_multiplier); + swap(a.max_grandparent_overlap_factor, b.max_grandparent_overlap_factor); + swap(a.disableDataSync, b.disableDataSync); swap(a.__isset, b.__isset); } @@ -579,6 +769,13 @@ void reflectionInitializer_8830325115029814540(::apache::thrift::reflection::Sch f.name = "sync"; dt.fields[1] = f; } + { + ::apache::thrift::reflection::StructField f; + f.isRequired = true; + f.type = 2U; + f.name = "disableWAL"; + dt.fields[2] = f; + } schema.dataTypes[id] = dt; schema.names[dt.name] = id; } @@ -616,6 +813,14 @@ uint32_t WriteOptions::read(apache::thrift::protocol::TProtocol* iprot) { xfer += iprot->skip(ftype); } break; + case 2: + if (ftype == apache::thrift::protocol::T_BOOL) { + xfer += iprot->readBool(this->disableWAL); + this->__isset.disableWAL = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -634,6 +839,9 @@ uint32_t WriteOptions::write(apache::thrift::protocol::TProtocol* oprot) const { xfer += oprot->writeFieldBegin("sync", apache::thrift::protocol::T_BOOL, 1); xfer += oprot->writeBool(this->sync); xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldBegin("disableWAL", apache::thrift::protocol::T_BOOL, 2); + xfer += oprot->writeBool(this->disableWAL); + xfer += oprot->writeFieldEnd(); xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -644,6 +852,7 @@ void swap(WriteOptions &a, WriteOptions &b) { (void)a; (void)b; swap(a.sync, b.sync); + swap(a.disableWAL, b.disableWAL); swap(a.__isset, b.__isset); } diff --git a/thrift/gen-cpp/leveldb_types.h b/thrift/gen-cpp/leveldb_types.h index 6b85d4d20..117ed4f71 100644 --- a/thrift/gen-cpp/leveldb_types.h +++ b/thrift/gen-cpp/leveldb_types.h @@ -238,7 +238,7 @@ class DBOptions { static const uint64_t _reflection_id = 6731746507948871532U; static void _reflection_register(::apache::thrift::reflection::Schema&); - DBOptions() : create_if_missing(0), error_if_exists(0), write_buffer_size(0), max_open_files(0), block_size(0), block_restart_interval(0), compression(static_cast(0)) { + DBOptions() : create_if_missing(0), error_if_exists(0), write_buffer_size(0), max_open_files(0), block_size(0), block_restart_interval(0), compression(static_cast(0)), num_levels(0), level0_file_num_compaction_trigger(0), level0_slowdown_writes_trigger(0), level0_stop_writes_trigger(0), target_file_size_base(0), target_file_size_multiplier(0), max_bytes_for_level_base(0), max_bytes_for_level_multiplier(0), max_grandparent_overlap_factor(0), disableDataSync(0) { } DBOptions(const DBOptions&) = default; @@ -254,6 +254,16 @@ class DBOptions { block_size = 0; block_restart_interval = 0; compression = static_cast(0); + num_levels = 0; + level0_file_num_compaction_trigger = 0; + level0_slowdown_writes_trigger = 0; + level0_stop_writes_trigger = 0; + target_file_size_base = 0; + target_file_size_multiplier = 0; + max_bytes_for_level_base = 0; + max_bytes_for_level_multiplier = 0; + max_grandparent_overlap_factor = 0; + disableDataSync = 0; __isset.__clear(); } @@ -266,6 +276,16 @@ class DBOptions { int32_t block_size; int32_t block_restart_interval; CompressionType compression; + int32_t num_levels; + int32_t level0_file_num_compaction_trigger; + int32_t level0_slowdown_writes_trigger; + int32_t level0_stop_writes_trigger; + int32_t target_file_size_base; + int32_t target_file_size_multiplier; + int32_t max_bytes_for_level_base; + int32_t max_bytes_for_level_multiplier; + int32_t max_grandparent_overlap_factor; + bool disableDataSync; struct __isset { __isset() { __clear(); } @@ -277,6 +297,16 @@ class DBOptions { block_size = false; block_restart_interval = false; compression = false; + num_levels = false; + level0_file_num_compaction_trigger = false; + level0_slowdown_writes_trigger = false; + level0_stop_writes_trigger = false; + target_file_size_base = false; + target_file_size_multiplier = false; + max_bytes_for_level_base = false; + max_bytes_for_level_multiplier = false; + max_grandparent_overlap_factor = false; + disableDataSync = false; } bool create_if_missing; bool error_if_exists; @@ -285,6 +315,16 @@ class DBOptions { bool block_size; bool block_restart_interval; bool compression; + bool num_levels; + bool level0_file_num_compaction_trigger; + bool level0_slowdown_writes_trigger; + bool level0_stop_writes_trigger; + bool target_file_size_base; + bool target_file_size_multiplier; + bool max_bytes_for_level_base; + bool max_bytes_for_level_multiplier; + bool max_grandparent_overlap_factor; + bool disableDataSync; } __isset; bool operator == (const DBOptions & rhs) const @@ -303,6 +343,26 @@ class DBOptions { return false; if (!(this->compression == rhs.compression)) return false; + if (!(this->num_levels == rhs.num_levels)) + return false; + if (!(this->level0_file_num_compaction_trigger == rhs.level0_file_num_compaction_trigger)) + return false; + if (!(this->level0_slowdown_writes_trigger == rhs.level0_slowdown_writes_trigger)) + return false; + if (!(this->level0_stop_writes_trigger == rhs.level0_stop_writes_trigger)) + return false; + if (!(this->target_file_size_base == rhs.target_file_size_base)) + return false; + if (!(this->target_file_size_multiplier == rhs.target_file_size_multiplier)) + return false; + if (!(this->max_bytes_for_level_base == rhs.max_bytes_for_level_base)) + return false; + if (!(this->max_bytes_for_level_multiplier == rhs.max_bytes_for_level_multiplier)) + return false; + if (!(this->max_grandparent_overlap_factor == rhs.max_grandparent_overlap_factor)) + return false; + if (!(this->disableDataSync == rhs.disableDataSync)) + return false; return true; } bool operator != (const DBOptions &rhs) const { @@ -324,7 +384,7 @@ class WriteOptions { static const uint64_t _reflection_id = 8830325115029814540U; static void _reflection_register(::apache::thrift::reflection::Schema&); - WriteOptions() : sync(0) { + WriteOptions() : sync(0), disableWAL(0) { } WriteOptions(const WriteOptions&) = default; @@ -334,25 +394,31 @@ class WriteOptions { void __clear() { sync = 0; + disableWAL = 0; __isset.__clear(); } virtual ~WriteOptions() throw() {} bool sync; + bool disableWAL; struct __isset { __isset() { __clear(); } void __clear() { sync = false; + disableWAL = false; } bool sync; + bool disableWAL; } __isset; bool operator == (const WriteOptions & rhs) const { if (!(this->sync == rhs.sync)) return false; + if (!(this->disableWAL == rhs.disableWAL)) + return false; return true; } bool operator != (const WriteOptions &rhs) const { diff --git a/thrift/if/leveldb.thrift b/thrift/if/leveldb.thrift index c78c04541..1768bb897 100644 --- a/thrift/if/leveldb.thrift +++ b/thrift/if/leveldb.thrift @@ -47,12 +47,23 @@ struct DBOptions { 4:i32 max_open_files; 5:i32 block_size; 6:i32 block_restart_interval; - 7:CompressionType compression + 7:CompressionType compression, + 8:i32 num_levels, + 9:i32 level0_file_num_compaction_trigger, + 10:i32 level0_slowdown_writes_trigger, + 11:i32 level0_stop_writes_trigger, + 12:i32 target_file_size_base, + 13:i32 target_file_size_multiplier, + 14:i32 max_bytes_for_level_base, + 15:i32 max_bytes_for_level_multiplier, + 16:i32 max_grandparent_overlap_factor, + 17:bool disableDataSync } // Options for writing struct WriteOptions { - 1:bool sync + 1:bool sync, + 2:bool disableWAL } struct Snapshot { diff --git a/thrift/server_options.h b/thrift/server_options.h index 40a16edd1..797c9a566 100644 --- a/thrift/server_options.h +++ b/thrift/server_options.h @@ -73,10 +73,10 @@ public: cache_size_ = l; } else if (sscanf(argv[i], "--cache_numshardbits=%d%c", &n, &junk) == 1) { cache_numshardbits_ = n; - } else if (strncmp(argv[i], "--hostname=", 10) == 0) { - hostname_ = argv[i] + 10; - } else if (strncmp(argv[i], "--rootdir=", 9) == 0) { - rootdir_ = argv[i] + 9; + } else if (strncmp(argv[i], "--hostname=", 11) == 0) { + hostname_ = argv[i] + 11; + } else if (strncmp(argv[i], "--rootdir=", 10) == 0) { + rootdir_ = argv[i] + 10; } else { fprintf(stderr, "Invalid flag '%s'\n", argv[i]); return false; diff --git a/thrift/server_utils.cpp b/thrift/server_utils.cpp index d155bb74f..120737f8b 100644 --- a/thrift/server_utils.cpp +++ b/thrift/server_utils.cpp @@ -75,6 +75,26 @@ class DBHandler : virtual public DBIf { } else if (dboptions.compression == kSnappyCompression) { options.compression = leveldb::kSnappyCompression; } + if (dboptions.num_levels > 0) + options.num_levels = dboptions.num_levels; + if (dboptions.level0_file_num_compaction_trigger > 0) + options.level0_file_num_compaction_trigger = dboptions.level0_file_num_compaction_trigger; + if (dboptions.level0_slowdown_writes_trigger > 0) + options.level0_slowdown_writes_trigger = dboptions.level0_slowdown_writes_trigger; + if (dboptions.level0_stop_writes_trigger) + options.level0_stop_writes_trigger = dboptions.level0_stop_writes_trigger; + if (dboptions.target_file_size_base > 0) + options.target_file_size_base = dboptions.target_file_size_base; + if (dboptions.target_file_size_multiplier > 0) + options.target_file_size_multiplier = dboptions.target_file_size_multiplier; + if (dboptions.max_bytes_for_level_base) + options.max_bytes_for_level_base = dboptions.max_bytes_for_level_base; + if (dboptions.max_bytes_for_level_multiplier) + options.max_bytes_for_level_multiplier = dboptions.max_bytes_for_level_multiplier; + if (dboptions.max_grandparent_overlap_factor) + options.max_grandparent_overlap_factor = dboptions.max_grandparent_overlap_factor; + if (dboptions.disableDataSync) + options.disableDataSync = dboptions.disableDataSync; openHandles->add(options, dbname, dbdir); _return.dbname = dbname; } @@ -91,6 +111,7 @@ class DBHandler : virtual public DBIf { const WriteOptions& options) { leveldb::WriteOptions woptions; woptions.sync = options.sync; + woptions.disableWAL = options.disableWAL; leveldb::Slice key, value; key.data_ = kv.key.data.data(); key.size_ = kv.key.size; @@ -111,6 +132,7 @@ class DBHandler : virtual public DBIf { const WriteOptions& options) { leveldb::WriteOptions woptions; woptions.sync = options.sync; + woptions.disableWAL = options.disableWAL; leveldb::Slice key; key.data_ = kv.data.data(); key.size_ = kv.size; @@ -130,6 +152,7 @@ class DBHandler : virtual public DBIf { leveldb::WriteOptions woptions; leveldb::WriteBatch lbatch; woptions.sync = options.sync; + woptions.disableWAL = options.disableWAL; leveldb::Slice key, value; for (unsigned int i = 0; i < batch.size(); i++) { kv one = batch[i]; diff --git a/tools/manifest_dump.cc b/tools/manifest_dump.cc new file mode 100644 index 000000000..5f4d272c6 --- /dev/null +++ b/tools/manifest_dump.cc @@ -0,0 +1,71 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/version_set.h" + +#include +#include +#include "db/filename.h" +#include "db/log_reader.h" +#include "db/log_writer.h" +#include "db/memtable.h" +#include "db/table_cache.h" +#include "leveldb/env.h" +#include "leveldb/table_builder.h" +#include "table/merger.h" +#include "table/two_level_iterator.h" +#include "util/coding.h" +#include "util/logging.h" + +static int verbose = 0; + +using namespace leveldb; + +// +// Takes a manifest file and dumps out all metedata +// +int main(int argc, char** argv) { + + // parse command line options + int n; + char junk; + int foundfile = 0; + std::string manifestfile; + for (int i = 1; i < argc; i++) { + std::string param(argv[i]); + if ((n = param.find("--file=")) != std::string::npos) { + manifestfile = param.substr(strlen("--file=")); + foundfile = 1; + } else if (sscanf(argv[i], "--verbose=%d%c", &n, &junk) == 1 && + (n == 0 || n == 1)) { + verbose = n; + } + } + if (!foundfile) { + fprintf(stderr, "%s [--verbose=0|1] [--file=pathname of manifest file\n", + argv[0]); + abort(); + } + + if (verbose) { + printf("Processing Manifest file %s\n", manifestfile.c_str()); + } + + Options options; + std::string file(manifestfile); + std::string dbname("dummy"); + TableCache* tc = new TableCache(dbname, &options, 10); + const InternalKeyComparator* cmp = new InternalKeyComparator(options.comparator); + + VersionSet* versions = new VersionSet(dbname, &options, + tc, cmp); + Status s = versions->DumpManifest(options, file); + if (!s.ok()) { + printf("Error in processing file %s %s\n", manifestfile.c_str(), + s.ToString().c_str()); + } + if (verbose) { + printf("Processing Manifest file %s done\n", manifestfile.c_str()); + } +} diff --git a/util/build_version.h b/util/build_version.h new file mode 100644 index 000000000..bcf869372 --- /dev/null +++ b/util/build_version.h @@ -0,0 +1,14 @@ +/*version.h*/ +#ifndef VERSION_H_ +#define VERSION_H_ + +// these variables tell us about the git config and time +extern const char* leveldb_build_git_sha; +extern const char* leveldb_build_git_datetime; + +// these variables tell us when the compilation occured +extern const char* leveldb_build_compile_time; +extern const char* leveldb_build_compile_date; + + +#endif /* VERSION_H_ */ diff --git a/util/env_posix.cc b/util/env_posix.cc index 95d324602..b343e4c70 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -3,6 +3,7 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include +#include #include #include #include @@ -31,6 +32,10 @@ namespace leveldb { namespace { +// list of pathnames that are locked +static std::set lockedFiles; +static port::Mutex mutex_lockedFiles; + static Status IOError(const std::string& context, int err_number) { return Status::IOError(context, strerror(err_number)); } @@ -291,7 +296,28 @@ class PosixMmapFile : public WritableFile { } }; -static int LockOrUnlock(int fd, bool lock) { +static int LockOrUnlock(const std::string& fname, int fd, bool lock) { + mutex_lockedFiles.Lock(); + if (lock) { + // If it already exists in the lockedFiles set, then it is already locked, + // and fail this lock attempt. Otherwise, insert it into lockedFiles. + // This check is needed because fcntl() does not detect lock conflict + // if the fcntl is issued by the same thread that earlier acquired + // this lock. + if (lockedFiles.insert(fname).second == false) { + mutex_lockedFiles.Unlock(); + errno = ENOLCK; + return -1; + } + } else { + // If we are unlocking, then verify that we had locked it earlier, + // it should already exist in lockedFiles. Remove it from lockedFiles. + if (lockedFiles.erase(fname) != 1) { + mutex_lockedFiles.Unlock(); + errno = ENOLCK; + return -1; + } + } errno = 0; struct flock f; memset(&f, 0, sizeof(f)); @@ -299,12 +325,19 @@ static int LockOrUnlock(int fd, bool lock) { f.l_whence = SEEK_SET; f.l_start = 0; f.l_len = 0; // Lock/unlock entire file - return fcntl(fd, F_SETLK, &f); + int value = fcntl(fd, F_SETLK, &f); + if (value == -1 && lock) { + // if there is an error in locking, then remove the pathname from lockedfiles + lockedFiles.erase(fname); + } + mutex_lockedFiles.Unlock(); + return value; } class PosixFileLock : public FileLock { public: int fd_; + std::string filename; }; class PosixEnv : public Env { @@ -435,12 +468,13 @@ class PosixEnv : public Env { int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644); if (fd < 0) { result = IOError(fname, errno); - } else if (LockOrUnlock(fd, true) == -1) { + } else if (LockOrUnlock(fname, fd, true) == -1) { result = IOError("lock " + fname, errno); close(fd); } else { PosixFileLock* my_lock = new PosixFileLock; my_lock->fd_ = fd; + my_lock->filename = fname; *lock = my_lock; } return result; @@ -449,7 +483,7 @@ class PosixEnv : public Env { virtual Status UnlockFile(FileLock* lock) { PosixFileLock* my_lock = reinterpret_cast(lock); Status result; - if (LockOrUnlock(my_lock->fd_, false) == -1) { + if (LockOrUnlock(my_lock->filename, my_lock->fd_, false) == -1) { result = IOError("unlock", errno); } close(my_lock->fd_); @@ -503,6 +537,43 @@ class PosixEnv : public Env { usleep(micros); } + virtual Status GetHostName(char* name, uint len) { + int ret = gethostname(name, len); + if (ret < 0) { + if (errno == EFAULT || errno == EINVAL) + return Status::InvalidArgument(strerror(errno)); + else + return IOError("GetHostName", errno); + } + return Status::OK(); + } + + virtual Status GetCurrentTime(int64_t* unix_time) { + time_t ret = time(NULL); + if (ret == (time_t) -1) { + return IOError("GetCurrentTime", errno); + } + *unix_time = (int64_t) ret; + return Status::OK(); + } + + virtual Status GetAbsolutePath(const std::string& db_path, + std::string* output_path) { + if (db_path.find('/') == 0) { + *output_path = db_path; + return Status::OK(); + } + + char the_path[256]; + char* ret = getcwd(the_path, 256); + if (ret == NULL) { + return Status::IOError(strerror(errno)); + } + + *output_path = ret; + return Status::OK(); + } + private: void PthreadCall(const char* label, int result) { if (result != 0) { diff --git a/util/filelock_test.cc b/util/filelock_test.cc new file mode 100644 index 000000000..ea24527c3 --- /dev/null +++ b/util/filelock_test.cc @@ -0,0 +1,57 @@ +// Copyright (c) 2012 Facebook. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "leveldb/status.h" +#include "leveldb/env.h" + +#include +#include "util/coding.h" +#include "util/testharness.h" + +namespace leveldb { + +class LockTest { + public: + static LockTest* current_; + std::string file_; + leveldb::Env* env_; + + LockTest() : file_(test::TmpDir() + "/db_testlock_file"), + env_(leveldb::Env::Default()) { + current_ = this; + } + + ~LockTest() { + } + + Status LockFile(FileLock** db_lock) { + return env_->LockFile(file_, db_lock); + } + + Status UnlockFile(FileLock* db_lock) { + return env_->UnlockFile(db_lock); + } +}; +LockTest* LockTest::current_; + +TEST(LockTest, LockBySameThread) { + FileLock* lock1; + FileLock* lock2; + + // acquire a lock on a file + ASSERT_OK(LockFile(&lock1)); + + // re-acquire the lock on the same file. This should fail. + ASSERT_TRUE(LockFile(&lock2).IsIOError()); + + // release the lock + ASSERT_OK(UnlockFile(lock1)); + +} + +} // namespace leveldb + +int main(int argc, char** argv) { + return leveldb::test::RunAllTests(); +} diff --git a/util/options.cc b/util/options.cc index 50ed00f9c..c9306387d 100644 --- a/util/options.cc +++ b/util/options.cc @@ -6,6 +6,7 @@ #include "leveldb/comparator.h" #include "leveldb/env.h" +#include "leveldb/filter_policy.h" namespace leveldb { @@ -28,14 +29,36 @@ Options::Options() level0_stop_writes_trigger(12), max_mem_compaction_level(2), target_file_size_base(2 * 1048576), - target_file_size_multiplier(10), + target_file_size_multiplier(1), max_bytes_for_level_base(10 * 1048576), max_bytes_for_level_multiplier(10), expanded_compaction_factor(25), max_grandparent_overlap_factor(10), filter_policy(NULL), statistics(NULL), - disableDataSync(false) { + disableDataSync(false), + db_stats_log_interval(1800) { } +void +Options::Dump( + Logger * log) const +{ + Log(log," Options.comparator: %s", comparator->Name()); + Log(log," Options.create_if_missing: %d", create_if_missing); + Log(log," Options.error_if_exists: %d", error_if_exists); + Log(log," Options.paranoid_checks: %d", paranoid_checks); + Log(log," Options.env: %p", env); + Log(log," Options.info_log: %p", info_log); + Log(log," Options.write_buffer_size: %zd", write_buffer_size); + Log(log," Options.max_open_files: %d", max_open_files); + Log(log," Options.block_cache: %p", block_cache); + Log(log," Options.block_size: %zd", block_size); + Log(log,"Options.block_restart_interval: %d", block_restart_interval); + Log(log," Options.compression: %d", compression); + Log(log," Options.filter_policy: %s", filter_policy == NULL ? "NULL" : filter_policy->Name()); + +} // Options::Dump + + } // namespace leveldb diff --git a/util/stats_logger.h b/util/stats_logger.h new file mode 100644 index 000000000..a14362c78 --- /dev/null +++ b/util/stats_logger.h @@ -0,0 +1,23 @@ +#ifndef STATS_LOGGER_H_ +#define STATS_LOGGER_H_ + +namespace leveldb { + +class StatsLogger { + + public: + + virtual void Log_Deploy_Stats(const std::string& db_version, + const std::string& machine_info, + const std::string& data_dir, + const uint64_t data_size, + const uint32_t file_number, + const std::string& data_size_per_level, + const std::string& file_number_per_level, + const int64_t& ts_unix) = 0; + +}; + +} + +#endif