diff --git a/build_detect_platform b/build_detect_platform index b71bf02c6..2264ee9cb 100755 --- a/build_detect_platform +++ b/build_detect_platform @@ -148,6 +148,21 @@ EOF fi fi +# shall we use HDFS? + +if test "$USE_HDFS"; then + if test -z "$JAVA_HOME"; then + echo "JAVA_HOME has to be set for HDFS usage." + exit 1 + fi + HDFS_CCFLAGS+=" -I$JAVA_HOME/include -I$JAVA_HOME/include/linux -DUSE_HDFS" + HDFS_LDFLAGS+=" -Wl,--no-whole-archive hdfs/libhdfs.a -L$JAVA_HOME/jre/lib/amd64" + HDFS_LDFLAGS+=" -L$JAVA_HOME/jre/lib/amd64/server -L$GLIBC_RUNTIME_PATH/lib" + HDFS_LDFLAGS+=" -ldl -lverify -ljava -ljvm" + COMMON_FLAGS+=$HDFS_CCFLAGS + PLATFORM_LDFLAGS+=$HDFS_LDFLAGS +fi + PLATFORM_CCFLAGS="$PLATFORM_CCFLAGS $COMMON_FLAGS" PLATFORM_CXXFLAGS="$PLATFORM_CXXFLAGS $COMMON_FLAGS" diff --git a/db/db_bench.cc b/db/db_bench.cc index ea82f11e5..fffbe9928 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -19,6 +19,7 @@ #include "util/mutexlock.h" #include "util/random.h" #include "util/testutil.h" +#include "hdfs/env_hdfs.h" // Comma-separated list of operations to run in the specified order // Actual benchmarks: @@ -122,6 +123,9 @@ static long FLAGS_writes = -1; // Sync all writes to disk static bool FLAGS_sync = false; +// posix or hdfs environment +static leveldb::Env* FLAGS_env = leveldb::Env::Default(); + extern bool useOsBuffer; namespace leveldb { @@ -202,7 +206,7 @@ class Stats { done_ = 0; bytes_ = 0; seconds_ = 0; - start_ = Env::Default()->NowMicros(); + start_ = FLAGS_env->NowMicros(); finish_ = start_; message_.clear(); } @@ -220,7 +224,7 @@ class Stats { } void Stop() { - finish_ = Env::Default()->NowMicros(); + finish_ = FLAGS_env->NowMicros(); seconds_ = (finish_ - start_) * 1e-6; } @@ -230,7 +234,7 @@ class Stats { void FinishedSingleOp() { if (FLAGS_histogram) { - double now = Env::Default()->NowMicros(); + double now = FLAGS_env->NowMicros(); double micros = now - last_op_finish_; hist_.Add(micros); if (micros > 20000) { @@ -437,10 +441,10 @@ class Benchmark { writes_(FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes), heap_counter_(0) { std::vector files; - Env::Default()->GetChildren(FLAGS_db, &files); + FLAGS_env->GetChildren(FLAGS_db, &files); for (int i = 0; i < files.size(); i++) { if (Slice(files[i]).starts_with("heap-")) { - Env::Default()->DeleteFile(std::string(FLAGS_db) + "/" + files[i]); + FLAGS_env->DeleteFile(std::string(FLAGS_db) + "/" + files[i]); } } if (!FLAGS_use_existing_db) { @@ -623,7 +627,7 @@ class Benchmark { arg[i].shared = &shared; arg[i].thread = new ThreadState(i); arg[i].thread->shared = &shared; - Env::Default()->StartThread(ThreadBody, &arg[i]); + FLAGS_env->StartThread(ThreadBody, &arg[i]); } shared.mu.Lock(); @@ -740,6 +744,7 @@ class Benchmark { options.filter_policy = filter_policy_; options.max_open_files = FLAGS_open_files; options.statistics = dbstats; + options.env = FLAGS_env; Status s = DB::Open(options, FLAGS_db, &db_); if (!s.ok()) { fprintf(stderr, "open error: %s\n", s.ToString().c_str()); @@ -758,7 +763,7 @@ class Benchmark { void DoWrite(ThreadState* thread, bool seq) { if (num_ != FLAGS_num) { char msg[100]; - snprintf(msg, sizeof(msg), "(%d ops)", num_); + snprintf(msg, sizeof(msg), "(%ld ops)", num_); thread->stats.AddMessage(msg); } @@ -952,7 +957,7 @@ class Benchmark { char fname[100]; snprintf(fname, sizeof(fname), "%s/heap-%04d", FLAGS_db, ++heap_counter_); WritableFile* file; - Status s = Env::Default()->NewWritableFile(fname, &file); + Status s = FLAGS_env->NewWritableFile(fname, &file); if (!s.ok()) { fprintf(stderr, "%s\n", s.ToString().c_str()); return; @@ -961,7 +966,7 @@ class Benchmark { delete file; if (!ok) { fprintf(stderr, "heap profiling not supported\n"); - Env::Default()->DeleteFile(fname); + FLAGS_env->DeleteFile(fname); } } }; @@ -977,6 +982,7 @@ int main(int argc, char** argv) { int n; long l; char junk; + char hdfsname[2048]; if (leveldb::Slice(argv[i]).starts_with("--benchmarks=")) { FLAGS_benchmarks = argv[i] + strlen("--benchmarks="); } else if (sscanf(argv[i], "--compression_ratio=%lf%c", &d, &junk) == 1) { @@ -1024,6 +1030,8 @@ int main(int argc, char** argv) { } else if (sscanf(argv[i], "--sync=%d%c", &n, &junk) == 1 && (n == 0 || n == 1)) { FLAGS_sync = n; + } else if (sscanf(argv[i], "--hdfs=%s", &hdfsname) == 1) { + FLAGS_env = new leveldb::HdfsEnv(hdfsname); } else { fprintf(stderr, "Invalid flag '%s'\n", argv[i]); exit(1); diff --git a/fbcode.sh b/fbcode.sh index 5cc777476..aa6dee4fd 100644 --- a/fbcode.sh +++ b/fbcode.sh @@ -1,3 +1,4 @@ +#!/bin/sh # # Set environment variables so that we can compile leveldb using # fbcode settings. It uses the latest g++ compiler and also @@ -7,8 +8,18 @@ TOOLCHAIN_REV=d28c90311ca14f9f0b2bb720f4e34b285513d4f4 TOOLCHAIN_EXECUTABLES="/mnt/gvfs/third-party/$TOOLCHAIN_REV/centos5.2-native" TOOLCHAIN_LIB_BASE="/mnt/gvfs/third-party/$TOOLCHAIN_REV/gcc-4.6.2-glibc-2.13" +# location of libhdfs libraries +if test "$USE_HDFS"; then + JAVA_HOME="/usr/local/jdk-6u22-64" + JINCLUDE="-I$JAVA_HOME/include -I$JAVA_HOME/include/linux" + GLIBC_RUNTIME_PATH="/usr/local/fbcode/gcc-4.6.2-glibc-2.13" + HDFSLIB=" -Wl,--no-whole-archive hdfs/libhdfs.a -L$JAVA_HOME/jre/lib/amd64 " + HDFSLIB+=" -L$JAVA_HOME/jre/lib/amd64/server -L$GLIBC_RUNTIME_PATH/lib " + HDFSLIB+=" -ldl -lverify -ljava -ljvm " +fi + CC="$TOOLCHAIN_EXECUTABLES/gcc/gcc-4.6.2-glibc-2.13/bin/gcc" -CXX="$TOOLCHAIN_EXECUTABLES/gcc/gcc-4.6.2-glibc-2.13/bin/g++" +CXX="$TOOLCHAIN_EXECUTABLES/gcc/gcc-4.6.2-glibc-2.13/bin/g++ $JINCLUDE" AR=$TOOLCHAIN_EXECUTABLES/binutils/binutils-2.21.1/da39a3e/bin/ar RANLIB=$TOOLCHAIN_EXECUTABLES/binutils/binutils-2.21.1/da39a3e/bin/ranlib @@ -17,5 +28,6 @@ CFLAGS+=" -I $TOOLCHAIN_LIB_BASE/jemalloc/jemalloc-2.2.5/96de4f9/include -DHAVE_ EXEC_LDFLAGS=" -Wl,--whole-archive $TOOLCHAIN_LIB_BASE/jemalloc/jemalloc-2.2.4/96de4f9/lib/libjemalloc.a " EXEC_LDFLAGS+="-Wl,--no-whole-archive $TOOLCHAIN_LIB_BASE/libunwind/libunwind-20100810/4bc2c16/lib/libunwind.a" +EXEC_LDFLAGS+=$HDFSLIB export CC CXX AR RANLIB CFLAGS EXEC_LDFLAGS diff --git a/hdfs/README b/hdfs/README new file mode 100644 index 000000000..dcedb93e1 --- /dev/null +++ b/hdfs/README @@ -0,0 +1,26 @@ +This directory contains the hdfs extensions needed to make leveldb store +files in HDFS. + +The hdfs.h file is copied from the Apache Hadoop 1.0 source code. +It defines the libhdfs library +(http://hadoop.apache.org/common/docs/r0.20.2/libhdfs.html) to access +data in HDFS. The libhdfs.a is copied from the Apache Hadoop 1.0 build. +It implements the API defined in hdfs.h. If your hadoop cluster is running +a different hadoop release, then install these two files manually from your +hadoop distribution and then recompile leveldb. + +The env_hdfs.h file defines the leveldb objects that are needed to talk to an +underlying filesystem. + +If you want to compile leveldb with hdfs support, please set the following +enviroment variables appropriately: + USE_HDFS=1 + JAVA_HOME=/usr/local/jdk-6u22-64 + LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/ + make clean all db_bench + +To run dbbench, + set CLASSPATH to include your hadoop distribution + db_bench --hdfs="hdfs://hbaseudbperf001.snc1.facebook.com:9000" + + diff --git a/hdfs/env_hdfs.h b/hdfs/env_hdfs.h new file mode 100644 index 000000000..6ee67dcfa --- /dev/null +++ b/hdfs/env_hdfs.h @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +// Copyright (c) 2012 Facebook. All rights reserved. + +#ifndef LEVELDB_HDFS_FILE_H +#define LEVELDB_HDFS_FILE_H + +#include +#include +#include +#include +#include +#include "leveldb/env.h" +#include "leveldb/status.h" + +#ifdef USE_HDFS +#include "hdfs/hdfs.h" + +namespace leveldb { + +static const std::string kProto = "hdfs://"; +static const std::string pathsep = "/"; + +// Thrown during execution when there is an issue with the supplied +// arguments. +class HdfsUsageException : public std::exception { }; + +// A simple exception that indicates something went wrong that is not +// recoverable. The intention is for the message to be printed (with +// nothing else) and the process terminate. +class HdfsFatalException : public std::exception { +public: + explicit HdfsFatalException(const std::string& s) : what_(s) { } + virtual ~HdfsFatalException() throw() { } + virtual const char* what() const throw() { + return what_.c_str(); + } +private: + const std::string what_; +}; + +// +// The HDFS environment for leveldb. This class overrides all the +// file/dir access methods and delegates the thread-mgmt methods to the +// default posix environment. +// +class HdfsEnv : public Env { + + public: + HdfsEnv(const std::string& fsname) : fsname_(fsname) { + posixEnv = Env::Default(); + fileSys_ = connectToPath(fsname_); + } + + virtual ~HdfsEnv() { + fprintf(stderr, "Destroying HdfsEnv::Default()\n"); + hdfsDisconnect(fileSys_); + } + + virtual Status NewSequentialFile(const std::string& fname, + SequentialFile** result); + + virtual Status NewRandomAccessFile(const std::string& fname, + RandomAccessFile** result); + + virtual Status NewWritableFile(const std::string& fname, + WritableFile** result); + + virtual bool FileExists(const std::string& fname); + + virtual Status GetChildren(const std::string& path, + std::vector* result); + + virtual Status DeleteFile(const std::string& fname); + + virtual Status CreateDir(const std::string& name); + + virtual Status DeleteDir(const std::string& name); + + virtual Status GetFileSize(const std::string& fname, uint64_t* size); + + virtual Status RenameFile(const std::string& src, const std::string& target); + + virtual Status LockFile(const std::string& fname, FileLock** lock); + + virtual Status UnlockFile(FileLock* lock); + + virtual Status NewLogger(const std::string& fname, Logger** result); + + virtual void Schedule( void (*function)(void* arg), void* arg) { + posixEnv->Schedule(function, arg); + } + + virtual void StartThread(void (*function)(void* arg), void* arg) { + posixEnv->StartThread(function, arg); + } + + virtual Status GetTestDirectory(std::string* path) { + return posixEnv->GetTestDirectory(path); + } + + virtual uint64_t NowMicros() { + return posixEnv->NowMicros(); + } + + virtual void SleepForMicroseconds(int micros) { + posixEnv->SleepForMicroseconds(micros); + } + + static uint64_t gettid() { + assert(sizeof(pthread_t) <= sizeof(uint64_t)); + return (uint64_t)pthread_self(); + } + + private: + std::string fsname_; // string of the form "hdfs://hostname:port/" + hdfsFS fileSys_; // a single FileSystem object for all files + Env* posixEnv; // This object is derived from Env, but not from + // posixEnv. We have posixnv as an encapsulated + // object here so that we can use posix timers, + // posix threads, etc. + + /** + * If the URI is specified of the form hdfs://server:port/path, + * then connect to the specified cluster + * else connect to default. + */ + hdfsFS connectToPath(const std::string& uri) { + if (uri.empty()) { + return NULL; + } + if (uri.find(kProto) != 0) { + // uri doesn't start with hdfs:// -> use default:0, which is special + // to libhdfs. + return hdfsConnectNewInstance("default", 0); + } + const std::string hostport = uri.substr(kProto.length()); + + std::vector parts; + split(hostport, ':', parts); + if (parts.size() != 2) { + throw HdfsFatalException("Bad uri for hdfs " + uri); + } + // parts[0] = hosts, parts[1] = port/xxx/yyy + std::string host(parts[0]); + std::string remaining(parts[1]); + + int rem = remaining.find(pathsep); + std::string portStr = (rem == 0 ? remaining : + remaining.substr(0, rem)); + + tPort port; + port = atoi(portStr.c_str()); + if (port == 0) { + throw HdfsFatalException("Bad host-port for hdfs " + uri); + } + hdfsFS fs = hdfsConnectNewInstance(host.c_str(), port); + return fs; + } + + void split(const std::string &s, char delim, + std::vector &elems) { + elems.clear(); + size_t prev = 0; + size_t pos = s.find(delim); + while (pos != std::string::npos) { + elems.push_back(s.substr(prev, pos)); + prev = pos + 1; + pos = s.find(delim, prev); + } + elems.push_back(s.substr(prev, s.size())); + } +}; + +} // namespace leveldb + +#else // USE_HDFS + + +namespace leveldb { + +class HdfsEnv : public Env { + + public: + HdfsEnv(const std::string& fsname) { + fprintf(stderr, "You have not build leveldb with HDFS support\n"); + fprintf(stderr, "Please see hdfs/README for details\n"); + throw new std::exception(); + } + + virtual ~HdfsEnv() { + } + + virtual Status NewSequentialFile(const std::string& fname, + SequentialFile** result); + + virtual Status NewRandomAccessFile(const std::string& fname, + RandomAccessFile** result) {} + + virtual Status NewWritableFile(const std::string& fname, + WritableFile** result){} + + virtual bool FileExists(const std::string& fname){} + + virtual Status GetChildren(const std::string& path, + std::vector* result){} + + virtual Status DeleteFile(const std::string& fname){} + + virtual Status CreateDir(const std::string& name){} + + virtual Status DeleteDir(const std::string& name){} + + virtual Status GetFileSize(const std::string& fname, uint64_t* size){} + + virtual Status RenameFile(const std::string& src, const std::string& target){} + + virtual Status LockFile(const std::string& fname, FileLock** lock){} + + virtual Status UnlockFile(FileLock* lock){} + + virtual Status NewLogger(const std::string& fname, Logger** result){} + + virtual void Schedule( void (*function)(void* arg), void* arg) {} + + virtual void StartThread(void (*function)(void* arg), void* arg) {} + + virtual Status GetTestDirectory(std::string* path) {} + + virtual uint64_t NowMicros() {} + + virtual void SleepForMicroseconds(int micros) {} +}; +} + +#endif // USE_HDFS + +#endif // LEVELDB_HDFS_FILE_H diff --git a/hdfs/hdfs.h b/hdfs/hdfs.h new file mode 100644 index 000000000..1b1135651 --- /dev/null +++ b/hdfs/hdfs.h @@ -0,0 +1,490 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIBHDFS_HDFS_H +#define LIBHDFS_HDFS_H + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + +#ifndef O_RDONLY +#define O_RDONLY 1 +#endif + +#ifndef O_WRONLY +#define O_WRONLY 2 +#endif + +#ifndef EINTERNAL +#define EINTERNAL 255 +#endif + + +/** All APIs set errno to meaningful values */ +#ifdef __cplusplus +extern "C" { +#endif + + /** + * Some utility decls used in libhdfs. + */ + + typedef int32_t tSize; /// size of data for read/write io ops + typedef time_t tTime; /// time type in seconds + typedef int64_t tOffset;/// offset within the file + typedef uint16_t tPort; /// port + typedef enum tObjectKind { + kObjectKindFile = 'F', + kObjectKindDirectory = 'D', + } tObjectKind; + + + /** + * The C reflection of org.apache.org.hadoop.FileSystem . + */ + typedef void* hdfsFS; + + + /** + * The C equivalent of org.apache.org.hadoop.FSData(Input|Output)Stream . + */ + enum hdfsStreamType + { + UNINITIALIZED = 0, + INPUT = 1, + OUTPUT = 2, + }; + + + /** + * The 'file-handle' to a file in hdfs. + */ + struct hdfsFile_internal { + void* file; + enum hdfsStreamType type; + }; + typedef struct hdfsFile_internal* hdfsFile; + + + /** + * hdfsConnectAsUser - Connect to a hdfs file system as a specific user + * Connect to the hdfs. + * @param host A string containing either a host name, or an ip address + * of the namenode of a hdfs cluster. 'host' should be passed as NULL if + * you want to connect to local filesystem. 'host' should be passed as + * 'default' (and port as 0) to used the 'configured' filesystem + * (core-site/core-default.xml). + * @param port The port on which the server is listening. + * @param user the user name (this is hadoop domain user). Or NULL is equivelant to hhdfsConnect(host, port) + * @param groups the groups (these are hadoop domain groups) + * @return Returns a handle to the filesystem or NULL on error. + */ + hdfsFS hdfsConnectAsUser(const char* host, tPort port, const char *user , const char *groups[], int groups_size ); + + + /** + * hdfsConnect - Connect to a hdfs file system. + * Connect to the hdfs. + * @param host A string containing either a host name, or an ip address + * of the namenode of a hdfs cluster. 'host' should be passed as NULL if + * you want to connect to local filesystem. 'host' should be passed as + * 'default' (and port as 0) to used the 'configured' filesystem + * (core-site/core-default.xml). + * @param port The port on which the server is listening. + * @return Returns a handle to the filesystem or NULL on error. + */ + hdfsFS hdfsConnect(const char* host, tPort port); + + + /** + * This are the same as hdfsConnectAsUser except that every invocation returns a new FileSystem handle. + * Applications should call a hdfsDisconnect for every call to hdfsConnectAsUserNewInstance. + */ + hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port, const char *user , const char *groups[], int groups_size ); + hdfsFS hdfsConnectNewInstance(const char* host, tPort port); + hdfsFS hdfsConnectPath(const char* uri); + + /** + * hdfsDisconnect - Disconnect from the hdfs file system. + * Disconnect from hdfs. + * @param fs The configured filesystem handle. + * @return Returns 0 on success, -1 on error. + */ + int hdfsDisconnect(hdfsFS fs); + + + /** + * hdfsOpenFile - Open a hdfs file in given mode. + * @param fs The configured filesystem handle. + * @param path The full path to the file. + * @param flags - an | of bits/fcntl.h file flags - supported flags are O_RDONLY, O_WRONLY (meaning create or overwrite i.e., implies O_TRUNCAT), + * O_WRONLY|O_APPEND. Other flags are generally ignored other than (O_RDWR || (O_EXCL & O_CREAT)) which return NULL and set errno equal ENOTSUP. + * @param bufferSize Size of buffer for read/write - pass 0 if you want + * to use the default configured values. + * @param replication Block replication - pass 0 if you want to use + * the default configured values. + * @param blocksize Size of block - pass 0 if you want to use the + * default configured values. + * @return Returns the handle to the open file or NULL on error. + */ + hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags, + int bufferSize, short replication, tSize blocksize); + + + /** + * hdfsCloseFile - Close an open file. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @return Returns 0 on success, -1 on error. + */ + int hdfsCloseFile(hdfsFS fs, hdfsFile file); + + + /** + * hdfsExists - Checks if a given path exsits on the filesystem + * @param fs The configured filesystem handle. + * @param path The path to look for + * @return Returns 0 on exists, 1 on non-exists, -1/-2 on error. + */ + int hdfsExists(hdfsFS fs, const char *path); + + + /** + * hdfsSeek - Seek to given offset in file. + * This works only for files opened in read-only mode. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @param desiredPos Offset into the file to seek into. + * @return Returns 0 on success, -1 on error. + */ + int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos); + + + /** + * hdfsTell - Get the current offset in the file, in bytes. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @return Current offset, -1 on error. + */ + tOffset hdfsTell(hdfsFS fs, hdfsFile file); + + + /** + * hdfsRead - Read data from an open file. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @param buffer The buffer to copy read bytes into. + * @param length The length of the buffer. + * @return Returns the number of bytes actually read, possibly less + * than than length;-1 on error. + */ + tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length); + + + /** + * hdfsPread - Positional read of data from an open file. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @param position Position from which to read + * @param buffer The buffer to copy read bytes into. + * @param length The length of the buffer. + * @return Returns the number of bytes actually read, possibly less than + * than length;-1 on error. + */ + tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, + void* buffer, tSize length); + + + /** + * hdfsWrite - Write data into an open file. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @param buffer The data. + * @param length The no. of bytes to write. + * @return Returns the number of bytes written, -1 on error. + */ + tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void* buffer, + tSize length); + + + /** + * hdfsWrite - Flush the data. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @return Returns 0 on success, -1 on error. + */ + int hdfsFlush(hdfsFS fs, hdfsFile file); + + /** + * hdfsSync - Sync the data to persistent store. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @return Returns 0 on success, -1 on error. + */ + int hdfsSync(hdfsFS fs, hdfsFile file); + + /** + * hdfsGetNumReplicasInPipeline - get number of remaining replicas in + * pipeline + * @param fs The configured filesystem handle + * @param file the file handle + * @return returns the # of datanodes in the write pipeline; -1 on error + */ + int hdfsGetNumCurrentReplicas(hdfsFS, hdfsFile file); + + /** + * hdfsAvailable - Number of bytes that can be read from this + * input stream without blocking. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @return Returns available bytes; -1 on error. + */ + int hdfsAvailable(hdfsFS fs, hdfsFile file); + + + /** + * hdfsCopy - Copy file from one filesystem to another. + * @param srcFS The handle to source filesystem. + * @param src The path of source file. + * @param dstFS The handle to destination filesystem. + * @param dst The path of destination file. + * @return Returns 0 on success, -1 on error. + */ + int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst); + + + /** + * hdfsMove - Move file from one filesystem to another. + * @param srcFS The handle to source filesystem. + * @param src The path of source file. + * @param dstFS The handle to destination filesystem. + * @param dst The path of destination file. + * @return Returns 0 on success, -1 on error. + */ + int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst); + + + /** + * hdfsDelete - Delete file. + * @param fs The configured filesystem handle. + * @param path The path of the file. + * @return Returns 0 on success, -1 on error. + */ + int hdfsDelete(hdfsFS fs, const char* path); + + + /** + * hdfsRename - Rename file. + * @param fs The configured filesystem handle. + * @param oldPath The path of the source file. + * @param newPath The path of the destination file. + * @return Returns 0 on success, -1 on error. + */ + int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath); + + + /** + * hdfsGetWorkingDirectory - Get the current working directory for + * the given filesystem. + * @param fs The configured filesystem handle. + * @param buffer The user-buffer to copy path of cwd into. + * @param bufferSize The length of user-buffer. + * @return Returns buffer, NULL on error. + */ + char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize); + + + /** + * hdfsSetWorkingDirectory - Set the working directory. All relative + * paths will be resolved relative to it. + * @param fs The configured filesystem handle. + * @param path The path of the new 'cwd'. + * @return Returns 0 on success, -1 on error. + */ + int hdfsSetWorkingDirectory(hdfsFS fs, const char* path); + + + /** + * hdfsCreateDirectory - Make the given file and all non-existent + * parents into directories. + * @param fs The configured filesystem handle. + * @param path The path of the directory. + * @return Returns 0 on success, -1 on error. + */ + int hdfsCreateDirectory(hdfsFS fs, const char* path); + + + /** + * hdfsSetReplication - Set the replication of the specified + * file to the supplied value + * @param fs The configured filesystem handle. + * @param path The path of the file. + * @return Returns 0 on success, -1 on error. + */ + int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication); + + + /** + * hdfsFileInfo - Information about a file/directory. + */ + typedef struct { + tObjectKind mKind; /* file or directory */ + char *mName; /* the name of the file */ + tTime mLastMod; /* the last modification time for the file in seconds */ + tOffset mSize; /* the size of the file in bytes */ + short mReplication; /* the count of replicas */ + tOffset mBlockSize; /* the block size for the file */ + char *mOwner; /* the owner of the file */ + char *mGroup; /* the group associated with the file */ + short mPermissions; /* the permissions associated with the file */ + tTime mLastAccess; /* the last access time for the file in seconds */ + } hdfsFileInfo; + + + /** + * hdfsListDirectory - Get list of files/directories for a given + * directory-path. hdfsFreeFileInfo should be called to deallocate memory if + * the function returns non-NULL value. + * @param fs The configured filesystem handle. + * @param path The path of the directory. + * @param numEntries Set to the number of files/directories in path. + * @return Returns a dynamically-allocated array of hdfsFileInfo + * objects; NULL if empty or on error. + * on error, numEntries will be -1. + */ + hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, + int *numEntries); + + + /** + * hdfsGetPathInfo - Get information about a path as a (dynamically + * allocated) single hdfsFileInfo struct. hdfsFreeFileInfo should be + * called when the pointer is no longer needed. + * @param fs The configured filesystem handle. + * @param path The path of the file. + * @return Returns a dynamically-allocated hdfsFileInfo object; + * NULL on error. + */ + hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path); + + + /** + * hdfsFreeFileInfo - Free up the hdfsFileInfo array (including fields) + * @param hdfsFileInfo The array of dynamically-allocated hdfsFileInfo + * objects. + * @param numEntries The size of the array. + */ + void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries); + + + /** + * hdfsGetHosts - Get hostnames where a particular block (determined by + * pos & blocksize) of a file is stored. The last element in the array + * is NULL. Due to replication, a single block could be present on + * multiple hosts. + * @param fs The configured filesystem handle. + * @param path The path of the file. + * @param start The start of the block. + * @param length The length of the block. + * @return Returns a dynamically-allocated 2-d array of blocks-hosts; + * NULL on error. + */ + char*** hdfsGetHosts(hdfsFS fs, const char* path, + tOffset start, tOffset length); + + + /** + * hdfsFreeHosts - Free up the structure returned by hdfsGetHosts + * @param hdfsFileInfo The array of dynamically-allocated hdfsFileInfo + * objects. + * @param numEntries The size of the array. + */ + void hdfsFreeHosts(char ***blockHosts); + + + /** + * hdfsGetDefaultBlockSize - Get the optimum blocksize. + * @param fs The configured filesystem handle. + * @return Returns the blocksize; -1 on error. + */ + tOffset hdfsGetDefaultBlockSize(hdfsFS fs); + + + /** + * hdfsGetCapacity - Return the raw capacity of the filesystem. + * @param fs The configured filesystem handle. + * @return Returns the raw-capacity; -1 on error. + */ + tOffset hdfsGetCapacity(hdfsFS fs); + + + /** + * hdfsGetUsed - Return the total raw size of all files in the filesystem. + * @param fs The configured filesystem handle. + * @return Returns the total-size; -1 on error. + */ + tOffset hdfsGetUsed(hdfsFS fs); + + /** + * hdfsChown + * @param fs The configured filesystem handle. + * @param path the path to the file or directory + * @param owner this is a string in Hadoop land. Set to null or "" if only setting group + * @param group this is a string in Hadoop land. Set to null or "" if only setting user + * @return 0 on success else -1 + */ + int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group); + + /** + * hdfsChmod + * @param fs The configured filesystem handle. + * @param path the path to the file or directory + * @param mode the bitmask to set it to + * @return 0 on success else -1 + */ + int hdfsChmod(hdfsFS fs, const char* path, short mode); + + /** + * hdfsUtime + * @param fs The configured filesystem handle. + * @param path the path to the file or directory + * @param mtime new modification time or 0 for only set access time in seconds + * @param atime new access time or 0 for only set modification time in seconds + * @return 0 on success else -1 + */ + int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime); + +#ifdef __cplusplus +} +#endif + +#endif /*LIBHDFS_HDFS_H*/ + +/** + * vim: ts=4: sw=4: et + */ diff --git a/util/env_hdfs.cc b/util/env_hdfs.cc new file mode 100644 index 000000000..500e343d1 --- /dev/null +++ b/util/env_hdfs.cc @@ -0,0 +1,499 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +// Copyright (c) 2012 Facebook. All rights reserved. + +#ifdef USE_HDFS +#ifndef LEVELDB_HDFS_FILE_C +#define LEVELDB_HDFS_FILE_C + +#include +#include +#include +#include +#include +#include +#include "leveldb/env.h" +#include "leveldb/status.h" +#include "hdfs/hdfs.h" +#include "hdfs/env_hdfs.h" + +// +// This file defines an HDFS environment for leveldb. It uses the libhdfs +// api to access HDFS. All HDFS files created by one instance of leveldb +// will reside on the same HDFS cluster. +// + +namespace leveldb { + +namespace { + +// Log error message +static Status IOError(const std::string& context, int err_number) { + return Status::IOError(context, strerror(err_number)); +} + +// assume that there is one global logger for now. It is not thread-safe, +// but need not be because the logger is initialized at db-open time. +static Logger* mylog = NULL; + +// Used for reading a file from HDFS. It implements both sequential-read +// access methods as well as random read access methods. +class HdfsReadableFile: virtual public SequentialFile, virtual public RandomAccessFile { + private: + hdfsFS fileSys_; + std::string filename_; + hdfsFile hfile_; + + public: + HdfsReadableFile(hdfsFS fileSys, const std::string& fname) + : fileSys_(fileSys), filename_(fname), hfile_(NULL) { + Log(mylog, "[hdfs] HdfsReadableFile opening file %s\n", + filename_.c_str()); + hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_RDONLY, 0, 0, 0); + Log(mylog, "[hdfs] HdfsReadableFile opened file %s hfile_=0x%p\n", + filename_.c_str(), hfile_); + } + + virtual ~HdfsReadableFile() { + Log(mylog, "[hdfs] HdfsReadableFile closing file %s\n", + filename_.c_str()); + hdfsCloseFile(fileSys_, hfile_); + Log(mylog, "[hdfs] HdfsReadableFile closed file %s\n", + filename_.c_str()); + hfile_ = NULL; + } + + bool isValid() { + return hfile_ != NULL; + } + + // sequential access, read data at current offset in file + virtual Status Read(size_t n, Slice* result, char* scratch) { + Status s; + Log(mylog, "[hdfs] HdfsReadableFile reading %s %ld\n", + filename_.c_str(), n); + size_t bytes_read = hdfsRead(fileSys_, hfile_, scratch, (tSize)n); + Log(mylog, "[hdfs] HdfsReadableFile read %s\n", filename_.c_str()); + *result = Slice(scratch, bytes_read); + if (bytes_read < n) { + if (feof()) { + // We leave status as ok if we hit the end of the file + } else { + // A partial read with an error: return a non-ok status + s = IOError(filename_, errno); + } + } + return s; + } + + // random access, read data from specified offset in file + virtual Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const { + Status s; + Log(mylog, "[hdfs] HdfsReadableFile preading %s\n", filename_.c_str()); + ssize_t bytes_read = hdfsPread(fileSys_, hfile_, offset, + (void*)scratch, (tSize)n); + Log(mylog, "[hdfs] HdfsReadableFile pread %s\n", filename_.c_str()); + *result = Slice(scratch, (bytes_read < 0) ? 0 : bytes_read); + if (bytes_read < 0) { + // An error: return a non-ok status + s = IOError(filename_, errno); + } + return s; + } + + virtual Status Skip(uint64_t n) { + Log(mylog, "[hdfs] HdfsReadableFile skip %s\n", filename_.c_str()); + // get current offset from file + tOffset current = hdfsTell(fileSys_, hfile_); + if (current < 0) { + return IOError(filename_, errno); + } + // seek to new offset in file + tOffset newoffset = current + n; + int val = hdfsSeek(fileSys_, hfile_, newoffset); + if (val < 0) { + return IOError(filename_, errno); + } + return Status::OK(); + } + + private: + + // returns true if we are at the end of file, false otherwise + bool feof() { + Log(mylog, "[hdfs] HdfsReadableFile feof %s\n", filename_.c_str()); + if (hdfsTell(fileSys_, hfile_) == fileSize()) { + return true; + } + return false; + } + + // the current size of the file + tOffset fileSize() { + Log(mylog, "[hdfs] HdfsReadableFile fileSize %s\n", filename_.c_str()); + hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, filename_.c_str()); + tOffset size = 0L; + if (pFileInfo != NULL) { + size = pFileInfo->mSize; + hdfsFreeFileInfo(pFileInfo, 1); + } else { + throw new leveldb::HdfsFatalException("fileSize on unknown file " + + filename_); + } + return size; + } +}; + +// Appends to an existing file in HDFS. +class HdfsWritableFile: public WritableFile { + private: + hdfsFS fileSys_; + std::string filename_; + hdfsFile hfile_; + + public: + HdfsWritableFile(hdfsFS fileSys, const std::string& fname) + : fileSys_(fileSys), filename_(fname) , hfile_(NULL) { + Log(mylog, "[hdfs] HdfsWritableFile opening %s\n", filename_.c_str()); + hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_WRONLY, 0, 0, 0); + Log(mylog, "[hdfs] HdfsWritableFile opened %s\n", filename_.c_str()); + assert(hfile_ != NULL); + } + virtual ~HdfsWritableFile() { + if (hfile_ != NULL) { + Log(mylog, "[hdfs] HdfsWritableFile closing %s\n", filename_.c_str()); + hdfsCloseFile(fileSys_, hfile_); + Log(mylog, "[hdfs] HdfsWritableFile closed %s\n", filename_.c_str()); + hfile_ = NULL; + } + } + + // If the file was successfully created, then this returns true. + // Otherwise returns false. + bool isValid() { + return hfile_ != NULL; + } + + // The name of the file, mostly needed for debug logging. + const std::string& getName() { + return filename_; + } + + virtual Status Append(const Slice& data) { + Log(mylog, "[hdfs] HdfsWritableFile Append %s\n", filename_.c_str()); + const char* src = data.data(); + size_t left = data.size(); + size_t ret = hdfsWrite(fileSys_, hfile_, src, left); + Log(mylog, "[hdfs] HdfsWritableFile Appended %s\n", filename_.c_str()); + if (ret != left) { + return IOError(filename_, errno); + } + return Status::OK(); + } + + virtual Status Flush() { + return Status::OK(); + } + + virtual Status Sync() { + Status s; + Log(mylog, "[hdfs] HdfsWritableFile Sync %s\n", filename_.c_str()); + if (hdfsFlush(fileSys_, hfile_) == -1) { + return IOError(filename_, errno); + } + if (hdfsSync(fileSys_, hfile_) == -1) { + return IOError(filename_, errno); + } + Log(mylog, "[hdfs] HdfsWritableFile Synced %s\n", filename_.c_str()); + return Status::OK(); + } + + // This is used by HdfsLogger to write data to the debug log file + virtual Status Append(const char* src, size_t size) { + if (hdfsWrite(fileSys_, hfile_, src, size) != (tSize)size) { + return IOError(filename_, errno); + } + return Status::OK(); + } + + virtual Status Close() { + Log(mylog, "[hdfs] HdfsWritableFile closing %s\n", filename_.c_str()); + if (hdfsCloseFile(fileSys_, hfile_) != 0) { + return IOError(filename_, errno); + } + Log(mylog, "[hdfs] HdfsWritableFile closed %s\n", filename_.c_str()); + hfile_ = NULL; + return Status::OK(); + } +}; + +// The object that implements the debug logs to reside in HDFS. +class HdfsLogger : public Logger { + private: + HdfsWritableFile* file_; + uint64_t (*gettid_)(); // Return the thread id for the current thread + + public: + HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)()) + : file_(f), gettid_(gettid) { + Log(mylog, "[hdfs] HdfsLogger opened %s\n", + file_->getName().c_str()); + } + + virtual ~HdfsLogger() { + Log(mylog, "[hdfs] HdfsLogger closed %s\n", + file_->getName().c_str()); + delete file_; + if (mylog != NULL && mylog == this) { + mylog = NULL; + } + } + + virtual void Logv(const char* format, va_list ap) { + const uint64_t thread_id = (*gettid_)(); + + // We try twice: the first time with a fixed-size stack allocated buffer, + // and the second time with a much larger dynamically allocated buffer. + char buffer[500]; + for (int iter = 0; iter < 2; iter++) { + char* base; + int bufsize; + if (iter == 0) { + bufsize = sizeof(buffer); + base = buffer; + } else { + bufsize = 30000; + base = new char[bufsize]; + } + char* p = base; + char* limit = base + bufsize; + + struct timeval now_tv; + gettimeofday(&now_tv, NULL); + const time_t seconds = now_tv.tv_sec; + struct tm t; + localtime_r(&seconds, &t); + p += snprintf(p, limit - p, + "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ", + t.tm_year + 1900, + t.tm_mon + 1, + t.tm_mday, + t.tm_hour, + t.tm_min, + t.tm_sec, + static_cast(now_tv.tv_usec), + static_cast(thread_id)); + + // Print the message + if (p < limit) { + va_list backup_ap; + va_copy(backup_ap, ap); + p += vsnprintf(p, limit - p, format, backup_ap); + va_end(backup_ap); + } + + // Truncate to available space if necessary + if (p >= limit) { + if (iter == 0) { + continue; // Try again with larger buffer + } else { + p = limit - 1; + } + } + + // Add newline if necessary + if (p == base || p[-1] != '\n') { + *p++ = '\n'; + } + + assert(p <= limit); + file_->Append(base, p-base); + file_->Flush(); + if (base != buffer) { + delete[] base; + } + break; + } + } +}; + +} // namespace + +// Finally, the hdfs environment + +// open a file for sequential reading +Status HdfsEnv::NewSequentialFile(const std::string& fname, + SequentialFile** result) { + HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname); + if (f == NULL) { + *result = NULL; + return IOError(fname, errno); + } + *result = dynamic_cast(f); + return Status::OK(); +} + +// open a file for random reading +Status HdfsEnv::NewRandomAccessFile(const std::string& fname, + RandomAccessFile** result) { + HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname); + if (f == NULL) { + *result = NULL; + return IOError(fname, errno); + } + *result = dynamic_cast(f); + return Status::OK(); +} + +// create a new file for writing +Status HdfsEnv::NewWritableFile(const std::string& fname, + WritableFile** result) { + Status s; + HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname); + if (f == NULL || !f->isValid()) { + *result = NULL; + return IOError(fname, errno); + } + *result = dynamic_cast(f); + return Status::OK(); +} + +bool HdfsEnv::FileExists(const std::string& fname) { + int value = hdfsExists(fileSys_, fname.c_str()); + if (value == 0) { + return true; + } + return false; +} + +Status HdfsEnv::GetChildren(const std::string& path, + std::vector* result) { + int value = hdfsExists(fileSys_, path.c_str()); + switch (value) { + case 0: { + int numEntries = 0; + hdfsFileInfo* pHdfsFileInfo = 0; + pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries); + if (numEntries >= 0) { + for(int i = 0; i < numEntries; i++) { + char* pathname = pHdfsFileInfo[i].mName; + char* filename = rindex(pathname, '/'); + if (filename != NULL) { + result->push_back(filename+1); + } + } + if (pHdfsFileInfo != NULL) { + hdfsFreeFileInfo(pHdfsFileInfo, numEntries); + } + } else { + // numEntries < 0 indicates error + Log(mylog, "hdfsListDirectory call failed with error "); + throw HdfsFatalException("hdfsListDirectory call failed negative error.\n"); + } + break; + } + case 1: // directory does not exist, exit + break; + default: // anything else should be an error + Log(mylog, "hdfsListDirectory call failed with error "); + throw HdfsFatalException("hdfsListDirectory call failed with error.\n"); + } + return Status::OK(); +} + +Status HdfsEnv::DeleteFile(const std::string& fname) { + if (hdfsDelete(fileSys_, fname.c_str()) == 0) { + return Status::OK(); + } + return IOError(fname, errno); +}; + +Status HdfsEnv::CreateDir(const std::string& name) { + if (hdfsCreateDirectory(fileSys_, name.c_str()) == 0) { + return Status::OK(); + } + return IOError(name, errno); +}; + +Status HdfsEnv::DeleteDir(const std::string& name) { + return DeleteFile(name); +}; + +Status HdfsEnv::GetFileSize(const std::string& fname, uint64_t* size) { + *size = 0L; + hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str()); + if (pFileInfo != NULL) { + *size = pFileInfo->mSize; + hdfsFreeFileInfo(pFileInfo, 1); + return Status::OK(); + } + return IOError(fname, errno); +} + +// The rename is not atomic. HDFS does not allow a renaming if the +// target already exists. So, we delete the target before attemting the +// rename. +Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) { + hdfsDelete(fileSys_, target.c_str()); + if (hdfsRename(fileSys_, src.c_str(), target.c_str()) == 0) { + return Status::OK(); + } + return IOError(src, errno); +} + +Status HdfsEnv::LockFile(const std::string& fname, FileLock** lock) { + // there isn's a very good way to atomically check and create + // a file via libhdfs + *lock = NULL; + return Status::OK(); +} + +Status HdfsEnv::UnlockFile(FileLock* lock) { + return Status::OK(); +} + +Status HdfsEnv::NewLogger(const std::string& fname, Logger** result) { + HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname); + if (f == NULL || !f->isValid()) { + *result = NULL; + return IOError(fname, errno); + } + HdfsLogger* h = new HdfsLogger(f, &HdfsEnv::gettid); + *result = h; + if (mylog == NULL) { + // mylog = h; // uncomment this for detailed logging + } + return Status::OK(); +} + +} // namespace leveldb + +#endif // LEVELDB_HDFS_FILE_C + +#else // USE_HDFS + +// dummy placeholders used when HDFS is not available +#include "leveldb/env.h" +#include "hdfs/env_hdfs.h" +namespace leveldb { +Status HdfsEnv::NewSequentialFile(const std::string& fname, + SequentialFile** result) {} +} + +#endif