adding a scribe logger in leveldb to log leveldb deploy stats

Summary:
as subject.

A new log is written to scribe via thrift client when a new db is opened and when there is
a compaction.

a new option var scribe_log_db_stats is added.

Test Plan: manually checked using command "ptail -time 0 leveldb_deploy_stats"

Reviewers: dhruba

Differential Revision: https://reviews.facebook.net/D4659
This commit is contained in:
heyongqiang 2012-08-14 15:20:36 -07:00
parent e56b2c5a31
commit 6ba1f17789
22 changed files with 2911 additions and 23 deletions

View File

@ -102,9 +102,14 @@ esac
# prune take effect.
DIRS="util db table"
if test "$USE_THRIFT"; then
DIRS+=" thrift/gen-cpp thrift/server_utils.cpp "
DIRS+=" thrift/server_utils.cpp thrift/gen-cpp "
THRIFTSERVER=leveldb_server
fi
if test "$USE_SCRIBE"; then
DIRS+=" scribe "
fi
set -f # temporarily disable globbing so that our patterns aren't expanded
PRUNE_TEST="-name *test*.cc -prune"
PRUNE_BENCH="-name *_bench.cc -prune"
@ -197,6 +202,11 @@ if test "$USE_THRIFT"; then
PLATFORM_LDFLAGS+=$THRIFT_LDFLAGS
fi
#shall we build with scribe
if test "$USE_SCRIBE"; then
COMMON_FLAGS="$COMMON_FLAGS -I./thrift/lib/ -DUSE_SCRIBE"
fi
PLATFORM_CCFLAGS="$PLATFORM_CCFLAGS $COMMON_FLAGS"
PLATFORM_CXXFLAGS="$PLATFORM_CXXFLAGS $COMMON_FLAGS"

View File

@ -144,6 +144,20 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
versions_ = new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_);
#ifdef USE_SCRIBE
logger_ = new ScribeLogger("localhost", 1456);
#endif
char name[100];
Status st = env_->GetHostName(name, 100);
if(st.ok()) {
host_name_ = name;
} else {
Log(options_.info_log, "Can't get hostname, use localhost as host name.");
host_name_ = "localhost";
}
last_log_ts = 0;
}
DBImpl::~DBImpl() {
@ -536,6 +550,7 @@ Status DBImpl::CompactMemTable() {
imm_ = NULL;
has_imm_.Release_Store(NULL);
DeleteObsoleteFiles();
MaybeScheduleLogDBDeployStats();
}
return s;
@ -559,15 +574,15 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
}
int DBImpl::NumberLevels() {
return options_.num_levels;
return options_.num_levels;
}
int DBImpl::MaxMemCompactionLevel() {
return options_.max_mem_compaction_level;
return options_.max_mem_compaction_level;
}
int DBImpl::Level0StopWriteTrigger() {
return options_.level0_stop_writes_trigger;
return options_.level0_stop_writes_trigger;
}
Status DBImpl::Flush(const FlushOptions& options) {
@ -620,34 +635,33 @@ Status DBImpl::FlushMemTable(const FlushOptions& options) {
}
Status DBImpl::WaitForCompactMemTable() {
Status s;
// Wait until the compaction completes
MutexLock l(&mutex_);
while (imm_ != NULL && bg_error_.ok()) {
bg_cv_.Wait();
}
if (imm_ != NULL) {
s = bg_error_;
}
return s;
Status s;
// Wait until the compaction completes
MutexLock l(&mutex_);
while (imm_ != NULL && bg_error_.ok()) {
bg_cv_.Wait();
}
if (imm_ != NULL) {
s = bg_error_;
}
return s;
}
Status DBImpl::TEST_CompactMemTable() {
return FlushMemTable(FlushOptions());
}
Status DBImpl::TEST_WaitForCompactMemTable() {
return WaitForCompactMemTable();
return WaitForCompactMemTable();
}
Status DBImpl::TEST_WaitForCompact() {
// Wait until the compaction completes
MutexLock l(&mutex_);
while (bg_compaction_scheduled_ && bg_error_.ok()) {
bg_cv_.Wait();
}
return bg_error_;
// Wait until the compaction completes
MutexLock l(&mutex_);
while (bg_compaction_scheduled_ && bg_error_.ok()) {
bg_cv_.Wait();
}
return bg_error_;
}
void DBImpl::MaybeScheduleCompaction() {
@ -678,6 +692,8 @@ void DBImpl::BackgroundCall() {
}
bg_compaction_scheduled_ = false;
MaybeScheduleLogDBDeployStats();
// Previous compaction may have produced too many files in a level,
// so reschedule another compaction if needed.
MaybeScheduleCompaction();
@ -1482,6 +1498,7 @@ Status DB::Open(const Options& options, const std::string& dbname,
if (s.ok()) {
impl->DeleteObsoleteFiles();
impl->MaybeScheduleCompaction();
impl->MaybeScheduleLogDBDeployStats();
}
}
impl->mutex_.Unlock();

View File

@ -7,12 +7,20 @@
#include <deque>
#include <set>
#include <atomic>
#include "db/dbformat.h"
#include "db/log_writer.h"
#include "db/snapshot.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "port/port.h"
#include "util/stats_logger.h"
#ifdef USE_SCRIBE
#include "scribe/scribe_logger.h"
#endif
#include <boost/lexical_cast.hpp>
namespace leveldb {
@ -107,6 +115,9 @@ class DBImpl : public DB {
// Wait for memtable compaction
Status WaitForCompactMemTable();
void MaybeScheduleLogDBDeployStats();
static void LogDBDeployStats(void* db);
void MaybeScheduleCompaction();
static void BGWork(void* db);
void BackgroundCall();
@ -144,6 +155,8 @@ class DBImpl : public DB {
uint64_t logfile_number_;
log::Writer* log_;
std::string host_name_;
// Queue of writers.
std::deque<Writer*> writers_;
WriteBatch* tmp_batch_;
@ -172,6 +185,10 @@ class DBImpl : public DB {
// Have we encountered a background error in paranoid mode?
Status bg_error_;
StatsLogger* logger_;
std::atomic<int64_t> last_log_ts;
// Per level compaction stats. stats_[level] stores the stats for
// compactions that produced data for the specified "level".
struct CompactionStats {

72
db/db_stats_logger.cc Normal file
View File

@ -0,0 +1,72 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_impl.h"
#include <string>
#include <stdint.h>
#include "db/version_set.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
namespace leveldb {
void DBImpl::MaybeScheduleLogDBDeployStats() {
// There is a lock in the actual logger.
if (!logger_ || options_.db_stats_log_interval < 0
|| host_name_.empty()) {
return;
}
if (shutting_down_.Acquire_Load()) {
// Already scheduled
} else {
int64_t current_ts = 0;
Status st = env_->GetCurrentTime(&current_ts);
if (!st.ok()) {
return;
}
if ((current_ts - last_log_ts) < options_.db_stats_log_interval) {
return;
}
last_log_ts = current_ts;
env_->Schedule(&DBImpl::LogDBDeployStats, this);
}
}
void DBImpl::LogDBDeployStats(void* db) {
DBImpl* db_inst = reinterpret_cast<DBImpl*>(db);
if (db_inst->shutting_down_.Acquire_Load()) {
return;
}
std::string version_info;
version_info += boost::lexical_cast<std::string>(kMajorVersion);
version_info += ".";
version_info += boost::lexical_cast<std::string>(kMinorVersion);
std::string data_dir;
db_inst->env_->GetAbsolutePath(db_inst->dbname_, &data_dir);
uint64_t file_total_size = 0;
uint32_t file_total_num = 0;
for (int i = 0; i < db_inst->versions_->NumberLevels(); i++) {
file_total_num += db_inst->versions_->NumLevelFiles(i);
file_total_size += db_inst->versions_->NumLevelBytes(i);
}
VersionSet::LevelSummaryStorage scratch;
const char* file_num_summary = db_inst->versions_->LevelSummary(&scratch);
std::string file_num_per_level(file_num_summary);
const char* file_size_summary = db_inst->versions_->LevelDataSizeSummary(
&scratch);
std::string data_size_per_level(file_num_summary);
int64_t unix_ts;
db_inst->env_->GetCurrentTime(&unix_ts);
db_inst->logger_->Log_Deploy_Stats(version_info, db_inst->host_name_,
data_dir, file_total_size, file_total_num, file_num_per_level,
data_size_per_level, unix_ts);
}
}

View File

@ -1145,6 +1145,21 @@ const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const {
return scratch->buffer;
}
const char* VersionSet::LevelDataSizeSummary(
LevelSummaryStorage* scratch) const {
int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size[");
for (int i = 0; i < NumberLevels(); i++) {
int sz = sizeof(scratch->buffer) - len;
int ret = snprintf(scratch->buffer + len, sz, "%ld ",
NumLevelBytes(i));
if (ret < 0 || ret >= sz)
break;
len += ret;
}
snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]");
return scratch->buffer;
}
uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
uint64_t result = 0;
for (int level = 0; level < NumberLevels(); level++) {

View File

@ -241,6 +241,10 @@ class VersionSet {
// printf contents (for debugging)
Status DumpManifest(Options& options, std::string& manifestFileName);
// Return a human-readable short (single-line) summary of the data size
// of files per level. Uses *scratch as backing store.
const char* LevelDataSizeSummary(LevelSummaryStorage* scratch) const;
private:
class Builder;

View File

@ -11,6 +11,10 @@ TOOLCHAIN_LIB_BASE="/mnt/gvfs/third-party/$TOOLCHAIN_REV/gcc-4.6.2-glibc-2.13"
# always build thrift server
export USE_THRIFT=1
if ! test "$NO_SCRIBE"; then
export USE_SCRIBE=1
fi
# location of libhdfs libraries
if test "$USE_HDFS"; then
JAVA_HOME="/usr/local/jdk-6u22-64"

View File

@ -122,6 +122,20 @@ class HdfsEnv : public Env {
posixEnv->SleepForMicroseconds(micros);
}
virtual Status GetHostName(char* name, uint len) {
return posixEnv->GetHostName(name, len);
}
virtual Status GetCurrentTime(int64_t* unix_time) {
return posixEnv->NowUnixTime(unix_time);
}
virtual Status GetAbsolutePath(const std::string& db_path,
std::string* output_path) {
return posixEnv->GetAbsolutePath(db_path, output_path);
}
static uint64_t gettid() {
assert(sizeof(pthread_t) <= sizeof(uint64_t));
return (uint64_t)pthread_self();
@ -245,6 +259,14 @@ class HdfsEnv : public Env {
virtual uint64_t NowMicros() {}
virtual void SleepForMicroseconds(int micros) {}
virtual Status GetHostName(char* name, uint len) {}
virtual Status GetCurrentTime(int64_t* unix_time) {}
virtual Status GetAbsolutePath(const std::string& db_path,
std::string* outputpath) {}
};
}

View File

@ -145,6 +145,16 @@ class Env {
// Sleep/delay the thread for the perscribed number of micro-seconds.
virtual void SleepForMicroseconds(int micros) = 0;
// Get the current host name.
virtual Status GetHostName(char* name, uint len) = 0;
// Get the number of seconds since the Epoch, 1970-01-01 00:00:00 (UTC).
virtual Status GetCurrentTime(int64_t* unix_time) = 0;
// Get full directory name for this db.
virtual Status GetAbsolutePath(const std::string& db_path,
std::string* output_path) = 0;
private:
// No copying allowed
Env(const Env&);

View File

@ -198,6 +198,12 @@ struct Options {
// Default: false
bool disableDataSync;
// This number controls how often a new scribe log about
// db deploy stats is written out.
// -1 indicates no logging at all.
// Default value is 1800 (half an hour).
int db_stats_log_interval;
// Create an Options object with default values for all fields.
Options();
};

1012
scribe/if/gen-cpp/scribe.cpp Normal file

File diff suppressed because it is too large Load Diff

593
scribe/if/gen-cpp/scribe.h Normal file
View File

@ -0,0 +1,593 @@
/**
* Autogenerated by Thrift
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
#ifndef _Tleveldb_scribe_H
#define _Tleveldb_scribe_H
#include <TDispatchProcessor.h>
#include "scribe_types.h"
namespace Tleveldb {
class scribeIf {
public:
virtual ~scribeIf() {}
virtual ResultCode Log(const std::vector<LogEntry> & messages) = 0;
virtual void LogMulti(std::vector<ResultCode> & _return, const std::vector<LogEntry> & messages) = 0;
virtual ResultCode LogCompressedMsg(const std::string& compressedMessages) = 0;
};
class scribeIfFactory {
public:
typedef scribeIf Handler;
virtual ~scribeIfFactory() {}
virtual scribeIf* getHandler(::apache::thrift::server::TConnectionContext* ctx) = 0;
virtual void releaseHandler(scribeIf* handler) = 0;
};
class scribeIfSingletonFactory : virtual public scribeIfFactory {
public:
scribeIfSingletonFactory(const boost::shared_ptr<scribeIf>& iface) : iface_(iface) {}
virtual ~scribeIfSingletonFactory() {}
virtual scribeIf* getHandler(::apache::thrift::server::TConnectionContext*) {
return iface_.get();
}
virtual void releaseHandler(scribeIf* handler) {}
protected:
boost::shared_ptr<scribeIf> iface_;
};
class scribeNull : virtual public scribeIf {
public:
virtual ~scribeNull() {}
ResultCode Log(const std::vector<LogEntry> & /* messages */) {
ResultCode _return = (ResultCode)0;
return _return;
}
void LogMulti(std::vector<ResultCode> & /* _return */, const std::vector<LogEntry> & /* messages */) {
return;
}
ResultCode LogCompressedMsg(const std::string& /* compressedMessages */) {
ResultCode _return = (ResultCode)0;
return _return;
}
};
class scribe_Log_args {
public:
static const uint64_t _reflection_id = 5902265217339133004U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
scribe_Log_args() {
}
scribe_Log_args(const scribe_Log_args&) = default;
scribe_Log_args& operator=(const scribe_Log_args&) = default;
scribe_Log_args(scribe_Log_args&&) = default;
scribe_Log_args& operator=(scribe_Log_args&&) = default;
void __clear() {
messages.clear();
__isset.__clear();
}
virtual ~scribe_Log_args() throw() {}
std::vector<LogEntry> messages;
struct __isset {
__isset() { __clear(); }
void __clear() {
messages = false;
}
bool messages;
} __isset;
bool operator == (const scribe_Log_args & rhs) const
{
if (!(this->messages == rhs.messages))
return false;
return true;
}
bool operator != (const scribe_Log_args &rhs) const {
return !(*this == rhs);
}
bool operator < (const scribe_Log_args & ) const;
uint32_t read(apache::thrift::protocol::TProtocol* iprot);
uint32_t write(apache::thrift::protocol::TProtocol* oprot) const;
};
class scribe_Log_pargs {
public:
static const uint64_t _reflection_id = 5555604010648986412U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
virtual ~scribe_Log_pargs() throw() {}
const std::vector<LogEntry> * messages;
uint32_t write(apache::thrift::protocol::TProtocol* oprot) const;
};
class scribe_Log_result {
public:
static const uint64_t _reflection_id = 18205781396971565932U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
scribe_Log_result() : success(static_cast<ResultCode>(0)) {
}
scribe_Log_result(const scribe_Log_result&) = default;
scribe_Log_result& operator=(const scribe_Log_result&) = default;
scribe_Log_result(scribe_Log_result&&) = default;
scribe_Log_result& operator=(scribe_Log_result&&) = default;
void __clear() {
success = static_cast<ResultCode>(0);
__isset.__clear();
}
virtual ~scribe_Log_result() throw() {}
ResultCode success;
struct __isset {
__isset() { __clear(); }
void __clear() {
success = false;
}
bool success;
} __isset;
bool operator == (const scribe_Log_result & rhs) const
{
if (!(this->success == rhs.success))
return false;
return true;
}
bool operator != (const scribe_Log_result &rhs) const {
return !(*this == rhs);
}
bool operator < (const scribe_Log_result & ) const;
uint32_t read(apache::thrift::protocol::TProtocol* iprot);
uint32_t write(apache::thrift::protocol::TProtocol* oprot) const;
};
class scribe_Log_presult {
public:
static const uint64_t _reflection_id = 12945584136895385836U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
virtual ~scribe_Log_presult() throw() {}
ResultCode* success;
struct __isset {
__isset() { __clear(); }
void __clear() {
success = false;
}
bool success;
} __isset;
uint32_t read(apache::thrift::protocol::TProtocol* iprot);
};
class scribe_LogMulti_args {
public:
static const uint64_t _reflection_id = 7590876486278061516U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
scribe_LogMulti_args() {
}
scribe_LogMulti_args(const scribe_LogMulti_args&) = default;
scribe_LogMulti_args& operator=(const scribe_LogMulti_args&) = default;
scribe_LogMulti_args(scribe_LogMulti_args&&) = default;
scribe_LogMulti_args& operator=(scribe_LogMulti_args&&) = default;
void __clear() {
messages.clear();
__isset.__clear();
}
virtual ~scribe_LogMulti_args() throw() {}
std::vector<LogEntry> messages;
struct __isset {
__isset() { __clear(); }
void __clear() {
messages = false;
}
bool messages;
} __isset;
bool operator == (const scribe_LogMulti_args & rhs) const
{
if (!(this->messages == rhs.messages))
return false;
return true;
}
bool operator != (const scribe_LogMulti_args &rhs) const {
return !(*this == rhs);
}
bool operator < (const scribe_LogMulti_args & ) const;
uint32_t read(apache::thrift::protocol::TProtocol* iprot);
uint32_t write(apache::thrift::protocol::TProtocol* oprot) const;
};
class scribe_LogMulti_pargs {
public:
static const uint64_t _reflection_id = 9124384543551655628U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
virtual ~scribe_LogMulti_pargs() throw() {}
const std::vector<LogEntry> * messages;
uint32_t write(apache::thrift::protocol::TProtocol* oprot) const;
};
class scribe_LogMulti_result {
public:
static const uint64_t _reflection_id = 4828367046341273164U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
scribe_LogMulti_result() {
}
scribe_LogMulti_result(const scribe_LogMulti_result&) = default;
scribe_LogMulti_result& operator=(const scribe_LogMulti_result&) = default;
scribe_LogMulti_result(scribe_LogMulti_result&&) = default;
scribe_LogMulti_result& operator=(scribe_LogMulti_result&&) = default;
void __clear() {
success.clear();
__isset.__clear();
}
virtual ~scribe_LogMulti_result() throw() {}
std::vector<ResultCode> success;
struct __isset {
__isset() { __clear(); }
void __clear() {
success = false;
}
bool success;
} __isset;
bool operator == (const scribe_LogMulti_result & rhs) const
{
if (!(this->success == rhs.success))
return false;
return true;
}
bool operator != (const scribe_LogMulti_result &rhs) const {
return !(*this == rhs);
}
bool operator < (const scribe_LogMulti_result & ) const;
uint32_t read(apache::thrift::protocol::TProtocol* iprot);
uint32_t write(apache::thrift::protocol::TProtocol* oprot) const;
};
class scribe_LogMulti_presult {
public:
static const uint64_t _reflection_id = 5642041737363050316U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
virtual ~scribe_LogMulti_presult() throw() {}
std::vector<ResultCode> * success;
struct __isset {
__isset() { __clear(); }
void __clear() {
success = false;
}
bool success;
} __isset;
uint32_t read(apache::thrift::protocol::TProtocol* iprot);
};
class scribe_LogCompressedMsg_args {
public:
static const uint64_t _reflection_id = 12705053036625273964U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
scribe_LogCompressedMsg_args() : compressedMessages("") {
}
scribe_LogCompressedMsg_args(const scribe_LogCompressedMsg_args&) = default;
scribe_LogCompressedMsg_args& operator=(const scribe_LogCompressedMsg_args&) = default;
scribe_LogCompressedMsg_args(scribe_LogCompressedMsg_args&&) = default;
scribe_LogCompressedMsg_args& operator=(scribe_LogCompressedMsg_args&&) = default;
void __clear() {
compressedMessages = "";
__isset.__clear();
}
virtual ~scribe_LogCompressedMsg_args() throw() {}
std::string compressedMessages;
struct __isset {
__isset() { __clear(); }
void __clear() {
compressedMessages = false;
}
bool compressedMessages;
} __isset;
bool operator == (const scribe_LogCompressedMsg_args & rhs) const
{
if (!(this->compressedMessages == rhs.compressedMessages))
return false;
return true;
}
bool operator != (const scribe_LogCompressedMsg_args &rhs) const {
return !(*this == rhs);
}
bool operator < (const scribe_LogCompressedMsg_args & ) const;
uint32_t read(apache::thrift::protocol::TProtocol* iprot);
uint32_t write(apache::thrift::protocol::TProtocol* oprot) const;
};
class scribe_LogCompressedMsg_pargs {
public:
static const uint64_t _reflection_id = 13645577436870531500U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
virtual ~scribe_LogCompressedMsg_pargs() throw() {}
const std::string* compressedMessages;
uint32_t write(apache::thrift::protocol::TProtocol* oprot) const;
};
class scribe_LogCompressedMsg_result {
public:
static const uint64_t _reflection_id = 15026639991904524972U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
scribe_LogCompressedMsg_result() : success(static_cast<ResultCode>(0)) {
}
scribe_LogCompressedMsg_result(const scribe_LogCompressedMsg_result&) = default;
scribe_LogCompressedMsg_result& operator=(const scribe_LogCompressedMsg_result&) = default;
scribe_LogCompressedMsg_result(scribe_LogCompressedMsg_result&&) = default;
scribe_LogCompressedMsg_result& operator=(scribe_LogCompressedMsg_result&&) = default;
void __clear() {
success = static_cast<ResultCode>(0);
__isset.__clear();
}
virtual ~scribe_LogCompressedMsg_result() throw() {}
ResultCode success;
struct __isset {
__isset() { __clear(); }
void __clear() {
success = false;
}
bool success;
} __isset;
bool operator == (const scribe_LogCompressedMsg_result & rhs) const
{
if (!(this->success == rhs.success))
return false;
return true;
}
bool operator != (const scribe_LogCompressedMsg_result &rhs) const {
return !(*this == rhs);
}
bool operator < (const scribe_LogCompressedMsg_result & ) const;
uint32_t read(apache::thrift::protocol::TProtocol* iprot);
uint32_t write(apache::thrift::protocol::TProtocol* oprot) const;
};
class scribe_LogCompressedMsg_presult {
public:
static const uint64_t _reflection_id = 5311776576442573772U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
virtual ~scribe_LogCompressedMsg_presult() throw() {}
ResultCode* success;
struct __isset {
__isset() { __clear(); }
void __clear() {
success = false;
}
bool success;
} __isset;
uint32_t read(apache::thrift::protocol::TProtocol* iprot);
};
class scribeClient : virtual public scribeIf, virtual public apache::thrift::TClientBase {
public:
scribeClient(boost::shared_ptr<apache::thrift::protocol::TProtocol> prot) :
checkSeqid_(true),
nextSendSequenceId_(1),
nextRecvSequenceId_(1),
piprot_(prot),
poprot_(prot) {
iprot_ = prot.get();
oprot_ = prot.get();
}
scribeClient(boost::shared_ptr<apache::thrift::protocol::TProtocol> iprot, boost::shared_ptr<apache::thrift::protocol::TProtocol> oprot) :
checkSeqid_(true),
nextSendSequenceId_(1),
nextRecvSequenceId_(1),
piprot_(iprot),
poprot_(oprot) {
iprot_ = iprot.get();
oprot_ = oprot.get();
}
boost::shared_ptr<apache::thrift::protocol::TProtocol> getInputProtocol() {
return piprot_;
}
boost::shared_ptr<apache::thrift::protocol::TProtocol> getOutputProtocol() {
return poprot_;
}
ResultCode Log(const std::vector<LogEntry> & messages);
void send_Log(const std::vector<LogEntry> & messages);
ResultCode recv_Log();
void LogMulti(std::vector<ResultCode> & _return, const std::vector<LogEntry> & messages);
void send_LogMulti(const std::vector<LogEntry> & messages);
void recv_LogMulti(std::vector<ResultCode> & _return);
ResultCode LogCompressedMsg(const std::string& compressedMessages);
void send_LogCompressedMsg(const std::string& compressedMessages);
ResultCode recv_LogCompressedMsg();
/**
* Disable checking the seqid field in server responses.
*
* This should only be used with broken servers that return incorrect seqid values.
*/
void _disableSequenceIdChecks() {
checkSeqid_ = false;
}
protected:
bool checkSeqid_;
int32_t nextSendSequenceId_;
int32_t nextRecvSequenceId_;
int32_t getNextSendSequenceId();
int32_t getNextRecvSequenceId();
boost::shared_ptr<apache::thrift::protocol::TProtocol> piprot_;
boost::shared_ptr<apache::thrift::protocol::TProtocol> poprot_;
apache::thrift::protocol::TProtocol* iprot_;
apache::thrift::protocol::TProtocol* oprot_;
};
class scribeProcessor : public ::apache::thrift::TDispatchProcessor {
protected:
boost::shared_ptr<scribeIf> iface_;
virtual bool dispatchCall(apache::thrift::protocol::TProtocol* iprot, apache::thrift::protocol::TProtocol* oprot, const std::string& fname, int32_t seqid, apache::thrift::server::TConnectionContext* connectionContext);
private:
typedef void (scribeProcessor::*ProcessFunction)(int32_t, apache::thrift::protocol::TProtocol*, apache::thrift::protocol::TProtocol*, apache::thrift::server::TConnectionContext*);
typedef std::map<std::string, ProcessFunction> ProcessMap;
ProcessMap processMap_;
void process_Log(int32_t seqid, apache::thrift::protocol::TProtocol* iprot, apache::thrift::protocol::TProtocol* oprot, apache::thrift::server::TConnectionContext* connectionContext);
void process_LogMulti(int32_t seqid, apache::thrift::protocol::TProtocol* iprot, apache::thrift::protocol::TProtocol* oprot, apache::thrift::server::TConnectionContext* connectionContext);
void process_LogCompressedMsg(int32_t seqid, apache::thrift::protocol::TProtocol* iprot, apache::thrift::protocol::TProtocol* oprot, apache::thrift::server::TConnectionContext* connectionContext);
public:
scribeProcessor(boost::shared_ptr<scribeIf> iface) :
iface_(iface) {
processMap_["Log"] = &scribeProcessor::process_Log;
processMap_["LogMulti"] = &scribeProcessor::process_LogMulti;
processMap_["LogCompressedMsg"] = &scribeProcessor::process_LogCompressedMsg;
}
virtual ~scribeProcessor() {}
boost::shared_ptr<std::set<std::string> > getProcessFunctions() {
boost::shared_ptr<std::set<std::string> > rSet(new std::set<std::string>());
rSet->insert("scribe.Log");
rSet->insert("scribe.LogMulti");
rSet->insert("scribe.LogCompressedMsg");
return rSet;
}
};
class scribeProcessorFactory : public ::apache::thrift::TProcessorFactory {
public:
scribeProcessorFactory(const ::boost::shared_ptr< scribeIfFactory >& handlerFactory) :
handlerFactory_(handlerFactory) {}
::boost::shared_ptr< ::apache::thrift::TProcessor > getProcessor(::apache::thrift::server::TConnectionContext* ctx);
protected:
::boost::shared_ptr< scribeIfFactory > handlerFactory_;
};
class scribeMultiface : virtual public scribeIf {
public:
scribeMultiface(std::vector<boost::shared_ptr<scribeIf> >& ifaces) : ifaces_(ifaces) {
}
virtual ~scribeMultiface() {}
protected:
std::vector<boost::shared_ptr<scribeIf> > ifaces_;
scribeMultiface() {}
void add(boost::shared_ptr<scribeIf> iface) {
ifaces_.push_back(iface);
}
public:
ResultCode Log(const std::vector<LogEntry> & messages) {
uint32_t i;
uint32_t sz = ifaces_.size();
for (i = 0; i < sz - 1; ++i) {
ifaces_[i]->Log(messages);
}
return ifaces_[i]->Log(messages);
}
void LogMulti(std::vector<ResultCode> & _return, const std::vector<LogEntry> & messages) {
uint32_t i;
uint32_t sz = ifaces_.size();
for (i = 0; i < sz; ++i) {
ifaces_[i]->LogMulti(_return, messages);
}
}
ResultCode LogCompressedMsg(const std::string& compressedMessages) {
uint32_t i;
uint32_t sz = ifaces_.size();
for (i = 0; i < sz - 1; ++i) {
ifaces_[i]->LogCompressedMsg(compressedMessages);
}
return ifaces_[i]->LogCompressedMsg(compressedMessages);
}
};
} // namespace
#endif

View File

@ -0,0 +1,17 @@
/**
* Autogenerated by Thrift
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
#include "scribe_constants.h"
namespace Tleveldb {
const scribeConstants g_scribe_constants;
scribeConstants::scribeConstants() {
SCRIBE_MAX_MESSAGE_LENGTH = 26214400;
}
} // namespace

View File

@ -0,0 +1,25 @@
/**
* Autogenerated by Thrift
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
#ifndef scribe_CONSTANTS_H
#define scribe_CONSTANTS_H
#include "scribe_types.h"
namespace Tleveldb {
class scribeConstants {
public:
scribeConstants();
int32_t SCRIBE_MAX_MESSAGE_LENGTH;
};
extern const scribeConstants g_scribe_constants;
} // namespace
#endif

View File

@ -0,0 +1,513 @@
/**
* Autogenerated by Thrift
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
#include "scribe_types.h"
#include <thrift/lib/cpp/Reflection.h>
#include <algorithm>
#include <string.h>
namespace Tleveldb {
int _kResultCodeValues[] = {
OK,
TRY_LATER,
ERROR_DECOMPRESS
};
const char* _kResultCodeNames[] = {
"OK",
"TRY_LATER",
"ERROR_DECOMPRESS"
};
const std::map<int, const char*> _ResultCode_VALUES_TO_NAMES(apache::thrift::TEnumIterator<int>(3, _kResultCodeValues, _kResultCodeNames), apache::thrift::TEnumIterator<int>(-1, NULL, NULL));
const std::map<const char*, int, apache::thrift::ltstr> _ResultCode_NAMES_TO_VALUES(apache::thrift::TEnumInverseIterator<int>(3, _kResultCodeValues, _kResultCodeNames), apache::thrift::TEnumInverseIterator<int>(-1, NULL, NULL));
} // namespace
namespace apache { namespace thrift {
template<>
const char* TEnumTraits< ::Tleveldb::ResultCode>::findName( ::Tleveldb::ResultCode value) {
return findName( ::Tleveldb::_ResultCode_VALUES_TO_NAMES, value);
}
template<>
bool TEnumTraits< ::Tleveldb::ResultCode>::findValue(const char* name, ::Tleveldb::ResultCode* out) {
return findValue( ::Tleveldb::_ResultCode_NAMES_TO_VALUES, name, out);
}
}} // apache::thrift
namespace Tleveldb {
// Reflection initializer for struct scribe.SourceInfo
namespace {
void reflectionInitializer_16557823557777806572(::apache::thrift::reflection::Schema& schema) {
const uint64_t id = 16557823557777806572U;
if (schema.dataTypes.count(id)) return;
::apache::thrift::reflection::DataType dt;
dt.name = "struct scribe.SourceInfo";
dt.__isset.fields = true;
{
::apache::thrift::reflection::StructField f;
f.isRequired = true;
f.type = 1U;
f.name = "host";
dt.fields[1] = f;
}
{
::apache::thrift::reflection::StructField f;
f.isRequired = true;
f.type = 5U;
f.name = "port";
dt.fields[2] = f;
}
{
::apache::thrift::reflection::StructField f;
f.isRequired = true;
f.type = 6U;
f.name = "timestamp";
dt.fields[3] = f;
}
schema.dataTypes[id] = dt;
schema.names[dt.name] = id;
}
} // namespace
const uint64_t SourceInfo::_reflection_id;
void SourceInfo::_reflection_register(::apache::thrift::reflection::Schema& schema) {
reflectionInitializer_16557823557777806572(schema);
}
uint32_t SourceInfo::read(apache::thrift::protocol::TProtocol* iprot) {
uint32_t xfer = 0;
std::string fname;
apache::thrift::protocol::TType ftype;
int16_t fid;
xfer += iprot->readStructBegin(fname);
using apache::thrift::protocol::TProtocolException;
while (true)
{
xfer += iprot->readFieldBegin(fname, ftype, fid);
if (ftype == apache::thrift::protocol::T_STOP) {
break;
}
switch (fid)
{
case 1:
if (ftype == apache::thrift::protocol::T_STRING) {
xfer += iprot->readBinary(this->host);
this->__isset.host = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 2:
if (ftype == apache::thrift::protocol::T_I32) {
xfer += iprot->readI32(this->port);
this->__isset.port = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 3:
if (ftype == apache::thrift::protocol::T_I64) {
xfer += iprot->readI64(this->timestamp);
this->__isset.timestamp = true;
} else {
xfer += iprot->skip(ftype);
}
break;
default:
xfer += iprot->skip(ftype);
break;
}
xfer += iprot->readFieldEnd();
}
xfer += iprot->readStructEnd();
return xfer;
}
uint32_t SourceInfo::write(apache::thrift::protocol::TProtocol* oprot) const {
uint32_t xfer = 0;
xfer += oprot->writeStructBegin("SourceInfo");
xfer += oprot->writeFieldBegin("host", apache::thrift::protocol::T_STRING, 1);
xfer += oprot->writeBinary(this->host);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldBegin("port", apache::thrift::protocol::T_I32, 2);
xfer += oprot->writeI32(this->port);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldBegin("timestamp", apache::thrift::protocol::T_I64, 3);
xfer += oprot->writeI64(this->timestamp);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
}
void swap(SourceInfo &a, SourceInfo &b) {
using ::std::swap;
(void)a;
(void)b;
swap(a.host, b.host);
swap(a.port, b.port);
swap(a.timestamp, b.timestamp);
swap(a.__isset, b.__isset);
}
// Reflection initializer for map<string, string>
namespace {
void reflectionInitializer_9246346592659763371(::apache::thrift::reflection::Schema& schema) {
const uint64_t id = 9246346592659763371U;
if (schema.dataTypes.count(id)) return;
::apache::thrift::reflection::DataType dt;
dt.name = "map<string, string>";
dt.__isset.mapKeyType = true;
dt.mapKeyType = 1U;
dt.__isset.valueType = true;
dt.valueType = 1U;
schema.dataTypes[id] = dt;
schema.names[dt.name] = id;
}
} // namespace
// Reflection initializer for struct scribe.LogEntry
namespace {
void reflectionInitializer_15053466696968532300(::apache::thrift::reflection::Schema& schema) {
const uint64_t id = 15053466696968532300U;
if (schema.dataTypes.count(id)) return;
reflectionInitializer_16557823557777806572(schema); // struct scribe.SourceInfo
reflectionInitializer_9246346592659763371(schema); // map<string, string>
::apache::thrift::reflection::DataType dt;
dt.name = "struct scribe.LogEntry";
dt.__isset.fields = true;
{
::apache::thrift::reflection::StructField f;
f.isRequired = true;
f.type = 1U;
f.name = "category";
dt.fields[1] = f;
}
{
::apache::thrift::reflection::StructField f;
f.isRequired = true;
f.type = 1U;
f.name = "message";
dt.fields[2] = f;
}
{
::apache::thrift::reflection::StructField f;
f.isRequired = false;
f.type = 9246346592659763371U;
f.name = "metadata";
dt.fields[3] = f;
}
{
::apache::thrift::reflection::StructField f;
f.isRequired = false;
f.type = 5U;
f.name = "checksum";
dt.fields[4] = f;
}
{
::apache::thrift::reflection::StructField f;
f.isRequired = false;
f.type = 16557823557777806572U;
f.name = "source";
dt.fields[5] = f;
}
{
::apache::thrift::reflection::StructField f;
f.isRequired = false;
f.type = 5U;
f.name = "bucket";
dt.fields[6] = f;
}
schema.dataTypes[id] = dt;
schema.names[dt.name] = id;
}
} // namespace
const uint64_t LogEntry::_reflection_id;
void LogEntry::_reflection_register(::apache::thrift::reflection::Schema& schema) {
reflectionInitializer_15053466696968532300(schema);
}
uint32_t LogEntry::read(apache::thrift::protocol::TProtocol* iprot) {
uint32_t xfer = 0;
std::string fname;
apache::thrift::protocol::TType ftype;
int16_t fid;
xfer += iprot->readStructBegin(fname);
using apache::thrift::protocol::TProtocolException;
while (true)
{
xfer += iprot->readFieldBegin(fname, ftype, fid);
if (ftype == apache::thrift::protocol::T_STOP) {
break;
}
switch (fid)
{
case 1:
if (ftype == apache::thrift::protocol::T_STRING) {
xfer += iprot->readBinary(this->category);
this->__isset.category = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 2:
if (ftype == apache::thrift::protocol::T_STRING) {
xfer += iprot->readBinary(this->message);
this->__isset.message = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 3:
if (ftype == apache::thrift::protocol::T_MAP) {
{
this->metadata.clear();
uint32_t _size0;
apache::thrift::protocol::TType _ktype1;
apache::thrift::protocol::TType _vtype2;
xfer += iprot->readMapBegin(_ktype1, _vtype2, _size0);
uint32_t _i4;
for (_i4 = 0; _i4 < _size0; ++_i4)
{
std::string _key5;
xfer += iprot->readString(_key5);
std::string& _val6 = this->metadata[_key5];
xfer += iprot->readString(_val6);
}
xfer += iprot->readMapEnd();
}
this->__isset.metadata = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 4:
if (ftype == apache::thrift::protocol::T_I32) {
xfer += iprot->readI32(this->checksum);
this->__isset.checksum = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 5:
if (ftype == apache::thrift::protocol::T_STRUCT) {
xfer += this->source.read(iprot);
this->__isset.source = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 6:
if (ftype == apache::thrift::protocol::T_I32) {
xfer += iprot->readI32(this->bucket);
this->__isset.bucket = true;
} else {
xfer += iprot->skip(ftype);
}
break;
default:
xfer += iprot->skip(ftype);
break;
}
xfer += iprot->readFieldEnd();
}
xfer += iprot->readStructEnd();
return xfer;
}
uint32_t LogEntry::write(apache::thrift::protocol::TProtocol* oprot) const {
uint32_t xfer = 0;
xfer += oprot->writeStructBegin("LogEntry");
xfer += oprot->writeFieldBegin("category", apache::thrift::protocol::T_STRING, 1);
xfer += oprot->writeBinary(this->category);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldBegin("message", apache::thrift::protocol::T_STRING, 2);
xfer += oprot->writeBinary(this->message);
xfer += oprot->writeFieldEnd();
if (this->__isset.metadata) {
xfer += oprot->writeFieldBegin("metadata", apache::thrift::protocol::T_MAP, 3);
{
xfer += oprot->writeMapBegin(apache::thrift::protocol::T_STRING, apache::thrift::protocol::T_STRING, this->metadata.size());
std::map<std::string, std::string> ::const_iterator _iter7;
for (_iter7 = this->metadata.begin(); _iter7 != this->metadata.end(); ++_iter7)
{
xfer += oprot->writeString(_iter7->first);
xfer += oprot->writeString(_iter7->second);
}
xfer += oprot->writeMapEnd();
}
xfer += oprot->writeFieldEnd();
}
if (this->__isset.checksum) {
xfer += oprot->writeFieldBegin("checksum", apache::thrift::protocol::T_I32, 4);
xfer += oprot->writeI32(this->checksum);
xfer += oprot->writeFieldEnd();
}
if (this->__isset.source) {
xfer += oprot->writeFieldBegin("source", apache::thrift::protocol::T_STRUCT, 5);
xfer += this->source.write(oprot);
xfer += oprot->writeFieldEnd();
}
if (this->__isset.bucket) {
xfer += oprot->writeFieldBegin("bucket", apache::thrift::protocol::T_I32, 6);
xfer += oprot->writeI32(this->bucket);
xfer += oprot->writeFieldEnd();
}
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
}
void swap(LogEntry &a, LogEntry &b) {
using ::std::swap;
(void)a;
(void)b;
swap(a.category, b.category);
swap(a.message, b.message);
swap(a.metadata, b.metadata);
swap(a.checksum, b.checksum);
swap(a.source, b.source);
swap(a.bucket, b.bucket);
swap(a.__isset, b.__isset);
}
// Reflection initializer for list<struct scribe.LogEntry>
namespace {
void reflectionInitializer_10251729064312664553(::apache::thrift::reflection::Schema& schema) {
const uint64_t id = 10251729064312664553U;
if (schema.dataTypes.count(id)) return;
reflectionInitializer_15053466696968532300(schema); // struct scribe.LogEntry
::apache::thrift::reflection::DataType dt;
dt.name = "list<struct scribe.LogEntry>";
dt.__isset.valueType = true;
dt.valueType = 15053466696968532300U;
schema.dataTypes[id] = dt;
schema.names[dt.name] = id;
}
} // namespace
// Reflection initializer for struct scribe.MessageList
namespace {
void reflectionInitializer_5674270912483072844(::apache::thrift::reflection::Schema& schema) {
const uint64_t id = 5674270912483072844U;
if (schema.dataTypes.count(id)) return;
reflectionInitializer_10251729064312664553(schema); // list<struct scribe.LogEntry>
::apache::thrift::reflection::DataType dt;
dt.name = "struct scribe.MessageList";
dt.__isset.fields = true;
{
::apache::thrift::reflection::StructField f;
f.isRequired = true;
f.type = 10251729064312664553U;
f.name = "messages";
dt.fields[1] = f;
}
schema.dataTypes[id] = dt;
schema.names[dt.name] = id;
}
} // namespace
const uint64_t MessageList::_reflection_id;
void MessageList::_reflection_register(::apache::thrift::reflection::Schema& schema) {
reflectionInitializer_5674270912483072844(schema);
}
uint32_t MessageList::read(apache::thrift::protocol::TProtocol* iprot) {
uint32_t xfer = 0;
std::string fname;
apache::thrift::protocol::TType ftype;
int16_t fid;
xfer += iprot->readStructBegin(fname);
using apache::thrift::protocol::TProtocolException;
while (true)
{
xfer += iprot->readFieldBegin(fname, ftype, fid);
if (ftype == apache::thrift::protocol::T_STOP) {
break;
}
switch (fid)
{
case 1:
if (ftype == apache::thrift::protocol::T_LIST) {
{
this->messages.clear();
uint32_t _size8;
apache::thrift::protocol::TType _etype11;
xfer += iprot->readListBegin(_etype11, _size8);
this->messages.resize(_size8);
uint32_t _i12;
for (_i12 = 0; _i12 < _size8; ++_i12)
{
xfer += this->messages[_i12].read(iprot);
}
xfer += iprot->readListEnd();
}
this->__isset.messages = true;
} else {
xfer += iprot->skip(ftype);
}
break;
default:
xfer += iprot->skip(ftype);
break;
}
xfer += iprot->readFieldEnd();
}
xfer += iprot->readStructEnd();
return xfer;
}
uint32_t MessageList::write(apache::thrift::protocol::TProtocol* oprot) const {
uint32_t xfer = 0;
xfer += oprot->writeStructBegin("MessageList");
xfer += oprot->writeFieldBegin("messages", apache::thrift::protocol::T_LIST, 1);
{
xfer += oprot->writeListBegin(apache::thrift::protocol::T_STRUCT, this->messages.size());
std::vector<LogEntry> ::const_iterator _iter13;
for (_iter13 = this->messages.begin(); _iter13 != this->messages.end(); ++_iter13)
{
xfer += (*_iter13).write(oprot);
}
xfer += oprot->writeListEnd();
}
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
}
void swap(MessageList &a, MessageList &b) {
using ::std::swap;
(void)a;
(void)b;
swap(a.messages, b.messages);
swap(a.__isset, b.__isset);
}
} // namespace

View File

@ -0,0 +1,247 @@
/**
* Autogenerated by Thrift
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
#ifndef scribe_TYPES_H
#define scribe_TYPES_H
#include <Thrift.h>
#include <TApplicationException.h>
#include <protocol/TProtocol.h>
#include <transport/TTransport.h>
namespace apache { namespace thrift { namespace reflection {
class Schema;
}}}
namespace Tleveldb {
enum ResultCode {
OK = 0,
TRY_LATER = 1,
ERROR_DECOMPRESS = 2
};
extern const std::map<int, const char*> _ResultCode_VALUES_TO_NAMES;
extern const std::map<const char*, int, apache::thrift::ltstr> _ResultCode_NAMES_TO_VALUES;
} // namespace
namespace apache { namespace thrift {
template<>
inline constexpr ::Tleveldb::ResultCode TEnumTraits< ::Tleveldb::ResultCode>::min() {
return ::Tleveldb::ResultCode::OK;
}
template<>
inline constexpr ::Tleveldb::ResultCode TEnumTraits< ::Tleveldb::ResultCode>::max() {
return ::Tleveldb::ResultCode::ERROR_DECOMPRESS;
}
}} // apache:thrift
namespace Tleveldb {
class SourceInfo {
public:
static const uint64_t _reflection_id = 16557823557777806572U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
SourceInfo() : host(""), port(0), timestamp(0) {
}
SourceInfo(const SourceInfo&) = default;
SourceInfo& operator=(const SourceInfo&) = default;
SourceInfo(SourceInfo&&) = default;
SourceInfo& operator=(SourceInfo&&) = default;
void __clear() {
host = "";
port = 0;
timestamp = 0;
__isset.__clear();
}
virtual ~SourceInfo() throw() {}
std::string host;
int32_t port;
int64_t timestamp;
struct __isset {
__isset() { __clear(); }
void __clear() {
host = false;
port = false;
timestamp = false;
}
bool host;
bool port;
bool timestamp;
} __isset;
bool operator == (const SourceInfo & rhs) const
{
if (!(this->host == rhs.host))
return false;
if (!(this->port == rhs.port))
return false;
if (!(this->timestamp == rhs.timestamp))
return false;
return true;
}
bool operator != (const SourceInfo &rhs) const {
return !(*this == rhs);
}
bool operator < (const SourceInfo & ) const;
uint32_t read(apache::thrift::protocol::TProtocol* iprot);
uint32_t write(apache::thrift::protocol::TProtocol* oprot) const;
};
class SourceInfo;
void swap(SourceInfo &a, SourceInfo &b);
class LogEntry {
public:
static const uint64_t _reflection_id = 15053466696968532300U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
LogEntry() : category(""), message(""), checksum(0), bucket(0) {
}
LogEntry(const LogEntry&) = default;
LogEntry& operator=(const LogEntry&) = default;
LogEntry(LogEntry&&) = default;
LogEntry& operator=(LogEntry&&) = default;
void __clear() {
category = "";
message = "";
metadata.clear();
checksum = 0;
source.__clear();
bucket = 0;
__isset.__clear();
}
virtual ~LogEntry() throw() {}
std::string category;
std::string message;
std::map<std::string, std::string> metadata;
int32_t checksum;
SourceInfo source;
int32_t bucket;
struct __isset {
__isset() { __clear(); }
void __clear() {
category = false;
message = false;
metadata = false;
checksum = false;
source = false;
bucket = false;
}
bool category;
bool message;
bool metadata;
bool checksum;
bool source;
bool bucket;
} __isset;
bool operator == (const LogEntry & rhs) const
{
if (!(this->category == rhs.category))
return false;
if (!(this->message == rhs.message))
return false;
if (__isset.metadata != rhs.__isset.metadata)
return false;
else if (__isset.metadata && !(metadata == rhs.metadata))
return false;
if (__isset.checksum != rhs.__isset.checksum)
return false;
else if (__isset.checksum && !(checksum == rhs.checksum))
return false;
if (__isset.source != rhs.__isset.source)
return false;
else if (__isset.source && !(source == rhs.source))
return false;
if (__isset.bucket != rhs.__isset.bucket)
return false;
else if (__isset.bucket && !(bucket == rhs.bucket))
return false;
return true;
}
bool operator != (const LogEntry &rhs) const {
return !(*this == rhs);
}
bool operator < (const LogEntry & ) const;
uint32_t read(apache::thrift::protocol::TProtocol* iprot);
uint32_t write(apache::thrift::protocol::TProtocol* oprot) const;
};
class LogEntry;
void swap(LogEntry &a, LogEntry &b);
class MessageList {
public:
static const uint64_t _reflection_id = 5674270912483072844U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
MessageList() {
}
MessageList(const MessageList&) = default;
MessageList& operator=(const MessageList&) = default;
MessageList(MessageList&&) = default;
MessageList& operator=(MessageList&&) = default;
void __clear() {
messages.clear();
__isset.__clear();
}
virtual ~MessageList() throw() {}
std::vector<LogEntry> messages;
struct __isset {
__isset() { __clear(); }
void __clear() {
messages = false;
}
bool messages;
} __isset;
bool operator == (const MessageList & rhs) const
{
if (!(this->messages == rhs.messages))
return false;
return true;
}
bool operator != (const MessageList &rhs) const {
return !(*this == rhs);
}
bool operator < (const MessageList & ) const;
uint32_t read(apache::thrift::protocol::TProtocol* iprot);
uint32_t write(apache::thrift::protocol::TProtocol* oprot) const;
};
class MessageList;
void swap(MessageList &a, MessageList &b);
} // namespace
#endif

82
scribe/if/scribe.thrift Normal file
View File

@ -0,0 +1,82 @@
#!/usr/local/bin/thrift --cpp --php
## Copyright (c) 2007-2012 Facebook
##
## Licensed under the Apache License, Version 2.0 (the "License");
## you may not use this file except in compliance with the License.
## You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
##
## See accompanying file LICENSE or visit the Scribe site at:
## http://developers.facebook.com/scribe/
namespace cpp Tleveldb
namespace java Tleveldb
// Max message length allowed to log through scribe
const i32 SCRIBE_MAX_MESSAGE_LENGTH = 26214400;
enum ResultCode
{
OK,
TRY_LATER,
ERROR_DECOMPRESS
}
struct SourceInfo
{
1: binary host,
2: i32 port,
3: i64 timestamp
}
struct LogEntry
{
1: binary category,
2: binary message,
3: optional map<string, string> metadata,
4: optional i32 checksum,
5: optional SourceInfo source,
6: optional i32 bucket
}
struct MessageList
{
1: list<LogEntry> messages
}
service scribe
{
#
# Delivers a list of LogEntry messages to the Scribe server.
# A returned ResultCode of anything other than OK indicates that the
# whole batch was unable to be delivered to the server.
# If data loss is a concern, the caller should buffer and retry the messages.
#
ResultCode Log(1: list<LogEntry> messages);
#
# NOTE: FOR INTERNAL USE ONLY!
#
# Delivers a list of LogEntry messages to the Scribe server, but
# allows partial successes. A list of ResultCodes will be returned to
# indicate the success or failure of each message at the corresponding index.
# If data loss is a concern, the caller should retry only the failed messages.
#
list<ResultCode> LogMulti(1: list<LogEntry> messages);
#
# NOTE: FOR INTERNAL USE ONLY!
#
# The same as Log(...) except that the list of messages must first be
# serialized and compressed in some internal format.
#
ResultCode LogCompressedMsg(1: binary compressedMessages);
}

90
scribe/scribe_logger.cc Normal file
View File

@ -0,0 +1,90 @@
#include "scribe_logger.h"
namespace leveldb {
const std::string ScribeLogger::COL_SEPERATOR = "\x1";
const std::string ScribeLogger::DEPLOY_STATS_CATEGORY = "leveldb_deploy_stats";
ScribeLogger::ScribeLogger(const std::string& host, int port,
int retry_times, uint32_t retry_intervals, int batch_size)
: host_(host),
port_(port),
retry_times_(retry_times),
retry_intervals_ (retry_intervals),
batch_size_ (batch_size) {
shared_ptr<TSocket> socket(new TSocket(host_, port_));
shared_ptr<TFramedTransport> framedTransport(new TFramedTransport(socket));
framedTransport->open();
shared_ptr<TBinaryProtocol> protocol(new TBinaryProtocol(framedTransport));
scribe_client_ = new scribeClient(protocol);
}
void ScribeLogger::Log(const std::string& category,
const std::string& message) {
LogEntry entry;
entry.category = category;
entry.message = message;
logger_mutex_.Lock();
logs_.push_back(entry);
if (logs_.size() >= batch_size_) {
ResultCode ret = scribe_client_->Log(logs_);
int retries_left = retry_times_;
while (ret == TRY_LATER && retries_left > 0) {
Env::Default()->SleepForMicroseconds(retry_intervals_);
ret = scribe_client_->Log(logs_);
retries_left--;
}
// Clear the local messages if either successfully write out
// or has failed in the last 10 calls.
if (ret == OK || logs_.size() > batch_size_ * 5) {
logs_.clear();
}
}
logger_mutex_.Unlock();
}
void ScribeLogger::MakeScribeMessage(std::string& output,
std::vector<std::string>& cols) {
int sz = cols.size();
int i = 0;
for (; i < sz - 1; i++) {
std::string& col = cols.at(i);
output += col;
output += ScribeLogger::COL_SEPERATOR;
}
std::string& col = cols.at(i);
output+=col;
}
void ScribeLogger::Log_Deploy_Stats(
const std::string& db_version,
const std::string& machine_info,
const std::string& data_dir,
const uint64_t data_size,
const uint32_t file_number,
const std::string& data_size_per_level,
const std::string& file_number_per_level,
const int64_t& ts_unix) {
std::string message;
std::vector<std::string> cols;
cols.push_back(db_version);
cols.push_back(machine_info);
cols.push_back(data_dir);
cols.push_back(boost::lexical_cast<std::string>(data_size));
cols.push_back(boost::lexical_cast<std::string>(file_number));
cols.push_back(data_size_per_level);
cols.push_back(file_number_per_level);
cols.push_back(boost::lexical_cast<std::string>(ts_unix));
MakeScribeMessage(message, cols);
return Log(ScribeLogger::DEPLOY_STATS_CATEGORY, message);
}
ScribeLogger::~ScribeLogger(){
delete scribe_client_;
}
}

71
scribe/scribe_logger.h Normal file
View File

@ -0,0 +1,71 @@
#ifndef SCRIBE_LOGGER_H_
#define SCRIBE_LOGGER_H_
#include "scribe/if/gen-cpp/scribe.h"
#include "scribe/if/gen-cpp/scribe_types.h"
#include "thrift/lib/cpp/protocol/TProtocol.h"
#include "thrift/lib/cpp/transport/TSocket.h"
#include "thrift/lib/cpp/protocol/TBinaryProtocol.h"
#include "thrift/lib/cpp/transport/TBufferTransports.h"
#include "leveldb/env.h"
#include "port/port.h"
#include "util/stats_logger.h"
#include "boost/lexical_cast.hpp"
using namespace Tleveldb;
using Tleveldb::scribeClient;
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using boost::shared_ptr;
using namespace ::Tleveldb;
namespace leveldb {
class ScribeLogger : public StatsLogger{
private:
std::string host_;
int port_;
int batch_size_;
scribeClient* scribe_client_;
std::vector<LogEntry> logs_;
port::Mutex logger_mutex_;
int retry_times_;
uint32_t retry_intervals_;
void MakeScribeMessage(std::string& output, std::vector<std::string>& cols);
public:
static const std::string COL_SEPERATOR;
static const std::string DEPLOY_STATS_CATEGORY;
ScribeLogger(const std::string& host, int port,
int retry_times=3, uint32_t retry_intervals=1000000,
int batch_size=1);
virtual ~ScribeLogger();
virtual void Log(const std::string& category, const std::string& message);
virtual void Log_Deploy_Stats(
const std::string& db_version,
const std::string& machine_info,
const std::string& data_dir,
const uint64_t data_size,
const uint32_t file_number,
const std::string& data_size_per_level,
const std::string& file_number_per_level,
const int64_t& ts_unix
);
};
}
#endif /* SCRIBE_LOGGER_H_ */

View File

@ -537,6 +537,43 @@ class PosixEnv : public Env {
usleep(micros);
}
virtual Status GetHostName(char* name, uint len) {
int ret = gethostname(name, len);
if (ret < 0) {
if (errno == EFAULT || errno == EINVAL)
return Status::InvalidArgument(strerror(errno));
else
return IOError("GetHostName", errno);
}
return Status::OK();
}
virtual Status GetCurrentTime(int64_t* unix_time) {
time_t ret = time(NULL);
if (ret == (time_t) -1) {
return IOError("GetCurrentTime", errno);
}
*unix_time = (int64_t) ret;
return Status::OK();
}
virtual Status GetAbsolutePath(const std::string& db_path,
std::string* output_path) {
if (db_path.find('/') == 0) {
*output_path = db_path;
return Status::OK();
}
char the_path[256];
char* ret = getcwd(the_path, 256);
if (ret == NULL) {
return Status::IOError(strerror(errno));
}
*output_path = ret;
return Status::OK();
}
private:
void PthreadCall(const char* label, int result) {
if (result != 0) {

View File

@ -35,7 +35,8 @@ Options::Options()
max_grandparent_overlap_factor(10),
filter_policy(NULL),
statistics(NULL),
disableDataSync(false) {
disableDataSync(false),
db_stats_log_interval(1800) {
}
} // namespace leveldb

23
util/stats_logger.h Normal file
View File

@ -0,0 +1,23 @@
#ifndef STATS_LOGGER_H_
#define STATS_LOGGER_H_
namespace leveldb {
class StatsLogger {
public:
virtual void Log_Deploy_Stats(const std::string& db_version,
const std::string& machine_info,
const std::string& data_dir,
const uint64_t data_size,
const uint32_t file_number,
const std::string& data_size_per_level,
const std::string& file_number_per_level,
const int64_t& ts_unix) = 0;
};
}
#endif