The problem with the old way was that we always set the OP_WRITE when the buffer could not be written
until the write-spin-count was reached. This means that in some cases the channel was still be writable
but we just was not able to write out the data quick enough. For this cases we should better break out the
write loop and schedule a write to be picked up later in the EventLoop, when other tasks was executed.
The OP_WRITE will only be set if a write actual returned 0 which means there is no more room for writing data
and this we need to wait for the os to notify us.
Beside this it also helps to reduce CPU usage as nioBufferCount() is quite expensive when used on CompositeByteBuf which are
nested and contains a lot of components
This ChannelOption allows to tell the DatagramChannel implementation to be active as soon as they are registrated to their EventLoop. This can be used to make it possible to write to a not bound DatagramChannel.
The ChannelOption is marked as @deprecated as I'm looking for a better solution in master which breaks default behaviour with 4.0 branch.
This move less common method patterns to extra methods and so make the nioBuffers() method with most common pattern (backed by one ByteBuffer) small enough for inlining.
This is needed because of otherwise the JDK itself will do an extra ByteBuffer copy with it's own pool implementation. Even worth it will be done
multiple times if the ByteBuffer is always only partial written. With this change the copy is done inside of netty using it's own allocator and
only be done one time in all cases.
Introduce a new interface called MessageSizeEstimator. This can be specific per Channel (via ChannelConfig). The MessageSizeEstimator will be used to estimate for a message that should be written. The default implementation handles ByteBuf, ByteBufHolder and FileRegion. A user is free to plug-in his/her own implementation for different behaviour.
This fixes#1664 and revert also the original commit which was meant to fix it 3b1881b523 . The problem with the original commit was that it could delay handlerRemove(..) calls and so mess up the order or forward bytes to late.
- Remove unnecessary ascending traversal of pipeline in DefaultChannelHandlerContext.freeInbound()
- Move DefaultChannelHandlerContext.teardownAll() to DefaultChannelPipeline
- Previously, failUnflushed() did not run when inFail is true, which made unflushed writes are not released on reentrance. This has been fixed by this commit.
- Also, AbstractUnsafe.outboundBuffer is set to null as early as possible to remove the chance of any write attempts made after the closure.
- Fix a bug in DefaultProgressivePromise.tryProgress() where the notification is dropped
- Fix a bug in AbstractChannel.calculateMessageSize() where FileRegion is not counted
- HttpStaticFileServer example now uses zero copy file transfer if possible.
- Merge MessageList into ChannelOutboundBuffer
- Make ChannelOutboundBuffer a queue-like data structure so that it is nearly impossible to leak a message
- Make ChannelOutboundBuffer public so that AbstractChannel can expose it to its subclasses.
- TODO: Re-enable gathering write in NioSocketChannel
This is often useful if you for example use a ChannelGroup to hold all connected Channels and want to broadcast a message too all of them
except one Channel.
- write() now accepts a ChannelPromise and returns ChannelFuture as most
users expected. It makes the user's life much easier because it is
now much easier to get notified when a specific message has been
written.
- flush() does not create a ChannelPromise nor returns ChannelFuture.
It is now similar to what read() looks like.
DefaultChannelHandlerContext does not trigger exceptionCaught() immediately when ChannelOutboundHandler.write() raises an exception. It just records the exception until flush() is triggered. On invokeFlush(), if there's any exception recorded, DefaultChannelHandlerContext will fail the promise without calling ChannelOutboundHandler.flush(). If more than one exception were raised, only the first exception is used as the cause of the failure and the others will be logged at warn level.
- Remove channelReadSuspended because it's actually same with messageReceivedLast
- Rename messageReceived to channelRead
- Rename messageReceivedLast to channelReadComplete
We renamed messageReceivedLast to channelReadComplete because it
reflects what it really is for. Also, we renamed messageReceived to
channelRead for consistency in method names.
I must admit MesageList was pain in the ass. Instead of forcing a
handler always loop over the list of messages, this commit splits
messageReceived(ctx, list) into two event handlers:
- messageReceived(ctx, msg)
- mmessageReceivedLast(ctx)
When Netty reads one or more messages, messageReceived(ctx, msg) event
is triggered for each message. Once the current read operation is
finished, messageReceivedLast() is triggered to tell the handler that
the last messageReceived() was the last message in the current batch.
Similarly, for outbound, write(ctx, list) has been split into two:
- write(ctx, msg)
- flush(ctx, promise)
Instead of writing a list of message with a promise, a user is now
supposed to call write(msg) multiple times and then call flush() to
actually flush the buffered messages.
Please note that write() doesn't have a promise with it. You must call
flush() to get notified on completion. (or you can use writeAndFlush())
Other changes:
- Because MessageList is completely hidden, codec framework uses
List<Object> instead of MessageList as an output parameter.
- Fixes#1528
It's not really easy to provide a general-purpose abstraction for fast-yet-safe iteration. Instead of making forEachByte() less optimal, let's make it do what it does really well, and allow a user to implement potentially unsafe-yet-fast loop using unsafe operations.
* The problem with the release(..) calls here was that it would have called release on an unsupported message and then throw an exception. This exception will trigger ChannelOutboundBuffer.fail(..), which will also try to release the message again.
* Also use the same exception type for unsupported messages as in other channel impls.
- MessageList.array() should give better performance + concise code
- MessageList.add(T[], int, int) iterated over the source array 3 times at worst case. This commit reduces that to 1 time.
- Related: #1378
- They now accept only one argument.
- A user who wants to use a buffer for more complex use cases, he or she can always access the buffer directly via memoryAddress() and array()
- Fixes#1426
- We already allow a user instantiate an EventLoopGroup with the default number of threads via the default constructor, so I think it's OK although it's not always optimal.
- Fixes#1486
- Decreased the default from 16 to 1 because unnecessary extra read on req-res protocols results in lower throughput due to extra syscalls.
- No need to have fine-grained lookup table because the buffer pool has
much more coarse capacities available
- No need to use a loop to normalize a buffer capacity
- SimpleChannelInboundHandler now has a constructor parameter to let a
user decide to enable automatic message release. (the default is to
enable), which makes ChannelInboundConsumingHandler of less value.
This reverts commit a1a86b9de4 because the
semantic of ctx.isRemoved() is confusing to a user - why is
ctx.isRemoved() false when handlerRemoved() is invoked? A better
solution would be check if the connection is inactive and mark the
promise as failure before attempting to write anything.
- Related issue: #1432
- Add Future.isCancellable()
- Add Promise.setUncancellable() which is meant to be used for the party that runs the task uncancellable once started
- Implement Future.isCancelled() and Promise.cancel(boolean) properly
The AIO transport was added in the past as we hoped it would have better latency as the NIO transport. But in reality this was never the case.
So there is no reason to use the AIO transport at all. It just put more burden on us as we need to also support it and fix bugs.
Because of this we dedicided to remove it for now. It will stay in the master_with_aio_transport branch so we can pick it up later again if it is ever needed.
The API changes made so far turned out to increase the memory footprint
and consumption while our intention was actually decreasing them.
Memory consumption issue:
When there are many connections which does not exchange data frequently,
the old Netty 4 API spent a lot more memory than 3 because it always
allocates per-handler buffer for each connection unless otherwise
explicitly stated by a user. In a usual real world load, a client
doesn't always send requests without pausing, so the idea of having a
buffer whose life cycle if bound to the life cycle of a connection
didn't work as expected.
Memory footprint issue:
The old Netty 4 API decreased overall memory footprint by a great deal
in many cases. It was mainly because the old Netty 4 API did not
allocate a new buffer and event object for each read. Instead, it
created a new buffer for each handler in a pipeline. This works pretty
well as long as the number of handlers in a pipeline is only a few.
However, for a highly modular application with many handlers which
handles connections which lasts for relatively short period, it actually
makes the memory footprint issue much worse.
Changes:
All in all, this is about retaining all the good changes we made in 4 so
far such as better thread model and going back to the way how we dealt
with message events in 3.
To fix the memory consumption/footprint issue mentioned above, we made a
hard decision to break the backward compatibility again with the
following changes:
- Remove MessageBuf
- Merge Buf into ByteBuf
- Merge ChannelInboundByte/MessageHandler and ChannelStateHandler into ChannelInboundHandler
- Similar changes were made to the adapter classes
- Merge ChannelOutboundByte/MessageHandler and ChannelOperationHandler into ChannelOutboundHandler
- Similar changes were made to the adapter classes
- Introduce MessageList which is similar to `MessageEvent` in Netty 3
- Replace inboundBufferUpdated(ctx) with messageReceived(ctx, MessageList)
- Replace flush(ctx, promise) with write(ctx, MessageList, promise)
- Remove ByteToByteEncoder/Decoder/Codec
- Replaced by MessageToByteEncoder<ByteBuf>, ByteToMessageDecoder<ByteBuf>, and ByteMessageCodec<ByteBuf>
- Merge EmbeddedByteChannel and EmbeddedMessageChannel into EmbeddedChannel
- Add SimpleChannelInboundHandler which is sometimes more useful than
ChannelInboundHandlerAdapter
- Bring back Channel.isWritable() from Netty 3
- Add ChannelInboundHandler.channelWritabilityChanges() event
- Add RecvByteBufAllocator configuration property
- Similar to ReceiveBufferSizePredictor in Netty 3
- Some existing configuration properties such as
DatagramChannelConfig.receivePacketSize is gone now.
- Remove suspend/resumeIntermediaryDeallocation() in ByteBuf
This change would have been impossible without @normanmaurer's help. He
fixed, ported, and improved many parts of the changes.
- Use the local transport in a correct way (i.e. no need to trigger channelActive et al by ourselves)
- Use Promise/Future instead of CountDownLatch where they simplifies
- Fixes#1366: No elegant way to free non-in/outbound buffers held by a handler
- handlerRemoved() is now also invoked when a channel is deregistered, as well as when a handler is removed from a pipeline.
- A little bit of clean-up for readability
- Fix a bug in forwardBufferContentAndRemove() where the handler buffers are not freed (mainly because we were relying on channel.isRegistered() to determine if the handler has been removed from inside the handler.
- ChunkedWriteHandler.handlerRemoved() is unnecessary anymore because ChannelPipeline now always forwards the content of the buffer.
This is done by stop accept() new sockets for 1 seconds
Beside this this commit also makes sure accept() exceptions of OioServerSocketChannel trigger
the fireExceptionCaught(...). The same is true fo the AioServerSocketChannel.
- Fixes#1282 (not perfectly, but to the extent it's possible with the current API)
- Add AddressedEnvelope and DefaultAddressedEnvelope
- Make DatagramPacket extend DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>
- Rename ByteBufHolder.data() to content() so that a message can implement both AddressedEnvelope and ByteBufHolder (DatagramPacket does) without introducing two getter methods for the content
- Datagram channel implementations now understand ByteBuf and ByteBufHolder as a message with unspecified remote address.
shutdownGracefully() provides two optional parameters that give more
control over when an executor has to be shut down.
- Related issue: #1307
- Add shutdownGracefully(..) and isShuttingDown()
- Deprecate shutdown() / shutdownNow()
- Replace lastAccessTime with lastExecutionTime and update it after task
execution for accurate quiet period check
- runAllTasks() and runShutdownTasks() update it automatically.
- Add updateLastExecutionTime() so that subclasses can update it
- Add a constructor parameter that tells not to add an unncessary wakeup
task in execute() if addTask() wakes up the executor thread
automatically. Previously, execute() always called wakeup() after
addTask(), which often caused an extra dummy task in the task queue.
- Use shutdownGracefully() wherever possible / Deprecation javadoc
- Reduce the running time of SingleThreadEventLoopTest from 40s to 15s
using custom graceful shutdown parameters
- Other changes made along with this commit:
- takeTask() does not throw InterruptedException anymore.
- Returns null on interruption or wakeup
- Make sure runShutdownTasks() return true even if an exception was
raised while running the shutdown tasks
- Remove unnecessary isShutdown() checks
- Consistent use of SingleThreadEventExecutor.nanoTime()
Replace isWakeupOverridden with a constructor parameter
- Fixes#1308
freeInboundBuffer() and freeOutboundBuffer() were introduced in the early days of the new API when we did not have reference counting mechanism in the buffer. A user did not want Netty to free the handler buffers had to override these methods.
However, now that we have reference counting mechanism built into the buffer, a user who wants to retain the buffers beyond handler's life cycle can simply return the buffer whose reference count is greater than 1 in newInbound/OutboundBuffer().
- Added a test case that reproduces the problem in ReplayingDecoderTest
- Call newHandler.handlerAdded() *before* oldHandler.handlerRemoved() to ensure newHandlerAdded() is called before forwarding the buffer content of the old handler in replace0().
- Fixes#1292
- Replace DefaultChannelPipeline.inbound/outboundShutdown flag with per-context flags
- Update the flags in free() / freeInbound() / freeOutbound() for clarity
- Replace ugly 'prev != null' check with explicit event scheduling
- Fix an incorrect flag operation in freeHandlerBuffersAfterRemoval()
- Fix a bug in AbstractEmbeddedChannel.doRegister where it makes pending tasks immediately, where the pending tasks actually triggers inbound events
- Remove unnecessary suppression of inboundBufferUpdated() event in DefaultChannelPipeline, which potentially hides an event ordering bug. Unfortunately, I don't remember why I added it in cca35454d2.
This change also introduce a few other changes which was needed:
* ChannelHandler.beforeAdd(...) and ChannelHandler.beforeRemove(...) were removed
* ChannelHandler.afterAdd(...) -> handlerAdded(...)
* ChannelHandler.afterRemoved(...) -> handlerRemoved(...)
* SslHandler.handshake() -> SslHandler.hanshakeFuture() as the handshake is triggered automatically after
the Channel becomes active
- Now works without the transport package
- Renamed TransferFuture to ProgressiveFuture and ChannelProgressiveFuture / same for promises
- ProgressiveFutureListener now extends GenericProgressiveFutureListener and GenericFutureListener (add/removeTransferListener*() were removed)
- Renamed DefaultEventListeners to DefaultFutureListeners and only accept GenericFutureListeners
- Various clean-up
This commit splits bridge into two parts. One is NextBridgeFeeder,
which provides ByteBuf and MessageBuf that are local to the context
whose next*Buffer() has been invoked on. The other is a thread-safe
queue that stores the data fed by NextBridgeFeeder.feed().
By splitting the bridge into the two parts, the data pushed by a handler
is not lost anymore when the next handler who provided the next buffer
is removed from the pipeline.
- Fixes#1272
- Fixes#1229
- Primarily written by @normanmaurer and revised by @trustin
This commit removes the notion of unfolding from the codec framework
completely. Unfolding was introduced in Netty 3.x to work around the
shortcoming of the codec framework where encode() and decode() did not
allow generating multiple messages.
Such a shortcoming can be fixed by changing the signature of encode()
and decode() instead of introducing an obscure workaround like
unfolding. Therefore, we changed the signature of them in 4.0.
The change is simple, but backward-incompatible. encode() and decode()
do not return anything. Instead, the codec framework will pass a
MessageBuf<Object> so encode() and decode() can add the generated
messages into the MessageBuf.
- Count the number of select() calls made to wait until reaching at the expected dead line, and rebuild selectors if too many select() calls were made.
- Similar to @normanmaurer's fix in that this commit also makes Bootstrap.init(Channel) asynchronous, but it is simpler and less invasive.
- Also made sure a connection attempt failure in the local transport does not trigger an exceptionCaught event
- The offending test case is annotated with `@Ignore`
- Also fixed a bug where channel initialization failure swallows the original cause of initialization failure
- Add ChannelHandlerUtil and move the core logic of ChannelInbound/OutboundMessageHandler to ChannelHandlerUtil
- Add ChannelHandlerUtil.SingleInbound/OutboundMessageHandler and make ChannelInbound/OutboundMessageHandlerAdapter implement them. This is a backward incompatible change because it forces all handler methods to be public (was protected previously)
- Fixes: #1119
- Merge waiters and fluchCheckpoint into a single field
- This limits the number of waiter threads to 2^24 - 1, which is still very large. Can you imagine an application with 16 million threads?
- Rename inbound/outboundBufferFreed to inbound/OutboundShutdown which makes more sense
- Move DefaultChannelHandlerContext.isInbound/OutboundBufferFreed() to DefaultChannelPipeline
- Fix a problem where invokeFreeInbound/OutboundBuffer() sets inbound/outboundShutdown too early (this was the direct cause of #1064)
- Remove the volatile modifier - DCHC.prev/next are volatile and that's just enough
- Rename ChannelHandlerAdapter to ChannelDuplexHandler
- Add ChannelHandlerAdapter that implements only ChannelHandler
- Rename CombinedChannelHandler to CombinedChannelDuplexHandler and
improve runtime validation
- Remove ChannelInbound/OutboundHandlerAdapter which are not useful
- Make ChannelOutboundByteHandlerAdapter similar to
ChannelInboundByteHandlerAdapter
- Make the tail and head handler of DefaultChannelPipeline accept both
bytes and messages. ChannelHandlerContext.hasNext*() were removed
because they always return true now.
- Removed various unnecessary null checks.
- Correct method/field names:
inboundBufferSuspended -> channelReadSuspended
- Move common methods from ByteBuf to Buf
- Rename ensureWritableBytes() to ensureWritable()
- Rename readable() to isReadable()
- Rename writable() to isWritable()
- Add isReadable(int) and isWritable(int)
- Add AbstractMessageBuf
- Rewrite DefaultMessageBuf and QueueBackedMessageBuf
- based on Josh Bloch's public domain ArrayDeque impl
- Additional fix for: #970
- Use LinkedHashMap again to save memory consumption
- ServerBootstrap now makes a copy of child parameters so that modifying ServerBootstrap after bind() does not affect the already-bound servers. This also makes child channel initialization potentially faster due to reduced garbage iterator.
This will safe as an example 2gb mem when have 10 DefaultHandlerContext instances per connection and the connection count is 1000000.
Also kind of related to [#920]
- Fixes#997
- Replace duplicate() with clone()
- Add copy constructor for simplicity
- Can now clone invalid/incomplete bootstrap
- Upgrade to netty-build-14 to disable SuperClone checkstyle module
- Finalize class hierarchy so no subclasses are introduced
This changes the behavior of the ChannelPipeline.remove(..) and ChannelPipeline.replace(..) methods in that way
that after invocation it is not possible anymore to access any data in the inbound or outbound buffer. This is
because it empty it now to prevent side-effects. If a user want to preserve the content and forward it to the
next handler in the pipeline it is adviced to use one of the new methods which where introduced.
- ChannelPipeline.removeAndForward(..)
- ChannelPipeline.replaceAndForward(..)
Changed options and attrs from LinkedHashMap to ConcurrentHashMap to
avoid a possible ConcurrentModificationException if a thread was
adding/removing options/attrs while another was calling Bootstrap.init()
or Bootstrap.duplicate()
- Also add a new abstract class called StreamOioByteChannel which can be used by OIO channel implementation which are Stream based as a starting point.
This pull request cleans up our pipeline implementation by moving most
inter-context traversal code to DefaultChannelHandlerContext.
Previously, outbound traversal was done in DefaultChannelPipeline while
inbound traversal was done in DefaultChannelHandlerContext.
Also, to address the memory inefficiency issue raised in #920, all
runnables are lazily instantiated.
This pull request adds two new handler methods: discardInboundReadBytes(ctx) and discardOutboundReadBytes(ctx) to ChannelInboundByteHandler and ChannelOutboundByteHandler respectively. They are called between every inboundBufferUpdated() and flush() respectively. Their default implementation is to call discardSomeReadBytes() on their buffers and a user can override this behavior easily. For example, ReplayingDecoder.discardInboundReadBytes() looks like the following:
@Override
public void discardInboundReadBytes(ChannelHandlerContext ctx) throws Exception {
ByteBuf in = ctx.inboundByteBuffer();
final int oldReaderIndex = in.readerIndex();
super.discardInboundReadBytes(ctx);
final int newReaderIndex = in.readerIndex();
checkpoint -= oldReaderIndex - newReaderIndex;
}
If a handler, which has its own buffer index variable, extends ReplayingDecoder or ByteToMessageDecoder, the handler can also override discardInboundReadBytes() and adjust its index variable accordingly.
use single static initialization of available metrics monitor registries
* This changes the original implementation to work in a similar way to
how slf4j selects and loads an implementation.
* Uses a single static instance so intialization is done only once.
* Doesn't throw IllegalStateException if multiple implementations are
found on the classpath. It instead selects and uses the first
implementation returned by iterator()
* Class left as an iterable to keep the API the same
add yammer metrics to examples to allow them to publish metrics
publish the number of threads used in an EventLoopGroup see issue #718
* seems like the better place to put this because it sets the default
thread count if the MultithreadEventLoopGroup uses super(0,...)
* It also happens to be the common parent class amongst all the
MultiThreadedEventLoopGroup implementations
* Count is reported for
io.netty.channel.{*,.local,.socket.aio,.socket.nio}
fix cosmetic issues pointed out in pull request and updated notice.txt
see https://github.com/netty/netty/pull/780
count # of channels registered in single threaded event loop
measure how many times Selector.select return before SELECT_TIME
This pull request introduces a new operation called read() that replaces the existing inbound traffic control method. EventLoop now performs socket reads only when the read() operation has been issued. Once the requested read() operation is actually performed, EventLoop triggers an inboundBufferSuspended event that tells the handlers that the requested read() operation has been performed and the inbound traffic has been suspended again. A handler can decide to continue reading or not.
Unlike other outbound operations, read() does not use ChannelFuture at all to avoid GC cost. If there's a good reason to create a new future per read at the GC cost, I'll change this.
This pull request consequently removes the readable property in ChannelHandlerContext, which means how the traffic control works changed significantly.
This pull request also adds a new configuration property ChannelOption.AUTO_READ whose default value is true. If true, Netty will call ctx.read() for you. If you need a close control over when read() is called, you can set it to false.
Another interesting fact is that non-terminal handlers do not really need to call read() at all. Only the last inbound handler will have to call it, and that's just enough. Actually, you don't even need to call it at the last handler in most cases because of the ChannelOption.AUTO_READ mentioned above.
There's no serious backward compatibility issue. If the compiler complains your handler does not implement the read() method, add the following:
public void read(ChannelHandlerContext ctx) throws Exception {
ctx.read();
}
Note that this pull request certainly makes bounded inbound buffer support very easy, but itself does not add the bounded inbound buffer support.
- Rename capacity variables to reqCapacity or normCapacity to distinguish if its the request capacity or the normalized capacity
- Do not reallocate on ByteBuf.capacity(int) if reallocation is unnecessary; just update the index range.
- Revert the workaround in DefaultChannelHandlerContext
- Also fixed a incorrect port of SpdySessionHandler
- Previously, it closed the connection too early when sending a GOAWAY frame
- After this fix, SpdySessionHandlerTest now passes again without the previous fix
- Fixes#831
This commit ensures the following events are never triggered as a direct
invocation if they are triggered via ChannelPipeline.fire*():
- channelInactive
- channelUnregistered
- exceptionCaught
This commit also fixes the following issues surfaced by this fix:
- Embedded channel implementations run scheduled tasks too early
- SpdySessionHandlerTest tries to generate inbound data even after the
channel is closed.
- AioSocketChannel enters into an infinite loop on I/O error.
- Fixes#826
Unsafe.isFreed(), free(), suspend/resumeIntermediaryAllocations() are not that dangerous. internalNioBuffer() and internalNioBuffers() are dangerous but it seems like nobody is using it even inside Netty. Removing those two methods also removes the necessity to keep Unsafe interface at all.
This commit also introduce a new interface which is called AioSocketChannelConfig to expose AIO only config options with the right visibility.
Also it change the ChannelConfig.setAllocator(..) to return the ChannelConfig to be more consistent with the other methods.
This pull request introduces the new default ByteBufAllocator implementation based on jemalloc, with a some differences:
* Minimum possible buffer capacity is 16 (jemalloc: 2)
* Uses binary heap with random branching (jemalloc: red-black tree)
* No thread-local cache yet (jemalloc has thread-local cache)
* Default page size is 8 KiB (jemalloc: 4 KiB)
* Default chunk size is 16 MiB (jemalloc: 2 MiB)
* Cannot allocate a buffer bigger than the chunk size (jemalloc: possible) because we don't have control over memory layout in Java. A user can work around this issue by creating a composite buffer, but it's not always a feasible option. Although 16 MiB is a pretty big default, a user's handler might need to deal with the bounded buffers when the user wants to deal with a large message.
Also, to ensure the new allocator performs good enough, I wrote a microbenchmark for it and made it a dedicated Maven module. It uses Google's Caliper framework to run and publish the test result (example)
Miscellaneous changes:
* Made some ByteBuf implementations public so that those who implements a new allocator can make use of them.
* Added ByteBufAllocator.compositeBuffer() and its variants.
* ByteBufAllocator.ioBuffer() creates a buffer with 0 capacity.
testConcurrentMessageBufferAccess() assumes the outbound/inbound byte buffers are unbounded. Because PooledByteBuf is bounded, the test did not pass.
The fix makes an assumption that ctx.flush() or fireInboundBufferUpdated() will make the next buffer consumed immediately, which is not the case in the real world. Under network congestion, a user will see IndexOutOfBoundsException if the user's handler implementation writes boundlessly into inbound/outbound buffers.
* UnsafeByteBuf is gone. I added ByteBuf.unsafe() back.
* To avoid extra instantiation, all ByteBuf implementations implement the ByteBuf.Unsafe interface.
* To hide this implementation detail, all ByteBuf implementations are package-private.
* AbstractByteBuf and SwappedByteBuf are public and they do not implement ByteBuf.Unsafe because they don't need to.
* unwrap() is not an unsafe operation anymore.
* ChannelBuf also has unsafe() and Unsafe. ByteBuf.Unsafe extends ChannelBuf.unsafe(). ChannelBuf.unsafe() provides free() operation so that a user does not need to down-cast the buffer in freeInbound/OutboundBuffer().
To perform writes in AioSocketChannel, we get a ByteBuffer view of the
outbound buffer and specify it as a parameter when we call
AsynchronousSocketChannel.write().
In most cases, the write() operation is finished immediately. However,
sometimes, it is scheduled for later execution. In such a case, there's
a chance for a user's handler to append more data to the outbound
buffer.
When more data is appended to the outbound buffer, the outbound buffer
can expand its capacity by itself. Changing the capacity of a buffer is
basically made of the following steps:
1. Allocate a larger new internal memory region.
2. Copy the current content of the buffer to the new memory region.
3. Rewire the buffer so that it refers to the new region.
4. Deallocate the old memory region.
Because the old memory region is deallocated at the step 4, the write
operation scheduled later will access the deallocated region, leading
all sort of data corruption or even segfaults.
To prevent this situation, I added suspendIntermediaryDeallocations()
and resumeIntermediaryDeallocations() to UnsafeByteBuf.
AioSocketChannel.doFlushByteBuf() now calls suspendIntermediaryDealloc()
to defer the deallocation of the old memory regions until the completion
handler is notified.
An AssertionError is triggered by a ByteBuf when beginRead() attempts to
access the buffer which has been freed already. This commit ensures the
buffer is not freed before performing an I/O operation.
To determine if the buffer has been freed, UnsafeByteBuf.isFreed() has
been added.
After some debugging, I found that JDK AIO implementation often performs
I/O immediately from the caller thread if the caller thread is the I/O
thread, and notifies the completion handler also immediately. This
commit handles such a case correctly during reads and writes.
Additionally, this commit also changes SingleThreadEventExecutor to let
it handle unexpected exceptions such as AssertionError in a robus
manner.
When a Netty application shuts down, a user often sees a REE
(RejectedExecutionException).
A REE is raised due to various reasons we don't have control over, such
as:
- A client connects to a server while the server is shutting down.
- An event is triggered for a closed Channel while its event loop is
also shutting down. Some of them are:
- channelDeregistered (triggered after a channel is closed)
- freeIn/OutboundBuffer (triggered after channelDeregistered)
- userEventTriggered (triggered anytime)
To address this issue, a new method called confirmShutdown() has been
added to SingleThreadEventExecutor. After a user calls shutdown(),
confirmShutdown() runs any remaining tasks in the task queue and ensures
no events are triggered for last 2 seconds. If any task are added to
the task queue before 2 seconds passes, confirmShutdown() prevents the
event loop from terminating by returning false.
Now that SingleThreadEventExecutor needs to accept tasks even after
shutdown(), its execute() method only rejects the task after the event
loop is terminated (i.e. isTerminated() returns true.) Except that,
there's no change in semantics.
SingleThreadEventExecutor also checks if its subclass called
confirmShutdown() in its run() implementation, so that Netty developers
can make sure they shut down their event loop impementation correctly.
It also fixes a bug in AioSocketChannel, revealed by delayed shutdown,
where an inboundBufferUpdated() event is triggered on a closed Channel
with deallocated buffers.
Caveats:
Because SingleThreadEventExecutor.takeTask() does not have a notion of
timeout, confirmShutdown() adds a dummy task (WAKEUP_TASK) to wake up
takeTask() immediately and instead sleeps hard-coded 100ms. I'll
address this issue later by modifying takeTask() times out dynamically.
Miscellaneous changes:
SingleThreadEventExecutor.wakeup() now has the default implementation.
Instead of interrupting the current thread, it simply adds a dummy task
(WAKEUP_TASK) to the task queue, which is more elegant and efficient.
NioEventLoop is the only implementation that overrides it. All other
implementations' wakeup()s were removed thanks to this change.
This commit introduces a new API for ByteBuf allocation which fixes
issue #643 along with refactoring of ByteBuf for simplicity and better
performance. (see #62)
A user can configure the ByteBufAllocator of a Channel via
ChannelOption.ALLOCATOR or ChannelConfig.get/setAllocator(). The
default allocator is currently UnpooledByteBufAllocator.HEAP_BY_DEFAULT.
To allocate a buffer, do not use Unpooled anymore. do the following:
ctx.alloc().buffer(...); // allocator chooses the buffer type.
ctx.alloc().heapBuffer(...);
ctx.alloc().directBuffer(...);
To deallocate a buffer, use the unsafe free() operation:
((UnsafeByteBuf) buf).free();
The following is the list of the relevant changes:
- Add ChannelInboundHandler.freeInboundBuffer() and
ChannelOutboundHandler.freeOutboundBuffer() to let a user free the
buffer he or she allocated. ChannelHandler adapter classes implement
is already, so most users won't need to call free() by themselves.
freeIn/OutboundBuffer() methods are invoked when a Channel is closed
and deregistered.
- All ByteBuf by contract must implement UnsafeByteBuf. To access an
unsafe operation: ((UnsafeByteBuf) buf).internalNioBuffer()
- Replace WrappedByteBuf and ByteBuf.Unsafe with UnsafeByteBuf to
simplify overall class hierarchy and to avoid unnecesary instantiation
of Unsafe instances on an unsafe operation.
- Remove buffer reference counting which is confusing
- Instantiate SwappedByteBuf lazily to avoid instantiation cost
- Rename ChannelFutureFactory to ChannelPropertyAccess and move common
methods between Channel and ChannelHandlerContext there. Also made it
package-private to hide it from a user.
- Remove unused unsafe operations such as newBuffer()
- Add DetectionUtil.canFreeDirectBuffer() so that an allocator decides
which buffer type to use safely
- Add Bootstrap.attr() and ServerBootstrap.attr()/childAttr() so that a
user can initialize the attribute map from the beginning.
- Replace newBootstrap() with duplicate()
- Ensure the event loop threads are never terminated before all tasks
submitted by JDK are executed
- Close all open connections before terminating an event loop