netty5/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java

461 lines
16 KiB
Java
Raw Normal View History

/*
2012-06-04 22:31:44 +02:00
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
2012-06-04 22:31:44 +02:00
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.nio;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoop;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketAddress;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* Abstract base class for {@link Channel} implementations which use a Selector based approach.
*/
public abstract class AbstractNioChannel extends AbstractChannel {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(AbstractNioChannel.class);
private final SelectableChannel ch;
Read only when requested (read-on-demand) This pull request introduces a new operation called read() that replaces the existing inbound traffic control method. EventLoop now performs socket reads only when the read() operation has been issued. Once the requested read() operation is actually performed, EventLoop triggers an inboundBufferSuspended event that tells the handlers that the requested read() operation has been performed and the inbound traffic has been suspended again. A handler can decide to continue reading or not. Unlike other outbound operations, read() does not use ChannelFuture at all to avoid GC cost. If there's a good reason to create a new future per read at the GC cost, I'll change this. This pull request consequently removes the readable property in ChannelHandlerContext, which means how the traffic control works changed significantly. This pull request also adds a new configuration property ChannelOption.AUTO_READ whose default value is true. If true, Netty will call ctx.read() for you. If you need a close control over when read() is called, you can set it to false. Another interesting fact is that non-terminal handlers do not really need to call read() at all. Only the last inbound handler will have to call it, and that's just enough. Actually, you don't even need to call it at the last handler in most cases because of the ChannelOption.AUTO_READ mentioned above. There's no serious backward compatibility issue. If the compiler complains your handler does not implement the read() method, add the following: public void read(ChannelHandlerContext ctx) throws Exception { ctx.read(); } Note that this pull request certainly makes bounded inbound buffer support very easy, but itself does not add the bounded inbound buffer support.
2012-12-30 13:53:59 +01:00
protected final int readInterestOp;
volatile SelectionKey selectionKey;
private volatile boolean inputShutdown;
private volatile boolean readPending;
/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
*/
private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;
/**
* Create a new instance
*
* @param parent the parent {@link Channel} by which this instance was created. May be {@code null}
* @param ch the underlying {@link SelectableChannel} on which it operates
* @param readInterestOp the ops to set to receive data from the {@link SelectableChannel}
*/
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
@Override
public boolean isOpen() {
return ch.isOpen();
}
@Override
public NioUnsafe unsafe() {
return (NioUnsafe) super.unsafe();
}
protected SelectableChannel javaChannel() {
return ch;
}
@Override
public NioEventLoop eventLoop() {
return (NioEventLoop) super.eventLoop();
}
/**
* Return the current {@link SelectionKey}
*/
protected SelectionKey selectionKey() {
assert selectionKey != null;
return selectionKey;
}
protected boolean isReadPending() {
return readPending;
}
protected void setReadPending(boolean readPending) {
this.readPending = readPending;
}
/**
* Return {@code true} if the input of this {@link Channel} is shutdown
*/
protected boolean isInputShutdown() {
return inputShutdown;
}
/**
* Shutdown the input of this {@link Channel}.
*/
void setInputShutdown() {
inputShutdown = true;
}
/**
* Special {@link Unsafe} sub-type which allows to access the underlying {@link SelectableChannel}
*/
public interface NioUnsafe extends Unsafe {
/**
* Return underlying {@link SelectableChannel}
*/
SelectableChannel ch();
/**
* Finish connect
*/
void finishConnect();
/**
* Read from underlying {@link SelectableChannel}
*/
void read();
void forceFlush();
}
protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
protected final void removeReadOp() {
SelectionKey key = selectionKey();
// Check first if the key is still valid as it may be canceled as part of the deregistration
// from the EventLoop
// See https://github.com/netty/netty/issues/2104
if (!key.isValid()) {
return;
}
int interestOps = key.interestOps();
if ((interestOps & readInterestOp) != 0) {
// only remove readInterestOp if needed
key.interestOps(interestOps & ~readInterestOp);
}
}
@Override
public final SelectableChannel ch() {
return javaChannel();
}
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
throw new IllegalStateException("connection attempt already made");
}
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new OneTimeTask() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
if (t instanceof ConnectException) {
Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress);
newT.setStackTrace(t.getStackTrace());
t = newT;
}
promise.tryFailure(t);
closeIfClosed();
}
}
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
// trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess();
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
if (!promiseSet) {
close(voidPromise());
}
}
private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
promise.tryFailure(cause);
closeIfClosed();
}
@Override
public final void finishConnect() {
// Note this method is invoked by the event loop only if the connection attempt was
// neither cancelled nor timed out.
assert eventLoop().inEventLoop();
try {
boolean wasActive = isActive();
doFinishConnect();
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
if (t instanceof ConnectException) {
Throwable newT = new ConnectException(t.getMessage() + ": " + requestedRemoteAddress);
newT.setStackTrace(t.getStackTrace());
t = newT;
}
fulfillConnectPromise(connectPromise, t);
} finally {
// Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
// See https://github.com/netty/netty/issues/1770
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}
}
@Override
protected final void flush0() {
// Flush immediately only when there's no pending flush.
// If there's a pending flush operation, event loop will call forceFlush() later,
// and thus there's no need to call it now.
if (isFlushPending()) {
return;
}
super.flush0();
}
@Override
public final void forceFlush() {
// directly call super.flush0() to force a flush now
super.flush0();
}
2013-08-09 13:30:04 +02:00
private boolean isFlushPending() {
SelectionKey selectionKey = selectionKey();
return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
}
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof NioEventLoop;
}
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
@Override
protected void doDeregister() throws Exception {
eventLoop().cancel(selectionKey());
}
Read only when requested (read-on-demand) This pull request introduces a new operation called read() that replaces the existing inbound traffic control method. EventLoop now performs socket reads only when the read() operation has been issued. Once the requested read() operation is actually performed, EventLoop triggers an inboundBufferSuspended event that tells the handlers that the requested read() operation has been performed and the inbound traffic has been suspended again. A handler can decide to continue reading or not. Unlike other outbound operations, read() does not use ChannelFuture at all to avoid GC cost. If there's a good reason to create a new future per read at the GC cost, I'll change this. This pull request consequently removes the readable property in ChannelHandlerContext, which means how the traffic control works changed significantly. This pull request also adds a new configuration property ChannelOption.AUTO_READ whose default value is true. If true, Netty will call ctx.read() for you. If you need a close control over when read() is called, you can set it to false. Another interesting fact is that non-terminal handlers do not really need to call read() at all. Only the last inbound handler will have to call it, and that's just enough. Actually, you don't even need to call it at the last handler in most cases because of the ChannelOption.AUTO_READ mentioned above. There's no serious backward compatibility issue. If the compiler complains your handler does not implement the read() method, add the following: public void read(ChannelHandlerContext ctx) throws Exception { ctx.read(); } Note that this pull request certainly makes bounded inbound buffer support very easy, but itself does not add the bounded inbound buffer support.
2012-12-30 13:53:59 +01:00
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
Read only when requested (read-on-demand) This pull request introduces a new operation called read() that replaces the existing inbound traffic control method. EventLoop now performs socket reads only when the read() operation has been issued. Once the requested read() operation is actually performed, EventLoop triggers an inboundBufferSuspended event that tells the handlers that the requested read() operation has been performed and the inbound traffic has been suspended again. A handler can decide to continue reading or not. Unlike other outbound operations, read() does not use ChannelFuture at all to avoid GC cost. If there's a good reason to create a new future per read at the GC cost, I'll change this. This pull request consequently removes the readable property in ChannelHandlerContext, which means how the traffic control works changed significantly. This pull request also adds a new configuration property ChannelOption.AUTO_READ whose default value is true. If true, Netty will call ctx.read() for you. If you need a close control over when read() is called, you can set it to false. Another interesting fact is that non-terminal handlers do not really need to call read() at all. Only the last inbound handler will have to call it, and that's just enough. Actually, you don't even need to call it at the last handler in most cases because of the ChannelOption.AUTO_READ mentioned above. There's no serious backward compatibility issue. If the compiler complains your handler does not implement the read() method, add the following: public void read(ChannelHandlerContext ctx) throws Exception { ctx.read(); } Note that this pull request certainly makes bounded inbound buffer support very easy, but itself does not add the bounded inbound buffer support.
2012-12-30 13:53:59 +01:00
if (inputShutdown) {
return;
}
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
Read only when requested (read-on-demand) This pull request introduces a new operation called read() that replaces the existing inbound traffic control method. EventLoop now performs socket reads only when the read() operation has been issued. Once the requested read() operation is actually performed, EventLoop triggers an inboundBufferSuspended event that tells the handlers that the requested read() operation has been performed and the inbound traffic has been suspended again. A handler can decide to continue reading or not. Unlike other outbound operations, read() does not use ChannelFuture at all to avoid GC cost. If there's a good reason to create a new future per read at the GC cost, I'll change this. This pull request consequently removes the readable property in ChannelHandlerContext, which means how the traffic control works changed significantly. This pull request also adds a new configuration property ChannelOption.AUTO_READ whose default value is true. If true, Netty will call ctx.read() for you. If you need a close control over when read() is called, you can set it to false. Another interesting fact is that non-terminal handlers do not really need to call read() at all. Only the last inbound handler will have to call it, and that's just enough. Actually, you don't even need to call it at the last handler in most cases because of the ChannelOption.AUTO_READ mentioned above. There's no serious backward compatibility issue. If the compiler complains your handler does not implement the read() method, add the following: public void read(ChannelHandlerContext ctx) throws Exception { ctx.read(); } Note that this pull request certainly makes bounded inbound buffer support very easy, but itself does not add the bounded inbound buffer support.
2012-12-30 13:53:59 +01:00
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
Read only when requested (read-on-demand) This pull request introduces a new operation called read() that replaces the existing inbound traffic control method. EventLoop now performs socket reads only when the read() operation has been issued. Once the requested read() operation is actually performed, EventLoop triggers an inboundBufferSuspended event that tells the handlers that the requested read() operation has been performed and the inbound traffic has been suspended again. A handler can decide to continue reading or not. Unlike other outbound operations, read() does not use ChannelFuture at all to avoid GC cost. If there's a good reason to create a new future per read at the GC cost, I'll change this. This pull request consequently removes the readable property in ChannelHandlerContext, which means how the traffic control works changed significantly. This pull request also adds a new configuration property ChannelOption.AUTO_READ whose default value is true. If true, Netty will call ctx.read() for you. If you need a close control over when read() is called, you can set it to false. Another interesting fact is that non-terminal handlers do not really need to call read() at all. Only the last inbound handler will have to call it, and that's just enough. Actually, you don't even need to call it at the last handler in most cases because of the ChannelOption.AUTO_READ mentioned above. There's no serious backward compatibility issue. If the compiler complains your handler does not implement the read() method, add the following: public void read(ChannelHandlerContext ctx) throws Exception { ctx.read(); } Note that this pull request certainly makes bounded inbound buffer support very easy, but itself does not add the bounded inbound buffer support.
2012-12-30 13:53:59 +01:00
}
}
/**
2013-04-08 21:23:53 +02:00
* Connect to the remote peer
*/
protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
/**
* Finish the connect
*/
protected abstract void doFinishConnect() throws Exception;
/**
* Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
* Note that this method does not create an off-heap copy if the allocation / deallocation cost is too high,
* but just returns the original {@link ByteBuf}..
*/
protected final ByteBuf newDirectBuffer(ByteBuf buf) {
final int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
ReferenceCountUtil.safeRelease(buf);
return Unpooled.EMPTY_BUFFER;
}
final ByteBufAllocator alloc = alloc();
if (alloc.isDirectBufferPooled()) {
ByteBuf directBuf = alloc.directBuffer(readableBytes);
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(buf);
return directBuf;
}
final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
if (directBuf != null) {
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(buf);
return directBuf;
}
// Allocating and deallocating an unpooled direct buffer is very expensive; give up.
return buf;
}
/**
* Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
* The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
* this method. Note that this method does not create an off-heap copy if the allocation / deallocation cost is
* too high, but just returns the original {@link ByteBuf}..
*/
protected final ByteBuf newDirectBuffer(ReferenceCounted holder, ByteBuf buf) {
final int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
ReferenceCountUtil.safeRelease(holder);
return Unpooled.EMPTY_BUFFER;
}
final ByteBufAllocator alloc = alloc();
if (alloc.isDirectBufferPooled()) {
ByteBuf directBuf = alloc.directBuffer(readableBytes);
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(holder);
return directBuf;
}
final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
if (directBuf != null) {
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(holder);
return directBuf;
}
// Allocating and deallocating an unpooled direct buffer is very expensive; give up.
if (holder != buf) {
// Ensure to call holder.release() to give the holder a chance to release other resources than its content.
buf.retain();
ReferenceCountUtil.safeRelease(holder);
}
return buf;
}
}