Trigger channelWritabilityChanged() later to avoid reentrance

Related: #3212

Motivation:

When SslHandler and ChunkedWriteHandler exists in a pipeline together,
it is possible that ChunkedWriteHandler.channelWritabilityChanged()
invokes SslHandler.flush() and vice versa. Because they can feed each
other (i.e. ChunkedWriteHandler.channelWritabilityChanged() ->
SslHandler.flush() -> ChunkedWriteHandler.channelWritabilityChanged() ->
..), they can fall into an inconsistent state due to reentrance (e.g.
bad MAC record at the remote peer due to incorrect ordering.)

Modifications:

- Trigger channelWritabilityChanged() using EventLoop.execute() when
  there's a chance where channelWritabilityChanged() can cause a
  reentrance issue
- Fix test failures caused by the modification

Result:

Fix the handler reentrance issues related with a
channelWritabilityChanged() event
This commit is contained in:
Trustin Lee 2014-12-10 18:36:53 +09:00
parent 3c2b5c96fd
commit b8abe2d92e
4 changed files with 131 additions and 34 deletions

View File

@ -84,6 +84,8 @@ public final class ChannelOutboundBuffer {
@SuppressWarnings("UnusedDeclaration")
private volatile int unwritable;
private volatile Runnable fireChannelWritabilityChangedTask;
static {
AtomicIntegerFieldUpdater<ChannelOutboundBuffer> unwritableUpdater =
PlatformDependent.newAtomicIntegerFieldUpdater(ChannelOutboundBuffer.class, "unwritable");
@ -124,7 +126,7 @@ public final class ChannelOutboundBuffer {
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
incrementPendingOutboundBytes(size);
incrementPendingOutboundBytes(size, false);
}
/**
@ -147,7 +149,7 @@ public final class ChannelOutboundBuffer {
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
decrementPendingOutboundBytes(pending);
decrementPendingOutboundBytes(pending, false);
}
entry = entry.next;
} while (entry != null);
@ -162,13 +164,17 @@ public final class ChannelOutboundBuffer {
* This method is thread-safe!
*/
void incrementPendingOutboundBytes(long size) {
incrementPendingOutboundBytes(size, true);
}
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize >= channel.config().getWriteBufferHighWaterMark()) {
setUnwritable();
setUnwritable(invokeLater);
}
}
@ -177,13 +183,17 @@ public final class ChannelOutboundBuffer {
* This method is thread-safe!
*/
void decrementPendingOutboundBytes(long size) {
decrementPendingOutboundBytes(size, true);
}
private void decrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
if (newWriteBufferSize == 0 || newWriteBufferSize <= channel.config().getWriteBufferLowWaterMark()) {
setWritable();
setWritable(invokeLater);
}
}
@ -247,7 +257,7 @@ public final class ChannelOutboundBuffer {
// only release message, notify and decrement if it was not canceled before.
ReferenceCountUtil.safeRelease(msg);
safeSuccess(promise);
decrementPendingOutboundBytes(size);
decrementPendingOutboundBytes(size, false);
}
// recycle the entry
@ -278,7 +288,7 @@ public final class ChannelOutboundBuffer {
ReferenceCountUtil.safeRelease(msg);
safeFail(promise, cause);
decrementPendingOutboundBytes(size);
decrementPendingOutboundBytes(size, false);
}
// recycle the entry
@ -476,7 +486,7 @@ public final class ChannelOutboundBuffer {
final int newValue = oldValue & mask;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue != 0 && newValue == 0) {
channel.pipeline().fireChannelWritabilityChanged();
fireChannelWritabilityChanged(true);
}
break;
}
@ -490,7 +500,7 @@ public final class ChannelOutboundBuffer {
final int newValue = oldValue | mask;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0 && newValue != 0) {
channel.pipeline().fireChannelWritabilityChanged();
fireChannelWritabilityChanged(true);
}
break;
}
@ -504,32 +514,50 @@ public final class ChannelOutboundBuffer {
return 1 << index;
}
private void setWritable() {
private void setWritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue & ~1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue != 0 && newValue == 0) {
channel.pipeline().fireChannelWritabilityChanged();
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
private void setUnwritable() {
private void setUnwritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | 1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0 && newValue != 0) {
channel.pipeline().fireChannelWritabilityChanged();
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
private void fireChannelWritabilityChanged(boolean invokeLater) {
final ChannelPipeline pipeline = channel.pipeline();
if (invokeLater) {
Runnable task = fireChannelWritabilityChangedTask;
if (task == null) {
fireChannelWritabilityChangedTask = task = new Runnable() {
@Override
public void run() {
pipeline.fireChannelWritabilityChanged();
}
};
}
channel.eventLoop().execute(task);
} else {
pipeline.fireChannelWritabilityChanged();
}
}
/**
* Returns the number of flushed messages in this {@link ChannelOutboundBuffer}.
*/

View File

@ -16,7 +16,6 @@
package io.netty.channel;
import static org.junit.Assert.assertEquals;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
@ -25,12 +24,14 @@ import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.local.LocalServerChannel;
import static org.junit.Assert.*;
class BaseChannelTest {
private final LoggingHandler loggingHandler;
BaseChannelTest() {
this.loggingHandler = new LoggingHandler();
loggingHandler = new LoggingHandler();
}
ServerBootstrap getLocalServerBootstrap() {
@ -53,7 +54,7 @@ class BaseChannelTest {
cb.channel(LocalChannel.class);
cb.group(clientGroup);
cb.handler(this.loggingHandler);
cb.handler(loggingHandler);
return cb;
}
@ -64,17 +65,26 @@ class BaseChannelTest {
return buf;
}
void assertLog(String expected) {
String actual = this.loggingHandler.getLog();
assertEquals(expected, actual);
void assertLog(String firstExpected, String... otherExpected) {
String actual = loggingHandler.getLog();
if (firstExpected.equals(actual)) {
return;
}
for (String e: otherExpected) {
if (e.equals(actual)) {
return;
}
}
// Let the comparison fail with the first expectation.
assertEquals(firstExpected, actual);
}
void clearLog() {
this.loggingHandler.clear();
loggingHandler.clear();
}
void setInterest(LoggingHandler.Event... events) {
this.loggingHandler.setInterest(events);
loggingHandler.setInterest(events);
}
}

View File

@ -261,10 +261,12 @@ public class ChannelOutboundBufferTest {
// Ensure that setting a user-defined writability flag to false affects channel.isWritable();
cob.setUserDefinedWritability(1, false);
ch.runPendingTasks();
assertThat(buf.toString(), is("false "));
// Ensure that setting a user-defined writability flag to true affects channel.isWritable();
cob.setUserDefinedWritability(1, true);
ch.runPendingTasks();
assertThat(buf.toString(), is("false true "));
safeClose(ch);
@ -288,19 +290,23 @@ public class ChannelOutboundBufferTest {
// Ensure that setting a user-defined writability flag to false affects channel.isWritable()
cob.setUserDefinedWritability(1, false);
ch.runPendingTasks();
assertThat(buf.toString(), is("false "));
// Ensure that setting another user-defined writability flag to false does not trigger
// channelWritabilityChanged.
cob.setUserDefinedWritability(2, false);
ch.runPendingTasks();
assertThat(buf.toString(), is("false "));
// Ensure that setting only one user-defined writability flag to true does not affect channel.isWritable()
cob.setUserDefinedWritability(1, true);
ch.runPendingTasks();
assertThat(buf.toString(), is("false "));
// Ensure that setting all user-defined writability flags to true affects channel.isWritable()
cob.setUserDefinedWritability(2, true);
ch.runPendingTasks();
assertThat(buf.toString(), is("false true "));
safeClose(ch);
@ -328,6 +334,7 @@ public class ChannelOutboundBufferTest {
// Ensure that setting a user-defined writability flag to false does not trigger channelWritabilityChanged()
cob.setUserDefinedWritability(1, false);
ch.runPendingTasks();
assertThat(buf.toString(), is("false "));
// Ensure reducing the totalPendingWriteBytes down to zero does not trigger channelWritabilityChannged()
@ -338,6 +345,7 @@ public class ChannelOutboundBufferTest {
// Ensure that setting the user-defined writability flag to true triggers channelWritabilityChanged()
cob.setUserDefinedWritability(1, true);
ch.runPendingTasks();
assertThat(buf.toString(), is("false true "));
safeClose(ch);

View File

@ -45,21 +45,62 @@ public class ReentrantChannelTest extends BaseChannelTest {
clientChannel.config().setWriteBufferLowWaterMark(512);
clientChannel.config().setWriteBufferHighWaterMark(1024);
// What is supposed to happen from this point:
//
// 1. Because this write attempt has been made from a non-I/O thread,
// ChannelOutboundBuffer.pendingWriteBytes will be increased before
// write() event is really evaluated.
// -> channelWritabilityChanged() will be triggered,
// because the Channel became unwritable.
//
// 2. The write() event is handled by the pipeline in an I/O thread.
// -> write() will be triggered.
//
// 3. Once the write() event is handled, ChannelOutboundBuffer.pendingWriteBytes
// will be decreased.
// -> channelWritabilityChanged() will be triggered,
// because the Channel became writable again.
//
// 4. The message is added to the ChannelOutboundBuffer and thus
// pendingWriteBytes will be increased again.
// -> channelWritabilityChanged() will be triggered.
//
// 5. The flush() event causes the write request in theChannelOutboundBuffer
// to be removed.
// -> flush() and channelWritabilityChanged() will be triggered.
//
// Note that the channelWritabilityChanged() in the step 4 can occur between
// the flush() and the channelWritabilityChanged() in the stap 5, because
// the flush() is invoked from a non-I/O thread while the other are from
// an I/O thread.
ChannelFuture future = clientChannel.write(createTestBuf(2000));
clientChannel.flush();
future.sync();
clientChannel.close().sync();
assertLog(
"WRITABILITY: writable=false\n" +
"WRITABILITY: writable=true\n" +
"WRITE\n" +
"WRITABILITY: writable=false\n" +
"FLUSH\n" +
"WRITABILITY: writable=true\n");
// Case 1:
"WRITABILITY: writable=false\n" +
"WRITE\n" +
"WRITABILITY: writable=false\n" +
"WRITABILITY: writable=false\n" +
"FLUSH\n" +
"WRITABILITY: writable=true\n",
// Case 2:
"WRITABILITY: writable=false\n" +
"WRITE\n" +
"WRITABILITY: writable=false\n" +
"FLUSH\n" +
"WRITABILITY: writable=true\n" +
"WRITABILITY: writable=true\n");
}
/**
* Similar to {@link #testWritabilityChanged()} with slight variation.
*/
@Test
public void testFlushInWritabilityChanged() throws Exception {
@ -87,17 +128,27 @@ public class ReentrantChannelTest extends BaseChannelTest {
});
assertTrue(clientChannel.isWritable());
clientChannel.write(createTestBuf(2000)).sync();
clientChannel.close().sync();
assertLog(
"WRITABILITY: writable=false\n" +
"FLUSH\n" +
"WRITABILITY: writable=true\n" +
"WRITE\n" +
"WRITABILITY: writable=false\n" +
"FLUSH\n" +
"WRITABILITY: writable=true\n");
// Case 1:
"WRITABILITY: writable=false\n" +
"FLUSH\n" +
"WRITE\n" +
"WRITABILITY: writable=false\n" +
"WRITABILITY: writable=false\n" +
"FLUSH\n" +
"WRITABILITY: writable=true\n",
// Case 2:
"WRITABILITY: writable=false\n" +
"FLUSH\n" +
"WRITE\n" +
"WRITABILITY: writable=false\n" +
"FLUSH\n" +
"WRITABILITY: writable=true\n" +
"WRITABILITY: writable=true\n");
}
@Test