From 2b340df452d5ec282b7cefd15f887f31604b9425 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Thu, 28 Apr 2016 10:56:49 -0700 Subject: [PATCH] DuplexChannel to support shutdownInput Motivation: The DuplexChannel is currently incomplete and only supports shutting down the output side of a channel. This interface should also support shutting down the input side of the channel. Modifications: - Add shutdownInput and shutdown methods to the DuplexChannel interface - Remove state in NIO and OIO for tracking input being shutdown independent of the underlying transport's socket type. Tracking the state independently may lead to inconsistent state. Result: DuplexChannel supports shutting down the input side of the channel Fixes https://github.com/netty/netty/issues/5175 --- .../epoll/AbstractEpollStreamChannel.java | 91 ++++++++++++- .../EpollSocketShutdownOutputByPeerTest.java | 29 +++++ .../EpollSocketShutdownOutputBySelfTest.java | 29 +++++ .../io/netty/channel/rxtx/RxtxChannel.java | 20 ++- .../udt/nio/NioUdtByteConnectorChannel.java | 9 +- .../channel/nio/AbstractNioByteChannel.java | 10 +- .../netty/channel/nio/AbstractNioChannel.java | 19 --- .../nio/AbstractNioMessageChannel.java | 11 +- .../channel/oio/AbstractOioByteChannel.java | 33 ++--- .../netty/channel/socket/DuplexChannel.java | 34 ++++- .../channel/socket/nio/NioSocketChannel.java | 119 ++++++++++++++++- .../channel/socket/oio/OioSocketChannel.java | 120 +++++++++++++++--- 12 files changed, 445 insertions(+), 79 deletions(-) create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketShutdownOutputByPeerTest.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketShutdownOutputBySelfTest.java diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index bc22b37370..8cdbba542e 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -530,7 +530,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); } - protected void shutdownOutput0(final ChannelPromise promise) { + private void shutdownOutput0(final ChannelPromise promise) { try { fd().shutdown(false, true); promise.setSuccess(); @@ -539,14 +539,37 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im } } + private void shutdownInput0(final ChannelPromise promise) { + try { + fd().shutdown(true, false); + promise.setSuccess(); + } catch (Throwable cause) { + promise.setFailure(cause); + } + } + + private void shutdown0(final ChannelPromise promise) { + try { + fd().shutdown(true, true); + promise.setSuccess(); + } catch (Throwable cause) { + promise.setFailure(cause); + } + } + + @Override + public boolean isOutputShutdown() { + return fd().isOutputShutdown(); + } + @Override public boolean isInputShutdown() { return fd().isInputShutdown(); } @Override - public boolean isOutputShutdown() { - return fd().isOutputShutdown(); + public boolean isShutdown() { + return fd().isShutdown(); } @Override @@ -580,6 +603,68 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im return promise; } + @Override + public ChannelFuture shutdownInput() { + return shutdownInput(newPromise()); + } + + @Override + public ChannelFuture shutdownInput(final ChannelPromise promise) { + Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose(); + if (closeExecutor != null) { + closeExecutor.execute(new OneTimeTask() { + @Override + public void run() { + shutdownInput0(promise); + } + }); + } else { + EventLoop loop = eventLoop(); + if (loop.inEventLoop()) { + shutdownInput0(promise); + } else { + loop.execute(new OneTimeTask() { + @Override + public void run() { + shutdownInput0(promise); + } + }); + } + } + return promise; + } + + @Override + public ChannelFuture shutdown() { + return shutdown(newPromise()); + } + + @Override + public ChannelFuture shutdown(final ChannelPromise promise) { + Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose(); + if (closeExecutor != null) { + closeExecutor.execute(new OneTimeTask() { + @Override + public void run() { + shutdown0(promise); + } + }); + } else { + EventLoop loop = eventLoop(); + if (loop.inEventLoop()) { + shutdown0(promise); + } else { + loop.execute(new OneTimeTask() { + @Override + public void run() { + shutdown0(promise); + } + }); + } + } + return promise; + } + @Override protected void doClose() throws Exception { try { diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketShutdownOutputByPeerTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketShutdownOutputByPeerTest.java new file mode 100644 index 0000000000..5a7bdc1b93 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketShutdownOutputByPeerTest.java @@ -0,0 +1,29 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketShutdownOutputByPeerTest; + +import java.util.List; + +public class EpollSocketShutdownOutputByPeerTest extends SocketShutdownOutputByPeerTest { + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.serverSocket(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketShutdownOutputBySelfTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketShutdownOutputBySelfTest.java new file mode 100644 index 0000000000..3ad80e472b --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketShutdownOutputBySelfTest.java @@ -0,0 +1,29 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketShutdownOutputBySelfTest; + +import java.util.List; + +public class EpollSocketShutdownOutputBySelfTest extends SocketShutdownOutputBySelfTest { + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.clientSocket(); + } +} diff --git a/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannel.java b/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannel.java index 07628a944a..7ff98e06ef 100644 --- a/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannel.java +++ b/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannel.java @@ -18,6 +18,7 @@ package io.netty.channel.rxtx; import gnu.io.CommPort; import gnu.io.CommPortIdentifier; import gnu.io.SerialPort; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPromise; import io.netty.channel.oio.OioByteStreamChannel; import io.netty.util.internal.OneTimeTask; @@ -25,7 +26,14 @@ import io.netty.util.internal.OneTimeTask; import java.net.SocketAddress; import java.util.concurrent.TimeUnit; -import static io.netty.channel.rxtx.RxtxChannelOption.*; +import static io.netty.channel.rxtx.RxtxChannelOption.BAUD_RATE; +import static io.netty.channel.rxtx.RxtxChannelOption.DATA_BITS; +import static io.netty.channel.rxtx.RxtxChannelOption.DTR; +import static io.netty.channel.rxtx.RxtxChannelOption.PARITY_BIT; +import static io.netty.channel.rxtx.RxtxChannelOption.READ_TIMEOUT; +import static io.netty.channel.rxtx.RxtxChannelOption.RTS; +import static io.netty.channel.rxtx.RxtxChannelOption.STOP_BITS; +import static io.netty.channel.rxtx.RxtxChannelOption.WAIT_TIME; /** * A channel to a serial device using the RXTX library. @@ -129,6 +137,16 @@ public class RxtxChannel extends OioByteStreamChannel { } } + @Override + protected boolean isInputShutdown() { + return !open; + } + + @Override + protected ChannelFuture shutdownInput() { + return newFailedFuture(new UnsupportedOperationException("shutdownInput")); + } + private final class RxtxUnsafe extends AbstractUnsafe { @Override public void connect( diff --git a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteConnectorChannel.java b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteConnectorChannel.java index 20adae249b..62dbb9ffd7 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteConnectorChannel.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteConnectorChannel.java @@ -17,10 +17,10 @@ package io.netty.channel.udt.nio; import com.barchart.udt.TypeUDT; import com.barchart.udt.nio.SocketChannelUDT; - import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelException; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelMetadata; import io.netty.channel.FileRegion; import io.netty.channel.RecvByteBufAllocator; @@ -34,7 +34,7 @@ import io.netty.util.internal.logging.InternalLoggerFactory; import java.net.InetSocketAddress; import java.net.SocketAddress; -import static java.nio.channels.SelectionKey.*; +import static java.nio.channels.SelectionKey.OP_CONNECT; /** * Byte Channel Connector for UDT Streams. @@ -149,6 +149,11 @@ public class NioUdtByteConnectorChannel extends AbstractNioByteChannel implement return byteBuf.readBytes(javaChannel(), expectedWrittenBytes); } + @Override + protected ChannelFuture shutdownInput() { + return newFailedFuture(new UnsupportedOperationException("shutdownInput")); + } + @Override protected long doWriteFileRegion(FileRegion region) throws Exception { throw new UnsupportedOperationException(); diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index 69f99b1575..cc59b84d8a 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; @@ -52,6 +53,11 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { super(parent, ch, SelectionKey.OP_READ); } + /** + * Shutdown the input side of the channel. + */ + protected abstract ChannelFuture shutdownInput(); + @Override protected AbstractNioUnsafe newUnsafe() { return new NioByteUnsafe(); @@ -60,10 +66,10 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { protected class NioByteUnsafe extends AbstractNioUnsafe { private void closeOnRead(ChannelPipeline pipeline) { - SelectionKey key = selectionKey(); - setInputShutdown(); if (isOpen()) { if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { + shutdownInput(); + SelectionKey key = selectionKey(); key.interestOps(key.interestOps() & ~readInterestOp); pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); } else { diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java index 582e6ae6fa..8de66aba18 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java @@ -60,7 +60,6 @@ public abstract class AbstractNioChannel extends AbstractChannel { private final SelectableChannel ch; protected final int readInterestOp; volatile SelectionKey selectionKey; - private volatile boolean inputShutdown; boolean readPending; private final Runnable clearReadPendingRunnable = new Runnable() { @Override @@ -197,20 +196,6 @@ public abstract class AbstractNioChannel extends AbstractChannel { ((AbstractNioUnsafe) unsafe()).removeReadOp(); } - /** - * Return {@code true} if the input of this {@link Channel} is shutdown - */ - protected boolean isInputShutdown() { - return inputShutdown; - } - - /** - * Shutdown the input of this {@link Channel}. - */ - void setInputShutdown() { - inputShutdown = true; - } - /** * Special {@link Unsafe} sub-type which allows to access the underlying {@link SelectableChannel} */ @@ -422,10 +407,6 @@ public abstract class AbstractNioChannel extends AbstractChannel { @Override protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called - if (inputShutdown) { - return; - } - final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java index 1d606e075b..1c7f92aa2b 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java @@ -33,6 +33,7 @@ import java.util.List; * {@link AbstractNioChannel} base class for {@link Channel}s that operate on messages. */ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { + boolean inputShutdown; /** * @see {@link AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)} @@ -46,6 +47,14 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { return new NioMessageUnsafe(); } + @Override + protected void doBeginRead() throws Exception { + if (inputShutdown) { + return; + } + super.doBeginRead(); + } + private final class NioMessageUnsafe extends AbstractNioUnsafe { private final List readBuf = new ArrayList(); @@ -98,7 +107,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { } if (closed) { - setInputShutdown(); + inputShutdown = true; if (isOpen()) { close(voidPromise()); } diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java index 8c10cf2b20..15da4099b6 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java @@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOutboundBuffer; @@ -40,8 +41,6 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel { " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " + StringUtil.simpleClassName(FileRegion.class) + ')'; - private volatile boolean inputShutdown; - /** * @see AbstractOioByteChannel#AbstractOioByteChannel(Channel) */ @@ -49,39 +48,27 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel { super(parent); } - protected boolean isInputShutdown() { - return inputShutdown; - } - @Override public ChannelMetadata metadata() { return METADATA; } /** - * Check if the input was shutdown and if so return {@code true}. The default implementation sleeps also for - * {@link #SO_TIMEOUT} milliseconds to simulate some blocking. + * Determine if the input side of this channel is shutdown. + * @return {@code true} if the input side of this channel is shutdown. */ - protected boolean checkInputShutdown() { - if (inputShutdown) { - try { - Thread.sleep(SO_TIMEOUT); - } catch (InterruptedException e) { - // ignore - } - return true; - } - return false; - } + protected abstract boolean isInputShutdown(); - void setInputShutdown() { - inputShutdown = true; - } + /** + * Shutdown the input side of this channel. + * @return A channel future that will complete when the shutdown is complete. + */ + protected abstract ChannelFuture shutdownInput(); private void closeOnRead(ChannelPipeline pipeline) { - setInputShutdown(); if (isOpen()) { if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { + shutdownInput(); pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); } else { unsafe().close(unsafe().voidPromise()); diff --git a/transport/src/main/java/io/netty/channel/socket/DuplexChannel.java b/transport/src/main/java/io/netty/channel/socket/DuplexChannel.java index d34ec36bff..4770d22511 100644 --- a/transport/src/main/java/io/netty/channel/socket/DuplexChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/DuplexChannel.java @@ -32,6 +32,18 @@ public interface DuplexChannel extends Channel { */ boolean isInputShutdown(); + /** + * @see Socket#shutdownInput() + */ + ChannelFuture shutdownInput(); + + /** + * Will shutdown the input and notify {@link ChannelPromise}. + * + * @see Socket#shutdownInput() + */ + ChannelFuture shutdownInput(ChannelPromise promise); + /** * @see Socket#isOutputShutdown() */ @@ -43,9 +55,27 @@ public interface DuplexChannel extends Channel { ChannelFuture shutdownOutput(); /** - * @see Socket#shutdownOutput() + * Will shutdown the output and notify {@link ChannelPromise}. * - * Will notify the given {@link ChannelPromise} + * @see Socket#shutdownOutput() */ ChannelFuture shutdownOutput(ChannelPromise promise); + + /** + * Determine if both the input and output of this channel have been shutdown. + */ + boolean isShutdown(); + + /** + * Will shutdown the input and output sides of this channel. + * @return will be completed when both shutdown operations complete. + */ + ChannelFuture shutdown(); + + /** + * Will shutdown the input and output sides of this channel. + * @param promise will be completed when both shutdown operations complete. + * @return will be completed when both shutdown operations complete. + */ + ChannelFuture shutdown(ChannelPromise promise); } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index 86253ab851..dc1cbe1cae 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -31,6 +31,8 @@ import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannelConfig; import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.internal.OneTimeTask; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; @@ -46,7 +48,7 @@ import java.util.concurrent.Executor; * {@link io.netty.channel.socket.SocketChannel} which uses NIO selector based implementation. */ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel { - + private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSocketChannel.class); private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16); private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); @@ -124,9 +126,20 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty return ch.isOpen() && ch.isConnected(); } + @Override + public boolean isOutputShutdown() { + return javaChannel().socket().isOutputShutdown() || !isActive(); + } + @Override public boolean isInputShutdown() { - return super.isInputShutdown(); + return javaChannel().socket().isInputShutdown() || !isActive(); + } + + @Override + public boolean isShutdown() { + Socket socket = javaChannel().socket(); + return socket.isInputShutdown() && socket.isOutputShutdown() || !isActive(); } @Override @@ -139,11 +152,6 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty return (InetSocketAddress) super.remoteAddress(); } - @Override - public boolean isOutputShutdown() { - return javaChannel().socket().isOutputShutdown() || !isActive(); - } - @Override public ChannelFuture shutdownOutput() { return shutdownOutput(newPromise()); @@ -175,6 +183,68 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty return promise; } + @Override + public ChannelFuture shutdownInput() { + return shutdownInput(newPromise()); + } + + @Override + public ChannelFuture shutdownInput(final ChannelPromise promise) { + Executor closeExecutor = ((NioSocketChannelUnsafe) unsafe()).prepareToClose(); + if (closeExecutor != null) { + closeExecutor.execute(new OneTimeTask() { + @Override + public void run() { + shutdownInput0(promise); + } + }); + } else { + EventLoop loop = eventLoop(); + if (loop.inEventLoop()) { + shutdownInput0(promise); + } else { + loop.execute(new OneTimeTask() { + @Override + public void run() { + shutdownInput0(promise); + } + }); + } + } + return promise; + } + + @Override + public ChannelFuture shutdown() { + return shutdown(newPromise()); + } + + @Override + public ChannelFuture shutdown(final ChannelPromise promise) { + Executor closeExecutor = ((NioSocketChannelUnsafe) unsafe()).prepareToClose(); + if (closeExecutor != null) { + closeExecutor.execute(new OneTimeTask() { + @Override + public void run() { + shutdown0(promise); + } + }); + } else { + EventLoop loop = eventLoop(); + if (loop.inEventLoop()) { + shutdown0(promise); + } else { + loop.execute(new OneTimeTask() { + @Override + public void run() { + shutdown0(promise); + } + }); + } + } + return promise; + } + private void shutdownOutput0(final ChannelPromise promise) { try { javaChannel().socket().shutdownOutput(); @@ -184,6 +254,41 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty } } + private void shutdownInput0(final ChannelPromise promise) { + try { + javaChannel().socket().shutdownInput(); + promise.setSuccess(); + } catch (Throwable t) { + promise.setFailure(t); + } + } + + private void shutdown0(final ChannelPromise promise) { + Socket socket = javaChannel().socket(); + Throwable cause = null; + try { + socket.shutdownOutput(); + } catch (Throwable t) { + cause = t; + } + try { + socket.shutdownInput(); + } catch (Throwable t) { + if (cause == null) { + promise.setFailure(t); + } else { + logger.debug("Exception suppressed because a previous exception occurred.", t); + promise.setFailure(cause); + } + return; + } + if (cause == null) { + promise.setSuccess(); + } else { + promise.setFailure(cause); + } + } + @Override protected SocketAddress localAddress0() { return javaChannel().socket().getLocalSocketAddress(); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java index 3b34e213ec..3e6cef632e 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java @@ -38,11 +38,9 @@ import java.net.SocketTimeoutException; /** * A {@link SocketChannel} which is using Old-Blocking-IO */ -public class OioSocketChannel extends OioByteStreamChannel - implements SocketChannel { +public class OioSocketChannel extends OioByteStreamChannel implements SocketChannel { - private static final InternalLogger logger = - InternalLoggerFactory.getInstance(OioSocketChannel.class); + private static final InternalLogger logger = InternalLoggerFactory.getInstance(OioSocketChannel.class); private final Socket socket; private final OioSocketChannelConfig config; @@ -115,21 +113,36 @@ public class OioSocketChannel extends OioByteStreamChannel return !socket.isClosed() && socket.isConnected(); } - @Override - public boolean isInputShutdown() { - return super.isInputShutdown(); - } - @Override public boolean isOutputShutdown() { return socket.isOutputShutdown() || !isActive(); } + @Override + public boolean isInputShutdown() { + return socket.isInputShutdown() || !isActive(); + } + + @Override + public boolean isShutdown() { + return socket.isInputShutdown() && socket.isOutputShutdown() || !isActive(); + } + @Override public ChannelFuture shutdownOutput() { return shutdownOutput(newPromise()); } + @Override + public ChannelFuture shutdownInput() { + return shutdownInput(newPromise()); + } + + @Override + public ChannelFuture shutdown() { + return shutdown(newPromise()); + } + @Override protected int doReadBytes(ByteBuf buf) throws Exception { if (socket.isClosed()) { @@ -143,24 +156,94 @@ public class OioSocketChannel extends OioByteStreamChannel } @Override - public ChannelFuture shutdownOutput(final ChannelPromise future) { + public ChannelFuture shutdownOutput(final ChannelPromise promise) { EventLoop loop = eventLoop(); if (loop.inEventLoop()) { - try { - socket.shutdownOutput(); - future.setSuccess(); - } catch (Throwable t) { - future.setFailure(t); - } + shutdownOutput0(promise); } else { loop.execute(new OneTimeTask() { @Override public void run() { - shutdownOutput(future); + shutdownOutput0(promise); } }); } - return future; + return promise; + } + + private void shutdownOutput0(ChannelPromise promise) { + try { + socket.shutdownOutput(); + promise.setSuccess(); + } catch (Throwable t) { + promise.setFailure(t); + } + } + + @Override + public ChannelFuture shutdownInput(final ChannelPromise promise) { + EventLoop loop = eventLoop(); + if (loop.inEventLoop()) { + shutdownInput0(promise); + } else { + loop.execute(new OneTimeTask() { + @Override + public void run() { + shutdownInput0(promise); + } + }); + } + return promise; + } + + private void shutdownInput0(ChannelPromise promise) { + try { + socket.shutdownInput(); + promise.setSuccess(); + } catch (Throwable t) { + promise.setFailure(t); + } + } + + @Override + public ChannelFuture shutdown(final ChannelPromise promise) { + EventLoop loop = eventLoop(); + if (loop.inEventLoop()) { + shutdown0(promise); + } else { + loop.execute(new OneTimeTask() { + @Override + public void run() { + shutdown0(promise); + } + }); + } + return promise; + } + + private void shutdown0(ChannelPromise promise) { + Throwable cause = null; + try { + socket.shutdownOutput(); + } catch (Throwable t) { + cause = t; + } + try { + socket.shutdownInput(); + } catch (Throwable t) { + if (cause == null) { + promise.setFailure(t); + } else { + logger.debug("Exception suppressed because a previous exception occurred.", t); + promise.setFailure(cause); + } + return; + } + if (cause == null) { + promise.setSuccess(); + } else { + promise.setFailure(cause); + } } @Override @@ -221,7 +304,6 @@ public class OioSocketChannel extends OioByteStreamChannel socket.close(); } - @Override protected boolean checkInputShutdown() { if (isInputShutdown()) { try {