2013-08-21 16:12:58 +02:00
|
|
|
/*
|
|
|
|
* Copyright 2012 The Netty Project
|
|
|
|
*
|
|
|
|
* The Netty Project licenses this file to you under the Apache License,
|
|
|
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
|
|
* with the License. You may obtain a copy of the License at:
|
|
|
|
*
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
*
|
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
|
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
|
|
* License for the specific language governing permissions and limitations
|
|
|
|
* under the License.
|
|
|
|
*/
|
|
|
|
package io.netty.channel;
|
|
|
|
|
|
|
|
import io.netty.bootstrap.Bootstrap;
|
|
|
|
import io.netty.bootstrap.ServerBootstrap;
|
|
|
|
import io.netty.channel.LoggingHandler.Event;
|
|
|
|
import io.netty.channel.local.LocalAddress;
|
2019-01-28 16:50:15 +01:00
|
|
|
|
2013-08-21 16:12:58 +02:00
|
|
|
import org.junit.Test;
|
|
|
|
|
2014-02-13 22:55:39 +01:00
|
|
|
import java.nio.channels.ClosedChannelException;
|
|
|
|
|
|
|
|
import static org.junit.Assert.*;
|
|
|
|
|
2016-04-08 23:59:42 +02:00
|
|
|
public class ReentrantChannelTest extends BaseChannelTest {
|
2013-08-21 16:12:58 +02:00
|
|
|
|
2016-04-08 23:59:42 +02:00
|
|
|
@Test
|
|
|
|
public void testWritabilityChanged() throws Exception {
|
2013-08-21 16:12:58 +02:00
|
|
|
|
2016-04-08 23:59:42 +02:00
|
|
|
LocalAddress addr = new LocalAddress("testWritabilityChanged");
|
2013-08-21 16:12:58 +02:00
|
|
|
|
2016-04-08 23:59:42 +02:00
|
|
|
ServerBootstrap sb = getLocalServerBootstrap();
|
|
|
|
sb.bind(addr).sync().channel();
|
2013-08-21 16:12:58 +02:00
|
|
|
|
2016-04-08 23:59:42 +02:00
|
|
|
Bootstrap cb = getLocalClientBootstrap();
|
2013-08-21 16:12:58 +02:00
|
|
|
|
2016-04-08 23:59:42 +02:00
|
|
|
setInterest(Event.WRITE, Event.FLUSH, Event.WRITABILITY);
|
2013-08-21 16:12:58 +02:00
|
|
|
|
2016-04-08 23:59:42 +02:00
|
|
|
Channel clientChannel = cb.connect(addr).sync().channel();
|
|
|
|
clientChannel.config().setWriteBufferLowWaterMark(512);
|
|
|
|
clientChannel.config().setWriteBufferHighWaterMark(1024);
|
2014-12-10 10:36:53 +01:00
|
|
|
|
2016-04-08 23:59:42 +02:00
|
|
|
// What is supposed to happen from this point:
|
|
|
|
//
|
|
|
|
// 1. Because this write attempt has been made from a non-I/O thread,
|
|
|
|
// ChannelOutboundBuffer.pendingWriteBytes will be increased before
|
|
|
|
// write() event is really evaluated.
|
|
|
|
// -> channelWritabilityChanged() will be triggered,
|
|
|
|
// because the Channel became unwritable.
|
|
|
|
//
|
|
|
|
// 2. The write() event is handled by the pipeline in an I/O thread.
|
|
|
|
// -> write() will be triggered.
|
|
|
|
//
|
|
|
|
// 3. Once the write() event is handled, ChannelOutboundBuffer.pendingWriteBytes
|
|
|
|
// will be decreased.
|
|
|
|
// -> channelWritabilityChanged() will be triggered,
|
|
|
|
// because the Channel became writable again.
|
|
|
|
//
|
|
|
|
// 4. The message is added to the ChannelOutboundBuffer and thus
|
|
|
|
// pendingWriteBytes will be increased again.
|
|
|
|
// -> channelWritabilityChanged() will be triggered.
|
|
|
|
//
|
|
|
|
// 5. The flush() event causes the write request in theChannelOutboundBuffer
|
|
|
|
// to be removed.
|
|
|
|
// -> flush() and channelWritabilityChanged() will be triggered.
|
|
|
|
//
|
|
|
|
// Note that the channelWritabilityChanged() in the step 4 can occur between
|
2017-04-19 22:37:03 +02:00
|
|
|
// the flush() and the channelWritabilityChanged() in the step 5, because
|
2016-04-08 23:59:42 +02:00
|
|
|
// the flush() is invoked from a non-I/O thread while the other are from
|
|
|
|
// an I/O thread.
|
2014-12-10 10:36:53 +01:00
|
|
|
|
2016-04-08 23:59:42 +02:00
|
|
|
ChannelFuture future = clientChannel.write(createTestBuf(2000));
|
2013-08-21 16:12:58 +02:00
|
|
|
|
2016-04-08 23:59:42 +02:00
|
|
|
clientChannel.flush();
|
|
|
|
future.sync();
|
2013-08-21 16:12:58 +02:00
|
|
|
|
2016-04-08 23:59:42 +02:00
|
|
|
clientChannel.close().sync();
|
2013-08-21 16:12:58 +02:00
|
|
|
|
2016-04-08 23:59:42 +02:00
|
|
|
assertLog(
|
|
|
|
// Case 1:
|
|
|
|
"WRITABILITY: writable=false\n" +
|
|
|
|
"WRITE\n" +
|
|
|
|
"WRITABILITY: writable=false\n" +
|
|
|
|
"WRITABILITY: writable=false\n" +
|
|
|
|
"FLUSH\n" +
|
|
|
|
"WRITABILITY: writable=true\n",
|
|
|
|
// Case 2:
|
|
|
|
"WRITABILITY: writable=false\n" +
|
|
|
|
"WRITE\n" +
|
|
|
|
"WRITABILITY: writable=false\n" +
|
|
|
|
"FLUSH\n" +
|
|
|
|
"WRITABILITY: writable=true\n" +
|
|
|
|
"WRITABILITY: writable=true\n");
|
2016-04-01 11:45:43 +02:00
|
|
|
}
|
2013-08-21 16:12:58 +02:00
|
|
|
|
2016-04-08 23:59:42 +02:00
|
|
|
/**
|
|
|
|
* Similar to {@link #testWritabilityChanged()} with slight variation.
|
|
|
|
*/
|
2016-04-01 11:45:43 +02:00
|
|
|
@Test
|
2016-04-08 23:59:42 +02:00
|
|
|
public void testFlushInWritabilityChanged() throws Exception {
|
2013-08-21 16:12:58 +02:00
|
|
|
|
2016-04-01 11:45:43 +02:00
|
|
|
LocalAddress addr = new LocalAddress("testFlushInWritabilityChanged");
|
2016-04-08 23:59:42 +02:00
|
|
|
|
2013-08-21 16:12:58 +02:00
|
|
|
ServerBootstrap sb = getLocalServerBootstrap();
|
2016-04-08 23:59:42 +02:00
|
|
|
sb.bind(addr).sync().channel();
|
2013-08-21 16:12:58 +02:00
|
|
|
|
|
|
|
Bootstrap cb = getLocalClientBootstrap();
|
|
|
|
|
|
|
|
setInterest(Event.WRITE, Event.FLUSH, Event.WRITABILITY);
|
|
|
|
|
|
|
|
Channel clientChannel = cb.connect(addr).sync().channel();
|
|
|
|
clientChannel.config().setWriteBufferLowWaterMark(512);
|
|
|
|
clientChannel.config().setWriteBufferHighWaterMark(1024);
|
|
|
|
|
2019-03-28 10:28:27 +01:00
|
|
|
clientChannel.pipeline().addLast(new ChannelHandler() {
|
2016-04-08 23:59:42 +02:00
|
|
|
@Override
|
|
|
|
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
|
|
|
|
if (!ctx.channel().isWritable()) {
|
|
|
|
ctx.channel().flush();
|
|
|
|
}
|
|
|
|
ctx.fireChannelWritabilityChanged();
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
2013-08-21 16:12:58 +02:00
|
|
|
assertTrue(clientChannel.isWritable());
|
2016-04-08 23:59:42 +02:00
|
|
|
|
|
|
|
clientChannel.write(createTestBuf(2000)).sync();
|
2013-08-23 15:38:30 +02:00
|
|
|
clientChannel.close().sync();
|
2013-08-21 16:12:58 +02:00
|
|
|
|
|
|
|
assertLog(
|
2016-04-08 23:59:42 +02:00
|
|
|
// Case 1:
|
|
|
|
"WRITABILITY: writable=false\n" +
|
|
|
|
"FLUSH\n" +
|
2016-04-01 11:45:43 +02:00
|
|
|
"WRITE\n" +
|
2014-12-10 10:36:53 +01:00
|
|
|
"WRITABILITY: writable=false\n" +
|
2016-04-08 23:59:42 +02:00
|
|
|
"WRITABILITY: writable=false\n" +
|
|
|
|
"FLUSH\n" +
|
|
|
|
"WRITABILITY: writable=true\n",
|
|
|
|
// Case 2:
|
|
|
|
"WRITABILITY: writable=false\n" +
|
|
|
|
"FLUSH\n" +
|
2014-12-10 10:36:53 +01:00
|
|
|
"WRITE\n" +
|
|
|
|
"WRITABILITY: writable=false\n" +
|
|
|
|
"FLUSH\n" +
|
2016-04-08 23:59:42 +02:00
|
|
|
"WRITABILITY: writable=true\n" +
|
|
|
|
"WRITABILITY: writable=true\n");
|
2013-08-21 16:12:58 +02:00
|
|
|
}
|
|
|
|
|
2014-02-13 22:55:39 +01:00
|
|
|
@Test
|
|
|
|
public void testWriteFlushPingPong() throws Exception {
|
|
|
|
|
|
|
|
LocalAddress addr = new LocalAddress("testWriteFlushPingPong");
|
|
|
|
|
|
|
|
ServerBootstrap sb = getLocalServerBootstrap();
|
2016-04-08 23:59:42 +02:00
|
|
|
sb.bind(addr).sync().channel();
|
2014-02-13 22:55:39 +01:00
|
|
|
|
|
|
|
Bootstrap cb = getLocalClientBootstrap();
|
|
|
|
|
|
|
|
setInterest(Event.WRITE, Event.FLUSH, Event.CLOSE, Event.EXCEPTION);
|
|
|
|
|
|
|
|
Channel clientChannel = cb.connect(addr).sync().channel();
|
|
|
|
|
2019-03-28 10:28:27 +01:00
|
|
|
clientChannel.pipeline().addLast(new ChannelHandler() {
|
2014-02-13 22:55:39 +01:00
|
|
|
|
|
|
|
int writeCount;
|
|
|
|
int flushCount;
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
|
|
|
|
if (writeCount < 5) {
|
|
|
|
writeCount++;
|
|
|
|
ctx.channel().flush();
|
|
|
|
}
|
2019-03-13 09:46:10 +01:00
|
|
|
ctx.write(msg, promise);
|
2014-02-13 22:55:39 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void flush(ChannelHandlerContext ctx) throws Exception {
|
|
|
|
if (flushCount < 5) {
|
|
|
|
flushCount++;
|
|
|
|
ctx.channel().write(createTestBuf(2000));
|
|
|
|
}
|
2019-03-13 09:46:10 +01:00
|
|
|
ctx.flush();
|
2014-02-13 22:55:39 +01:00
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
clientChannel.writeAndFlush(createTestBuf(2000));
|
|
|
|
clientChannel.close().sync();
|
|
|
|
|
|
|
|
assertLog(
|
|
|
|
"WRITE\n" +
|
|
|
|
"FLUSH\n" +
|
|
|
|
"WRITE\n" +
|
|
|
|
"FLUSH\n" +
|
|
|
|
"WRITE\n" +
|
|
|
|
"FLUSH\n" +
|
|
|
|
"WRITE\n" +
|
|
|
|
"FLUSH\n" +
|
|
|
|
"WRITE\n" +
|
|
|
|
"FLUSH\n" +
|
|
|
|
"WRITE\n" +
|
|
|
|
"FLUSH\n" +
|
|
|
|
"CLOSE\n");
|
|
|
|
}
|
|
|
|
|
|
|
|
@Test
|
|
|
|
public void testCloseInFlush() throws Exception {
|
|
|
|
|
|
|
|
LocalAddress addr = new LocalAddress("testCloseInFlush");
|
|
|
|
|
|
|
|
ServerBootstrap sb = getLocalServerBootstrap();
|
2016-04-08 23:59:42 +02:00
|
|
|
sb.bind(addr).sync().channel();
|
2014-02-13 22:55:39 +01:00
|
|
|
|
|
|
|
Bootstrap cb = getLocalClientBootstrap();
|
|
|
|
|
|
|
|
setInterest(Event.WRITE, Event.FLUSH, Event.CLOSE, Event.EXCEPTION);
|
|
|
|
|
|
|
|
Channel clientChannel = cb.connect(addr).sync().channel();
|
|
|
|
|
2019-03-28 10:28:27 +01:00
|
|
|
clientChannel.pipeline().addLast(new ChannelHandler() {
|
2014-02-13 22:55:39 +01:00
|
|
|
|
|
|
|
@Override
|
|
|
|
public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
|
2019-01-25 10:51:05 +01:00
|
|
|
promise.addListener(future -> ctx.channel().close());
|
2019-03-13 09:46:10 +01:00
|
|
|
ctx.write(msg, promise);
|
2014-02-13 22:55:39 +01:00
|
|
|
ctx.channel().flush();
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
clientChannel.write(createTestBuf(2000)).sync();
|
|
|
|
clientChannel.closeFuture().sync();
|
|
|
|
|
|
|
|
assertLog("WRITE\nFLUSH\nCLOSE\n");
|
|
|
|
}
|
|
|
|
|
|
|
|
@Test
|
|
|
|
public void testFlushFailure() throws Exception {
|
|
|
|
|
|
|
|
LocalAddress addr = new LocalAddress("testFlushFailure");
|
|
|
|
|
|
|
|
ServerBootstrap sb = getLocalServerBootstrap();
|
2016-04-08 23:59:42 +02:00
|
|
|
sb.bind(addr).sync().channel();
|
2014-02-13 22:55:39 +01:00
|
|
|
|
|
|
|
Bootstrap cb = getLocalClientBootstrap();
|
|
|
|
|
|
|
|
setInterest(Event.WRITE, Event.FLUSH, Event.CLOSE, Event.EXCEPTION);
|
|
|
|
|
|
|
|
Channel clientChannel = cb.connect(addr).sync().channel();
|
|
|
|
|
2019-03-28 10:28:27 +01:00
|
|
|
clientChannel.pipeline().addLast(new ChannelHandler() {
|
2014-02-13 22:55:39 +01:00
|
|
|
|
|
|
|
@Override
|
|
|
|
public void flush(ChannelHandlerContext ctx) throws Exception {
|
|
|
|
throw new Exception("intentional failure");
|
|
|
|
}
|
|
|
|
|
2019-03-28 10:28:27 +01:00
|
|
|
}, new ChannelHandler() {
|
2014-02-13 22:55:39 +01:00
|
|
|
@Override
|
2019-01-31 20:29:17 +01:00
|
|
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
2014-02-13 22:55:39 +01:00
|
|
|
ctx.close();
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
try {
|
|
|
|
clientChannel.writeAndFlush(createTestBuf(2000)).sync();
|
|
|
|
fail();
|
|
|
|
} catch (Throwable cce) {
|
|
|
|
// FIXME: shouldn't this contain the "intentional failure" exception?
|
|
|
|
assertEquals(ClosedChannelException.class, cce.getClass());
|
|
|
|
}
|
|
|
|
|
|
|
|
clientChannel.closeFuture().sync();
|
|
|
|
|
|
|
|
assertLog("WRITE\nCLOSE\n");
|
|
|
|
}
|
2013-08-21 16:12:58 +02:00
|
|
|
}
|