netty5/transport/src/main/java/io/netty/channel/AbstractChannel.java

1041 lines
33 KiB
Java
Raw Normal View History

/*
2012-06-04 22:31:44 +02:00
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
2012-06-04 22:31:44 +02:00
* http://www.apache.org/licenses/LICENSE-2.0
*
2009-08-28 09:15:49 +02:00
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
2009-08-28 09:15:49 +02:00
* License for the specific language governing permissions and limitations
* under the License.
*/
2011-12-09 04:38:59 +01:00
package io.netty.channel;
import io.netty.buffer.ByteBuf;
ByteBufAllocator API w/ ByteBuf perf improvements This commit introduces a new API for ByteBuf allocation which fixes issue #643 along with refactoring of ByteBuf for simplicity and better performance. (see #62) A user can configure the ByteBufAllocator of a Channel via ChannelOption.ALLOCATOR or ChannelConfig.get/setAllocator(). The default allocator is currently UnpooledByteBufAllocator.HEAP_BY_DEFAULT. To allocate a buffer, do not use Unpooled anymore. do the following: ctx.alloc().buffer(...); // allocator chooses the buffer type. ctx.alloc().heapBuffer(...); ctx.alloc().directBuffer(...); To deallocate a buffer, use the unsafe free() operation: ((UnsafeByteBuf) buf).free(); The following is the list of the relevant changes: - Add ChannelInboundHandler.freeInboundBuffer() and ChannelOutboundHandler.freeOutboundBuffer() to let a user free the buffer he or she allocated. ChannelHandler adapter classes implement is already, so most users won't need to call free() by themselves. freeIn/OutboundBuffer() methods are invoked when a Channel is closed and deregistered. - All ByteBuf by contract must implement UnsafeByteBuf. To access an unsafe operation: ((UnsafeByteBuf) buf).internalNioBuffer() - Replace WrappedByteBuf and ByteBuf.Unsafe with UnsafeByteBuf to simplify overall class hierarchy and to avoid unnecesary instantiation of Unsafe instances on an unsafe operation. - Remove buffer reference counting which is confusing - Instantiate SwappedByteBuf lazily to avoid instantiation cost - Rename ChannelFutureFactory to ChannelPropertyAccess and move common methods between Channel and ChannelHandlerContext there. Also made it package-private to hide it from a user. - Remove unused unsafe operations such as newBuffer() - Add DetectionUtil.canFreeDirectBuffer() so that an allocator decides which buffer type to use safely
2012-11-15 22:04:37 +01:00
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.MessageBuf;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.DefaultAttributeMap;
import io.netty.util.internal.DetectionUtil;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
2008-09-02 09:13:20 +02:00
* A skeletal {@link Channel} implementation.
*/
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
static final ConcurrentMap<Integer, Channel> allChannels = new ConcurrentHashMap<Integer, Channel>();
private static final Random random = new Random();
/**
* Generates a negative unique integer ID. This method generates only
* negative integers to avoid conflicts with user-specified IDs where only
* non-negative integers are allowed.
*/
private static Integer allocateId(Channel channel) {
int idVal = random.nextInt();
if (idVal > 0) {
idVal = -idVal;
} else if (idVal == 0) {
idVal = -1;
}
Integer id;
for (;;) {
id = Integer.valueOf(idVal);
// Loop until a unique ID is acquired.
// It should be found in one loop practically.
if (allChannels.putIfAbsent(id, channel) == null) {
// Successfully acquired.
return id;
} else {
// Taken by other channel at almost the same moment.
idVal --;
if (idVal >= 0) {
idVal = -1;
}
}
}
}
private final Channel parent;
private final Integer id;
private final Unsafe unsafe;
private final DefaultChannelPipeline pipeline;
private final ChannelFuture succeededFuture = new SucceededChannelFuture(this);
private final VoidChannelPromise voidPromise = new VoidChannelPromise(this);
private final CloseFuture closeFuture = new CloseFuture(this);
protected final ChannelFlushPromiseNotifier flushFutureNotifier = new ChannelFlushPromiseNotifier();
private volatile SocketAddress localAddress;
private volatile SocketAddress remoteAddress;
private volatile EventLoop eventLoop;
private volatile boolean registered;
private ClosedChannelException closedChannelException;
private boolean inFlushNow;
private boolean flushNowPending;
/** Cache for the string representation of this channel */
private boolean strValActive;
private String strVal;
private AbstractUnsafe.FlushTask flushTaskInProgress;
2008-09-02 09:13:20 +02:00
/**
* Creates a new instance.
*
* @param id
* the unique non-negative integer ID of this channel.
* Specify {@code null} to auto-generate a unique negative integer
* ID.
2008-09-02 09:13:20 +02:00
* @param parent
* the parent of this channel. {@code null} if there's no parent.
*/
protected AbstractChannel(Channel parent, Integer id) {
if (id == null) {
id = allocateId(this);
} else {
if (id.intValue() < 0) {
throw new IllegalArgumentException("id: " + id + " (expected: >= 0)");
}
if (allChannels.putIfAbsent(id, this) != null) {
throw new IllegalArgumentException("duplicate ID: " + id);
}
}
this.parent = parent;
this.id = id;
unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this);
closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
allChannels.remove(id());
}
});
}
@Override
public final Integer id() {
return id;
}
@Override
public Channel parent() {
return parent;
}
@Override
public ChannelPipeline pipeline() {
return pipeline;
}
ByteBufAllocator API w/ ByteBuf perf improvements This commit introduces a new API for ByteBuf allocation which fixes issue #643 along with refactoring of ByteBuf for simplicity and better performance. (see #62) A user can configure the ByteBufAllocator of a Channel via ChannelOption.ALLOCATOR or ChannelConfig.get/setAllocator(). The default allocator is currently UnpooledByteBufAllocator.HEAP_BY_DEFAULT. To allocate a buffer, do not use Unpooled anymore. do the following: ctx.alloc().buffer(...); // allocator chooses the buffer type. ctx.alloc().heapBuffer(...); ctx.alloc().directBuffer(...); To deallocate a buffer, use the unsafe free() operation: ((UnsafeByteBuf) buf).free(); The following is the list of the relevant changes: - Add ChannelInboundHandler.freeInboundBuffer() and ChannelOutboundHandler.freeOutboundBuffer() to let a user free the buffer he or she allocated. ChannelHandler adapter classes implement is already, so most users won't need to call free() by themselves. freeIn/OutboundBuffer() methods are invoked when a Channel is closed and deregistered. - All ByteBuf by contract must implement UnsafeByteBuf. To access an unsafe operation: ((UnsafeByteBuf) buf).internalNioBuffer() - Replace WrappedByteBuf and ByteBuf.Unsafe with UnsafeByteBuf to simplify overall class hierarchy and to avoid unnecesary instantiation of Unsafe instances on an unsafe operation. - Remove buffer reference counting which is confusing - Instantiate SwappedByteBuf lazily to avoid instantiation cost - Rename ChannelFutureFactory to ChannelPropertyAccess and move common methods between Channel and ChannelHandlerContext there. Also made it package-private to hide it from a user. - Remove unused unsafe operations such as newBuffer() - Add DetectionUtil.canFreeDirectBuffer() so that an allocator decides which buffer type to use safely
2012-11-15 22:04:37 +01:00
@Override
public ByteBufAllocator alloc() {
return config().getAllocator();
}
@Override
public EventLoop eventLoop() {
2012-06-02 03:34:19 +02:00
EventLoop eventLoop = this.eventLoop;
if (eventLoop == null) {
throw new IllegalStateException("channel not registered to an event loop");
}
return eventLoop;
}
@Override
public SocketAddress localAddress() {
SocketAddress localAddress = this.localAddress;
if (localAddress == null) {
try {
this.localAddress = localAddress = unsafe().localAddress();
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
}
}
return localAddress;
}
protected void invalidateLocalAddress() {
localAddress = null;
}
@Override
public SocketAddress remoteAddress() {
SocketAddress remoteAddress = this.remoteAddress;
if (remoteAddress == null) {
try {
this.remoteAddress = remoteAddress = unsafe().remoteAddress();
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
}
}
return remoteAddress;
}
/**
* Reset the stored remoteAddress
*/
protected void invalidateRemoteAddress() {
remoteAddress = null;
}
@Override
public boolean isRegistered() {
return registered;
}
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return pipeline.bind(localAddress);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
return pipeline.connect(remoteAddress);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return pipeline.connect(remoteAddress, localAddress);
}
@Override
public ChannelFuture disconnect() {
return pipeline.disconnect();
}
@Override
public ChannelFuture close() {
return pipeline.close();
}
@Override
public ChannelFuture deregister() {
return pipeline.deregister();
}
@Override
public ChannelFuture flush() {
return pipeline.flush();
}
@Override
public ChannelFuture write(Object message) {
return pipeline.write(message);
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, localAddress, promise);
}
@Override
public ChannelFuture disconnect(ChannelPromise promise) {
return pipeline.disconnect(promise);
}
@Override
public ChannelFuture close(ChannelPromise promise) {
return pipeline.close(promise);
}
@Override
public ChannelFuture deregister(ChannelPromise promise) {
return pipeline.deregister(promise);
}
@Override
public ByteBuf outboundByteBuffer() {
return pipeline.outboundByteBuffer();
}
@Override
2012-11-30 14:49:51 +01:00
@SuppressWarnings("unchecked")
public <T> MessageBuf<T> outboundMessageBuffer() {
return (MessageBuf<T>) pipeline.outboundMessageBuffer();
}
Read only when requested (read-on-demand) This pull request introduces a new operation called read() that replaces the existing inbound traffic control method. EventLoop now performs socket reads only when the read() operation has been issued. Once the requested read() operation is actually performed, EventLoop triggers an inboundBufferSuspended event that tells the handlers that the requested read() operation has been performed and the inbound traffic has been suspended again. A handler can decide to continue reading or not. Unlike other outbound operations, read() does not use ChannelFuture at all to avoid GC cost. If there's a good reason to create a new future per read at the GC cost, I'll change this. This pull request consequently removes the readable property in ChannelHandlerContext, which means how the traffic control works changed significantly. This pull request also adds a new configuration property ChannelOption.AUTO_READ whose default value is true. If true, Netty will call ctx.read() for you. If you need a close control over when read() is called, you can set it to false. Another interesting fact is that non-terminal handlers do not really need to call read() at all. Only the last inbound handler will have to call it, and that's just enough. Actually, you don't even need to call it at the last handler in most cases because of the ChannelOption.AUTO_READ mentioned above. There's no serious backward compatibility issue. If the compiler complains your handler does not implement the read() method, add the following: public void read(ChannelHandlerContext ctx) throws Exception { ctx.read(); } Note that this pull request certainly makes bounded inbound buffer support very easy, but itself does not add the bounded inbound buffer support.
2012-12-30 13:53:59 +01:00
@Override
public void read() {
pipeline.read();
}
@Override
public ChannelFuture flush(ChannelPromise promise) {
return pipeline.flush(promise);
}
@Override
public ChannelFuture write(Object message, ChannelPromise promise) {
return pipeline.write(message, promise);
}
@Override
public ChannelPromise newPromise() {
return new DefaultChannelPromise(this);
}
@Override
public ChannelFuture newSucceededFuture() {
return succeededFuture;
}
@Override
public ChannelFuture newFailedFuture(Throwable cause) {
return new FailedChannelFuture(this, cause);
}
@Override
public ChannelFuture closeFuture() {
return closeFuture;
}
@Override
public Unsafe unsafe() {
return unsafe;
}
@Override
public ChannelFuture sendFile(FileRegion region) {
return pipeline.sendFile(region);
}
@Override
public ChannelFuture sendFile(FileRegion region, ChannelPromise promise) {
return pipeline.sendFile(region, promise);
}
/**
* Create a new {@link AbstractUnsafe} instance which will be used for the life-time of the {@link Channel}
*/
protected abstract AbstractUnsafe newUnsafe();
/**
* Returns the ID of this channel.
*/
@Override
public final int hashCode() {
return id;
}
/**
* Returns {@code true} if and only if the specified object is identical
* with this channel (i.e: {@code this == o}).
*/
@Override
public final boolean equals(Object o) {
return this == o;
}
/**
* Compares the {@linkplain #id() ID} of the two channels.
*/
@Override
public final int compareTo(Channel o) {
return id().compareTo(o.id());
}
2008-09-02 09:13:20 +02:00
/**
* Returns the {@link String} representation of this channel. The returned
* string contains the {@linkplain #id() ID}, {@linkplain #localAddress() local address},
* and {@linkplain #remoteAddress() remote address} of this channel for
2008-09-02 09:13:20 +02:00
* easier identification.
*/
@Override
public String toString() {
boolean active = isActive();
if (strValActive == active && strVal != null) {
return strVal;
}
2012-05-11 03:47:45 +02:00
SocketAddress remoteAddr = remoteAddress();
SocketAddress localAddr = localAddress();
if (remoteAddr != null) {
SocketAddress srcAddr;
SocketAddress dstAddr;
if (parent == null) {
srcAddr = localAddr;
dstAddr = remoteAddr;
} else {
2012-05-11 03:47:45 +02:00
srcAddr = remoteAddr;
dstAddr = localAddr;
}
2012-05-11 03:47:45 +02:00
strVal = String.format("[id: 0x%08x, %s %s %s]", id, srcAddr, active? "=>" : ":>", dstAddr);
} else if (localAddr != null) {
strVal = String.format("[id: 0x%08x, %s]", id, localAddr);
} else {
strVal = String.format("[id: 0x%08x]", id);
}
strValActive = active;
return strVal;
}
2009-02-16 15:32:40 +01:00
/**
* {@link Unsafe} implementation which sub-classes must extend and use.
*/
protected abstract class AbstractUnsafe implements Unsafe {
private final class FlushTask {
final FileRegion region;
final ChannelPromise promise;
FlushTask next;
FlushTask(FileRegion region, ChannelPromise promise) {
this.region = region;
this.promise = promise;
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
flushTaskInProgress = next;
if (next != null) {
try {
FileRegion region = next.region;
if (region == null) {
// no region present means the next flush task was to directly flush
// the outbound buffer
flushNotifierAndFlush(next.promise);
} else {
// flush the region now
doFlushFileRegion(region, next.promise);
}
} catch (Throwable cause) {
next.promise.setFailure(cause);
}
} else {
// notify the flush futures
flushFutureNotifier.notifyFlushFutures();
}
}
});
}
}
Read only when requested (read-on-demand) This pull request introduces a new operation called read() that replaces the existing inbound traffic control method. EventLoop now performs socket reads only when the read() operation has been issued. Once the requested read() operation is actually performed, EventLoop triggers an inboundBufferSuspended event that tells the handlers that the requested read() operation has been performed and the inbound traffic has been suspended again. A handler can decide to continue reading or not. Unlike other outbound operations, read() does not use ChannelFuture at all to avoid GC cost. If there's a good reason to create a new future per read at the GC cost, I'll change this. This pull request consequently removes the readable property in ChannelHandlerContext, which means how the traffic control works changed significantly. This pull request also adds a new configuration property ChannelOption.AUTO_READ whose default value is true. If true, Netty will call ctx.read() for you. If you need a close control over when read() is called, you can set it to false. Another interesting fact is that non-terminal handlers do not really need to call read() at all. Only the last inbound handler will have to call it, and that's just enough. Actually, you don't even need to call it at the last handler in most cases because of the ChannelOption.AUTO_READ mentioned above. There's no serious backward compatibility issue. If the compiler complains your handler does not implement the read() method, add the following: public void read(ChannelHandlerContext ctx) throws Exception { ctx.read(); } Note that this pull request certainly makes bounded inbound buffer support very easy, but itself does not add the bounded inbound buffer support.
2012-12-30 13:53:59 +01:00
private final Runnable beginReadTask = new Runnable() {
@Override
public void run() {
beginRead();
}
};
private final Runnable flushLaterTask = new Runnable() {
@Override
public void run() {
flushNowPending = false;
flush(voidFuture());
}
};
@Override
public final void sendFile(final FileRegion region, final ChannelPromise promise) {
if (eventLoop().inEventLoop()) {
if (outboundBufSize() > 0) {
flushNotifier(newPromise()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture cf) throws Exception {
sendFile0(region, promise);
}
});
} else {
// nothing pending try to send the fileRegion now!
sendFile0(region, promise);
}
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
sendFile(region, promise);
}
});
}
}
private void sendFile0(FileRegion region, ChannelPromise promise) {
if (flushTaskInProgress == null) {
flushTaskInProgress = new FlushTask(region, promise);
try {
// the first FileRegion to flush so trigger it now!
doFlushFileRegion(region, promise);
} catch (Throwable cause) {
region.close();
promise.setFailure(cause);
}
return;
}
FlushTask task = flushTaskInProgress;
for (;;) {
FlushTask next = task.next;
if (next == null) {
break;
}
task = next;
}
// there is something that needs to get flushed first so add it as next in the chain
task.next = new FlushTask(region, promise);
}
@Override
public final ChannelHandlerContext directOutboundContext() {
return pipeline.head;
}
@Override
public final ChannelPromise voidFuture() {
return voidPromise;
}
@Override
public final SocketAddress localAddress() {
return localAddress0();
}
@Override
public final SocketAddress remoteAddress() {
return remoteAddress0();
}
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
throw new IllegalStateException("registered to an event loop already");
}
if (!isCompatible(eventLoop)) {
throw new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName());
}
AbstractChannel.this.eventLoop = eventLoop;
assert eventLoop().inEventLoop();
if (!ensureOpen(promise)) {
return;
}
// check if the eventLoop which was given is currently in the eventloop.
// if that is the case we are safe to call register, if not we need to
// schedule the execution as otherwise we may say some race-conditions.
//
// See https://github.com/netty/netty/issues/654
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
}
}
private void register0(ChannelPromise promise) {
try {
Runnable postRegisterTask = doRegister();
registered = true;
promise.setSuccess();
pipeline.fireChannelRegistered();
if (postRegisterTask != null) {
postRegisterTask.run();
}
if (isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
try {
doClose();
} catch (Throwable t2) {
logger.warn("Failed to close a channel", t2);
}
promise.setFailure(t);
closeFuture.setClosed();
}
}
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (eventLoop().inEventLoop()) {
if (!ensureOpen(promise)) {
return;
}
try {
boolean wasActive = isActive();
// See: https://github.com/netty/netty/issues/576
if (!DetectionUtil.isWindows() && !DetectionUtil.isRoot() &&
Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
doBind(localAddress);
promise.setSuccess();
if (!wasActive && isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
promise.setFailure(t);
closeIfClosed();
}
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
bind(localAddress, promise);
}
});
}
}
@Override
public final void disconnect(final ChannelPromise promise) {
if (eventLoop().inEventLoop()) {
try {
boolean wasActive = isActive();
doDisconnect();
promise.setSuccess();
if (wasActive && !isActive()) {
pipeline.fireChannelInactive();
}
} catch (Throwable t) {
promise.setFailure(t);
closeIfClosed();
}
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
disconnect(promise);
}
});
}
}
@Override
public final void close(final ChannelPromise promise) {
if (eventLoop().inEventLoop()) {
boolean wasActive = isActive();
if (closeFuture.setClosed()) {
try {
doClose();
promise.setSuccess();
} catch (Throwable t) {
promise.setFailure(t);
}
if (closedChannelException != null) {
closedChannelException = new ClosedChannelException();
}
flushFutureNotifier.notifyFlushFutures(closedChannelException);
if (wasActive && !isActive()) {
pipeline.fireChannelInactive();
}
deregister(voidFuture());
} else {
// Closed already.
promise.setSuccess();
}
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
close(promise);
}
});
}
}
@Override
public final void closeForcibly() {
try {
doClose();
} catch (Exception e) {
logger.warn("Failed to close a channel.", e);
}
}
@Override
public final void deregister(final ChannelPromise promise) {
if (eventLoop().inEventLoop()) {
if (!registered) {
promise.setSuccess();
return;
}
try {
doDeregister();
} catch (Throwable t) {
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
} finally {
if (registered) {
registered = false;
promise.setSuccess();
pipeline.fireChannelUnregistered();
} else {
// Some transports like local and AIO does not allow the deregistration of
// an open channel. Their doDeregister() calls close(). Consequently,
// close() calls deregister() again - no need to fire channelUnregistered.
promise.setSuccess();
}
}
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
deregister(promise);
}
});
}
}
Read only when requested (read-on-demand) This pull request introduces a new operation called read() that replaces the existing inbound traffic control method. EventLoop now performs socket reads only when the read() operation has been issued. Once the requested read() operation is actually performed, EventLoop triggers an inboundBufferSuspended event that tells the handlers that the requested read() operation has been performed and the inbound traffic has been suspended again. A handler can decide to continue reading or not. Unlike other outbound operations, read() does not use ChannelFuture at all to avoid GC cost. If there's a good reason to create a new future per read at the GC cost, I'll change this. This pull request consequently removes the readable property in ChannelHandlerContext, which means how the traffic control works changed significantly. This pull request also adds a new configuration property ChannelOption.AUTO_READ whose default value is true. If true, Netty will call ctx.read() for you. If you need a close control over when read() is called, you can set it to false. Another interesting fact is that non-terminal handlers do not really need to call read() at all. Only the last inbound handler will have to call it, and that's just enough. Actually, you don't even need to call it at the last handler in most cases because of the ChannelOption.AUTO_READ mentioned above. There's no serious backward compatibility issue. If the compiler complains your handler does not implement the read() method, add the following: public void read(ChannelHandlerContext ctx) throws Exception { ctx.read(); } Note that this pull request certainly makes bounded inbound buffer support very easy, but itself does not add the bounded inbound buffer support.
2012-12-30 13:53:59 +01:00
@Override
public void beginRead() {
if (eventLoop().inEventLoop()) {
try {
doBeginRead();
} catch (Exception e) {
pipeline().fireExceptionCaught(e);
close(unsafe().voidFuture());
}
} else {
eventLoop().execute(beginReadTask);
}
}
@Override
public void flush(final ChannelPromise promise) {
if (eventLoop().inEventLoop()) {
if (flushTaskInProgress != null) {
FlushTask task = flushTaskInProgress;
// loop over the tasks to find the last one
for (;;) {
FlushTask t = task.next;
if (t == null) {
break;
}
task = t.next;
}
task.next = new FlushTask(null, promise);
return;
}
flushNotifierAndFlush(promise);
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
flush(promise);
}
});
}
}
private void flushNotifierAndFlush(ChannelPromise promise) {
flushNotifier(promise);
flush0();
}
private int outboundBufSize() {
final int bufSize;
final ChannelHandlerContext ctx = directOutboundContext();
if (ctx.hasOutboundByteBuffer()) {
bufSize = ctx.outboundByteBuffer().readableBytes();
} else {
bufSize = ctx.outboundMessageBuffer().size();
}
return bufSize;
}
private ChannelFuture flushNotifier(ChannelPromise promise) {
// Append flush future to the notification list.
if (promise != voidPromise) {
flushFutureNotifier.addFlushFuture(promise, outboundBufSize());
}
return promise;
}
private void flush0() {
if (!inFlushNow) { // Avoid re-entrance
try {
if (!isFlushPending()) {
flushNow();
} else {
// Event loop will call flushNow() later by itself.
}
} catch (Throwable t) {
flushFutureNotifier.notifyFlushFutures(t);
if (t instanceof IOException) {
close(voidFuture());
}
} finally {
if (!isActive()) {
close(unsafe().voidFuture());
}
}
} else {
if (!flushNowPending) {
flushNowPending = true;
eventLoop().execute(flushLaterTask);
}
}
}
@Override
public final void flushNow() {
if (inFlushNow || flushTaskInProgress != null) {
return;
}
inFlushNow = true;
ChannelHandlerContext ctx = directOutboundContext();
Throwable cause = null;
try {
if (ctx.hasOutboundByteBuffer()) {
ByteBuf out = ctx.outboundByteBuffer();
int oldSize = out.readableBytes();
try {
doFlushByteBuffer(out);
} catch (Throwable t) {
cause = t;
} finally {
final int newSize = out.readableBytes();
final int writtenBytes = oldSize - newSize;
if (writtenBytes > 0) {
flushFutureNotifier.increaseWriteCounter(writtenBytes);
if (newSize == 0) {
out.discardReadBytes();
}
}
}
} else {
MessageBuf<Object> out = ctx.outboundMessageBuffer();
int oldSize = out.size();
try {
doFlushMessageBuffer(out);
} catch (Throwable t) {
cause = t;
} finally {
flushFutureNotifier.increaseWriteCounter(oldSize - out.size());
}
}
if (cause == null) {
flushFutureNotifier.notifyFlushFutures();
} else {
flushFutureNotifier.notifyFlushFutures(cause);
if (cause instanceof IOException) {
close(voidFuture());
}
}
} finally {
inFlushNow = false;
}
}
protected final boolean ensureOpen(ChannelPromise promise) {
if (isOpen()) {
return true;
}
Exception e = new ClosedChannelException();
promise.setFailure(e);
return false;
}
protected final void closeIfClosed() {
if (isOpen()) {
return;
}
close(voidFuture());
}
}
/**
* Return {@code true} if the given {@link EventLoop} is compatible with this instance.
*/
protected abstract boolean isCompatible(EventLoop loop);
/**
* Returns the {@link SocketAddress} which is bound locally.
*/
protected abstract SocketAddress localAddress0();
/**
* Return the {@link SocketAddress} which the {@link Channel} is connected to.
*/
protected abstract SocketAddress remoteAddress0();
/**
* Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
* You can return a {@link Runnable} which will be run as post-task of the registration process.
*
* Sub-classes may override this method as it will just return {@code null}
*/
protected Runnable doRegister() throws Exception {
return null;
}
/**
* Bind the {@link Channel} to the {@link SocketAddress}
*/
protected abstract void doBind(SocketAddress localAddress) throws Exception;
/**
* Disconnect this {@link Channel} from its remote peer
*/
protected abstract void doDisconnect() throws Exception;
/**
* Will be called before the actual close operation will be performed. Sub-classes may override this as the default
* is to do nothing.
*/
protected void doPreClose() throws Exception {
// NOOP by default
}
/**
* Close the {@link Channel}
*/
protected abstract void doClose() throws Exception;
/**
* Deregister the {@link Channel} from its {@link EventLoop}.
*
* Sub-classes may override this method
*/
protected void doDeregister() throws Exception {
// NOOP
}
Read only when requested (read-on-demand) This pull request introduces a new operation called read() that replaces the existing inbound traffic control method. EventLoop now performs socket reads only when the read() operation has been issued. Once the requested read() operation is actually performed, EventLoop triggers an inboundBufferSuspended event that tells the handlers that the requested read() operation has been performed and the inbound traffic has been suspended again. A handler can decide to continue reading or not. Unlike other outbound operations, read() does not use ChannelFuture at all to avoid GC cost. If there's a good reason to create a new future per read at the GC cost, I'll change this. This pull request consequently removes the readable property in ChannelHandlerContext, which means how the traffic control works changed significantly. This pull request also adds a new configuration property ChannelOption.AUTO_READ whose default value is true. If true, Netty will call ctx.read() for you. If you need a close control over when read() is called, you can set it to false. Another interesting fact is that non-terminal handlers do not really need to call read() at all. Only the last inbound handler will have to call it, and that's just enough. Actually, you don't even need to call it at the last handler in most cases because of the ChannelOption.AUTO_READ mentioned above. There's no serious backward compatibility issue. If the compiler complains your handler does not implement the read() method, add the following: public void read(ChannelHandlerContext ctx) throws Exception { ctx.read(); } Note that this pull request certainly makes bounded inbound buffer support very easy, but itself does not add the bounded inbound buffer support.
2012-12-30 13:53:59 +01:00
/**
* Schedule a read operation.
*/
protected abstract void doBeginRead() throws Exception;
/**
* Flush the content of the given {@link ByteBuf} to the remote peer.
*
* Sub-classes may override this as this implementation will just thrown an {@link UnsupportedOperationException}
*/
protected void doFlushByteBuffer(ByteBuf buf) throws Exception {
throw new UnsupportedOperationException();
}
/**
* Flush the content of the given {@link MessageBuf} to the remote peer.
*
* Sub-classes may override this as this implementation will just thrown an {@link UnsupportedOperationException}
*/
protected void doFlushMessageBuffer(MessageBuf<Object> buf) throws Exception {
throw new UnsupportedOperationException();
}
/**
* Flush the content of the given {@link FileRegion} to the remote peer.
*
* Sub-classes may override this as this implementation will just thrown an {@link UnsupportedOperationException}
*/
protected void doFlushFileRegion(FileRegion region, ChannelPromise promise) throws Exception {
throw new UnsupportedOperationException();
}
protected static void checkEOF(FileRegion region, long writtenBytes) throws IOException {
if (writtenBytes < region.count()) {
throw new EOFException("Expected to be able to write "
+ region.count() + " bytes, but only wrote "
+ writtenBytes);
}
}
/**
* Return {@code true} if a flush to the {@link Channel} is currently pending.
*/
protected abstract boolean isFlushPending();
private final class CloseFuture extends DefaultChannelPromise implements ChannelFuture.Unsafe {
CloseFuture(AbstractChannel ch) {
super(ch);
}
@Override
public void setSuccess() {
throw new IllegalStateException();
}
@Override
public void setFailure(Throwable cause) {
throw new IllegalStateException();
}
@Override
public boolean trySuccess() {
throw new IllegalStateException();
}
@Override
public boolean tryFailure(Throwable cause) {
throw new IllegalStateException();
}
boolean setClosed() {
try {
doPreClose();
} catch (Exception e) {
logger.warn("doPreClose() raised an exception.", e);
}
return super.trySuccess();
}
}
}