/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #ifndef THRIFT_ASYNC_TNOTIFICATIONPIPE_H #define THRIFT_ASYNC_TNOTIFICATIONPIPE_H 1 #include "thrift/lib/cpp/async/TDelayedDestruction.h" #include "thrift/lib/cpp/async/TEventBase.h" #include "thrift/lib/cpp/async/TEventHandler.h" #include "thrift/lib/cpp/concurrency/Mutex.h" #include #include #include namespace apache { namespace thrift { namespace async { /** * A simple notification pipe for sending messages to a TEventBase thread. * * TNotificationPipe is a unidirectional pipe for sending small, atomic * messages. * * TNotificationPipe cannot be send messages larger than a fixed size. * TNotificationPipe::kMaxMessageSize defines the maximum message size * supported. If you need to pass larger amounts of data between threads, * consider just passing a pointer to the data over the pipe, and using some * external mechanism to synchronize management of the memory. * * * TNotificationPipe provides two parallel APIs for writing and closing the * pipe: a thread-safe version and a non-thread-safe version. Which version to * use depends on how the caller uses the pipe: * * - If there is only a single writer thread, you can use the non-thread-safe * versions of trySendMessage() and close(). This guarantees close() is * never called by one thread while another thread is attempting to send a * message. * * - If there are multiple writers, but the pipe is never closed by the * writers, you can use the non-thread-safe version of trySendMessage(). * Multiple simultaneous trySendMessage() calls will not interfere with each * other. Since none of the writer threads call close, a call to close() * cannot be running simultaneously with a write attempt. (With this model, * the TNotificationPipe is never closed until it is destroyed. It is up to * the caller to ensure the TNotificationPipe is not destroyed while write * threads still have a pointer or reference to it.) * * In other circumstances (if one thread may call close while another thread is * simultaneously trying to write), the thread-safe versions * trySendMessageSync() and closeSync() must be used. */ class TNotificationPipe : public TDelayedDestruction, private TEventHandler, private TEventBase::LoopCallback { public: /** * A callback interface for receiving notification of messages from the pipe. */ class Callback { public: virtual ~Callback() {} /** * notificationMessage() will be invoked whenever a new * message is available from the pipe. */ virtual void notificationMessage(const void *msg, uint32_t msgSize) = 0; /** * notificationPipeError() will be invoked if an error occurs while reading * from the pipe. Before notificationPipeError() is invoked, the read * callback will automatically be uninstalled and the pipe will be closed. */ virtual void notificationPipeError(const std::exception& ex) = 0; /** * notificationPipeClosed() is invoked in the read thread after the write * end of the pipe is closed. */ virtual void notificationPipeClosed() = 0; }; /** * Helper function to create a new shared_ptr. * * This simply sets the correct destructor to call destroy() instead of * directly deleting the TNotificationPipe. */ static boost::shared_ptr newPipe(TEventBase *base) { return boost::shared_ptr(new TNotificationPipe(base), Destructor()); } /** * Create a new TNotificationPipe. * * @param eventBase The TEventBase to use for receiving read notifications * from this pipe. All read events will be processed in this * TEventBase's thread. trySendMessage() may be called from any thread. */ TNotificationPipe(TEventBase *eventBase); /** * Destroy this TNotificationPipe. * * This method may only be called from the read thread. * * This will automatically close the pipe if it is not already closed. */ virtual void destroy(); /** * Close the pipe. * * This version of close() is not thread-safe. It should only be used if the * caller is sure no other thread is attempting to write a message at the * same time. * * Use closeSync() if other threads may be attempting to send a message * simultaneously. The other threads must use also use the thread-safe * trySendMessageSync() or trySendFrameSync() calls. */ void close(); /** * A thread-safe version of close(). */ void closeSync(); /** * Send a message over the pipe. * * trySendMessage() is best-effort. It will either immediately succeed to * send the message, or it will fail immediately if the pipe reader is too * busy and it's backlog of unread messages is too large. * * trySendMessage() also does not support arbitrarily large messages. * It will also fail immediately if msgSize is larger than (PIPE_BUF - 4). * * If trySendMessage() succeeds, the message is guaranteed to be delivered to * the pipe reader, except in the case where the pipe reader explicitly stops * reading and destroys the pipe before processing all of its messages. * * On failure a TTransportException is thrown. The error code will be * TTransportException::BAD_ARGS if the message is too large, * TTransportException::TIMED_OUT if the message cannot be sent right now * because the pipe is full, or TTransportException::NOT_OPEN if the pipe has * already been closed. * * This method is thread safe with other simultaneous trySendMessage() calls, * but not with close() calls. Use trySendMessageSync() and closeSync() if a * close may occur simultaneously on another thread. */ void trySendMessage(const void *msg, uint32_t msgSize); /** * A thread-safe version of trySendMessage(). * * This may be called simultaneously with closeSync(). */ void trySendMessageSync(const void *msg, uint32_t msgSize); /** * Send a message over the pipe. * * This is identical to trySendMessage(), except that the caller must provide * 4 bytes at the beginning of the message where we can write a frame length. * This allows us to avoid copying the message into a new buffer. * (trySendMessage() always has to make a copy of the message.) * * @param frame A pointer to the frame buffer. trySendFrame() will * overwrite the first 4 bytes of this buffer. When the read callback * receives the message, it will not see these first 4 bytes. * @param frameSize The full size of the frame buffer. This must be at * least 4 bytes long. The actual message size that will be sent is * frameSize - 4. */ void trySendFrame(void *frame, uint32_t frameSize); /** * A thread-safe version of trySendFrame(). * * This may be called simultaneously with closeSync(). */ void trySendFrameSync(void *frame, uint32_t frameSize); /** * Get the number of messages which haven't been processed. */ int64_t getNumNotProcessed() const { return numInputs_ - numOutputs_; } /** * Set the callback to receive read notifications from this pipe. * * This method must be invoked from the pipe's read thread. * * May throw TLibraryException on error. The callback will always be unset * (NULL) after an error. */ void setReadCallback(Callback *callback); /** * Mark the pipe read event handler as an "internal" event handler. * * This causes the notification pipe not to be counted when determining if * the TEventBase has any more active events to wait on. This is intended to * be used only be internal TEventBase code. This API is not guaranteed to * remain stable or portable in the future. * * May throw TLibraryException if it fails to re-register its event handler * with the correct flags. */ void setInternal(bool internal); /** * Get the maximum number of messages that will be read on a single iteration * of the event loop. */ uint32_t getMaxReadAtOnce() const { return maxReadAtOnce_; } /** * Set the maximum number of messages to read each iteration of the event * loop. * * If messages are being received faster than they can be processed, this * helps limit the rate at which they will be read. This can be used to * prevent the notification pipe reader from starving other users of the * event loop. */ void setMaxReadAtOnce(uint32_t numMessages) { maxReadAtOnce_ = numMessages; } /** * The maximum message size that can be sent over a TNotificationPipe. * * This restriction ensures that trySendMessage() can send all messages * atomically. This is (PIPE_BUF - 4) bytes. (On Linux, this is 4092 * bytes.) */ static const uint32_t kMaxMessageSize = PIPE_BUF - 4; /** * The default maximum number of messages that will be read each time around * the event loop. * * This value used for each TNotificationPipe can be changed using the * setMaxReadAtOnce() method. */ static const uint32_t kDefaultMaxReadAtOnce = 10; private: enum ReadAction { kDoNothing, kContinue, kWaitForRead, kRunInNextLoop, }; // Forbidden copy constructor and assignment opererator TNotificationPipe(TNotificationPipe const &); TNotificationPipe& operator=(TNotificationPipe const &); // TEventHandler methods virtual void handlerReady(uint16_t events) THRIFT_NOEXCEPT; // TEventBase::LoopCallback methods virtual void runLoopCallback() THRIFT_NOEXCEPT; void initPipe(); void registerPipeEvent(); void readMessages(ReadAction action); ReadAction performRead(); ReadAction processReadData(uint32_t* messagesProcessed); ReadAction handleError(const char* fmt, ...) __attribute__((format(printf, 2, 3))); void checkMessage(uint32_t msgSize); void writeFrame(const void *frame, uint32_t frameSize); TEventBase *eventBase_; Callback *readCallback_; int readPipe_; int writePipe_; bool internal_; uint32_t maxReadAtOnce_; int64_t numInputs_; int64_t numOutputs_; /** * Mutex for guarding numInputs_ */ concurrency::Mutex numInputsMutex_; /** * A mutex that guards writePipe_. * * This is used by closeSync(), trySendMessageSync(), and trySendFrameSync(), * since trySendMessageSync() and trySendFrameSync() read writePipe_ * and closeSync() resets it to -1. */ concurrency::NoStarveReadWriteMutex writePipeMutex_; /** * A pointer to the end of valid read data in the read buffer. */ uint8_t *readPtr_; /** * An internal read buffer * * This is large enough to contain the maximum possible message plus the * mssage length. */ uint8_t readBuffer_[kMaxMessageSize + 4]; }; }}} // apache::thrift::async #endif // THRIFT_ASYNC_TNOTIFICATIONPIPE_H