fix writability callback

This commit is contained in:
bgallagher 2013-08-21 10:12:58 -04:00 committed by Norman Maurer
parent 217b8e255c
commit fb619f2394
5 changed files with 383 additions and 37 deletions

View File

@ -120,7 +120,7 @@ public final class ChannelOutboundBuffer {
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
incrementPendingOutboundBytes(size);
incrementPendingOutboundBytes(size, true);
}
private void addCapacity() {
@ -155,7 +155,7 @@ public final class ChannelOutboundBuffer {
* Increment the pending bytes which will be written at some point.
* This method is thread-safe!
*/
void incrementPendingOutboundBytes(int size) {
void incrementPendingOutboundBytes(int size, boolean fireEvent) {
// Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets
// recycled while process this method.
Channel channel = this.channel;
@ -174,7 +174,9 @@ public final class ChannelOutboundBuffer {
if (newWriteBufferSize > highWaterMark) {
if (WRITABLE_UPDATER.compareAndSet(this, 1, 0)) {
channel.pipeline().fireChannelWritabilityChanged();
if (fireEvent) {
channel.pipeline().fireChannelWritabilityChanged();
}
}
}
}
@ -183,7 +185,7 @@ public final class ChannelOutboundBuffer {
* Decrement the pending bytes which will be written at some point.
* This method is thread-safe!
*/
void decrementPendingOutboundBytes(int size) {
void decrementPendingOutboundBytes(int size, boolean fireEvent) {
// Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets
// recycled while process this method.
Channel channel = this.channel;
@ -202,7 +204,9 @@ public final class ChannelOutboundBuffer {
if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
if (WRITABLE_UPDATER.compareAndSet(this, 0, 1)) {
channel.pipeline().fireChannelWritabilityChanged();
if (fireEvent) {
channel.pipeline().fireChannelWritabilityChanged();
}
}
}
}
@ -259,7 +263,7 @@ public final class ChannelOutboundBuffer {
safeRelease(msg);
promise.trySuccess();
decrementPendingOutboundBytes(size);
decrementPendingOutboundBytes(size, true);
return true;
}
@ -285,7 +289,7 @@ public final class ChannelOutboundBuffer {
safeRelease(msg);
safeFail(promise, cause);
decrementPendingOutboundBytes(size);
decrementPendingOutboundBytes(size, true);
return true;
}

View File

@ -15,6 +15,7 @@
*/
package io.netty.channel;
import static io.netty.channel.DefaultChannelPipeline.logger;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.DefaultAttributeMap;
import io.netty.util.Recycler;
@ -24,8 +25,6 @@ import io.netty.util.internal.StringUtil;
import java.net.SocketAddress;
import static io.netty.channel.DefaultChannelPipeline.*;
final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext {
volatile DefaultChannelHandlerContext next;
@ -48,9 +47,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
private Runnable invokeFlushTask;
private Runnable invokeChannelWritableStateChangedTask;
@SuppressWarnings("unchecked")
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutorGroup group, String name, ChannelHandler handler) {
DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutorGroup group, String name,
ChannelHandler handler) {
if (name == null) {
throw new NullPointerException("name");
@ -628,15 +626,10 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
if (msg == null) {
throw new NullPointerException("msg");
}
validatePromise(promise, true);
final DefaultChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeWrite(msg, promise);
} else {
submitWriteTask(next, executor, msg, false, promise);
}
write(msg, false, promise);
return promise;
}
@ -684,31 +677,34 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
if (msg == null) {
throw new NullPointerException("msg");
}
validatePromise(promise, true);
final DefaultChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeWrite(msg, promise);
next.invokeFlush();
} else {
submitWriteTask(next, executor, msg, true, promise);
}
write(msg, true, promise);
return promise;
}
private void submitWriteTask(DefaultChannelHandlerContext next, EventExecutor executor,
Object msg, boolean flush, ChannelPromise promise) {
final int size = channel.estimatorHandle().size(msg);
if (size > 0) {
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.incrementPendingOutboundBytes(size);
private void write(Object msg, boolean flush, ChannelPromise promise) {
DefaultChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeWrite(msg, promise);
if (flush) {
next.invokeFlush();
}
} else {
int size = channel.estimatorHandle().size(msg);
if (size > 0) {
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.incrementPendingOutboundBytes(size, false);
}
}
executor.execute(WriteTask.newInstance(next, msg, size, flush, promise));
}
executor.execute(WriteTask.newInstance(next, msg, size, flush, promise));
}
@Override
@ -884,7 +880,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
ChannelOutboundBuffer buffer = ctx.channel.unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.decrementPendingOutboundBytes(size);
buffer.decrementPendingOutboundBytes(size, false);
}
}
ctx.invokeWrite(msg, promise);

View File

@ -0,0 +1,80 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import static org.junit.Assert.assertEquals;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.local.LocalServerChannel;
class BaseChannelTest {
private final LoggingHandler loggingHandler;
BaseChannelTest() {
this.loggingHandler = new LoggingHandler();
}
ServerBootstrap getLocalServerBootstrap() {
EventLoopGroup serverGroup = new LocalEventLoopGroup();
ServerBootstrap sb = new ServerBootstrap();
sb.group(serverGroup);
sb.channel(LocalServerChannel.class);
sb.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
}
});
return sb;
}
Bootstrap getLocalClientBootstrap() {
EventLoopGroup clientGroup = new LocalEventLoopGroup();
Bootstrap cb = new Bootstrap();
cb.channel(LocalChannel.class);
cb.group(clientGroup);
cb.handler(this.loggingHandler);
return cb;
}
static ByteBuf createTestBuf(int len) {
ByteBuf buf = Unpooled.buffer(len, len);
buf.setIndex(0, len);
return buf;
}
void assertLog(String expected) {
String actual = this.loggingHandler.getLog();
assertEquals(expected, actual);
}
void clearLog() {
this.loggingHandler.clear();
}
void setInterest(LoggingHandler.Event... events) {
this.loggingHandler.setInterest(events);
}
}

View File

@ -0,0 +1,172 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.EnumSet;
final class LoggingHandler implements ChannelInboundHandler, ChannelOutboundHandler {
static enum Event { WRITE, FLUSH, BIND, CONNECT, DISCONNECT, CLOSE, DEREGISTER, READ, WRITABILITY,
HANDLER_ADDED, HANDLER_REMOVED, EXCEPTION, READ_COMPLETE, REGISTERED, UNREGISTERED, ACTIVE, INACTIVE,
USER };
private StringBuilder log = new StringBuilder();
private final EnumSet<LoggingHandler.Event> interest = EnumSet.allOf(LoggingHandler.Event.class);
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log(Event.WRITE);
ctx.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
log(Event.FLUSH);
ctx.flush();
}
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
log(Event.BIND, "localAddress=" + localAddress);
ctx.bind(localAddress, promise);
}
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
log(Event.CONNECT, "remoteAddress=" + remoteAddress + " localAddress=" + localAddress);
ctx.connect(remoteAddress, localAddress, promise);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
log(Event.DISCONNECT);
ctx.disconnect(promise);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
log(Event.CLOSE);
ctx.close(promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
log(Event.DEREGISTER);
ctx.deregister(promise);
}
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
log(Event.READ);
ctx.read();
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
log(Event.WRITABILITY, "writable=" + ctx.channel().isWritable());
ctx.fireChannelWritabilityChanged();
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log(Event.HANDLER_ADDED);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
log(Event.HANDLER_REMOVED);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log(Event.EXCEPTION, cause.toString());
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
log(Event.REGISTERED);
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
log(Event.UNREGISTERED);
ctx.fireChannelUnregistered();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log(Event.ACTIVE);
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log(Event.INACTIVE);
ctx.fireChannelInactive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log(Event.READ);
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
log(Event.READ_COMPLETE);
ctx.fireChannelReadComplete();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
log(Event.USER, evt.toString());
ctx.fireUserEventTriggered(evt);
}
String getLog() {
return this.log.toString();
}
void clear() {
this.log = new StringBuilder();
}
void setInterest(LoggingHandler.Event... events) {
this.interest.clear();
Collections.addAll(this.interest, events);
}
private void log(LoggingHandler.Event e) {
log(e, null);
}
private void log(LoggingHandler.Event e, String msg) {
if (this.interest.contains(e)) {
this.log.append(e);
if (msg != null) {
this.log.append(": ").append(msg);
}
this.log.append('\n');
}
}
}

View File

@ -0,0 +1,94 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import static org.junit.Assert.assertTrue;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.LoggingHandler.Event;
import io.netty.channel.local.LocalAddress;
import org.junit.Test;
public class ReentrantChannelTest extends BaseChannelTest {
@Test
public void testWritabilityChanged() throws Exception {
LocalAddress addr = new LocalAddress("testWritabilityChanged");
ServerBootstrap sb = getLocalServerBootstrap();
sb.bind(addr).sync().channel();
Bootstrap cb = getLocalClientBootstrap();
setInterest(Event.WRITE, Event.FLUSH, Event.WRITABILITY);
Channel clientChannel = cb.connect(addr).sync().channel();
clientChannel.config().setWriteBufferLowWaterMark(512);
clientChannel.config().setWriteBufferHighWaterMark(1024);
ChannelFuture future = clientChannel.write(createTestBuf(2000));
clientChannel.flush();
future.sync();
clientChannel.close().sync();
assertLog(
"WRITE\n" +
"WRITABILITY: writable=false\n" +
"FLUSH\n" +
"WRITABILITY: writable=true\n");
}
@Test
public void testFlushInWritabilityChanged() throws Exception {
LocalAddress addr = new LocalAddress("testFlushInWritabilityChanged");
ServerBootstrap sb = getLocalServerBootstrap();
sb.bind(addr).sync().channel();
Bootstrap cb = getLocalClientBootstrap();
setInterest(Event.WRITE, Event.FLUSH, Event.WRITABILITY);
Channel clientChannel = cb.connect(addr).sync().channel();
clientChannel.config().setWriteBufferLowWaterMark(512);
clientChannel.config().setWriteBufferHighWaterMark(1024);
clientChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
if (!ctx.channel().isWritable()) {
ctx.channel().flush();
}
ctx.fireChannelWritabilityChanged();
}
});
assertTrue(clientChannel.isWritable());
clientChannel.write(createTestBuf(2000)).sync();
assertTrue(clientChannel.isWritable());
assertLog(
"WRITE\n" +
"WRITABILITY: writable=false\n" +
"FLUSH\n" +
"WRITABILITY: writable=true\n");
}
}