This pull request provides a framework for exchanging a very large
stream between handlers, typically between a decoder and an inbound
handler (or between a handler that writes a message and an encoder that
encodes that message).
For example, an HTTP decoder, previously, generates multiple
micro-messages to decode an HTTP message (i.e. HttpRequest +
HttpChunks). With the streaming API, The HTTP decoder can simply
generate a single HTTP message whose content is a Stream. And then the
inbound handler can consume the Stream via the buffer you created when
you begin to read the stream. If you create a buffer whose capacity is
bounded, you can handle a very large stream without allocating a lot of
memory. If you just want to wait until the whole content is ready, you
can also do that with an unbounded buffer.
The streaming API also supports a limited form of communication between
a producer (i.e. decoder) and a consumer. A producer can abort the
stream if the stream is not valid anymore. A consumer can choose to
reject or discard the stream, where rejection is for unrecoverable
failure and discard is for recoverable failure.
P.S. Special thanks to @jpinner for the initial input.
- Make PoolSubpage a linked list node in the pool
- Now that a subpage is added to and removed from the pool correctly, allocating a subpage from the pool became vastly simpler.
- Rename directbyDefault to preferDirect
- Add a system property 'io.netty.prederDirect' to allow a user from changing the preference on launch-time
- Merge UnpooledByteBufAllocator.DEFAULT_BY_* to DEFAULT
- Rename ChannelHandlerAdapter to ChannelDuplexHandler
- Add ChannelHandlerAdapter that implements only ChannelHandler
- Rename CombinedChannelHandler to CombinedChannelDuplexHandler and
improve runtime validation
- Remove ChannelInbound/OutboundHandlerAdapter which are not useful
- Make ChannelOutboundByteHandlerAdapter similar to
ChannelInboundByteHandlerAdapter
- Make the tail and head handler of DefaultChannelPipeline accept both
bytes and messages. ChannelHandlerContext.hasNext*() were removed
because they always return true now.
- Removed various unnecessary null checks.
- Correct method/field names:
inboundBufferSuspended -> channelReadSuspended
- Move common methods from ByteBuf to Buf
- Rename ensureWritableBytes() to ensureWritable()
- Rename readable() to isReadable()
- Rename writable() to isWritable()
- Add isReadable(int) and isWritable(int)
- Add AbstractMessageBuf
- Rewrite DefaultMessageBuf and QueueBackedMessageBuf
- based on Josh Bloch's public domain ArrayDeque impl
- Rename message types for clarity
- HttpMessage -> FullHttpMessage
- HttpHeader -> HttpMessage
- HttpRequest -> FullHttpRequest
- HttpResponse -> FulllHttpResponse
- HttpRequestHeader -> HttpRequest
- HttpResponseHeader -> HttpResponse
- HttpContent now extends ByteBufHolder; no more content() method
- Make HttpHeaders abstract, make its header access methods public, and
add DefaultHttpHeaders
- Header accessor methods in HttpMessage and LastHttpContent are
replaced with HttpMessage.headers() and
LastHttpContent.trailingHeaders(). Both methods return HttpHeaders.
- Remove setters wherever possible and remove 'get' prefix
- Instead of calling setContent(), a user can either specify the content
when constructing a message or write content into the buffer.
(e.g. m.content().writeBytes(...))
- Overall cleanup & fixes
Now that we are going to use buffer pooling by default, it is obvious
that a user will forget to call .free() and report memory leak. In this
case, we should have a tool to determine if it is a bug in our allocator
implementation or in the user's code.
This pull request adds a system property flag called
'io.netty.resourceLeakDetection'. If set, when a user forgets to call
.free(), the ResourceLeakDetector will detect it and log a message with
detailed stack trace to tell where the leaked buffer has been allocated.
Because obtaining stack trace is an expensive operation, I used sampling
technique. Allocation is recorded only for every 113th allocation. I
chose 113 because it's a prime number.
In production, a user might not want to enable this option due to
potential performance impact. If a user does not specify the
'-Dio.netty.resourceLeakDetection' option leak detection is disabled.
Even if the leak detection is enabled, the overhead should be less than
5% because only ~1% of allocations are monitored.
I also replaced SharedResourceMisuseDetector with ResourceLeakDetector.
- Add PooledUnsafeDirectByteBuf, a variant of PooledDirectByteBuf, which
accesses its underlying direct ByteBuffer using sun.misc.Unsafe.
- To decouple Netty from sun.misc.*, sun.misc.Unsafe is accessed via
PlatformDependent.
- This change solely introduces about 8+% improvement in direct memory
access according to the tests conducted as described in #918
- Rename capacity variables to reqCapacity or normCapacity to distinguish if its the request capacity or the normalized capacity
- Do not reallocate on ByteBuf.capacity(int) if reallocation is unnecessary; just update the index range.
- Revert the workaround in DefaultChannelHandlerContext
- Fixes#826
Unsafe.isFreed(), free(), suspend/resumeIntermediaryAllocations() are not that dangerous. internalNioBuffer() and internalNioBuffers() are dangerous but it seems like nobody is using it even inside Netty. Removing those two methods also removes the necessity to keep Unsafe interface at all.
This pull request introduces the new default ByteBufAllocator implementation based on jemalloc, with a some differences:
* Minimum possible buffer capacity is 16 (jemalloc: 2)
* Uses binary heap with random branching (jemalloc: red-black tree)
* No thread-local cache yet (jemalloc has thread-local cache)
* Default page size is 8 KiB (jemalloc: 4 KiB)
* Default chunk size is 16 MiB (jemalloc: 2 MiB)
* Cannot allocate a buffer bigger than the chunk size (jemalloc: possible) because we don't have control over memory layout in Java. A user can work around this issue by creating a composite buffer, but it's not always a feasible option. Although 16 MiB is a pretty big default, a user's handler might need to deal with the bounded buffers when the user wants to deal with a large message.
Also, to ensure the new allocator performs good enough, I wrote a microbenchmark for it and made it a dedicated Maven module. It uses Google's Caliper framework to run and publish the test result (example)
Miscellaneous changes:
* Made some ByteBuf implementations public so that those who implements a new allocator can make use of them.
* Added ByteBufAllocator.compositeBuffer() and its variants.
* ByteBufAllocator.ioBuffer() creates a buffer with 0 capacity.
* UnsafeByteBuf is gone. I added ByteBuf.unsafe() back.
* To avoid extra instantiation, all ByteBuf implementations implement the ByteBuf.Unsafe interface.
* To hide this implementation detail, all ByteBuf implementations are package-private.
* AbstractByteBuf and SwappedByteBuf are public and they do not implement ByteBuf.Unsafe because they don't need to.
* unwrap() is not an unsafe operation anymore.
* ChannelBuf also has unsafe() and Unsafe. ByteBuf.Unsafe extends ChannelBuf.unsafe(). ChannelBuf.unsafe() provides free() operation so that a user does not need to down-cast the buffer in freeInbound/OutboundBuffer().
To perform writes in AioSocketChannel, we get a ByteBuffer view of the
outbound buffer and specify it as a parameter when we call
AsynchronousSocketChannel.write().
In most cases, the write() operation is finished immediately. However,
sometimes, it is scheduled for later execution. In such a case, there's
a chance for a user's handler to append more data to the outbound
buffer.
When more data is appended to the outbound buffer, the outbound buffer
can expand its capacity by itself. Changing the capacity of a buffer is
basically made of the following steps:
1. Allocate a larger new internal memory region.
2. Copy the current content of the buffer to the new memory region.
3. Rewire the buffer so that it refers to the new region.
4. Deallocate the old memory region.
Because the old memory region is deallocated at the step 4, the write
operation scheduled later will access the deallocated region, leading
all sort of data corruption or even segfaults.
To prevent this situation, I added suspendIntermediaryDeallocations()
and resumeIntermediaryDeallocations() to UnsafeByteBuf.
AioSocketChannel.doFlushByteBuf() now calls suspendIntermediaryDealloc()
to defer the deallocation of the old memory regions until the completion
handler is notified.
An AssertionError is triggered by a ByteBuf when beginRead() attempts to
access the buffer which has been freed already. This commit ensures the
buffer is not freed before performing an I/O operation.
To determine if the buffer has been freed, UnsafeByteBuf.isFreed() has
been added.
This commit introduces a new API for ByteBuf allocation which fixes
issue #643 along with refactoring of ByteBuf for simplicity and better
performance. (see #62)
A user can configure the ByteBufAllocator of a Channel via
ChannelOption.ALLOCATOR or ChannelConfig.get/setAllocator(). The
default allocator is currently UnpooledByteBufAllocator.HEAP_BY_DEFAULT.
To allocate a buffer, do not use Unpooled anymore. do the following:
ctx.alloc().buffer(...); // allocator chooses the buffer type.
ctx.alloc().heapBuffer(...);
ctx.alloc().directBuffer(...);
To deallocate a buffer, use the unsafe free() operation:
((UnsafeByteBuf) buf).free();
The following is the list of the relevant changes:
- Add ChannelInboundHandler.freeInboundBuffer() and
ChannelOutboundHandler.freeOutboundBuffer() to let a user free the
buffer he or she allocated. ChannelHandler adapter classes implement
is already, so most users won't need to call free() by themselves.
freeIn/OutboundBuffer() methods are invoked when a Channel is closed
and deregistered.
- All ByteBuf by contract must implement UnsafeByteBuf. To access an
unsafe operation: ((UnsafeByteBuf) buf).internalNioBuffer()
- Replace WrappedByteBuf and ByteBuf.Unsafe with UnsafeByteBuf to
simplify overall class hierarchy and to avoid unnecesary instantiation
of Unsafe instances on an unsafe operation.
- Remove buffer reference counting which is confusing
- Instantiate SwappedByteBuf lazily to avoid instantiation cost
- Rename ChannelFutureFactory to ChannelPropertyAccess and move common
methods between Channel and ChannelHandlerContext there. Also made it
package-private to hide it from a user.
- Remove unused unsafe operations such as newBuffer()
- Add DetectionUtil.canFreeDirectBuffer() so that an allocator decides
which buffer type to use safely
o Add ByteBuf.hasNioBuffers() method
o Promote CompositeByteBuf.nioBuffers() methods to ByteBuf
o Use ByteBuf.nioBuffers() methods from AioSocketChannel
- Replace ByteBufferBackedByteBuf with DirectByteBuf
- Make DirectByteBuf and HeapByteBuf dynamic
- Remove DynamicByteBuf
- Replace Unpooled.dynamicBuffer() with Unpooled.buffer() and
directBuffer()
- Remove ByteBufFactory (will be replaced with ByteBufPool later)
- Add ByteBuf.Unsafe (might change in the future)