diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketShutdownOutputByPeerTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketShutdownOutputByPeerTest.java new file mode 100644 index 0000000000..cda0cf9c9a --- /dev/null +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketShutdownOutputByPeerTest.java @@ -0,0 +1,135 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport.socket; + +import static org.junit.Assert.*; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundByteHandlerAdapter; +import io.netty.channel.ChannelInputShutdownEvent; +import io.netty.channel.ChannelOption; +import io.netty.channel.socket.SocketChannel; + +import java.net.Socket; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.SynchronousQueue; + +import org.junit.Test; + +public class SocketShutdownOutputByPeerTest extends AbstractServerSocketTest { + + @Test(timeout = 30000) + public void testShutdownOutput() throws Throwable { + run(); + } + + public void testShutdownOutput(ServerBootstrap sb) throws Throwable { + TestHandler h = new TestHandler(); + Socket s = new Socket(); + try { + sb.childHandler(h).childOption(ChannelOption.ALLOW_HALF_CLOSURE, true).bind().sync(); + + s.connect(addr, 10000); + s.getOutputStream().write(1); + + assertEquals(1, (int) h.queue.take()); + + assertTrue(h.ch.isOpen()); + assertTrue(h.ch.isActive()); + assertFalse(h.ch.isInputShutdown()); + assertFalse(h.ch.isOutputShutdown()); + + s.shutdownOutput(); + + h.halfClosure.await(); + + assertTrue(h.ch.isOpen()); + assertTrue(h.ch.isActive()); + assertTrue(h.ch.isInputShutdown()); + assertFalse(h.ch.isOutputShutdown()); + assertEquals(1, h.closure.getCount()); + } finally { + s.close(); + } + } + + @Test(timeout = 30000) + public void testShutdownOutputWithoutOption() throws Throwable { + run(); + } + + public void testShutdownOutputWithoutOption(ServerBootstrap sb) throws Throwable { + TestHandler h = new TestHandler(); + Socket s = new Socket(); + try { + sb.childHandler(h).bind().sync(); + + s.connect(addr, 10000); + s.getOutputStream().write(1); + + assertEquals(1, (int) h.queue.take()); + + assertTrue(h.ch.isOpen()); + assertTrue(h.ch.isActive()); + assertFalse(h.ch.isInputShutdown()); + assertFalse(h.ch.isOutputShutdown()); + + s.shutdownOutput(); + + h.closure.await(); + + assertFalse(h.ch.isOpen()); + assertFalse(h.ch.isActive()); + assertTrue(h.ch.isInputShutdown()); + assertTrue(h.ch.isOutputShutdown()); + + assertEquals(1, h.halfClosure.getCount()); + } finally { + s.close(); + } + } + + private static class TestHandler extends ChannelInboundByteHandlerAdapter { + volatile SocketChannel ch; + final BlockingQueue queue = new SynchronousQueue(); + final CountDownLatch halfClosure = new CountDownLatch(1); + final CountDownLatch closure = new CountDownLatch(1); + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + ch = (SocketChannel) ctx.channel(); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + closure.countDown(); + } + + @Override + public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + queue.offer(in.readByte()); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof ChannelInputShutdownEvent) { + halfClosure.countDown(); + } + } + } +} diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketShutdownOutputTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketShutdownOutputBySelfTest.java similarity index 80% rename from testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketShutdownOutputTest.java rename to testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketShutdownOutputBySelfTest.java index cc6b08d4ea..0936f6f42e 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketShutdownOutputTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketShutdownOutputBySelfTest.java @@ -30,7 +30,7 @@ import java.util.concurrent.SynchronousQueue; import org.junit.Test; -public class SocketShutdownOutputTest extends AbstractClientSocketTest { +public class SocketShutdownOutputBySelfTest extends AbstractClientSocketTest { @Test(timeout = 30000) public void testShutdownOutput() throws Throwable { @@ -51,10 +51,20 @@ public class SocketShutdownOutputTest extends AbstractClientSocketTest { ch.write(Unpooled.wrappedBuffer(new byte[] { 1 })).sync(); assertEquals(1, s.getInputStream().read()); + assertTrue(h.ch.isOpen()); + assertTrue(h.ch.isActive()); + assertFalse(h.ch.isInputShutdown()); + assertFalse(h.ch.isOutputShutdown()); + // Make the connection half-closed and ensure read() returns -1. ch.shutdownOutput(); assertEquals(-1, s.getInputStream().read()); + assertTrue(h.ch.isOpen()); + assertTrue(h.ch.isActive()); + assertFalse(h.ch.isInputShutdown()); + assertTrue(h.ch.isOutputShutdown()); + // If half-closed, the peer should be able to write something. s.getOutputStream().write(1); assertEquals(1, (int) h.queue.take()); @@ -68,8 +78,14 @@ public class SocketShutdownOutputTest extends AbstractClientSocketTest { } private static class TestHandler extends ChannelInboundByteHandlerAdapter { + volatile SocketChannel ch; final BlockingQueue queue = new SynchronousQueue(); + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + ch = (SocketChannel) ctx.channel(); + } + @Override public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception { queue.offer(in.readByte()); diff --git a/transport/src/main/java/io/netty/channel/ChannelInputShutdownEvent.java b/transport/src/main/java/io/netty/channel/ChannelInputShutdownEvent.java new file mode 100644 index 0000000000..06f213811b --- /dev/null +++ b/transport/src/main/java/io/netty/channel/ChannelInputShutdownEvent.java @@ -0,0 +1,23 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel; + +public final class ChannelInputShutdownEvent { + + public static final ChannelInputShutdownEvent INSTANCE = new ChannelInputShutdownEvent(); + + private ChannelInputShutdownEvent() { } +} diff --git a/transport/src/main/java/io/netty/channel/ChannelOption.java b/transport/src/main/java/io/netty/channel/ChannelOption.java index 36d399cf46..e80d9c068c 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOption.java +++ b/transport/src/main/java/io/netty/channel/ChannelOption.java @@ -32,6 +32,9 @@ public class ChannelOption extends UniqueName { new ChannelOption("CONNECT_TIMEOUT_MILLIS"); public static final ChannelOption WRITE_SPIN_COUNT = new ChannelOption("WRITE_SPIN_COUNT"); + public static final ChannelOption ALLOW_HALF_CLOSURE = + new ChannelOption("ALLOW_HALF_CLOSURE"); + public static final ChannelOption SO_BROADCAST = new ChannelOption("SO_BROADCAST"); diff --git a/transport/src/main/java/io/netty/channel/socket/DefaultSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/DefaultSocketChannelConfig.java index 87068d53fb..e900b68fee 100644 --- a/transport/src/main/java/io/netty/channel/socket/DefaultSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/DefaultSocketChannelConfig.java @@ -31,6 +31,7 @@ public class DefaultSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig { private final Socket socket; + private volatile boolean allowHalfClosure; /** * Creates a new instance. @@ -46,7 +47,8 @@ public class DefaultSocketChannelConfig extends DefaultChannelConfig public Map, Object> getOptions() { return getOptions( super.getOptions(), - SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS); + SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS, + ALLOW_HALF_CLOSURE); } @Override @@ -72,6 +74,9 @@ public class DefaultSocketChannelConfig extends DefaultChannelConfig if (option == IP_TOS) { return (T) Integer.valueOf(getTrafficClass()); } + if (option == ALLOW_HALF_CLOSURE) { + return (T) Boolean.valueOf(isAllowHalfClosure()); + } return super.getOption(option); } @@ -94,6 +99,8 @@ public class DefaultSocketChannelConfig extends DefaultChannelConfig setSoLinger((Integer) value); } else if (option == IP_TOS) { setTrafficClass((Integer) value); + } else if (option == ALLOW_HALF_CLOSURE) { + setAllowHalfClosure((Boolean) value); } else { return super.setOption(option, value); } @@ -236,4 +243,14 @@ public class DefaultSocketChannelConfig extends DefaultChannelConfig throw new ChannelException(e); } } + + @Override + public boolean isAllowHalfClosure() { + return allowHalfClosure; + } + + @Override + public void setAllowHalfClosure(boolean allowHalfClosure) { + this.allowHalfClosure = allowHalfClosure; + } } diff --git a/transport/src/main/java/io/netty/channel/socket/SocketChannel.java b/transport/src/main/java/io/netty/channel/socket/SocketChannel.java index 48a48fe013..8892a96f7d 100644 --- a/transport/src/main/java/io/netty/channel/socket/SocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/SocketChannel.java @@ -35,6 +35,13 @@ public interface SocketChannel extends Channel { @Override InetSocketAddress remoteAddress(); + /** + * Returns {@code true} if and only if the remote peer shut down its output so that no more + * data is received from this channel. Note that the semantic of this method is different from + * that of {@link Socket#shutdownInput()} and {@link Socket#isInputShutdown()}. + */ + boolean isInputShutdown(); + /** * @see Socket#isOutputShutdown() */ diff --git a/transport/src/main/java/io/netty/channel/socket/SocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/SocketChannelConfig.java index 29815c0892..b7552705b8 100644 --- a/transport/src/main/java/io/netty/channel/socket/SocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/SocketChannelConfig.java @@ -125,4 +125,18 @@ public interface SocketChannelConfig extends ChannelConfig { * {@link Socket#setPerformancePreferences(int, int, int)}. */ void setPerformancePreferences(int connectionTime, int latency, int bandwidth); + + /** + * Returns {@code true} if and only if the channel should not close itself when its remote + * peer shuts down output to make the connection half-closed. If {@code false}, the connection + * is closed automatically when the remote peer shuts down output. + */ + boolean isAllowHalfClosure(); + + /** + * Sets whether the channel should not close itself when its remote peer shuts down output to + * make the connection half-closed. If {@code true} the connection is not closed when the + * remote peer shuts down output. If {@code false}, the connection is closed automatically. + */ + void setAllowHalfClosure(boolean allowHalfClosure); } diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java index e41a2c4c30..fe5f9c332d 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java @@ -20,6 +20,7 @@ import io.netty.buffer.ChannelBufType; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFlushFutureNotifier; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInputShutdownEvent; import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoop; @@ -57,6 +58,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne } private final AioSocketChannelConfig config; + private volatile boolean inputShutdown; private volatile boolean outputShutdown; private boolean flushing; @@ -96,6 +98,11 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne return METADATA; } + @Override + public boolean isInputShutdown() { + return inputShutdown; + } + @Override public boolean isOutputShutdown() { return outputShutdown; @@ -209,6 +216,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne @Override protected void doClose() throws Exception { javaChannel().close(); + outputShutdown = true; } @Override @@ -245,7 +253,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne } private void beginRead() { - if (readSuspended.get()) { + if (readSuspended.get() || inputShutdown) { return; } @@ -381,8 +389,16 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne if (read) { pipeline.fireInboundBufferUpdated(); } - if (closed && channel.isOpen()) { - channel.unsafe().close(channel.unsafe().voidFuture()); + + if (closed) { + channel.inputShutdown = true; + if (channel.isOpen()) { + if (channel.config().isAllowHalfClosure()) { + pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); + } else { + channel.unsafe().close(channel.unsafe().voidFuture()); + } + } } else { // start the next read channel.beginRead(); @@ -446,6 +462,10 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne @Override public void resumeRead() { if (readSuspended.compareAndSet(true, false)) { + if (inputShutdown) { + return; + } + if (eventLoop().inEventLoop()) { beginRead(); } else { diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java index ba0481c62d..b5d23b1338 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java @@ -34,6 +34,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig { private final NetworkChannel channel; + private volatile boolean allowHalfClosure; private volatile long readTimeoutInMillis; private volatile long writeTimeoutInMillis; @@ -53,7 +54,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig return getOptions( super.getOptions(), SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS, - AIO_READ_TIMEOUT, AIO_WRITE_TIMEOUT); + AIO_READ_TIMEOUT, AIO_WRITE_TIMEOUT, ALLOW_HALF_CLOSURE); } @Override @@ -86,6 +87,9 @@ final class AioSocketChannelConfig extends DefaultChannelConfig if (option == AIO_WRITE_TIMEOUT) { return (T) Long.valueOf(getWriteTimeout()); } + if (option == ALLOW_HALF_CLOSURE) { + return (T) Boolean.valueOf(isAllowHalfClosure()); + } return super.getOption(option); } @@ -112,6 +116,8 @@ final class AioSocketChannelConfig extends DefaultChannelConfig setReadTimeout((Long) value); } else if (option == AIO_WRITE_TIMEOUT) { setWriteTimeout((Long) value); + } else if (option == ALLOW_HALF_CLOSURE) { + setAllowHalfClosure((Boolean) value); } else { return super.setOption(option, value); } @@ -296,4 +302,14 @@ final class AioSocketChannelConfig extends DefaultChannelConfig public long getWriteTimeout() { return writeTimeoutInMillis; } + + @Override + public boolean isAllowHalfClosure() { + return allowHalfClosure; + } + + @Override + public void setAllowHalfClosure(boolean allowHalfClosure) { + this.allowHalfClosure = allowHalfClosure; + } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java index a7c35e5e7a..d51cc6ce7a 100755 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java @@ -17,6 +17,8 @@ package io.netty.channel.socket.nio; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; +import io.netty.channel.ChannelInputShutdownEvent; +import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import java.io.IOException; @@ -88,8 +90,16 @@ abstract class AbstractNioByteChannel extends AbstractNioChannel { if (read) { pipeline.fireInboundBufferUpdated(); } - if (closed && isOpen()) { - close(voidFuture()); + if (closed) { + setInputShutdown(); + if (isOpen()) { + if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { + suspendReadTask.run(); + pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); + } else { + close(voidFuture()); + } + } } } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java index 03083c74e5..15dc6c6571 100755 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java @@ -40,6 +40,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { private final SelectableChannel ch; private final int readInterestOp; private volatile SelectionKey selectionKey; + private volatile boolean inputShutdown; final Runnable suspendReadTask = new Runnable() { @Override @@ -54,10 +55,8 @@ public abstract class AbstractNioChannel extends AbstractChannel { public void run() { selectionKey().interestOps(selectionKey().interestOps() | readInterestOp); } - }; - /** * The future of the current connection attempt. If not null, subsequent * connection attempts will fail. @@ -117,6 +116,14 @@ public abstract class AbstractNioChannel extends AbstractChannel { return selectionKey; } + boolean isInputShutdown() { + return inputShutdown; + } + + void setInputShutdown() { + inputShutdown = true; + } + public interface NioUnsafe extends Unsafe { java.nio.channels.Channel ch(); void finishConnect(); @@ -218,6 +225,10 @@ public abstract class AbstractNioChannel extends AbstractChannel { @Override public void resumeRead() { + if (inputShutdown) { + return; + } + EventLoop loop = eventLoop(); if (loop.inEventLoop()) { resumeReadTask.run(); @@ -242,7 +253,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { protected Runnable doRegister() throws Exception { NioEventLoop loop = (NioEventLoop) eventLoop(); selectionKey = javaChannel().register( - loop.selector, isActive()? readInterestOp : 0, this); + loop.selector, isActive() && !inputShutdown ? readInterestOp : 0, this); return null; } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index cf6dde8e03..2dccff94e2 100755 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -98,9 +98,14 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty return ch.isOpen() && ch.isConnected(); } + @Override + public boolean isInputShutdown() { + return super.isInputShutdown(); + } + @Override public boolean isOutputShutdown() { - return javaChannel().socket().isOutputShutdown(); + return javaChannel().socket().isOutputShutdown() || !isActive(); } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java index 0de6430775..bb45e553c7 100755 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java @@ -17,16 +17,24 @@ package io.netty.channel.socket.oio; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; +import io.netty.channel.ChannelInputShutdownEvent; +import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import java.io.IOException; abstract class AbstractOioByteChannel extends AbstractOioChannel { + private volatile boolean inputShutdown; + protected AbstractOioByteChannel(Channel parent, Integer id) { super(parent, id); } + boolean isInputShutdown() { + return inputShutdown; + } + @Override protected OioByteUnsafe newUnsafe() { return new OioByteUnsafe(); @@ -37,6 +45,15 @@ abstract class AbstractOioByteChannel extends AbstractOioChannel { public void read() { assert eventLoop().inEventLoop(); + if (inputShutdown) { + try { + Thread.sleep(SO_TIMEOUT); + } catch (InterruptedException e) { + // ignore + } + return; + } + final ChannelPipeline pipeline = pipeline(); final ByteBuf byteBuf = pipeline.inboundByteBuffer(); boolean closed = false; @@ -93,8 +110,15 @@ abstract class AbstractOioByteChannel extends AbstractOioChannel { if (read) { pipeline.fireInboundBufferUpdated(); } - if (closed && isOpen()) { - close(voidFuture()); + if (closed) { + inputShutdown = true; + if (isOpen()) { + if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { + pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); + } else { + close(voidFuture()); + } + } } } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java index a0cf6dd522..aff6189565 100755 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java @@ -103,9 +103,14 @@ public class OioSocketChannel extends AbstractOioByteChannel return !socket.isClosed() && socket.isConnected(); } + @Override + public boolean isInputShutdown() { + return super.isInputShutdown(); + } + @Override public boolean isOutputShutdown() { - return socket.isOutputShutdown(); + return socket.isOutputShutdown() || !isActive(); } @Override