Include the original Exception that caused the Channel to be closed in the ClosedChannelException (#8863)

Motivation:

To make it easier to understand why a Channel was closed previously and so why the operation failed with a ClosedChannelException we should include the original Exception.

Modifications:

- Store the original exception that lead to the closed Channel and include it in the ClosedChannelException that is used to fail the operation.
- Add unit test

Result:

Fixes https://github.com/netty/netty/issues/8862.
This commit is contained in:
Norman Maurer 2019-02-15 13:13:17 -08:00
parent d036b24a4b
commit 68d8bd9a95
3 changed files with 155 additions and 19 deletions

View File

@ -46,14 +46,14 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
private static final ClosedChannelException FLUSH0_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), AbstractUnsafe.class, "flush0()");
private static final ClosedChannelException ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), AbstractUnsafe.class, "ensureOpen(...)");
new ExtendedClosedChannelException(null), AbstractUnsafe.class, "ensureOpen(...)");
private static final ClosedChannelException CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), AbstractUnsafe.class, "close(...)");
private static final ClosedChannelException WRITE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), AbstractUnsafe.class, "write(...)");
new ExtendedClosedChannelException(null), AbstractUnsafe.class, "write(...)");
private static final ClosedChannelException FLUSH0_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ExtendedClosedChannelException(null), AbstractUnsafe.class, "flush0()");
private static final NotYetConnectedException FLUSH0_NOT_YET_CONNECTED_EXCEPTION = ThrowableUtil.unknownStackTrace(
new NotYetConnectedException(), AbstractUnsafe.class, "flush0()");
@ -70,6 +70,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private final EventLoop eventLoop;
private volatile boolean registered;
private boolean closeInitiated;
private Throwable initialCloseCause;
/** Cache for the string representation of this channel */
private boolean strValActive;
@ -817,7 +818,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
safeSetFailure(promise, newWriteException(initialCloseCause));
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
@ -876,7 +877,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
outboundBuffer.failFlushed(newFlush0Exception(initialCloseCause), false);
}
} finally {
inFlush0 = false;
@ -896,12 +897,14 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
initialCloseCause = t;
close(voidPromise(), t, newFlush0Exception(t), false);
} else {
try {
shutdownOutput(voidPromise(), t);
} catch (Throwable t2) {
close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
initialCloseCause = t;
close(voidPromise(), t2, newFlush0Exception(t), false);
}
}
} finally {
@ -909,6 +912,30 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
private ClosedChannelException newWriteException(Throwable cause) {
if (cause == null) {
return WRITE_CLOSED_CHANNEL_EXCEPTION;
}
return ThrowableUtil.unknownStackTrace(
new ExtendedClosedChannelException(cause), AbstractUnsafe.class, "write(...)");
}
private ClosedChannelException newFlush0Exception(Throwable cause) {
if (cause == null) {
return FLUSH0_CLOSED_CHANNEL_EXCEPTION;
}
return ThrowableUtil.unknownStackTrace(
new ExtendedClosedChannelException(cause), AbstractUnsafe.class, "flush0()");
}
private ClosedChannelException newEnsureOpenException(Throwable cause) {
if (cause == null) {
return ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION;
}
return ThrowableUtil.unknownStackTrace(
new ExtendedClosedChannelException(cause), AbstractUnsafe.class, "ensureOpen(...)");
}
@Override
public final ChannelPromise voidPromise() {
assertEventLoop();
@ -921,7 +948,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return true;
}
safeSetFailure(promise, new ClosedChannelException());
safeSetFailure(promise, newEnsureOpenException(initialCloseCause));
return false;
}

View File

@ -0,0 +1,32 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import java.nio.channels.ClosedChannelException;
final class ExtendedClosedChannelException extends ClosedChannelException {
ExtendedClosedChannelException(Throwable cause) {
if (cause != null) {
initCause(cause);
}
}
@Override
public Throwable fillInStackTrace() {
return this;
}
}

View File

@ -15,8 +15,12 @@
*/
package io.netty.channel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import io.netty.util.NetUtil;
import org.junit.Test;
import static org.junit.Assert.*;
@ -80,6 +84,77 @@ public class AbstractChannelTest {
assertTrue(channelId instanceof DefaultChannelId);
}
@Test
public void testClosedChannelExceptionCarryIOException() throws Exception {
final IOException ioException = new IOException();
final EventLoop eventLoop = mock(EventLoop.class);
// This allows us to have a single-threaded test
when(eventLoop.inEventLoop()).thenReturn(true);
when(eventLoop.unsafe()).thenReturn(mock(EventLoop.Unsafe.class));
doAnswer(invocationOnMock -> {
((Runnable) invocationOnMock.getArgument(0)).run();
return null;
}).when(eventLoop).execute(any(Runnable.class));
final Channel channel = new TestChannel(eventLoop) {
private boolean open = true;
private boolean active;
@Override
protected AbstractUnsafe newUnsafe() {
return new AbstractUnsafe() {
@Override
public void connect(
SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
active = true;
promise.setSuccess();
}
};
}
@Override
protected void doClose() {
active = false;
open = false;
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
throw ioException;
}
@Override
public boolean isOpen() {
return open;
}
@Override
public boolean isActive() {
return active;
}
};
try {
registerChannel(channel);
channel.connect(new InetSocketAddress(NetUtil.LOCALHOST, 8888)).sync();
assertSame(ioException, channel.writeAndFlush("").await().cause());
assertClosedChannelException(channel.writeAndFlush(""), ioException);
assertClosedChannelException(channel.write(""), ioException);
assertClosedChannelException(channel.bind(new InetSocketAddress(NetUtil.LOCALHOST, 8888)), ioException);
} finally {
channel.close();
}
}
private static void assertClosedChannelException(ChannelFuture future, IOException expected)
throws InterruptedException {
Throwable cause = future.await().cause();
assertTrue(cause instanceof ClosedChannelException);
assertSame(expected, cause.getCause());
}
private static void registerChannel(Channel channel) throws Exception {
DefaultChannelPromise future = new DefaultChannelPromise(channel);
channel.register(future);
@ -88,11 +163,8 @@ public class AbstractChannelTest {
private static class TestChannel extends AbstractChannel {
private static final ChannelMetadata TEST_METADATA = new ChannelMetadata(false);
private class TestUnsafe extends AbstractUnsafe {
@Override
public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { }
}
private final ChannelConfig config = new DefaultChannelConfig(this);
TestChannel(EventLoop eventLoop) {
super(null, eventLoop);
@ -100,7 +172,7 @@ public class AbstractChannelTest {
@Override
public ChannelConfig config() {
return new DefaultChannelConfig(this);
return config;
}
@Override
@ -120,7 +192,12 @@ public class AbstractChannelTest {
@Override
protected AbstractUnsafe newUnsafe() {
return new TestUnsafe();
return new AbstractUnsafe() {
@Override
public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
promise.setFailure(new UnsupportedOperationException());
}
};
}
@Override
@ -134,16 +211,16 @@ public class AbstractChannelTest {
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception { }
protected void doBind(SocketAddress localAddress) { }
@Override
protected void doDisconnect() throws Exception { }
protected void doDisconnect() { }
@Override
protected void doClose() throws Exception { }
protected void doClose() { }
@Override
protected void doBeginRead() throws Exception { }
protected void doBeginRead() { }
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception { }