80c663882a
Summary: First draft. Unit tests pass. Test Plan: unit tests attached Reviewers: heyongqiang Reviewed By: heyongqiang Differential Revision: https://reviews.facebook.net/D3969
350 lines
10 KiB
C++
350 lines
10 KiB
C++
/*
|
|
* Licensed to the Apache Software Foundation (ASF) under one
|
|
* or more contributor license agreements. See the NOTICE file
|
|
* distributed with this work for additional information
|
|
* regarding copyright ownership. The ASF licenses this file
|
|
* to you under the Apache License, Version 2.0 (the
|
|
* "License"); you may not use this file except in compliance
|
|
* with the License. You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing,
|
|
* software distributed under the License is distributed on an
|
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
* KIND, either express or implied. See the License for the
|
|
* specific language governing permissions and limitations
|
|
* under the License.
|
|
*/
|
|
|
|
#ifndef _THRIFT_TRANSPORT_TMEMPAGEDTRANSPORT_HPP_
|
|
#define _THRIFT_TRANSPORT_TMEMPAGEDTRANSPORT_HPP_ 1
|
|
|
|
#include "thrift/lib/cpp/transport/TMemPagedTransport.h"
|
|
#include <cstring>
|
|
|
|
namespace apache { namespace thrift { namespace transport {
|
|
|
|
/**
|
|
* Reset buffer for read
|
|
*/
|
|
inline void TMemPagedTransport::resetForRead() {
|
|
currentReadPage_ = headPage_;
|
|
currentReadOffset_ = 0;
|
|
}
|
|
|
|
/**
|
|
* Reset buffer for write
|
|
*/
|
|
inline void TMemPagedTransport::resetForWrite(bool releaseAllPages) {
|
|
FixedSizeMemoryPage* discardPage = NULL;
|
|
if (releaseAllPages) { // release all pages
|
|
discardPage = headPage_;
|
|
headPage_ = NULL;
|
|
} else { // make sure head page left or created
|
|
if (!headPage_) { // allocate page if any
|
|
headPage_ = factory_->getPage();
|
|
} else {
|
|
discardPage = headPage_->next_;
|
|
headPage_->next_ = NULL;
|
|
}
|
|
}
|
|
while (discardPage) {
|
|
FixedSizeMemoryPage* pageForDeletion = discardPage;
|
|
discardPage = discardPage->next_;
|
|
factory_->returnPage(pageForDeletion);
|
|
}
|
|
|
|
// reset
|
|
currentReadPage_ = currentWritePage_ = headPage_;
|
|
currentReadOffset_ = currentWriteOffset_ = 0;
|
|
}
|
|
|
|
/**
|
|
* Returns number of bytes already read from buffer
|
|
*/
|
|
inline uint32_t TMemPagedTransport::getReadBytes() const {
|
|
uint32_t ret = 0;
|
|
FixedSizeMemoryPage* page = headPage_;
|
|
while (page) {
|
|
if (page == currentReadPage_) {
|
|
ret += currentReadOffset_;
|
|
break;
|
|
} else {
|
|
ret += pageSize_;
|
|
}
|
|
page = page->next_;
|
|
}
|
|
// return result
|
|
return ret;
|
|
}
|
|
|
|
/**
|
|
* Returns current bytes written in buffer
|
|
*/
|
|
inline uint32_t TMemPagedTransport::getWrittenBytes() const {
|
|
uint32_t ret = 0;
|
|
FixedSizeMemoryPage* page = headPage_;
|
|
while (page) {
|
|
if (page == currentWritePage_) {
|
|
ret += currentWriteOffset_;
|
|
break;
|
|
} else {
|
|
ret += pageSize_;
|
|
}
|
|
page = page->next_;
|
|
}
|
|
// return result
|
|
return ret;
|
|
}
|
|
|
|
/**
|
|
* Implementations of the base functions
|
|
*/
|
|
inline uint32_t TMemPagedTransport::read(uint8_t* buf, uint32_t len) {
|
|
if (!buf) {
|
|
throw TTransportException(TTransportException::BAD_ARGS);
|
|
}
|
|
return get(buf, len);
|
|
}
|
|
|
|
inline void TMemPagedTransport::write(const uint8_t* buf, uint32_t len) {
|
|
if (!buf) {
|
|
throw TTransportException(TTransportException::BAD_ARGS);
|
|
}
|
|
set(buf, len);
|
|
}
|
|
|
|
/**
|
|
* interface extension, not part of the TTrasport
|
|
* similar to consume but for the write
|
|
*/
|
|
inline void TMemPagedTransport::skip(uint32_t len) {
|
|
set(NULL, len);
|
|
}
|
|
|
|
inline const uint8_t* TMemPagedTransport::borrow(uint8_t* buf, uint32_t* len) {
|
|
// - possible scenarios
|
|
// - buf is NULL
|
|
// -- available bytes for current page >= len - (return internal pointer)
|
|
// -- available bytes for current page < len - return NULL, set len 0
|
|
// - buf is not NULL
|
|
// -- available bytes for current page >= len - (return internal pointer)
|
|
// -- available bytes for current page < len - copy to the external buffer
|
|
FixedSizeMemoryPage* page = currentReadPage_;
|
|
uint32_t readOffset = currentReadOffset_;
|
|
if (!page) { // no memory available
|
|
*len = 0;
|
|
return NULL;
|
|
}
|
|
|
|
uint32_t capacity = *len; // requested capacity
|
|
uint32_t available = 0;
|
|
if (page == currentWritePage_) {
|
|
available = currentWriteOffset_ - readOffset;
|
|
} else {
|
|
available = pageSize_ - readOffset;
|
|
}
|
|
|
|
if (available >= capacity) { // enough available bytes
|
|
*len = available; // assign available bytes
|
|
return page->buffer_ + readOffset; // return readable memory pointer
|
|
}
|
|
// not enough available bytes on the current page
|
|
if (!buf) { // buffer is not provided
|
|
*len = 0;
|
|
return NULL;
|
|
}
|
|
// buffer is provided, make memory copy from current page if any
|
|
available = std::min(available, capacity);
|
|
if (available) {
|
|
std::memcpy(buf + *len - capacity,
|
|
page->buffer_ + readOffset,
|
|
available);
|
|
// adjust capacity
|
|
capacity -= available;
|
|
// adjust offset
|
|
readOffset += available;
|
|
}
|
|
// copy the rest if available
|
|
while (capacity) { // read until all buffer filled
|
|
if (page == currentWritePage_ &&
|
|
readOffset == currentWriteOffset_) {
|
|
*len = 0;
|
|
return NULL; // no more available bytes for read
|
|
} else {
|
|
// if read and write on the same page
|
|
if (page == currentWritePage_) {
|
|
available = currentWriteOffset_ - readOffset;
|
|
} else if (!(available = pageSize_ - readOffset)) {
|
|
// move to next page
|
|
page = page->next_;
|
|
assert(page);
|
|
readOffset = 0;
|
|
continue;
|
|
}
|
|
}
|
|
// how many bytes we can read
|
|
available = std::min(capacity, available);
|
|
// memory copy
|
|
std::memcpy(buf + *len - capacity,
|
|
page->buffer_ + readOffset,
|
|
available);
|
|
// adjust capacity
|
|
capacity -= available;
|
|
// adjust page read offset
|
|
readOffset += available;
|
|
// move to the next page if end is reached
|
|
// and more pages for read available
|
|
if (readOffset == pageSize_ &&
|
|
page != currentWritePage_) {
|
|
page = page->next_;
|
|
assert(page);
|
|
// reset offset
|
|
readOffset = 0;
|
|
}
|
|
}
|
|
// return result, leave len as it is
|
|
return buf;
|
|
}
|
|
|
|
/**
|
|
* interface extension, not part of the TTrasport
|
|
* make sure there are some bytes for write at the same page available
|
|
*/
|
|
inline uint8_t* TMemPagedTransport::reserve(uint32_t* len, bool throwOnError) {
|
|
if (!currentWritePage_ || pageSize_ == currentWriteOffset_) {
|
|
FixedSizeMemoryPage* newPage = factory_->getPage(throwOnError);
|
|
if (!newPage) { // no memory
|
|
return NULL;
|
|
}
|
|
if (currentWritePage_) {
|
|
currentWritePage_->next_ = newPage;
|
|
} else {
|
|
headPage_ = currentReadPage_ = newPage;
|
|
}
|
|
currentWritePage_ = newPage;
|
|
// reset write offset
|
|
currentWriteOffset_ = 0;
|
|
}
|
|
if (len) { // it can be a call just to reserve a new page
|
|
*len = pageSize_ - currentWriteOffset_;
|
|
}
|
|
|
|
return currentWritePage_->buffer_ + currentWriteOffset_;
|
|
}
|
|
|
|
inline void TMemPagedTransport::consume(uint32_t len) {
|
|
get(NULL, len);
|
|
}
|
|
|
|
inline uint32_t TMemPagedTransport::readAll(uint8_t* buf, uint32_t len) {
|
|
// try to read requested bytes
|
|
uint32_t res = read(buf, len);
|
|
if (res != len) { // not all bytes can be read
|
|
// interface description requires throwing exception
|
|
throw TTransportException(TTransportException::END_OF_FILE);
|
|
}
|
|
return res;
|
|
}
|
|
|
|
|
|
/**
|
|
* helper function
|
|
* write or skip bytes
|
|
*/
|
|
inline void TMemPagedTransport::set(const uint8_t* buf, uint32_t len) {
|
|
uint32_t capacity = len; // requested capacity
|
|
// prepare for rollback in case of out of memory
|
|
FixedSizeMemoryPage* saveWritePage = currentWritePage_;
|
|
uint32_t saveWriteOffset = currentWriteOffset_;
|
|
while (capacity) { // write until all buffer bytes get sent
|
|
uint32_t available = 0;
|
|
if (!reserve(&available, false)) { // reserve page if any
|
|
// rollback
|
|
currentWriteOffset_ = saveWriteOffset;
|
|
if ((currentWritePage_ = saveWritePage) != NULL) { // there were pages
|
|
// remove extra allocated pages if any
|
|
// even we remove previously existed pages it won't hurt
|
|
FixedSizeMemoryPage* pageForDelete = saveWritePage->next_;
|
|
currentWritePage_->next_ = NULL; // cut the tail
|
|
while (pageForDelete) {
|
|
saveWritePage = pageForDelete;
|
|
pageForDelete = pageForDelete->next_;
|
|
factory_->returnPage(saveWritePage);
|
|
}
|
|
} else {
|
|
// remove all pages
|
|
resetForWrite(true);
|
|
}
|
|
// throw
|
|
throw TTransportException(TTransportException::INTERNAL_ERROR);
|
|
}
|
|
// how many bytes we can write
|
|
available = std::min(capacity, available);
|
|
if (buf) {
|
|
// memory copy if any
|
|
std::memcpy(currentWritePage_->buffer_ + currentWriteOffset_,
|
|
buf + len - capacity,
|
|
available);
|
|
}
|
|
// adjust capacity
|
|
capacity -= available;
|
|
// adjust page write iterator
|
|
currentWriteOffset_ += available;
|
|
} // while
|
|
}
|
|
|
|
/**
|
|
* Implementations of the base functions
|
|
*/
|
|
inline uint32_t TMemPagedTransport::get(uint8_t* buf, uint32_t len) {
|
|
uint32_t capacity = len; // requested memory
|
|
uint32_t available = 0;
|
|
if (!currentReadPage_) {
|
|
return 0; // nothing to read, did not reset or write bytes
|
|
}
|
|
while (capacity) { // read until all buffer filled
|
|
if (currentReadPage_ == currentWritePage_) {
|
|
// if read and write on the same page
|
|
if (!(available = currentWriteOffset_ - currentReadOffset_)) {
|
|
break; // no more available bytes for read
|
|
}
|
|
} else if (!(available = pageSize_ - currentReadOffset_)) {
|
|
// not the last page
|
|
// move to next page
|
|
currentReadPage_ = currentReadPage_->next_;
|
|
assert(currentReadPage_);
|
|
currentReadOffset_ = 0;
|
|
available = pageSize_ - currentReadOffset_;
|
|
}
|
|
// how many bytes we can read
|
|
available = std::min(capacity, available);
|
|
// memory copy if any
|
|
if (buf) {
|
|
std::memcpy(buf + len - capacity,
|
|
currentReadPage_->buffer_ + currentReadOffset_,
|
|
available);
|
|
}
|
|
// adjust capacity
|
|
capacity -= available;
|
|
// adjust page read iterator
|
|
currentReadOffset_ += available;
|
|
// move to the next page if end is reached
|
|
// and more pages for read available
|
|
if (currentReadOffset_ == pageSize_ &&
|
|
currentReadPage_ != currentWritePage_) {
|
|
currentReadPage_ = currentReadPage_->next_;
|
|
assert(currentReadPage_);
|
|
// reset offset
|
|
currentReadOffset_ = 0;
|
|
}
|
|
}
|
|
// return result
|
|
return len - capacity;
|
|
}
|
|
|
|
}}} // apache::thrift::transport
|
|
|
|
#endif // #ifndef _THRIFT_TRANSPORT_TMEMPAGEDTRANSPORT_HPP_
|