Backport the tests in ReentrantChannelTest
- Originally from c149f4bcc0
by @wgallagher
This commit is contained in:
parent
69a36b8bea
commit
faaff9bd86
@ -15,14 +15,18 @@
|
|||||||
*/
|
*/
|
||||||
package io.netty.channel;
|
package io.netty.channel;
|
||||||
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import io.netty.bootstrap.Bootstrap;
|
import io.netty.bootstrap.Bootstrap;
|
||||||
import io.netty.bootstrap.ServerBootstrap;
|
import io.netty.bootstrap.ServerBootstrap;
|
||||||
import io.netty.channel.LoggingHandler.Event;
|
import io.netty.channel.LoggingHandler.Event;
|
||||||
import io.netty.channel.local.LocalAddress;
|
import io.netty.channel.local.LocalAddress;
|
||||||
|
import io.netty.util.concurrent.Future;
|
||||||
|
import io.netty.util.concurrent.GenericFutureListener;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.nio.channels.ClosedChannelException;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
public class ReentrantChannelTest extends BaseChannelTest {
|
public class ReentrantChannelTest extends BaseChannelTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -96,4 +100,136 @@ public class ReentrantChannelTest extends BaseChannelTest {
|
|||||||
"WRITABILITY: writable=true\n");
|
"WRITABILITY: writable=true\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWriteFlushPingPong() throws Exception {
|
||||||
|
|
||||||
|
LocalAddress addr = new LocalAddress("testWriteFlushPingPong");
|
||||||
|
|
||||||
|
ServerBootstrap sb = getLocalServerBootstrap();
|
||||||
|
sb.bind(addr).sync().channel();
|
||||||
|
|
||||||
|
Bootstrap cb = getLocalClientBootstrap();
|
||||||
|
|
||||||
|
setInterest(Event.WRITE, Event.FLUSH, Event.CLOSE, Event.EXCEPTION);
|
||||||
|
|
||||||
|
Channel clientChannel = cb.connect(addr).sync().channel();
|
||||||
|
|
||||||
|
clientChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
|
||||||
|
|
||||||
|
int writeCount;
|
||||||
|
int flushCount;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
|
||||||
|
if (writeCount < 5) {
|
||||||
|
writeCount++;
|
||||||
|
ctx.channel().flush();
|
||||||
|
}
|
||||||
|
super.write(ctx, msg, promise);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void flush(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
if (flushCount < 5) {
|
||||||
|
flushCount++;
|
||||||
|
ctx.channel().write(createTestBuf(2000));
|
||||||
|
}
|
||||||
|
super.flush(ctx);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
clientChannel.writeAndFlush(createTestBuf(2000));
|
||||||
|
clientChannel.close().sync();
|
||||||
|
|
||||||
|
assertLog(
|
||||||
|
"WRITE\n" +
|
||||||
|
"FLUSH\n" +
|
||||||
|
"WRITE\n" +
|
||||||
|
"FLUSH\n" +
|
||||||
|
"WRITE\n" +
|
||||||
|
"FLUSH\n" +
|
||||||
|
"WRITE\n" +
|
||||||
|
"FLUSH\n" +
|
||||||
|
"WRITE\n" +
|
||||||
|
"FLUSH\n" +
|
||||||
|
"WRITE\n" +
|
||||||
|
"FLUSH\n" +
|
||||||
|
"CLOSE\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCloseInFlush() throws Exception {
|
||||||
|
|
||||||
|
LocalAddress addr = new LocalAddress("testCloseInFlush");
|
||||||
|
|
||||||
|
ServerBootstrap sb = getLocalServerBootstrap();
|
||||||
|
sb.bind(addr).sync().channel();
|
||||||
|
|
||||||
|
Bootstrap cb = getLocalClientBootstrap();
|
||||||
|
|
||||||
|
setInterest(Event.WRITE, Event.FLUSH, Event.CLOSE, Event.EXCEPTION);
|
||||||
|
|
||||||
|
Channel clientChannel = cb.connect(addr).sync().channel();
|
||||||
|
|
||||||
|
clientChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
|
||||||
|
promise.addListener(new GenericFutureListener<Future<? super Void>>() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(Future<? super Void> future) throws Exception {
|
||||||
|
ctx.channel().close();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
super.write(ctx, msg, promise);
|
||||||
|
ctx.channel().flush();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
clientChannel.write(createTestBuf(2000)).sync();
|
||||||
|
clientChannel.closeFuture().sync();
|
||||||
|
|
||||||
|
assertLog("WRITE\nFLUSH\nCLOSE\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFlushFailure() throws Exception {
|
||||||
|
|
||||||
|
LocalAddress addr = new LocalAddress("testFlushFailure");
|
||||||
|
|
||||||
|
ServerBootstrap sb = getLocalServerBootstrap();
|
||||||
|
sb.bind(addr).sync().channel();
|
||||||
|
|
||||||
|
Bootstrap cb = getLocalClientBootstrap();
|
||||||
|
|
||||||
|
setInterest(Event.WRITE, Event.FLUSH, Event.CLOSE, Event.EXCEPTION);
|
||||||
|
|
||||||
|
Channel clientChannel = cb.connect(addr).sync().channel();
|
||||||
|
|
||||||
|
clientChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void flush(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
throw new Exception("intentional failure");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||||
|
ctx.close();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
try {
|
||||||
|
clientChannel.writeAndFlush(createTestBuf(2000)).sync();
|
||||||
|
fail();
|
||||||
|
} catch (Throwable cce) {
|
||||||
|
// FIXME: shouldn't this contain the "intentional failure" exception?
|
||||||
|
assertEquals(ClosedChannelException.class, cce.getClass());
|
||||||
|
}
|
||||||
|
|
||||||
|
clientChannel.closeFuture().sync();
|
||||||
|
|
||||||
|
assertLog("WRITE\nCLOSE\n");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user