Change semantics of EmbeddedChannel to match other transports more closely. (#9529)

Motiviation:

EmbeddedChannel currently is quite differently in terms of semantics to other Channel implementations. We should better change it to be more closely aligned and so have the testing code be more robust.

Modifications:

- Change EmbeddedEventLoop.inEventLoop() to only return true if we currenlty run pending / scheduled tasks
- Change EmbeddedEventLoop.execute(...) to automatically process pending tasks if not already doing so
- Adjust a few tests for the new semantics (which is closer to other Channel implementations)

Result:

EmbeddedChannel works more like other Channel implementations
This commit is contained in:
Norman Maurer 2019-09-04 12:00:06 +02:00 committed by GitHub
parent 48634f1466
commit 3099bbcc13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 187 additions and 94 deletions

View File

@ -53,6 +53,7 @@ import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
@ -222,10 +223,22 @@ public class Http2FrameCodecTest {
Http2FrameCodec codec = new Http2FrameCodec(enc, dec, new Http2Settings(), false);
EmbeddedChannel em = new EmbeddedChannel(codec);
// We call #consumeBytes on a stream id which has not been seen yet to emulate the case
// where a stream is deregistered which in reality can happen in response to a RST.
assertFalse(codec.consumeBytes(1, 1));
AtomicReference<Http2Exception> errorRef = new AtomicReference<>();
em.eventLoop().execute(() -> {
try {
// We call #consumeBytes on a stream id which has not been seen yet to emulate the case
// where a stream is deregistered which in reality can happen in response to a RST.
assertFalse(codec.consumeBytes(1, 1));
} catch (Http2Exception e) {
errorRef.set(e);
}
});
assertTrue(em.finishAndReleaseAll());
Http2Exception exception = errorRef.get();
if (exception != null) {
throw exception;
}
}
@Test
@ -516,9 +529,23 @@ public class Http2FrameCodecTest {
int initialWindowSizeBefore = localFlow.initialWindowSize();
Http2Stream connectionStream = connection.connectionStream();
int connectionWindowSizeBefore = localFlow.windowSize(connectionStream);
// We only replenish the flow control window after the amount consumed drops below the following threshold.
// We make the threshold very "high" so that window updates will be sent when the delta is relatively small.
((DefaultHttp2LocalFlowController) localFlow).windowUpdateRatio(connectionStream, .999f);
AtomicReference<Http2Exception> errorRef = new AtomicReference<>();
channel.eventLoop().execute(() -> {
try {
// We only replenish the flow control window after the amount consumed drops below the following
// threshold. We make the threshold very "high" so that window updates will be sent when the delta is
// relatively small.
((DefaultHttp2LocalFlowController) localFlow).windowUpdateRatio(connectionStream, .999f);
} catch (Http2Exception e) {
errorRef.set(e);
}
});
Http2Exception exception = errorRef.get();
if (exception != null) {
throw exception;
}
int windowUpdate = 1024;
@ -740,10 +767,21 @@ public class Http2FrameCodecTest {
Http2FrameStream idleStream = frameCodec.newStream();
final Set<Http2FrameStream> activeStreams = new HashSet<>();
frameCodec.forEachActiveStream(stream -> {
activeStreams.add(stream);
return true;
final AtomicReference<Http2Exception> errorRef = new AtomicReference<>();
channel.eventLoop().execute(() -> {
try {
frameCodec.forEachActiveStream(stream -> {
activeStreams.add(stream);
return true;
});
} catch (Http2Exception e) {
errorRef.set(e);
}
});
Http2Exception exception = errorRef.get();
if (exception != null) {
throw exception;
}
assertEquals(2, activeStreams.size());

View File

@ -31,6 +31,7 @@ import io.netty.handler.codec.http2.Http2Exception.StreamException;
import io.netty.handler.codec.http2.LastInboundHandler.Consumer;
import io.netty.util.AsciiString;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -100,7 +101,6 @@ public abstract class Http2MultiplexTest<C extends Http2FrameCodec> {
parentChannel.pipeline().addLast(multiplexer);
}
parentChannel.runPendingTasks();
parentChannel.pipeline().fireChannelActive();
parentChannel.writeInbound(Http2CodecUtil.connectionPrefaceBuf());
@ -152,8 +152,6 @@ public abstract class Http2MultiplexTest<C extends Http2FrameCodec> {
});
assertTrue(childChannel.isActive());
parentChannel.runPendingTasks();
verify(frameWriter).writeFrame(eq(codec.ctx), eq((byte) 99), eqStreamId(childChannel), any(Http2Flags.class),
any(ByteBuf.class), any(ChannelPromise.class));
}
@ -177,7 +175,6 @@ public abstract class Http2MultiplexTest<C extends Http2FrameCodec> {
};
frameInboundWriter.writeInboundHeaders(streamId, request, 0, endStream);
parentChannel.runPendingTasks();
Http2StreamChannel channel = streamChannelRef.get();
assertEquals(streamId, channel.stream().id());
return channel;
@ -384,8 +381,9 @@ public abstract class Http2MultiplexTest<C extends Http2FrameCodec> {
}
private Http2StreamChannel newOutboundStream(ChannelHandler handler) {
return new Http2StreamChannelBootstrap(parentChannel).handler(handler)
.open().syncUninterruptibly().getNow();
Future<Http2StreamChannel> future = new Http2StreamChannelBootstrap(parentChannel).handler(handler)
.open();
return future.syncUninterruptibly().getNow();
}
/**
@ -400,7 +398,6 @@ public abstract class Http2MultiplexTest<C extends Http2FrameCodec> {
assertTrue(childChannel.isActive());
childChannel.close();
parentChannel.runPendingTasks();
assertFalse(childChannel.isOpen());
assertFalse(childChannel.isActive());
@ -458,7 +455,6 @@ public abstract class Http2MultiplexTest<C extends Http2FrameCodec> {
assertFalse(childChannel.isActive());
childChannel.close();
parentChannel.runPendingTasks();
// The channel was never active so we should not generate a RST frame.
verify(frameWriter, never()).writeRstStream(eqCodecCtx(), eqStreamId(childChannel), anyLong(),
anyChannelPromise());
@ -545,7 +541,6 @@ public abstract class Http2MultiplexTest<C extends Http2FrameCodec> {
// Close the child channel.
childChannel.close();
parentChannel.runPendingTasks();
// An active outbound stream should emit a RST_STREAM frame.
verify(frameWriter).writeRstStream(eqCodecCtx(), eqStreamId(childChannel),
anyLong(), anyChannelPromise());
@ -829,7 +824,6 @@ public abstract class Http2MultiplexTest<C extends Http2FrameCodec> {
});
childChannel.pipeline().fireUserEventTriggered(new Object());
parentChannel.runPendingTasks();
// The events should have happened in this order because the inactive and deregistration events
// get deferred as they do in the AbstractChannel.

View File

@ -75,8 +75,11 @@ public class Http2ServerUpgradeCodecTest {
} else {
codec = new Http2ServerUpgradeCodec((Http2FrameCodec) handler, multiplexer);
}
assertTrue(codec.prepareUpgradeResponse(ctx, request, new DefaultHttpHeaders()));
codec.upgradeTo(ctx, request);
channel.eventLoop().execute(() -> {
assertTrue(codec.prepareUpgradeResponse(ctx, request, new DefaultHttpHeaders()));
codec.upgradeTo(ctx, request);
});
// Flush the channel to ensure we write out all buffered data
channel.flush();

View File

@ -414,7 +414,9 @@ public class DefaultPromiseTest {
try {
final AtomicInteger state = new AtomicInteger();
final CountDownLatch latch1 = new CountDownLatch(1);
final CountDownLatch latch2 = new CountDownLatch(2);
final CountDownLatch latch2 = new CountDownLatch(1);
final CountDownLatch latch3 = new CountDownLatch(1);
final Promise<Void> promise = new DefaultPromise<>(executor);
// Add a listener before completion so "lateListener" is used next time we add a listener.
@ -443,15 +445,16 @@ public class DefaultPromiseTest {
assertTrue(state.compareAndSet(2, 3));
latch2.countDown();
}));
latch2.await();
// Simulate a read operation being queued up in the executor.
executor.execute(() -> {
// This is the key, we depend upon the state being set in the next listener.
assertEquals(3, state.get());
latch2.countDown();
latch3.countDown();
});
latch2.await();
latch3.await();
} finally {
executor.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync();
}

View File

@ -32,13 +32,13 @@ public class FlushConsolidationHandlerTest {
public void testFlushViaScheduledTask() {
final AtomicInteger flushCount = new AtomicInteger();
EmbeddedChannel channel = newChannel(flushCount, true);
// Flushes should not go through immediately, as they're scheduled as an async task
channel.flush();
assertEquals(0, flushCount.get());
channel.flush();
assertEquals(0, flushCount.get());
// Trigger the execution of the async task
channel.runPendingTasks();
channel.eventLoop().execute(() -> {
// Flushes should not go through immediately, as they're scheduled as an async task
channel.flush();
assertEquals(0, flushCount.get());
channel.flush();
assertEquals(0, flushCount.get());
});
assertEquals(1, flushCount.get());
assertFalse(channel.finish());
}
@ -47,11 +47,14 @@ public class FlushConsolidationHandlerTest {
public void testFlushViaThresholdOutsideOfReadLoop() {
final AtomicInteger flushCount = new AtomicInteger();
EmbeddedChannel channel = newChannel(flushCount, true);
// After a given threshold, the async task should be bypassed and a flush should be triggered immediately
for (int i = 0; i < EXPLICIT_FLUSH_AFTER_FLUSHES; i++) {
channel.flush();
}
assertEquals(1, flushCount.get());
channel.eventLoop().execute(() -> {
// After a given threshold, the async task should be bypassed and a flush should be triggered immediately
for (int i = 0; i < EXPLICIT_FLUSH_AFTER_FLUSHES; i++) {
channel.flush();
}
assertEquals(1, flushCount.get());
});
assertFalse(channel.finish());
}

View File

@ -119,7 +119,11 @@ public class LoggingHandlerTest {
channel.config().setWriteBufferHighWaterMark(10);
channel.write("hello", channel.newPromise());
verify(appender).doAppend(argThat(new RegexLogMatcher(".+WRITABILITY CHANGED$")));
// This is expected to be called 3 times:
// - Mark the channel unwritable when schedule the write on the EventLoop.
// - Mark writable when dequeue task
// - Mark unwritable when the write is actual be fired through the pipeline and hit the ChannelOutboundBuffer.
verify(appender, times(3)).doAppend(argThat(new RegexLogMatcher(".+WRITABILITY CHANGED$")));
}
@Test

View File

@ -599,11 +599,7 @@ public class EmbeddedChannel extends AbstractChannel {
recordException(e);
}
try {
embeddedEventLoop.runScheduledTasks();
} catch (Exception e) {
recordException(e);
}
runScheduledPendingTasks();
}
/**
@ -619,6 +615,9 @@ public class EmbeddedChannel extends AbstractChannel {
} catch (Exception e) {
recordException(e);
return embeddedEventLoop.nextScheduledTask();
} finally {
// A scheduled task may put something on the taskQueue so lets run it.
embeddedEventLoop.runTasks();
}
}

View File

@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements EventLoop {
private final Queue<Runnable> tasks = new ArrayDeque<>(2);
private boolean running;
private static EmbeddedChannel cast(Channel channel) {
if (channel instanceof EmbeddedChannel) {
@ -65,28 +66,47 @@ final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements
public void execute(Runnable command) {
requireNonNull(command, "command");
tasks.add(command);
if (!running) {
runTasks();
}
}
void runTasks() {
for (;;) {
Runnable task = tasks.poll();
if (task == null) {
break;
}
boolean wasRunning = running;
try {
for (;;) {
running = true;
Runnable task = tasks.poll();
if (task == null) {
break;
}
task.run();
task.run();
}
} finally {
if (!wasRunning) {
running = false;
}
}
}
long runScheduledTasks() {
long time = AbstractScheduledEventExecutor.nanoTime();
for (;;) {
Runnable task = pollScheduledTask(time);
if (task == null) {
return nextScheduledTaskNano();
}
boolean wasRunning = running;
try {
for (;;) {
running = true;
Runnable task = pollScheduledTask(time);
if (task == null) {
return nextScheduledTaskNano();
}
task.run();
task.run();
}
} finally {
if (!wasRunning) {
running = false;
}
}
}
@ -95,7 +115,12 @@ final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements
}
void cancelScheduled() {
cancelScheduledTasks();
running = true;
try {
cancelScheduledTasks();
} finally {
running = false;
}
}
@Override
@ -136,6 +161,6 @@ final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements
@Override
public boolean inEventLoop(Thread thread) {
return true;
return running;
}
}

View File

@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.CharsetUtil;
import org.junit.Ignore;
import org.junit.Test;
import java.net.SocketAddress;
@ -359,25 +360,27 @@ public class ChannelOutboundBufferTest {
ChannelOutboundBuffer cob = ch.unsafe().outboundBuffer();
// Trigger channelWritabilityChanged() by writing a lot.
ch.write(buffer().writeZero(257));
assertThat(buf.toString(), is("false "));
ch.eventLoop().execute(() -> {
// Trigger channelWritabilityChanged() by writing a lot.
ch.write(buffer().writeZero(257));
assertThat(buf.toString(), is("false "));
// Ensure that setting a user-defined writability flag to false does not trigger channelWritabilityChanged()
cob.setUserDefinedWritability(1, false);
ch.runPendingTasks();
assertThat(buf.toString(), is("false "));
// Ensure that setting a user-defined writability flag to false does not trigger channelWritabilityChanged()
cob.setUserDefinedWritability(1, false);
ch.runPendingTasks();
assertThat(buf.toString(), is("false "));
// Ensure reducing the totalPendingWriteBytes down to zero does not trigger channelWritabilityChanged()
// because of the user-defined writability flag.
ch.flush();
assertThat(cob.totalPendingWriteBytes(), is(0L));
assertThat(buf.toString(), is("false "));
// Ensure reducing the totalPendingWriteBytes down to zero does not trigger channelWritabilityChanged()
// because of the user-defined writability flag.
ch.flush();
assertThat(cob.totalPendingWriteBytes(), is(0L));
assertThat(buf.toString(), is("false "));
// Ensure that setting the user-defined writability flag to true triggers channelWritabilityChanged()
cob.setUserDefinedWritability(1, true);
ch.runPendingTasks();
assertThat(buf.toString(), is("false true "));
// Ensure that setting the user-defined writability flag to true triggers channelWritabilityChanged()
cob.setUserDefinedWritability(1, true);
ch.runPendingTasks();
assertThat(buf.toString(), is("false true "));
});
safeClose(ch);
}

View File

@ -124,8 +124,10 @@ public class PendingWriteQueueTest {
final PendingWriteQueue queue = queueRef.get();
// Trigger channelWritabilityChanged() by adding a message that's larger than the high watermark.
queue.add(msg, channel.newPromise());
channel.eventLoop().execute(() -> {
// Trigger channelWritabilityChanged() by adding a message that's larger than the high watermark.
queue.add(msg, channel.newPromise());
});
channel.finish();
@ -200,11 +202,14 @@ public class PendingWriteQueueTest {
ChannelPromise promise = channel.newPromise();
promise.addListener((ChannelFutureListener) future -> queue.removeAndFailAll(new IllegalStateException()));
queue.add(1L, promise);
ChannelPromise promise2 = channel.newPromise();
queue.add(2L, promise2);
queue.removeAndFailAll(new Exception());
channel.eventLoop().execute(() -> {
queue.add(1L, promise);
queue.add(2L, promise2);
queue.removeAndFailAll(new Exception());
});
assertTrue(promise.isDone());
assertFalse(promise.isSuccess());
assertTrue(promise2.isDone());
@ -227,10 +232,13 @@ public class PendingWriteQueueTest {
ChannelPromise promise = channel.newPromise();
final ChannelPromise promise3 = channel.newPromise();
promise.addListener((ChannelFutureListener) future -> queue.add(3L, promise3));
queue.add(1L, promise);
ChannelPromise promise2 = channel.newPromise();
queue.add(2L, promise2);
queue.removeAndWriteAll();
channel.eventLoop().execute(() -> {
queue.add(1L, promise);
queue.add(2L, promise2);
queue.removeAndWriteAll();
});
assertTrue(promise.isDone());
assertTrue(promise.isSuccess());
@ -257,9 +265,11 @@ public class PendingWriteQueueTest {
final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().lastContext());
ChannelPromise promise = channel.newPromise();
queue.add(1L, promise);
queue.add(2L, channel.voidPromise());
queue.removeAndWriteAll();
channel.eventLoop().execute(() -> {
queue.add(1L, promise);
queue.add(2L, channel.voidPromise());
queue.removeAndWriteAll();
});
assertTrue(channel.finish());
assertTrue(promise.isDone());
@ -281,12 +291,17 @@ public class PendingWriteQueueTest {
failOrder.add(1);
queue.add(3L, promise3);
});
queue.add(1L, promise);
ChannelPromise promise2 = channel.newPromise();
promise2.addListener((ChannelFutureListener) future -> failOrder.add(2));
queue.add(2L, promise2);
queue.removeAndFailAll(new Exception());
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
queue.add(1L, promise);
queue.add(2L, promise2);
queue.removeAndFailAll(new Exception());
}
});
assertTrue(promise.isDone());
assertFalse(promise.isSuccess());
assertTrue(promise2.isDone());
@ -306,11 +321,15 @@ public class PendingWriteQueueTest {
ChannelPromise promise = channel.newPromise();
promise.addListener((ChannelFutureListener) future -> queue.removeAndWriteAll());
queue.add(1L, promise);
ChannelPromise promise2 = channel.newPromise();
queue.add(2L, promise2);
queue.removeAndWriteAll();
channel.eventLoop().execute(() -> {
queue.add(1L, promise);
queue.add(2L, promise2);
queue.removeAndWriteAll();
});
channel.flush();
assertTrue(promise.isSuccess());
assertTrue(promise2.isSuccess());
@ -333,8 +352,10 @@ public class PendingWriteQueueTest {
IllegalStateException ex = new IllegalStateException();
ChannelPromise promise = channel.newPromise();
queue.add(1L, promise);
queue.removeAndFailAll(ex);
channel.eventLoop().execute(() -> {
queue.add(1L, promise);
queue.removeAndFailAll(ex);
});
assertSame(ex, promise.cause());
}