AutoClose behavior may infinite loop

Motivation:
If AutoClose is false and there is a IoException then AbstractChannel will not close the channel but instead just fail flushed element in the ChannelOutboundBuffer. AbstractChannel also notifies of writability changes, which may lead to an infinite loop if the peer has closed its read side of the socket because we will keep accepting more data but continuously fail because the peer isn't accepting writes.

Modifications:
- If the transport throws on a write we should acknowledge that the output side of the channel has been shutdown and cleanup. If the channel can't accept more data because it is full, and still healthy it is not expected to throw. However if the channel is not healthy it will throw and is not expected to accept any more writes. In this case we should shutdown the output for Channels that support this feature and otherwise just close.
- Connection-less protocols like UDP can remain the same because the channel may disconnected temporarily.
- Make sure AbstractUnsafe#shutdownOutput is called because the shutdown on the socket may throw an exception.

Result:
More correct handling of write failure when AutoClose is false.
This commit is contained in:
Scott Mitchell 2017-08-24 19:46:46 -07:00
parent b967805f32
commit 89ecb4b4a4
18 changed files with 432 additions and 24 deletions

View File

@ -28,15 +28,19 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.ChannelInputShutdownReadComplete; import io.netty.channel.socket.ChannelInputShutdownReadComplete;
import io.netty.channel.socket.ChannelOutputShutdownEvent;
import io.netty.channel.socket.DuplexChannel; import io.netty.channel.socket.DuplexChannel;
import io.netty.util.UncheckedBooleanSupplier; import io.netty.util.UncheckedBooleanSupplier;
import org.junit.Test; import org.junit.Test;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
public class SocketHalfClosedTest extends AbstractSocketTest { public class SocketHalfClosedTest extends AbstractSocketTest {
@ -50,7 +54,7 @@ public class SocketHalfClosedTest extends AbstractSocketTest {
testAllDataReadAfterHalfClosure(false, sb, cb); testAllDataReadAfterHalfClosure(false, sb, cb);
} }
public void testAllDataReadAfterHalfClosure(final boolean autoRead, private static void testAllDataReadAfterHalfClosure(final boolean autoRead,
ServerBootstrap sb, Bootstrap cb) throws Throwable { ServerBootstrap sb, Bootstrap cb) throws Throwable {
final int totalServerBytesWritten = 1024 * 16; final int totalServerBytesWritten = 1024 * 16;
final int numReadsPerReadLoop = 2; final int numReadsPerReadLoop = 2;
@ -150,6 +154,200 @@ public class SocketHalfClosedTest extends AbstractSocketTest {
} }
} }
@Test
public void testAutoCloseFalseDoesShutdownOutput() throws Throwable {
run();
}
public void testAutoCloseFalseDoesShutdownOutput(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testAutoCloseFalseDoesShutdownOutput(false, false, sb, cb);
testAutoCloseFalseDoesShutdownOutput(false, true, sb, cb);
testAutoCloseFalseDoesShutdownOutput(true, false, sb, cb);
testAutoCloseFalseDoesShutdownOutput(true, true, sb, cb);
}
private static void testAutoCloseFalseDoesShutdownOutput(boolean allowHalfClosed,
final boolean clientIsLeader,
ServerBootstrap sb,
Bootstrap cb) throws InterruptedException {
final int expectedBytes = 100;
final CountDownLatch serverReadExpectedLatch = new CountDownLatch(1);
final CountDownLatch doneLatch = new CountDownLatch(1);
final AtomicReference<Throwable> causeRef = new AtomicReference<Throwable>();
Channel serverChannel = null;
Channel clientChannel = null;
try {
cb.option(ChannelOption.ALLOW_HALF_CLOSURE, allowHalfClosed)
.option(ChannelOption.AUTO_CLOSE, false)
.option(ChannelOption.SO_LINGER, 0);
sb.childOption(ChannelOption.ALLOW_HALF_CLOSURE, allowHalfClosed)
.childOption(ChannelOption.AUTO_CLOSE, false)
.childOption(ChannelOption.SO_LINGER, 0);
final SimpleChannelInboundHandler<ByteBuf> leaderHandler = new AutoCloseFalseLeader(expectedBytes,
serverReadExpectedLatch, doneLatch, causeRef);
final SimpleChannelInboundHandler<ByteBuf> followerHandler = new AutoCloseFalseFollower(expectedBytes,
serverReadExpectedLatch, doneLatch, causeRef);
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(clientIsLeader ? followerHandler :leaderHandler);
}
});
cb.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(clientIsLeader ? leaderHandler : followerHandler);
}
});
serverChannel = sb.bind().sync().channel();
clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
doneLatch.await();
assertNull(causeRef.get());
} finally {
if (clientChannel != null) {
clientChannel.close().sync();
}
if (serverChannel != null) {
serverChannel.close().sync();
}
}
}
private static final class AutoCloseFalseFollower extends SimpleChannelInboundHandler<ByteBuf> {
private final int expectedBytes;
private final CountDownLatch followerCloseLatch;
private final CountDownLatch doneLatch;
private final AtomicReference<Throwable> causeRef;
private int bytesRead;
AutoCloseFalseFollower(int expectedBytes, CountDownLatch followerCloseLatch, CountDownLatch doneLatch,
AtomicReference<Throwable> causeRef) {
this.expectedBytes = expectedBytes;
this.followerCloseLatch = followerCloseLatch;
this.doneLatch = doneLatch;
this.causeRef = causeRef;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
checkPrematureClose();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
checkPrematureClose();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
bytesRead += msg.readableBytes();
if (bytesRead >= expectedBytes) {
// We write a reply and immediately close our end of the socket.
ByteBuf buf = ctx.alloc().buffer(expectedBytes);
buf.writerIndex(buf.writerIndex() + expectedBytes);
ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
future.channel().close().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
followerCloseLatch.countDown();
}
});
}
});
}
}
private void checkPrematureClose() {
if (bytesRead < expectedBytes) {
causeRef.set(new IllegalStateException("follower premature close"));
doneLatch.countDown();
}
}
}
private static final class AutoCloseFalseLeader extends SimpleChannelInboundHandler<ByteBuf> {
private final int expectedBytes;
private final CountDownLatch followerCloseLatch;
private final CountDownLatch doneLatch;
private final AtomicReference<Throwable> causeRef;
private int bytesRead;
private boolean seenOutputShutdown;
AutoCloseFalseLeader(int expectedBytes, CountDownLatch followerCloseLatch, CountDownLatch doneLatch,
AtomicReference<Throwable> causeRef) {
this.expectedBytes = expectedBytes;
this.followerCloseLatch = followerCloseLatch;
this.doneLatch = doneLatch;
this.causeRef = causeRef;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = ctx.alloc().buffer(expectedBytes);
buf.writerIndex(buf.writerIndex() + expectedBytes);
ctx.writeAndFlush(buf.retainedDuplicate());
// We wait here to ensure that we write before we have a chance to process the outbound
// shutdown event.
followerCloseLatch.await();
// This write should fail, but we should still be allowed to read the peer's data
ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.cause() == null) {
causeRef.set(new IllegalStateException("second write should have failed!"));
doneLatch.countDown();
}
}
});
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
bytesRead += msg.readableBytes();
if (bytesRead >= expectedBytes) {
if (!seenOutputShutdown) {
causeRef.set(new IllegalStateException(
ChannelOutputShutdownEvent.class.getSimpleName() + " event was not seen"));
}
doneLatch.countDown();
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof ChannelOutputShutdownEvent) {
seenOutputShutdown = true;
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
checkPrematureClose();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
checkPrematureClose();
}
private void checkPrematureClose() {
if (bytesRead < expectedBytes || !seenOutputShutdown) {
causeRef.set(new IllegalStateException("leader premature close"));
doneLatch.countDown();
}
}
}
@Test @Test
public void testAllDataReadClosure() throws Throwable { public void testAllDataReadClosure() throws Throwable {
run(); run();
@ -162,7 +360,7 @@ public class SocketHalfClosedTest extends AbstractSocketTest {
testAllDataReadClosure(false, true, sb, cb); testAllDataReadClosure(false, true, sb, cb);
} }
public void testAllDataReadClosure(final boolean autoRead, final boolean allowHalfClosed, private static void testAllDataReadClosure(final boolean autoRead, final boolean allowHalfClosed,
ServerBootstrap sb, Bootstrap cb) throws Throwable { ServerBootstrap sb, Bootstrap cb) throws Throwable {
final int totalServerBytesWritten = 1024 * 16; final int totalServerBytesWritten = 1024 * 16;
final int numReadsPerReadLoop = 2; final int numReadsPerReadLoop = 2;

View File

@ -23,6 +23,7 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.ServerChannel; import io.netty.channel.ServerChannel;
import io.netty.util.internal.UnstableApi;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
@ -72,6 +73,16 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@UnstableApi
@Override
protected final void doShutdownOutput(Throwable cause) throws Exception {
try {
super.doShutdownOutput(cause);
} finally {
close();
}
}
abstract Channel newChildChannel(int fd, byte[] remote, int offset, int len) throws Exception; abstract Channel newChildChannel(int fd, byte[] remote, int offset, int len) throws Exception;
final class EpollServerSocketUnsafe extends AbstractEpollUnsafe { final class EpollServerSocketUnsafe extends AbstractEpollUnsafe {

View File

@ -37,6 +37,7 @@ import io.netty.channel.unix.UnixChannelUtil;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import io.netty.util.internal.ThrowableUtil; import io.netty.util.internal.ThrowableUtil;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -542,10 +543,27 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
} }
@UnstableApi
@Override
protected final void doShutdownOutput(Throwable cause) throws Exception {
try {
// The native socket implementation may throw a NotYetConnected exception when we attempt to shut it down.
// However NIO doesn't propagate an exception in the same situation (write failure), and we just want to
// update the socket state to flag that it has been shutdown. So don't use a voidPromise but instead create
// a new promise and ignore the results.
shutdownOutput0(newPromise());
} finally {
super.doShutdownOutput(cause);
}
}
private void shutdownOutput0(final ChannelPromise promise) { private void shutdownOutput0(final ChannelPromise promise) {
try {
try { try {
socket.shutdown(false, true); socket.shutdown(false, true);
} finally {
((AbstractUnsafe) unsafe()).shutdownOutput(); ((AbstractUnsafe) unsafe()).shutdownOutput();
}
promise.setSuccess(); promise.setSuccess();
} catch (Throwable cause) { } catch (Throwable cause) {
promise.setFailure(cause); promise.setFailure(cause);
@ -562,9 +580,12 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
} }
private void shutdown0(final ChannelPromise promise) { private void shutdown0(final ChannelPromise promise) {
try {
try { try {
socket.shutdown(true, true); socket.shutdown(true, true);
} finally {
((AbstractUnsafe) unsafe()).shutdownOutput(); ((AbstractUnsafe) unsafe()).shutdownOutput();
}
promise.setSuccess(); promise.setSuccess();
} catch (Throwable cause) { } catch (Throwable cause) {
promise.setFailure(cause); promise.setFailure(cause);

View File

@ -31,6 +31,7 @@ import io.netty.channel.unix.DatagramSocketAddress;
import io.netty.channel.unix.IovArray; import io.netty.channel.unix.IovArray;
import io.netty.channel.unix.UnixChannelUtil; import io.netty.channel.unix.UnixChannelUtil;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import io.netty.util.internal.UnstableApi;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
@ -39,8 +40,6 @@ import java.net.NetworkInterface;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.SocketException; import java.net.SocketException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import static io.netty.channel.epoll.LinuxSocket.newSocketDgram; import static io.netty.channel.epoll.LinuxSocket.newSocketDgram;
@ -424,6 +423,16 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
return false; return false;
} }
@UnstableApi
@Override
protected void doShutdownOutput(Throwable cause) throws Exception {
// UDP sockets are not connected. A write failure may just be temporary or disconnect was called.
ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
if (channelOutboundBuffer != null) {
channelOutboundBuffer.remove(cause);
}
}
@Override @Override
protected void doClose() throws Exception { protected void doClose() throws Exception {
super.doClose(); super.doClose();

View File

@ -70,6 +70,16 @@ public abstract class AbstractKQueueServerChannel extends AbstractKQueueChannel
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@UnstableApi
@Override
protected final void doShutdownOutput(Throwable cause) throws Exception {
try {
super.doShutdownOutput(cause);
} finally {
close();
}
}
abstract Channel newChildChannel(int fd, byte[] remote, int offset, int len) throws Exception; abstract Channel newChildChannel(int fd, byte[] remote, int offset, int len) throws Exception;
@Override @Override

View File

@ -370,10 +370,27 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
} }
@UnstableApi
@Override
protected final void doShutdownOutput(Throwable cause) throws Exception {
try {
// The native socket implementation may throw a NotYetConnected exception when we attempt to shut it down.
// However NIO doesn't propagate an exception in the same situation (write failure), and we just want to
// update the socket state to flag that it has been shutdown. So don't use a voidPromise but instead create
// a new promise and ignore the results.
shutdownOutput0(newPromise());
} finally {
super.doShutdownOutput(cause);
}
}
private void shutdownOutput0(final ChannelPromise promise) { private void shutdownOutput0(final ChannelPromise promise) {
try {
try { try {
socket.shutdown(false, true); socket.shutdown(false, true);
} finally {
((AbstractUnsafe) unsafe()).shutdownOutput(); ((AbstractUnsafe) unsafe()).shutdownOutput();
}
promise.setSuccess(); promise.setSuccess();
} catch (Throwable cause) { } catch (Throwable cause) {
promise.setFailure(cause); promise.setFailure(cause);
@ -390,9 +407,12 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
} }
private void shutdown0(final ChannelPromise promise) { private void shutdown0(final ChannelPromise promise) {
try {
try { try {
socket.shutdown(true, true); socket.shutdown(true, true);
} finally {
((AbstractUnsafe) unsafe()).shutdownOutput(); ((AbstractUnsafe) unsafe()).shutdownOutput();
}
promise.setSuccess(); promise.setSuccess();
} catch (Throwable cause) { } catch (Throwable cause) {
promise.setFailure(cause); promise.setFailure(cause);

View File

@ -397,6 +397,16 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement
return false; return false;
} }
@UnstableApi
@Override
protected void doShutdownOutput(Throwable cause) throws Exception {
// UDP sockets are not connected. A write failure may just be temporary or {@link #doDisconnect()} was called.
ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
if (channelOutboundBuffer != null) {
channelOutboundBuffer.remove(cause);
}
}
@Override @Override
protected void doClose() throws Exception { protected void doClose() throws Exception {
super.doClose(); super.doClose();

View File

@ -611,14 +611,27 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false); close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false);
} }
/**
* Shutdown the output portion of the corresponding {@link Channel}.
* For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
*/
@UnstableApi @UnstableApi
public void shutdownOutput() { public final void shutdownOutput() {
shutdownOutput(null);
}
/**
* Shutdown the output portion of the corresponding {@link Channel}.
* For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
* @param cause The cause which may provide rational for the shutdown.
*/
final void shutdownOutput(Throwable cause) {
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) { if (outboundBuffer == null) {
return; return;
} }
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
ChannelOutputShutdownException e = new ChannelOutputShutdownException("Channel output explicitly shutdown"); ChannelOutputShutdownException e = new ChannelOutputShutdownException("Channel output shutdown", cause);
outboundBuffer.failFlushed(e, false); outboundBuffer.failFlushed(e, false);
outboundBuffer.close(e, true); outboundBuffer.close(e, true);
pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE); pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);
@ -885,7 +898,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
*/ */
close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
} else { } else {
outboundBuffer.failFlushed(t, true); try {
doShutdownOutput(t);
} catch (Throwable t2) {
close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
} }
} finally { } finally {
inFlush0 = false; inFlush0 = false;
@ -1019,6 +1036,16 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
*/ */
protected abstract void doClose() throws Exception; protected abstract void doClose() throws Exception;
/**
* Called when conditions justify shutting down the output portion of the channel. This may happen if a write
* operation throws an exception.
* @param cause The cause for the shutdown.
*/
@UnstableApi
protected void doShutdownOutput(Throwable cause) throws Exception {
((AbstractUnsafe) unsafe).shutdownOutput(cause);
}
/** /**
* Deregister the {@link Channel} from its {@link EventLoop}. * Deregister the {@link Channel} from its {@link EventLoop}.
* *

View File

@ -15,6 +15,8 @@
*/ */
package io.netty.channel; package io.netty.channel;
import io.netty.util.internal.UnstableApi;
import java.net.SocketAddress; import java.net.SocketAddress;
/** /**
@ -58,6 +60,16 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@UnstableApi
@Override
protected final void doShutdownOutput(Throwable cause) throws Exception {
try {
super.doShutdownOutput(cause);
} finally {
close();
}
}
@Override @Override
protected AbstractUnsafe newUnsafe() { protected AbstractUnsafe newUnsafe() {
return new DefaultServerUnsafe(); return new DefaultServerUnsafe();

View File

@ -40,6 +40,7 @@ import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.RecyclableArrayList; import io.netty.util.internal.RecyclableArrayList;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -695,6 +696,13 @@ public class EmbeddedChannel extends AbstractChannel {
state = State.CLOSED; state = State.CLOSED;
} }
@UnstableApi
@Override
protected final void doShutdownOutput(Throwable cause) throws Exception {
super.doShutdownOutput(cause);
close();
}
@Override @Override
protected void doBeginRead() throws Exception { protected void doBeginRead() throws Exception {
// NOOP // NOOP

View File

@ -32,6 +32,7 @@ import io.netty.util.concurrent.SingleThreadEventExecutor;
import io.netty.util.internal.InternalThreadLocalMap; import io.netty.util.internal.InternalThreadLocalMap;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ThrowableUtil; import io.netty.util.internal.ThrowableUtil;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -291,6 +292,13 @@ public class LocalChannel extends AbstractChannel {
} }
} }
@UnstableApi
@Override
protected final void doShutdownOutput(Throwable cause) throws Exception {
super.doShutdownOutput(cause);
close();
}
private void tryClose(boolean isActive) { private void tryClose(boolean isActive) {
if (isActive) { if (isActive) {
unsafe().close(unsafe().voidPromise()); unsafe().close(unsafe().voidPromise());

View File

@ -21,6 +21,7 @@ import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.ServerChannel; import io.netty.channel.ServerChannel;
import io.netty.util.internal.UnstableApi;
import java.io.IOException; import java.io.IOException;
import java.net.PortUnreachableException; import java.net.PortUnreachableException;
@ -179,6 +180,13 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
!(this instanceof ServerChannel); !(this instanceof ServerChannel);
} }
@UnstableApi
@Override
protected void doShutdownOutput(Throwable cause) throws Exception {
super.doShutdownOutput(cause);
close();
}
/** /**
* Read messages into the given array and return the amount which was read. * Read messages into the given array and return the amount which was read.
*/ */

View File

@ -19,6 +19,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.util.internal.UnstableApi;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -103,6 +104,13 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
} }
} }
@UnstableApi
@Override
protected void doShutdownOutput(Throwable cause) throws Exception {
super.doShutdownOutput(cause);
close();
}
/** /**
* Read messages into the given array and return the amount which was read. * Read messages into the given array and return the amount which was read.
*/ */

View File

@ -29,4 +29,8 @@ public final class ChannelOutputShutdownException extends IOException {
public ChannelOutputShutdownException(String msg) { public ChannelOutputShutdownException(String msg) {
super(msg); super(msg);
} }
public ChannelOutputShutdownException(String msg, Throwable cause) {
super(msg, cause);
}
} }

View File

@ -33,6 +33,7 @@ import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.util.internal.SocketUtils; import io.netty.util.internal.SocketUtils;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import io.netty.util.internal.UnstableApi;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
@ -234,6 +235,16 @@ public final class NioDatagramChannel
javaChannel().close(); javaChannel().close();
} }
@UnstableApi
@Override
protected void doShutdownOutput(Throwable cause) throws Exception {
// UDP sockets are not connected. A write failure may just be temporary or doDisconnect() was called.
ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
if (channelOutboundBuffer != null) {
channelOutboundBuffer.remove(cause);
}
}
@Override @Override
protected int doReadMessages(List<Object> buf) throws Exception { protected int doReadMessages(List<Object> buf) throws Exception {
DatagramChannel ch = javaChannel(); DatagramChannel ch = javaChannel();

View File

@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
@ -31,6 +32,7 @@ import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannelConfig; import io.netty.channel.socket.SocketChannelConfig;
import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -146,6 +148,22 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
return (InetSocketAddress) super.remoteAddress(); return (InetSocketAddress) super.remoteAddress();
} }
@UnstableApi
@Override
protected final void doShutdownOutput(final Throwable cause) throws Exception {
ChannelFuture future = shutdownOutput();
if (future.isDone()) {
super.doShutdownOutput(cause);
} else {
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
NioSocketChannel.super.doShutdownOutput(cause);
}
});
}
}
@Override @Override
public ChannelFuture shutdownOutput() { public ChannelFuture shutdownOutput() {
return shutdownOutput(newPromise()); return shutdownOutput(newPromise());
@ -254,13 +272,16 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
} }
private void shutdownOutput0() throws Exception { private void shutdownOutput0() throws Exception {
try {
if (PlatformDependent.javaVersion() >= 7) { if (PlatformDependent.javaVersion() >= 7) {
javaChannel().shutdownOutput(); javaChannel().shutdownOutput();
} else { } else {
javaChannel().socket().shutdownOutput(); javaChannel().socket().shutdownOutput();
} }
} finally {
((AbstractUnsafe) unsafe()).shutdownOutput(); ((AbstractUnsafe) unsafe()).shutdownOutput();
} }
}
private void shutdownInput0(final ChannelPromise promise) { private void shutdownInput0(final ChannelPromise promise) {
try { try {

View File

@ -32,6 +32,7 @@ import io.netty.channel.socket.DatagramPacket;
import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -198,6 +199,16 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
socket.disconnect(); socket.disconnect();
} }
@UnstableApi
@Override
protected final void doShutdownOutput(Throwable cause) throws Exception {
// UDP sockets are not connected. A write failure may just be temporary or {@link #doDisconnect()} was called.
ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
if (channelOutboundBuffer != null) {
channelOutboundBuffer.remove(cause);
}
}
@Override @Override
protected void doClose() throws Exception { protected void doClose() throws Exception {
socket.close(); socket.close();

View File

@ -26,6 +26,7 @@ import io.netty.channel.oio.OioByteStreamChannel;
import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.util.internal.SocketUtils; import io.netty.util.internal.SocketUtils;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -128,6 +129,13 @@ public class OioSocketChannel extends OioByteStreamChannel implements SocketChan
return socket.isInputShutdown() && socket.isOutputShutdown() || !isActive(); return socket.isInputShutdown() && socket.isOutputShutdown() || !isActive();
} }
@UnstableApi
@Override
protected final void doShutdownOutput(final Throwable cause) throws Exception {
shutdownOutput0(voidPromise());
super.doShutdownOutput(cause);
}
@Override @Override
public ChannelFuture shutdownOutput() { public ChannelFuture shutdownOutput() {
return shutdownOutput(newPromise()); return shutdownOutput(newPromise());
@ -181,9 +189,12 @@ public class OioSocketChannel extends OioByteStreamChannel implements SocketChan
} }
private void shutdownOutput0() throws IOException { private void shutdownOutput0() throws IOException {
try {
socket.shutdownOutput(); socket.shutdownOutput();
} finally {
((AbstractUnsafe) unsafe()).shutdownOutput(); ((AbstractUnsafe) unsafe()).shutdownOutput();
} }
}
@Override @Override
public ChannelFuture shutdownInput(final ChannelPromise promise) { public ChannelFuture shutdownInput(final ChannelPromise promise) {