[#107] Add support for closing either input or output part of a channel

- Add ChannelOption.ALLOW_HALF_CLOSURE
  - If true, ChannelInputShutdownEvent is fired via userEventTriggered()
    when the remote peer shuts down its output, and the connection is 
    not closed until a user calls close() explicitly.
  - If false, the connection is closed immediately as it did before.
- Add SocketChannel.isInputShutdown()
- Add & improve test cases related with half-closed sockets
This commit is contained in:
Trustin Lee 2012-08-29 21:49:39 +09:00
parent bfdc28bd67
commit d03de0f3ca
14 changed files with 321 additions and 15 deletions

View File

@ -0,0 +1,135 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.transport.socket;
import static org.junit.Assert.*;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandlerAdapter;
import io.netty.channel.ChannelInputShutdownEvent;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.SocketChannel;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.SynchronousQueue;
import org.junit.Test;
public class SocketShutdownOutputByPeerTest extends AbstractServerSocketTest {
@Test(timeout = 30000)
public void testShutdownOutput() throws Throwable {
run();
}
public void testShutdownOutput(ServerBootstrap sb) throws Throwable {
TestHandler h = new TestHandler();
Socket s = new Socket();
try {
sb.childHandler(h).childOption(ChannelOption.ALLOW_HALF_CLOSURE, true).bind().sync();
s.connect(addr, 10000);
s.getOutputStream().write(1);
assertEquals(1, (int) h.queue.take());
assertTrue(h.ch.isOpen());
assertTrue(h.ch.isActive());
assertFalse(h.ch.isInputShutdown());
assertFalse(h.ch.isOutputShutdown());
s.shutdownOutput();
h.halfClosure.await();
assertTrue(h.ch.isOpen());
assertTrue(h.ch.isActive());
assertTrue(h.ch.isInputShutdown());
assertFalse(h.ch.isOutputShutdown());
assertEquals(1, h.closure.getCount());
} finally {
s.close();
}
}
@Test(timeout = 30000)
public void testShutdownOutputWithoutOption() throws Throwable {
run();
}
public void testShutdownOutputWithoutOption(ServerBootstrap sb) throws Throwable {
TestHandler h = new TestHandler();
Socket s = new Socket();
try {
sb.childHandler(h).bind().sync();
s.connect(addr, 10000);
s.getOutputStream().write(1);
assertEquals(1, (int) h.queue.take());
assertTrue(h.ch.isOpen());
assertTrue(h.ch.isActive());
assertFalse(h.ch.isInputShutdown());
assertFalse(h.ch.isOutputShutdown());
s.shutdownOutput();
h.closure.await();
assertFalse(h.ch.isOpen());
assertFalse(h.ch.isActive());
assertTrue(h.ch.isInputShutdown());
assertTrue(h.ch.isOutputShutdown());
assertEquals(1, h.halfClosure.getCount());
} finally {
s.close();
}
}
private static class TestHandler extends ChannelInboundByteHandlerAdapter {
volatile SocketChannel ch;
final BlockingQueue<Byte> queue = new SynchronousQueue<Byte>();
final CountDownLatch halfClosure = new CountDownLatch(1);
final CountDownLatch closure = new CountDownLatch(1);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ch = (SocketChannel) ctx.channel();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
closure.countDown();
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
queue.offer(in.readByte());
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof ChannelInputShutdownEvent) {
halfClosure.countDown();
}
}
}
}

View File

@ -30,7 +30,7 @@ import java.util.concurrent.SynchronousQueue;
import org.junit.Test;
public class SocketShutdownOutputTest extends AbstractClientSocketTest {
public class SocketShutdownOutputBySelfTest extends AbstractClientSocketTest {
@Test(timeout = 30000)
public void testShutdownOutput() throws Throwable {
@ -51,10 +51,20 @@ public class SocketShutdownOutputTest extends AbstractClientSocketTest {
ch.write(Unpooled.wrappedBuffer(new byte[] { 1 })).sync();
assertEquals(1, s.getInputStream().read());
assertTrue(h.ch.isOpen());
assertTrue(h.ch.isActive());
assertFalse(h.ch.isInputShutdown());
assertFalse(h.ch.isOutputShutdown());
// Make the connection half-closed and ensure read() returns -1.
ch.shutdownOutput();
assertEquals(-1, s.getInputStream().read());
assertTrue(h.ch.isOpen());
assertTrue(h.ch.isActive());
assertFalse(h.ch.isInputShutdown());
assertTrue(h.ch.isOutputShutdown());
// If half-closed, the peer should be able to write something.
s.getOutputStream().write(1);
assertEquals(1, (int) h.queue.take());
@ -68,8 +78,14 @@ public class SocketShutdownOutputTest extends AbstractClientSocketTest {
}
private static class TestHandler extends ChannelInboundByteHandlerAdapter {
volatile SocketChannel ch;
final BlockingQueue<Byte> queue = new SynchronousQueue<Byte>();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ch = (SocketChannel) ctx.channel();
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
queue.offer(in.readByte());

View File

@ -0,0 +1,23 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
public final class ChannelInputShutdownEvent {
public static final ChannelInputShutdownEvent INSTANCE = new ChannelInputShutdownEvent();
private ChannelInputShutdownEvent() { }
}

View File

@ -32,6 +32,9 @@ public class ChannelOption<T> extends UniqueName {
new ChannelOption<Integer>("CONNECT_TIMEOUT_MILLIS");
public static final ChannelOption<Integer> WRITE_SPIN_COUNT =
new ChannelOption<Integer>("WRITE_SPIN_COUNT");
public static final ChannelOption<Boolean> ALLOW_HALF_CLOSURE =
new ChannelOption<Boolean>("ALLOW_HALF_CLOSURE");
public static final ChannelOption<Boolean> SO_BROADCAST =
new ChannelOption<Boolean>("SO_BROADCAST");

View File

@ -31,6 +31,7 @@ public class DefaultSocketChannelConfig extends DefaultChannelConfig
implements SocketChannelConfig {
private final Socket socket;
private volatile boolean allowHalfClosure;
/**
* Creates a new instance.
@ -46,7 +47,8 @@ public class DefaultSocketChannelConfig extends DefaultChannelConfig
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(
super.getOptions(),
SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS);
SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS,
ALLOW_HALF_CLOSURE);
}
@Override
@ -72,6 +74,9 @@ public class DefaultSocketChannelConfig extends DefaultChannelConfig
if (option == IP_TOS) {
return (T) Integer.valueOf(getTrafficClass());
}
if (option == ALLOW_HALF_CLOSURE) {
return (T) Boolean.valueOf(isAllowHalfClosure());
}
return super.getOption(option);
}
@ -94,6 +99,8 @@ public class DefaultSocketChannelConfig extends DefaultChannelConfig
setSoLinger((Integer) value);
} else if (option == IP_TOS) {
setTrafficClass((Integer) value);
} else if (option == ALLOW_HALF_CLOSURE) {
setAllowHalfClosure((Boolean) value);
} else {
return super.setOption(option, value);
}
@ -236,4 +243,14 @@ public class DefaultSocketChannelConfig extends DefaultChannelConfig
throw new ChannelException(e);
}
}
@Override
public boolean isAllowHalfClosure() {
return allowHalfClosure;
}
@Override
public void setAllowHalfClosure(boolean allowHalfClosure) {
this.allowHalfClosure = allowHalfClosure;
}
}

View File

@ -35,6 +35,13 @@ public interface SocketChannel extends Channel {
@Override
InetSocketAddress remoteAddress();
/**
* Returns {@code true} if and only if the remote peer shut down its output so that no more
* data is received from this channel. Note that the semantic of this method is different from
* that of {@link Socket#shutdownInput()} and {@link Socket#isInputShutdown()}.
*/
boolean isInputShutdown();
/**
* @see Socket#isOutputShutdown()
*/

View File

@ -125,4 +125,18 @@ public interface SocketChannelConfig extends ChannelConfig {
* {@link Socket#setPerformancePreferences(int, int, int)}.
*/
void setPerformancePreferences(int connectionTime, int latency, int bandwidth);
/**
* Returns {@code true} if and only if the channel should not close itself when its remote
* peer shuts down output to make the connection half-closed. If {@code false}, the connection
* is closed automatically when the remote peer shuts down output.
*/
boolean isAllowHalfClosure();
/**
* Sets whether the channel should not close itself when its remote peer shuts down output to
* make the connection half-closed. If {@code true} the connection is not closed when the
* remote peer shuts down output. If {@code false}, the connection is closed automatically.
*/
void setAllowHalfClosure(boolean allowHalfClosure);
}

View File

@ -20,6 +20,7 @@ import io.netty.buffer.ChannelBufType;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFlushFutureNotifier;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInputShutdownEvent;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
@ -57,6 +58,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
}
private final AioSocketChannelConfig config;
private volatile boolean inputShutdown;
private volatile boolean outputShutdown;
private boolean flushing;
@ -96,6 +98,11 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
return METADATA;
}
@Override
public boolean isInputShutdown() {
return inputShutdown;
}
@Override
public boolean isOutputShutdown() {
return outputShutdown;
@ -209,6 +216,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
@Override
protected void doClose() throws Exception {
javaChannel().close();
outputShutdown = true;
}
@Override
@ -245,7 +253,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
}
private void beginRead() {
if (readSuspended.get()) {
if (readSuspended.get() || inputShutdown) {
return;
}
@ -381,8 +389,16 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
if (read) {
pipeline.fireInboundBufferUpdated();
}
if (closed && channel.isOpen()) {
if (closed) {
channel.inputShutdown = true;
if (channel.isOpen()) {
if (channel.config().isAllowHalfClosure()) {
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
channel.unsafe().close(channel.unsafe().voidFuture());
}
}
} else {
// start the next read
channel.beginRead();
@ -446,6 +462,10 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
@Override
public void resumeRead() {
if (readSuspended.compareAndSet(true, false)) {
if (inputShutdown) {
return;
}
if (eventLoop().inEventLoop()) {
beginRead();
} else {

View File

@ -34,6 +34,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
implements SocketChannelConfig {
private final NetworkChannel channel;
private volatile boolean allowHalfClosure;
private volatile long readTimeoutInMillis;
private volatile long writeTimeoutInMillis;
@ -53,7 +54,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
return getOptions(
super.getOptions(),
SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS,
AIO_READ_TIMEOUT, AIO_WRITE_TIMEOUT);
AIO_READ_TIMEOUT, AIO_WRITE_TIMEOUT, ALLOW_HALF_CLOSURE);
}
@Override
@ -86,6 +87,9 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
if (option == AIO_WRITE_TIMEOUT) {
return (T) Long.valueOf(getWriteTimeout());
}
if (option == ALLOW_HALF_CLOSURE) {
return (T) Boolean.valueOf(isAllowHalfClosure());
}
return super.getOption(option);
}
@ -112,6 +116,8 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
setReadTimeout((Long) value);
} else if (option == AIO_WRITE_TIMEOUT) {
setWriteTimeout((Long) value);
} else if (option == ALLOW_HALF_CLOSURE) {
setAllowHalfClosure((Boolean) value);
} else {
return super.setOption(option, value);
}
@ -296,4 +302,14 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
public long getWriteTimeout() {
return writeTimeoutInMillis;
}
@Override
public boolean isAllowHalfClosure() {
return allowHalfClosure;
}
@Override
public void setAllowHalfClosure(boolean allowHalfClosure) {
this.allowHalfClosure = allowHalfClosure;
}
}

View File

@ -17,6 +17,8 @@ package io.netty.channel.socket.nio;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInputShutdownEvent;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import java.io.IOException;
@ -88,12 +90,20 @@ abstract class AbstractNioByteChannel extends AbstractNioChannel {
if (read) {
pipeline.fireInboundBufferUpdated();
}
if (closed && isOpen()) {
if (closed) {
setInputShutdown();
if (isOpen()) {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
suspendReadTask.run();
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
close(voidFuture());
}
}
}
}
}
}
@Override
protected void doFlushByteBuffer(ByteBuf buf) throws Exception {

View File

@ -40,6 +40,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
private final SelectableChannel ch;
private final int readInterestOp;
private volatile SelectionKey selectionKey;
private volatile boolean inputShutdown;
final Runnable suspendReadTask = new Runnable() {
@Override
@ -54,10 +55,8 @@ public abstract class AbstractNioChannel extends AbstractChannel {
public void run() {
selectionKey().interestOps(selectionKey().interestOps() | readInterestOp);
}
};
/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
@ -117,6 +116,14 @@ public abstract class AbstractNioChannel extends AbstractChannel {
return selectionKey;
}
boolean isInputShutdown() {
return inputShutdown;
}
void setInputShutdown() {
inputShutdown = true;
}
public interface NioUnsafe extends Unsafe {
java.nio.channels.Channel ch();
void finishConnect();
@ -218,6 +225,10 @@ public abstract class AbstractNioChannel extends AbstractChannel {
@Override
public void resumeRead() {
if (inputShutdown) {
return;
}
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
resumeReadTask.run();
@ -242,7 +253,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
protected Runnable doRegister() throws Exception {
NioEventLoop loop = (NioEventLoop) eventLoop();
selectionKey = javaChannel().register(
loop.selector, isActive()? readInterestOp : 0, this);
loop.selector, isActive() && !inputShutdown ? readInterestOp : 0, this);
return null;
}

View File

@ -98,9 +98,14 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
return ch.isOpen() && ch.isConnected();
}
@Override
public boolean isInputShutdown() {
return super.isInputShutdown();
}
@Override
public boolean isOutputShutdown() {
return javaChannel().socket().isOutputShutdown();
return javaChannel().socket().isOutputShutdown() || !isActive();
}
@Override

View File

@ -17,16 +17,24 @@ package io.netty.channel.socket.oio;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInputShutdownEvent;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import java.io.IOException;
abstract class AbstractOioByteChannel extends AbstractOioChannel {
private volatile boolean inputShutdown;
protected AbstractOioByteChannel(Channel parent, Integer id) {
super(parent, id);
}
boolean isInputShutdown() {
return inputShutdown;
}
@Override
protected OioByteUnsafe newUnsafe() {
return new OioByteUnsafe();
@ -37,6 +45,15 @@ abstract class AbstractOioByteChannel extends AbstractOioChannel {
public void read() {
assert eventLoop().inEventLoop();
if (inputShutdown) {
try {
Thread.sleep(SO_TIMEOUT);
} catch (InterruptedException e) {
// ignore
}
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBuf byteBuf = pipeline.inboundByteBuffer();
boolean closed = false;
@ -93,12 +110,19 @@ abstract class AbstractOioByteChannel extends AbstractOioChannel {
if (read) {
pipeline.fireInboundBufferUpdated();
}
if (closed && isOpen()) {
if (closed) {
inputShutdown = true;
if (isOpen()) {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
close(voidFuture());
}
}
}
}
}
}
@Override
protected void doFlushByteBuffer(ByteBuf buf) throws Exception {

View File

@ -103,9 +103,14 @@ public class OioSocketChannel extends AbstractOioByteChannel
return !socket.isClosed() && socket.isConnected();
}
@Override
public boolean isInputShutdown() {
return super.isInputShutdown();
}
@Override
public boolean isOutputShutdown() {
return socket.isOutputShutdown();
return socket.isOutputShutdown() || !isActive();
}
@Override