205 lines
5.7 KiB
C
205 lines
5.7 KiB
C
|
/*
|
||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||
|
* or more contributor license agreements. See the NOTICE file
|
||
|
* distributed with this work for additional information
|
||
|
* regarding copyright ownership. The ASF licenses this file
|
||
|
* to you under the Apache License, Version 2.0 (the
|
||
|
* "License"); you may not use this file except in compliance
|
||
|
* with the License. You may obtain a copy of the License at
|
||
|
*
|
||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||
|
*
|
||
|
* Unless required by applicable law or agreed to in writing,
|
||
|
* software distributed under the License is distributed on an
|
||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||
|
* KIND, either express or implied. See the License for the
|
||
|
* specific language governing permissions and limitations
|
||
|
* under the License.
|
||
|
*/
|
||
|
#ifndef THRIFT_ASYNC_THTTPASYNCCHANNEL_H_
|
||
|
#define THRIFT_ASYNC_THTTPASYNCCHANNEL_H_ 1
|
||
|
|
||
|
#include "thrift/lib/cpp/async/TStreamAsyncChannel.h"
|
||
|
#include "thrift/lib/cpp/util/THttpParser.h"
|
||
|
|
||
|
namespace apache { namespace thrift { namespace async {
|
||
|
|
||
|
class THttpAsyncChannel;
|
||
|
|
||
|
namespace detail {
|
||
|
|
||
|
/**
|
||
|
* Encapsulation of one outstanding write request on a THttpAsyncChannel.
|
||
|
*/
|
||
|
class THttpACWriteRequest :
|
||
|
public TAsyncChannelWriteRequestBase<THttpACWriteRequest> {
|
||
|
public:
|
||
|
typedef std::tr1::function<void()> VoidCallback;
|
||
|
|
||
|
THttpACWriteRequest(const VoidCallback& callback,
|
||
|
const VoidCallback& errorCallback,
|
||
|
transport::TMemoryBuffer* message,
|
||
|
TAsyncEventChannel* channel);
|
||
|
|
||
|
void write(TAsyncTransport* transport,
|
||
|
TAsyncTransport::WriteCallback* callback) THRIFT_NOEXCEPT;
|
||
|
|
||
|
void writeSuccess() THRIFT_NOEXCEPT;
|
||
|
void writeError(size_t bytesWritten,
|
||
|
const transport::TTransportException& ex) THRIFT_NOEXCEPT;
|
||
|
|
||
|
private:
|
||
|
char lengthBuf_[64];
|
||
|
THttpAsyncChannel* channel_;
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* Read state for THttpAsyncChannel
|
||
|
*/
|
||
|
class THttpACReadState {
|
||
|
public:
|
||
|
typedef std::tr1::function<void()> VoidCallback;
|
||
|
|
||
|
THttpACReadState() {
|
||
|
}
|
||
|
|
||
|
// Methods required by TStreamAsyncChannel
|
||
|
|
||
|
void setCallbackBuffer(transport::TMemoryBuffer* buffer) {
|
||
|
parser_->setDataBuffer(buffer);
|
||
|
}
|
||
|
void unsetCallbackBuffer() {
|
||
|
parser_->unsetDataBuffer();
|
||
|
}
|
||
|
|
||
|
bool hasReadAheadData() {
|
||
|
return parser_->hasReadAheadData();
|
||
|
}
|
||
|
bool hasPartialMessage() {
|
||
|
return parser_->hasPartialMessage();
|
||
|
}
|
||
|
|
||
|
void getReadBuffer(void** bufReturn, size_t* lenReturn);
|
||
|
bool readDataAvailable(size_t len);
|
||
|
|
||
|
// Other methods specific to THttpAsyncChannel
|
||
|
void setParser(boost::shared_ptr<apache::thrift::util::THttpParser> parser) {
|
||
|
parser_ = parser;
|
||
|
}
|
||
|
private:
|
||
|
boost::shared_ptr<apache::thrift::util::THttpParser> parser_;
|
||
|
};
|
||
|
|
||
|
} // namespace detail
|
||
|
|
||
|
/**
|
||
|
* THttpAsyncChannel
|
||
|
*
|
||
|
* This is a TAsyncChannel implementation that reads and writes messages
|
||
|
* encapuated in HTTP.
|
||
|
*
|
||
|
* Its messages are compatible with THttpTransport.
|
||
|
*/
|
||
|
class THttpAsyncChannel :
|
||
|
public TStreamAsyncChannel<detail::THttpACWriteRequest,
|
||
|
detail::THttpACReadState> {
|
||
|
private:
|
||
|
typedef TStreamAsyncChannel<detail::THttpACWriteRequest,
|
||
|
detail::THttpACReadState> Parent;
|
||
|
boost::shared_ptr<apache::thrift::util::THttpParser> parser_;
|
||
|
|
||
|
public:
|
||
|
explicit THttpAsyncChannel(
|
||
|
const boost::shared_ptr<TAsyncTransport>& transport)
|
||
|
: Parent(transport) {
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Helper function to create a shared_ptr<THttpAsyncChannel>.
|
||
|
*
|
||
|
* This passes in the correct destructor object, since THttpAsyncChannel's
|
||
|
* destructor is protected and cannot be invoked directly.
|
||
|
*/
|
||
|
static boost::shared_ptr<THttpAsyncChannel> newChannel(
|
||
|
const boost::shared_ptr<TAsyncTransport>& transport) {
|
||
|
return boost::shared_ptr<THttpAsyncChannel>(
|
||
|
new THttpAsyncChannel(transport), Destructor());
|
||
|
}
|
||
|
|
||
|
/// size in bytes beyond which we'll reject a given http size.
|
||
|
void setMaxHttpSize(uint32_t size) {
|
||
|
parser_->setMaxSize(size);
|
||
|
}
|
||
|
|
||
|
uint32_t getMaxHttpSize() const {
|
||
|
return parser_->getMaxSize();
|
||
|
}
|
||
|
|
||
|
void setParser(boost::shared_ptr<apache::thrift::util::THttpParser> parser) {
|
||
|
parser_ = parser;
|
||
|
readState_.setParser(parser);
|
||
|
}
|
||
|
|
||
|
boost::shared_ptr<apache::thrift::util::THttpParser> getParser() const {
|
||
|
return parser_;
|
||
|
}
|
||
|
|
||
|
int constructHeader(iovec* ops,
|
||
|
int opsLen,
|
||
|
int contentLength,
|
||
|
char* contentLengthBuf) {
|
||
|
return parser_->constructHeader(ops,
|
||
|
opsLen,
|
||
|
contentLength,
|
||
|
contentLengthBuf);
|
||
|
}
|
||
|
|
||
|
protected:
|
||
|
/**
|
||
|
* Protected destructor.
|
||
|
*
|
||
|
* Users of THttpAsyncChannel must never delete it directly. Instead,
|
||
|
* invoke destroy().
|
||
|
*/
|
||
|
virtual ~THttpAsyncChannel() { }
|
||
|
};
|
||
|
|
||
|
class THttpAsyncChannelFactory : public TStreamAsyncChannelFactory {
|
||
|
public:
|
||
|
THttpAsyncChannelFactory()
|
||
|
: maxHttpSize_(0x7fffffff)
|
||
|
, recvTimeout_(0)
|
||
|
, sendTimeout_(0) {}
|
||
|
|
||
|
void setMaxHttpSize(uint32_t bytes) {
|
||
|
maxHttpSize_ = bytes;
|
||
|
}
|
||
|
|
||
|
void setRecvTimeout(uint32_t milliseconds) {
|
||
|
recvTimeout_ = milliseconds;
|
||
|
}
|
||
|
|
||
|
void setSendTimeout(uint32_t milliseconds) {
|
||
|
sendTimeout_ = milliseconds;
|
||
|
}
|
||
|
|
||
|
virtual boost::shared_ptr<TAsyncEventChannel> newChannel(
|
||
|
const boost::shared_ptr<TAsyncTransport>& transport) {
|
||
|
boost::shared_ptr<THttpAsyncChannel> channel(
|
||
|
THttpAsyncChannel::newChannel(transport));
|
||
|
transport->setSendTimeout(sendTimeout_);
|
||
|
channel->setMaxHttpSize(maxHttpSize_);
|
||
|
channel->setRecvTimeout(recvTimeout_);
|
||
|
return channel;
|
||
|
}
|
||
|
|
||
|
private:
|
||
|
uint32_t maxHttpSize_;
|
||
|
uint32_t recvTimeout_;
|
||
|
uint32_t sendTimeout_;
|
||
|
};
|
||
|
|
||
|
}}} // apache::thrift::async
|
||
|
|
||
|
#endif // THRIFT_ASYNC_THTTPASYNCCHANNEL_H_
|