Fix assertion error when closing / shutdown native channel and SO_LINGER is set.

Motivation:

When SO_LINGER is used we run doClose() on the GlobalEventExecutor by default so we need to ensure we schedule all code that needs to be run on the EventLoop on the EventLoop in doClose. Beside this there are also threading issues when calling shutdownOutput(...)

Modifications:

- Schedule removal from EventLoop to the EventLoop
- Correctly handle shutdownOutput and shutdown in respect with threading-model
- Add unit tests

Result:

Fixes [#7159].
This commit is contained in:
Norman Maurer 2017-09-16 17:20:01 -07:00
parent ff592c8d5c
commit aa8bdb5d6b
21 changed files with 387 additions and 418 deletions

View File

@ -20,6 +20,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.WriteBufferWaterMark;
@ -193,6 +194,52 @@ public class SocketShutdownOutputBySelfTest extends AbstractClientSocketTest {
} }
} }
@Test(timeout = 30000)
public void testShutdownOutputSoLingerNoAssertError() throws Throwable {
run();
}
public void testShutdownOutputSoLingerNoAssertError(Bootstrap cb) throws Throwable {
testShutdownSoLingerNoAssertError0(cb, true);
}
@Test(timeout = 30000)
public void testShutdownSoLingerNoAssertError() throws Throwable {
run();
}
public void testShutdownSoLingerNoAssertError(Bootstrap cb) throws Throwable {
testShutdownSoLingerNoAssertError0(cb, false);
}
private void testShutdownSoLingerNoAssertError0(Bootstrap cb, boolean output) throws Throwable {
ServerSocket ss = new ServerSocket();
Socket s = null;
ChannelFuture cf = null;
try {
ss.bind(newSocketAddress());
cf = cb.option(ChannelOption.SO_LINGER, 1).handler(new ChannelInboundHandlerAdapter())
.connect(ss.getLocalSocketAddress()).sync();
s = ss.accept();
cf.sync();
if (output) {
((SocketChannel) cf.channel()).shutdownOutput().sync();
} else {
((SocketChannel) cf.channel()).shutdown().sync();
}
} finally {
if (s != null) {
s.close();
}
if (cf != null) {
cf.channel().close();
}
ss.close();
}
}
private static void checkThrowable(Throwable cause) throws Throwable { private static void checkThrowable(Throwable cause) throws Throwable {
// Depending on OIO / NIO both are ok // Depending on OIO / NIO both are ok
if (!(cause instanceof ClosedChannelException) && !(cause instanceof SocketException)) { if (!(cause instanceof ClosedChannelException) && !(cause instanceof SocketException)) {

View File

@ -171,7 +171,25 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
} }
if (isRegistered()) { if (isRegistered()) {
// Need to check if we are on the EventLoop as doClose() may be triggered by the GlobalEventExecutor
// if SO_LINGER is used.
//
// See https://github.com/netty/netty/issues/7159
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
doDeregister(); doDeregister();
} else {
loop.execute(new Runnable() {
@Override
public void run() {
try {
doDeregister();
} catch (Throwable cause) {
pipeline().fireExceptionCaught(cause);
}
}
});
}
} }
} finally { } finally {
socket.close(); socket.close();

View File

@ -23,7 +23,6 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.ServerChannel; import io.netty.channel.ServerChannel;
import io.netty.util.internal.UnstableApi;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
@ -73,16 +72,6 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@UnstableApi
@Override
protected final void doShutdownOutput(Throwable cause) throws Exception {
try {
super.doShutdownOutput(cause);
} finally {
close();
}
}
abstract Channel newChildChannel(int fd, byte[] remote, int offset, int len) throws Exception; abstract Channel newChildChannel(int fd, byte[] remote, int offset, int len) throws Exception;
final class EpollServerSocketUnsafe extends AbstractEpollUnsafe { final class EpollServerSocketUnsafe extends AbstractEpollUnsafe {

View File

@ -545,29 +545,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
@UnstableApi @UnstableApi
@Override @Override
protected final void doShutdownOutput(Throwable cause) throws Exception { protected final void doShutdownOutput() throws Exception {
try {
// The native socket implementation may throw a NotYetConnected exception when we attempt to shut it down.
// However NIO doesn't propagate an exception in the same situation (write failure), and we just want to
// update the socket state to flag that it has been shutdown. So don't use a voidPromise but instead create
// a new promise and ignore the results.
shutdownOutput0(newPromise());
} finally {
super.doShutdownOutput(cause);
}
}
private void shutdownOutput0(final ChannelPromise promise) {
try {
try {
socket.shutdown(false, true); socket.shutdown(false, true);
} finally {
((AbstractUnsafe) unsafe()).shutdownOutput();
}
promise.setSuccess();
} catch (Throwable cause) {
promise.setFailure(cause);
}
} }
private void shutdownInput0(final ChannelPromise promise) { private void shutdownInput0(final ChannelPromise promise) {
@ -579,19 +558,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
} }
} }
private void shutdown0(final ChannelPromise promise) {
try {
try {
socket.shutdown(true, true);
} finally {
((AbstractUnsafe) unsafe()).shutdownOutput();
}
promise.setSuccess();
} catch (Throwable cause) {
promise.setFailure(cause);
}
}
@Override @Override
public boolean isOutputShutdown() { public boolean isOutputShutdown() {
return socket.isOutputShutdown(); return socket.isOutputShutdown();
@ -614,27 +580,18 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
@Override @Override
public ChannelFuture shutdownOutput(final ChannelPromise promise) { public ChannelFuture shutdownOutput(final ChannelPromise promise) {
Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose();
if (closeExecutor != null) {
closeExecutor.execute(new Runnable() {
@Override
public void run() {
shutdownOutput0(promise);
}
});
} else {
EventLoop loop = eventLoop(); EventLoop loop = eventLoop();
if (loop.inEventLoop()) { if (loop.inEventLoop()) {
shutdownOutput0(promise); ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
} else { } else {
loop.execute(new Runnable() { loop.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
shutdownOutput0(promise); ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
} }
}); });
} }
}
return promise; return promise;
} }
@ -676,30 +633,52 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
@Override @Override
public ChannelFuture shutdown(final ChannelPromise promise) { public ChannelFuture shutdown(final ChannelPromise promise) {
Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose(); ChannelFuture shutdownOutputFuture = shutdownOutput();
if (closeExecutor != null) { if (shutdownOutputFuture.isDone()) {
closeExecutor.execute(new Runnable() { shutdownOutputDone(shutdownOutputFuture, promise);
} else {
shutdownOutputFuture.addListener(new ChannelFutureListener() {
@Override @Override
public void run() { public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
shutdown0(promise); shutdownOutputDone(shutdownOutputFuture, promise);
} }
}); });
} else {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
shutdown0(promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
shutdown0(promise);
}
});
}
} }
return promise; return promise;
} }
private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
ChannelFuture shutdownInputFuture = shutdownInput();
if (shutdownInputFuture.isDone()) {
shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
} else {
shutdownInputFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
}
});
}
}
private static void shutdownDone(ChannelFuture shutdownOutputFuture,
ChannelFuture shutdownInputFuture,
ChannelPromise promise) {
Throwable shutdownOutputCause = shutdownOutputFuture.cause();
Throwable shutdownInputCause = shutdownInputFuture.cause();
if (shutdownOutputCause != null) {
if (shutdownInputCause != null) {
logger.debug("Exception suppressed because a previous exception occurred.",
shutdownInputCause);
}
promise.setFailure(shutdownOutputCause);
} else if (shutdownInputCause != null) {
promise.setFailure(shutdownInputCause);
} else {
promise.setSuccess();
}
}
@Override @Override
protected void doClose() throws Exception { protected void doClose() throws Exception {
try { try {

View File

@ -31,7 +31,6 @@ import io.netty.channel.unix.DatagramSocketAddress;
import io.netty.channel.unix.IovArray; import io.netty.channel.unix.IovArray;
import io.netty.channel.unix.UnixChannelUtil; import io.netty.channel.unix.UnixChannelUtil;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import io.netty.util.internal.UnstableApi;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
@ -423,16 +422,6 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
return false; return false;
} }
@UnstableApi
@Override
protected void doShutdownOutput(Throwable cause) throws Exception {
// UDP sockets are not connected. A write failure may just be temporary or disconnect was called.
ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
if (channelOutboundBuffer != null) {
channelOutboundBuffer.remove(cause);
}
}
@Override @Override
protected void doClose() throws Exception { protected void doClose() throws Exception {
super.doClose(); super.doClose();

View File

@ -17,6 +17,7 @@ package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -98,4 +99,22 @@ public class EpollSocketChannelTest {
Assert.assertTrue(info.rcvSpace() >= 0); Assert.assertTrue(info.rcvSpace() >= 0);
Assert.assertTrue(info.totalRetrans() >= 0); Assert.assertTrue(info.totalRetrans() >= 0);
} }
// See https://github.com/netty/netty/issues/7159
@Test
public void testSoLingerNoAssertError() throws Exception {
EventLoopGroup group = new EpollEventLoopGroup(1);
try {
Bootstrap bootstrap = new Bootstrap();
EpollSocketChannel ch = (EpollSocketChannel) bootstrap.group(group)
.channel(EpollSocketChannel.class)
.option(ChannelOption.SO_LINGER, 10)
.handler(new ChannelInboundHandlerAdapter())
.bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
ch.close().syncUninterruptibly();
} finally {
group.shutdownGracefully();
}
}
} }

View File

@ -137,9 +137,28 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
try { try {
if (isRegistered()) { if (isRegistered()) {
// The FD will be closed, which should take care of deleting any associated events from kqueue, but // The FD will be closed, which should take care of deleting any associated events from kqueue, but
// since we rely upon jniSelfRef to be consistent we make sure that we clear this reference out for all] // since we rely upon jniSelfRef to be consistent we make sure that we clear this reference out for
// events which are pending in kqueue to avoid referencing a deleted pointer at a later time. // all events which are pending in kqueue to avoid referencing a deleted pointer at a later time.
// Need to check if we are on the EventLoop as doClose() may be triggered by the GlobalEventExecutor
// if SO_LINGER is used.
//
// See https://github.com/netty/netty/issues/7159
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
doDeregister(); doDeregister();
} else {
loop.execute(new Runnable() {
@Override
public void run() {
try {
doDeregister();
} catch (Throwable cause) {
pipeline().fireExceptionCaught(cause);
}
}
});
}
} }
} finally { } finally {
socket.close(); socket.close();

View File

@ -20,7 +20,6 @@ import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.ServerChannel; import io.netty.channel.ServerChannel;
import io.netty.util.internal.UnstableApi; import io.netty.util.internal.UnstableApi;
@ -70,16 +69,6 @@ public abstract class AbstractKQueueServerChannel extends AbstractKQueueChannel
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@UnstableApi
@Override
protected final void doShutdownOutput(Throwable cause) throws Exception {
try {
super.doShutdownOutput(cause);
} finally {
close();
}
}
abstract Channel newChildChannel(int fd, byte[] remote, int offset, int len) throws Exception; abstract Channel newChildChannel(int fd, byte[] remote, int offset, int len) throws Exception;
@Override @Override

View File

@ -20,6 +20,7 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
@ -34,6 +35,8 @@ import io.netty.channel.unix.UnixChannelUtil;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import io.netty.util.internal.UnstableApi; import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.net.SocketAddress; import java.net.SocketAddress;
@ -43,6 +46,7 @@ import java.util.concurrent.Executor;
@UnstableApi @UnstableApi
public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel implements DuplexChannel { public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel implements DuplexChannel {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractKQueueStreamChannel.class);
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16); private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
private static final String EXPECTED_TYPES = private static final String EXPECTED_TYPES =
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " + " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
@ -372,51 +376,8 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
@UnstableApi @UnstableApi
@Override @Override
protected final void doShutdownOutput(Throwable cause) throws Exception { protected final void doShutdownOutput() throws Exception {
try {
// The native socket implementation may throw a NotYetConnected exception when we attempt to shut it down.
// However NIO doesn't propagate an exception in the same situation (write failure), and we just want to
// update the socket state to flag that it has been shutdown. So don't use a voidPromise but instead create
// a new promise and ignore the results.
shutdownOutput0(newPromise());
} finally {
super.doShutdownOutput(cause);
}
}
private void shutdownOutput0(final ChannelPromise promise) {
try {
try {
socket.shutdown(false, true); socket.shutdown(false, true);
} finally {
((AbstractUnsafe) unsafe()).shutdownOutput();
}
promise.setSuccess();
} catch (Throwable cause) {
promise.setFailure(cause);
}
}
private void shutdownInput0(final ChannelPromise promise) {
try {
socket.shutdown(true, false);
promise.setSuccess();
} catch (Throwable cause) {
promise.setFailure(cause);
}
}
private void shutdown0(final ChannelPromise promise) {
try {
try {
socket.shutdown(true, true);
} finally {
((AbstractUnsafe) unsafe()).shutdownOutput();
}
promise.setSuccess();
} catch (Throwable cause) {
promise.setFailure(cause);
}
} }
@Override @Override
@ -441,27 +402,17 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
@Override @Override
public ChannelFuture shutdownOutput(final ChannelPromise promise) { public ChannelFuture shutdownOutput(final ChannelPromise promise) {
Executor closeExecutor = ((KQueueStreamUnsafe) unsafe()).prepareToClose();
if (closeExecutor != null) {
closeExecutor.execute(new Runnable() {
@Override
public void run() {
shutdownOutput0(promise);
}
});
} else {
EventLoop loop = eventLoop(); EventLoop loop = eventLoop();
if (loop.inEventLoop()) { if (loop.inEventLoop()) {
shutdownOutput0(promise); ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
} else { } else {
loop.execute(new Runnable() { loop.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
shutdownOutput0(promise); ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
} }
}); });
} }
}
return promise; return promise;
} }
@ -472,15 +423,6 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
@Override @Override
public ChannelFuture shutdownInput(final ChannelPromise promise) { public ChannelFuture shutdownInput(final ChannelPromise promise) {
Executor closeExecutor = ((KQueueStreamUnsafe) unsafe()).prepareToClose();
if (closeExecutor != null) {
closeExecutor.execute(new Runnable() {
@Override
public void run() {
shutdownInput0(promise);
}
});
} else {
EventLoop loop = eventLoop(); EventLoop loop = eventLoop();
if (loop.inEventLoop()) { if (loop.inEventLoop()) {
shutdownInput0(promise); shutdownInput0(promise);
@ -492,10 +434,19 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
} }
}); });
} }
}
return promise; return promise;
} }
private void shutdownInput0(ChannelPromise promise) {
try {
socket.shutdown(true, false);
} catch (Throwable cause) {
promise.setFailure(cause);
return;
}
promise.setSuccess();
}
@Override @Override
public ChannelFuture shutdown() { public ChannelFuture shutdown() {
return shutdown(newPromise()); return shutdown(newPromise());
@ -503,30 +454,52 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
@Override @Override
public ChannelFuture shutdown(final ChannelPromise promise) { public ChannelFuture shutdown(final ChannelPromise promise) {
Executor closeExecutor = ((KQueueStreamUnsafe) unsafe()).prepareToClose(); ChannelFuture shutdownOutputFuture = shutdownOutput();
if (closeExecutor != null) { if (shutdownOutputFuture.isDone()) {
closeExecutor.execute(new Runnable() { shutdownOutputDone(shutdownOutputFuture, promise);
} else {
shutdownOutputFuture.addListener(new ChannelFutureListener() {
@Override @Override
public void run() { public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
shutdown0(promise); shutdownOutputDone(shutdownOutputFuture, promise);
} }
}); });
} else {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
shutdown0(promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
shutdown0(promise);
}
});
}
} }
return promise; return promise;
} }
private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
ChannelFuture shutdownInputFuture = shutdownInput();
if (shutdownInputFuture.isDone()) {
shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
} else {
shutdownInputFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
}
});
}
}
private static void shutdownDone(ChannelFuture shutdownOutputFuture,
ChannelFuture shutdownInputFuture,
ChannelPromise promise) {
Throwable shutdownOutputCause = shutdownOutputFuture.cause();
Throwable shutdownInputCause = shutdownInputFuture.cause();
if (shutdownOutputCause != null) {
if (shutdownInputCause != null) {
logger.debug("Exception suppressed because a previous exception occurred.",
shutdownInputCause);
}
promise.setFailure(shutdownOutputCause);
} else if (shutdownInputCause != null) {
promise.setFailure(shutdownInputCause);
} else {
promise.setSuccess();
}
}
class KQueueStreamUnsafe extends AbstractKQueueUnsafe { class KQueueStreamUnsafe extends AbstractKQueueUnsafe {
// Overridden here just to be able to access this method from AbstractKQueueStreamChannel // Overridden here just to be able to access this method from AbstractKQueueStreamChannel
@Override @Override

View File

@ -397,16 +397,6 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement
return false; return false;
} }
@UnstableApi
@Override
protected void doShutdownOutput(Throwable cause) throws Exception {
// UDP sockets are not connected. A write failure may just be temporary or {@link #doDisconnect()} was called.
ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
if (channelOutboundBuffer != null) {
channelOutboundBuffer.remove(cause);
}
}
@Override @Override
protected void doClose() throws Exception { protected void doClose() throws Exception {
super.doClose(); super.doClose();

View File

@ -15,10 +15,16 @@
*/ */
package io.netty.channel.kqueue; package io.netty.channel.kqueue;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.net.InetSocketAddress;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
public class KQueueChannelConfigTest { public class KQueueChannelConfigTest {
@ -52,4 +58,22 @@ public class KQueueChannelConfigTest {
// expected // expected
} }
} }
// See https://github.com/netty/netty/issues/7159
@Test
public void testSoLingerNoAssertError() throws Exception {
EventLoopGroup group = new KQueueEventLoopGroup(1);
try {
Bootstrap bootstrap = new Bootstrap();
KQueueSocketChannel ch = (KQueueSocketChannel) bootstrap.group(group)
.channel(KQueueSocketChannel.class)
.option(ChannelOption.SO_LINGER, 10)
.handler(new ChannelInboundHandlerAdapter())
.bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
ch.close().syncUninterruptibly();
} finally {
group.shutdownGracefully();
}
}
} }

View File

@ -616,8 +616,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
* For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes. * For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
*/ */
@UnstableApi @UnstableApi
public final void shutdownOutput() { public final void shutdownOutput(final ChannelPromise promise) {
shutdownOutput(null); assertEventLoop();
shutdownOutput(promise, null);
} }
/** /**
@ -625,15 +626,60 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
* For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes. * For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
* @param cause The cause which may provide rational for the shutdown. * @param cause The cause which may provide rational for the shutdown.
*/ */
final void shutdownOutput(Throwable cause) { private void shutdownOutput(final ChannelPromise promise, Throwable cause) {
if (!promise.setUncancellable()) {
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) { if (outboundBuffer == null) {
promise.setFailure(CLOSE_CLOSED_CHANNEL_EXCEPTION);
return; return;
} }
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
ChannelOutputShutdownException e = new ChannelOutputShutdownException("Channel output shutdown", cause);
outboundBuffer.failFlushed(e, false); final Throwable shutdownCause = cause == null ?
outboundBuffer.close(e, true); new ChannelOutputShutdownException("Channel output shutdown") :
new ChannelOutputShutdownException("Channel output shutdown", cause);
Executor closeExecutor = prepareToClose();
if (closeExecutor != null) {
closeExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// Execute the shutdown.
doShutdownOutput();
promise.setSuccess();
} catch (Throwable err) {
promise.setFailure(err);
} finally {
// Dispatch to the EventLoop
eventLoop().execute(new Runnable() {
@Override
public void run() {
closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
}
});
}
}
});
} else {
try {
// Execute the shutdown.
doShutdownOutput();
promise.setSuccess();
} catch (Throwable err) {
promise.setFailure(err);
} finally {
closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
}
}
}
private void closeOutboundBufferForShutdown(
ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) {
buffer.failFlushed(cause, false);
buffer.close(cause, true);
pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE); pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);
} }
@ -899,7 +945,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
} else { } else {
try { try {
doShutdownOutput(t); shutdownOutput(voidPromise(), t);
} catch (Throwable t2) { } catch (Throwable t2) {
close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
} }
@ -1039,11 +1085,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
/** /**
* Called when conditions justify shutting down the output portion of the channel. This may happen if a write * Called when conditions justify shutting down the output portion of the channel. This may happen if a write
* operation throws an exception. * operation throws an exception.
* @param cause The cause for the shutdown.
*/ */
@UnstableApi @UnstableApi
protected void doShutdownOutput(Throwable cause) throws Exception { protected void doShutdownOutput() throws Exception {
((AbstractUnsafe) unsafe).shutdownOutput(cause); doClose();
} }
/** /**

View File

@ -15,8 +15,6 @@
*/ */
package io.netty.channel; package io.netty.channel;
import io.netty.util.internal.UnstableApi;
import java.net.SocketAddress; import java.net.SocketAddress;
/** /**
@ -60,16 +58,6 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@UnstableApi
@Override
protected final void doShutdownOutput(Throwable cause) throws Exception {
try {
super.doShutdownOutput(cause);
} finally {
close();
}
}
@Override @Override
protected AbstractUnsafe newUnsafe() { protected AbstractUnsafe newUnsafe() {
return new DefaultServerUnsafe(); return new DefaultServerUnsafe();

View File

@ -696,13 +696,6 @@ public class EmbeddedChannel extends AbstractChannel {
state = State.CLOSED; state = State.CLOSED;
} }
@UnstableApi
@Override
protected final void doShutdownOutput(Throwable cause) throws Exception {
super.doShutdownOutput(cause);
close();
}
@Override @Override
protected void doBeginRead() throws Exception { protected void doBeginRead() throws Exception {
// NOOP // NOOP

View File

@ -32,7 +32,6 @@ import io.netty.util.concurrent.SingleThreadEventExecutor;
import io.netty.util.internal.InternalThreadLocalMap; import io.netty.util.internal.InternalThreadLocalMap;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ThrowableUtil; import io.netty.util.internal.ThrowableUtil;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -292,13 +291,6 @@ public class LocalChannel extends AbstractChannel {
} }
} }
@UnstableApi
@Override
protected final void doShutdownOutput(Throwable cause) throws Exception {
super.doShutdownOutput(cause);
close();
}
private void tryClose(boolean isActive) { private void tryClose(boolean isActive) {
if (isActive) { if (isActive) {
unsafe().close(unsafe().voidPromise()); unsafe().close(unsafe().voidPromise());

View File

@ -21,7 +21,6 @@ import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.ServerChannel; import io.netty.channel.ServerChannel;
import io.netty.util.internal.UnstableApi;
import java.io.IOException; import java.io.IOException;
import java.net.PortUnreachableException; import java.net.PortUnreachableException;
@ -155,7 +154,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
} }
break; break;
} }
} catch (IOException e) { } catch (Exception e) {
if (continueOnWriteError()) { if (continueOnWriteError()) {
in.remove(e); in.remove(e);
} else { } else {
@ -180,13 +179,6 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
!(this instanceof ServerChannel); !(this instanceof ServerChannel);
} }
@UnstableApi
@Override
protected void doShutdownOutput(Throwable cause) throws Exception {
super.doShutdownOutput(cause);
close();
}
/** /**
* Read messages into the given array and return the amount which was read. * Read messages into the given array and return the amount which was read.
*/ */

View File

@ -19,7 +19,6 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.util.internal.UnstableApi;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -104,13 +103,6 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
} }
} }
@UnstableApi
@Override
protected void doShutdownOutput(Throwable cause) throws Exception {
super.doShutdownOutput(cause);
close();
}
/** /**
* Read messages into the given array and return the amount which was read. * Read messages into the given array and return the amount which was read.
*/ */

View File

@ -235,16 +235,6 @@ public final class NioDatagramChannel
javaChannel().close(); javaChannel().close();
} }
@UnstableApi
@Override
protected void doShutdownOutput(Throwable cause) throws Exception {
// UDP sockets are not connected. A write failure may just be temporary or doDisconnect() was called.
ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
if (channelOutboundBuffer != null) {
channelOutboundBuffer.remove(cause);
}
}
@Override @Override
protected int doReadMessages(List<Object> buf) throws Exception { protected int doReadMessages(List<Object> buf) throws Exception {
DatagramChannel ch = javaChannel(); DatagramChannel ch = javaChannel();

View File

@ -150,17 +150,11 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
@UnstableApi @UnstableApi
@Override @Override
protected final void doShutdownOutput(final Throwable cause) throws Exception { protected final void doShutdownOutput() throws Exception {
ChannelFuture future = shutdownOutput(); if (PlatformDependent.javaVersion() >= 7) {
if (future.isDone()) { javaChannel().shutdownOutput();
super.doShutdownOutput(cause);
} else { } else {
future.addListener(new ChannelFutureListener() { javaChannel().socket().shutdownOutput();
@Override
public void operationComplete(ChannelFuture future) throws Exception {
NioSocketChannel.super.doShutdownOutput(cause);
}
});
} }
} }
@ -171,27 +165,17 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
@Override @Override
public ChannelFuture shutdownOutput(final ChannelPromise promise) { public ChannelFuture shutdownOutput(final ChannelPromise promise) {
Executor closeExecutor = ((NioSocketChannelUnsafe) unsafe()).prepareToClose(); final EventLoop loop = eventLoop();
if (closeExecutor != null) {
closeExecutor.execute(new Runnable() {
@Override
public void run() {
shutdownOutput0(promise);
}
});
} else {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) { if (loop.inEventLoop()) {
shutdownOutput0(promise); ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
} else { } else {
loop.execute(new Runnable() { loop.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
shutdownOutput0(promise); ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
} }
}); });
} }
}
return promise; return promise;
} }
@ -207,15 +191,6 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
@Override @Override
public ChannelFuture shutdownInput(final ChannelPromise promise) { public ChannelFuture shutdownInput(final ChannelPromise promise) {
Executor closeExecutor = ((NioSocketChannelUnsafe) unsafe()).prepareToClose();
if (closeExecutor != null) {
closeExecutor.execute(new Runnable() {
@Override
public void run() {
shutdownInput0(promise);
}
});
} else {
EventLoop loop = eventLoop(); EventLoop loop = eventLoop();
if (loop.inEventLoop()) { if (loop.inEventLoop()) {
shutdownInput0(promise); shutdownInput0(promise);
@ -227,7 +202,6 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
} }
}); });
} }
}
return promise; return promise;
} }
@ -238,51 +212,51 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
@Override @Override
public ChannelFuture shutdown(final ChannelPromise promise) { public ChannelFuture shutdown(final ChannelPromise promise) {
Executor closeExecutor = ((NioSocketChannelUnsafe) unsafe()).prepareToClose(); ChannelFuture shutdownOutputFuture = shutdownOutput();
if (closeExecutor != null) { if (shutdownOutputFuture.isDone()) {
closeExecutor.execute(new Runnable() { shutdownOutputDone(shutdownOutputFuture, promise);
} else {
shutdownOutputFuture.addListener(new ChannelFutureListener() {
@Override @Override
public void run() { public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
shutdown0(promise); shutdownOutputDone(shutdownOutputFuture, promise);
} }
}); });
} else {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
shutdown0(promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
shutdown0(promise);
}
});
}
} }
return promise; return promise;
} }
private void shutdownOutput0(final ChannelPromise promise) { private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
try { ChannelFuture shutdownInputFuture = shutdownInput();
shutdownOutput0(); if (shutdownInputFuture.isDone()) {
promise.setSuccess(); shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
} catch (Throwable t) {
promise.setFailure(t);
}
}
private void shutdownOutput0() throws Exception {
try {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().shutdownOutput();
} else { } else {
javaChannel().socket().shutdownOutput(); shutdownInputFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
} }
} finally { });
((AbstractUnsafe) unsafe()).shutdownOutput();
} }
} }
private static void shutdownDone(ChannelFuture shutdownOutputFuture,
ChannelFuture shutdownInputFuture,
ChannelPromise promise) {
Throwable shutdownOutputCause = shutdownOutputFuture.cause();
Throwable shutdownInputCause = shutdownInputFuture.cause();
if (shutdownOutputCause != null) {
if (shutdownInputCause != null) {
logger.debug("Exception suppressed because a previous exception occurred.",
shutdownInputCause);
}
promise.setFailure(shutdownOutputCause);
} else if (shutdownInputCause != null) {
promise.setFailure(shutdownInputCause);
} else {
promise.setSuccess();
}
}
private void shutdownInput0(final ChannelPromise promise) { private void shutdownInput0(final ChannelPromise promise) {
try { try {
shutdownInput0(); shutdownInput0();
@ -300,31 +274,6 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
} }
} }
private void shutdown0(final ChannelPromise promise) {
Throwable cause = null;
try {
shutdownOutput0();
} catch (Throwable t) {
cause = t;
}
try {
shutdownInput0();
} catch (Throwable t) {
if (cause == null) {
promise.setFailure(t);
} else {
logger.debug("Exception suppressed because a previous exception occurred.", t);
promise.setFailure(cause);
}
return;
}
if (cause == null) {
promise.setSuccess();
} else {
promise.setFailure(cause);
}
}
@Override @Override
protected SocketAddress localAddress0() { protected SocketAddress localAddress0() {
return javaChannel().socket().getLocalSocketAddress(); return javaChannel().socket().getLocalSocketAddress();

View File

@ -32,7 +32,6 @@ import io.netty.channel.socket.DatagramPacket;
import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -199,16 +198,6 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
socket.disconnect(); socket.disconnect();
} }
@UnstableApi
@Override
protected final void doShutdownOutput(Throwable cause) throws Exception {
// UDP sockets are not connected. A write failure may just be temporary or {@link #doDisconnect()} was called.
ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
if (channelOutboundBuffer != null) {
channelOutboundBuffer.remove(cause);
}
}
@Override @Override
protected void doClose() throws Exception { protected void doClose() throws Exception {
socket.close(); socket.close();
@ -293,7 +282,7 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
} }
socket.send(tmpPacket); socket.send(tmpPacket);
in.remove(); in.remove();
} catch (IOException e) { } catch (Exception e) {
// Continue on write error as a DatagramChannel can write to multiple remote peers // Continue on write error as a DatagramChannel can write to multiple remote peers
// //
// See https://github.com/netty/netty/issues/2665 // See https://github.com/netty/netty/issues/2665

View File

@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.ConnectTimeoutException; import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
@ -131,9 +132,8 @@ public class OioSocketChannel extends OioByteStreamChannel implements SocketChan
@UnstableApi @UnstableApi
@Override @Override
protected final void doShutdownOutput(final Throwable cause) throws Exception { protected final void doShutdownOutput() throws Exception {
shutdownOutput0(voidPromise()); shutdownOutput0();
super.doShutdownOutput(cause);
} }
@Override @Override
@ -189,11 +189,7 @@ public class OioSocketChannel extends OioByteStreamChannel implements SocketChan
} }
private void shutdownOutput0() throws IOException { private void shutdownOutput0() throws IOException {
try {
socket.shutdownOutput(); socket.shutdownOutput();
} finally {
((AbstractUnsafe) unsafe()).shutdownOutput();
}
} }
@Override @Override
@ -223,42 +219,49 @@ public class OioSocketChannel extends OioByteStreamChannel implements SocketChan
@Override @Override
public ChannelFuture shutdown(final ChannelPromise promise) { public ChannelFuture shutdown(final ChannelPromise promise) {
EventLoop loop = eventLoop(); ChannelFuture shutdownOutputFuture = shutdownOutput();
if (loop.inEventLoop()) { if (shutdownOutputFuture.isDone()) {
shutdown0(promise); shutdownOutputDone(shutdownOutputFuture, promise);
} else { } else {
loop.execute(new Runnable() { shutdownOutputFuture.addListener(new ChannelFutureListener() {
@Override @Override
public void run() { public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
shutdown0(promise); shutdownOutputDone(shutdownOutputFuture, promise);
} }
}); });
} }
return promise; return promise;
} }
private void shutdown0(ChannelPromise promise) { private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
Throwable cause = null; ChannelFuture shutdownInputFuture = shutdownInput();
try { if (shutdownInputFuture.isDone()) {
shutdownOutput0(); shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
} catch (Throwable t) {
cause = t;
}
try {
socket.shutdownInput();
} catch (Throwable t) {
if (cause == null) {
promise.setFailure(t);
} else { } else {
logger.debug("Exception suppressed because a previous exception occurred.", t); shutdownInputFuture.addListener(new ChannelFutureListener() {
promise.setFailure(cause); @Override
public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
} }
return; });
} }
if (cause == null) { }
private static void shutdownDone(ChannelFuture shutdownOutputFuture,
ChannelFuture shutdownInputFuture,
ChannelPromise promise) {
Throwable shutdownOutputCause = shutdownOutputFuture.cause();
Throwable shutdownInputCause = shutdownInputFuture.cause();
if (shutdownOutputCause != null) {
if (shutdownInputCause != null) {
logger.debug("Exception suppressed because a previous exception occurred.",
shutdownInputCause);
}
promise.setFailure(shutdownOutputCause);
} else if (shutdownInputCause != null) {
promise.setFailure(shutdownInputCause);
} else {
promise.setSuccess(); promise.setSuccess();
} else {
promise.setFailure(cause);
} }
} }