Limit future notification stack depth / Robost writeCounter management

- Also ported the discard example while testing this commit
This commit is contained in:
Trustin Lee 2012-05-28 05:05:49 -07:00
parent a2698e65fb
commit e48281471b
12 changed files with 258 additions and 240 deletions

View File

@ -15,15 +15,12 @@
*/
package io.netty.example.discard;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import io.netty.bootstrap.ClientBootstrap;
import io.netty.channel.ChannelBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.Channels;
import io.netty.channel.socket.nio.NioClientSocketChannelFactory;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* Keeps sending random data to the specified address.
@ -40,28 +37,28 @@ public class DiscardClient {
this.firstMessageSize = firstMessageSize;
}
public void run() {
// Configure the client.
ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool()));
public void run() throws Exception {
ChannelBootstrap b = new ChannelBootstrap();
try {
b.eventLoop(new NioEventLoop())
.channel(new NioSocketChannel())
.remoteAddress(host, port)
.initializer(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardClientHandler(firstMessageSize));
}
});
// Set up the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
new DiscardClientHandler(firstMessageSize));
}
});
// Make the connection attempt.
ChannelFuture f = b.connect().sync();
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
// Wait until the connection is closed.
f.channel().closeFuture().sync();
// Wait until the connection is closed or the connection attempt fails.
future.channel().getCloseFuture().awaitUninterruptibly();
// Shut down thread pools to exit.
bootstrap.releaseExternalResources();
} finally {
b.shutdown();
}
}
public static void main(String[] args) throws Exception {

View File

@ -15,31 +15,26 @@
*/
package io.netty.example.discard;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInboundStreamHandlerAdapter;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelState;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.channel.WriteCompletionEvent;
/**
* Handles a client-side channel.
*/
public class DiscardClientHandler extends SimpleChannelUpstreamHandler {
public class DiscardClientHandler extends ChannelInboundStreamHandlerAdapter {
private static final Logger logger = Logger.getLogger(
DiscardClientHandler.class.getName());
private long transferredBytes;
private final byte[] content;
private ChannelInboundHandlerContext<Byte> ctx;
private ChannelBuffer out;
public DiscardClientHandler(int messageSize) {
if (messageSize <= 0) {
@ -49,70 +44,55 @@ public class DiscardClientHandler extends SimpleChannelUpstreamHandler {
content = new byte[messageSize];
}
public long getTransferredBytes() {
return transferredBytes;
}
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
if (((ChannelStateEvent) e).getState() != ChannelState.INTEREST_OPS) {
logger.info(e.toString());
}
}
// Let SimpleChannelHandler call actual event handler methods below.
super.handleUpstream(ctx, e);
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
public void channelActive(ChannelInboundHandlerContext<Byte> ctx)
throws Exception {
this.ctx = ctx;
out = ctx.out().byteBuffer();
// Send the initial messages.
generateTraffic(e);
generateTraffic();
}
@Override
public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {
// Keep sending messages whenever the current socket buffer has room.
generateTraffic(e);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
public void inboundBufferUpdated(ChannelInboundHandlerContext<Byte> ctx)
throws Exception {
// Server is supposed to send nothing. Therefore, do nothing.
}
@Override
public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) {
transferredBytes += e.getWrittenAmount();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
public void exceptionCaught(ChannelInboundHandlerContext<Byte> ctx,
Throwable cause) throws Exception {
// Close the connection when an exception is raised.
logger.log(
Level.WARNING,
"Unexpected exception from downstream.",
e.cause());
e.channel().close();
cause);
ctx.close();
}
private void generateTraffic(ChannelStateEvent e) {
// Keep generating traffic until the channel is unwritable.
// A channel becomes unwritable when its internal buffer is full.
// If you keep writing messages ignoring this property,
// you will end up with an OutOfMemoryError.
Channel channel = e.channel();
while (channel.isWritable()) {
ChannelBuffer m = nextMessage();
if (m == null) {
break;
}
channel.write(m);
long counter;
private void generateTraffic() {
// Fill the outbound buffer up to 64KiB
while (out.readableBytes() < 65536) {
out.writeBytes(content);
}
// Flush the outbound buffer to the socket.
// Once flushed, generate the same amount of traffic again.
ctx.flush().addListener(GENERATE_TRAFFIC);
}
private ChannelBuffer nextMessage() {
return ChannelBuffers.wrappedBuffer(content);
}
private final ChannelFutureListener GENERATE_TRAFFIC = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
out.clear();
generateTraffic();
}
}
};
}

View File

@ -15,14 +15,12 @@
*/
package io.netty.example.discard;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.Channels;
import io.netty.channel.socket.nio.NioServerSocketChannelFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ServerChannelBootstrap;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Discards any incoming data.
@ -35,21 +33,29 @@ public class DiscardServer {
this.port = port;
}
public void run() {
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool()));
public void run() throws Exception {
ServerChannelBootstrap b = new ServerChannelBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())
.channel(new NioServerSocketChannel())
.localAddress(port)
.childInitializer(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
});
// Set up the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new DiscardServerHandler());
}
});
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind().sync();
// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(port));
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
b.shutdown();
}
}
public static void main(String[] args) throws Exception {

View File

@ -15,54 +15,37 @@
*/
package io.netty.example.discard;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInboundStreamHandlerAdapter;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
/**
* Handles a server-side channel.
*/
public class DiscardServerHandler extends SimpleChannelUpstreamHandler {
public class DiscardServerHandler extends ChannelInboundStreamHandlerAdapter {
private static final Logger logger = Logger.getLogger(
DiscardServerHandler.class.getName());
private long transferredBytes;
public long getTransferredBytes() {
return transferredBytes;
}
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
logger.info(e.toString());
}
// Let SimpleChannelHandler call actual event handler methods below.
super.handleUpstream(ctx, e);
public void inboundBufferUpdated(ChannelInboundHandlerContext<Byte> ctx)
throws Exception {
// Discard the received data silently.
ctx.in().byteBuffer().clear();
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
// Discard received data silently by doing nothing.
transferredBytes += ((ChannelBuffer) e.getMessage()).readableBytes();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
public void exceptionCaught(ChannelInboundHandlerContext<Byte> ctx,
Throwable cause) throws Exception {
// Close the connection when an exception is raised.
logger.log(
Level.WARNING,
"Unexpected exception from downstream.",
e.cause());
e.channel().close();
cause);
ctx.close();
}
}

View File

@ -82,7 +82,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private ClosedChannelException closedChannelException;
private final Deque<FlushCheckpoint> flushCheckpoints = new ArrayDeque<FlushCheckpoint>();
protected long writeCounter;
private long writeCounter;
private boolean inFlushNow;
private boolean flushNowPending;
/** Cache for the string representation of this channel */
private boolean strValActive;
@ -362,6 +364,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
protected abstract class AbstractUnsafe implements Unsafe {
private final Runnable flushLaterTask = new FlushLater();
@Override
public ChannelBufferHolder<Object> out() {
return firstOut();
@ -554,19 +558,26 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
// Attempt/perform outbound I/O if:
// - the channel is inactive - flush0() will fail the futures.
// - the event loop has no plan to call flushForcibly().
try {
if (!isActive() || !isFlushPending()) {
doFlush(out());
if (!inFlushNow) {
try {
if (!isActive() || !isFlushPending()) {
flushNow();
}
} catch (Throwable t) {
notifyFlushFutures(t);
pipeline().fireExceptionCaught(t);
if (t instanceof IOException) {
close(voidFuture());
}
} finally {
if (!isActive()) {
close(unsafe().voidFuture());
}
}
} catch (Throwable t) {
notifyFlushFutures(t);
pipeline().fireExceptionCaught(t);
if (t instanceof IOException) {
close(voidFuture());
}
} finally {
if (!isActive()) {
close(unsafe().voidFuture());
} else {
if (!flushNowPending) {
flushNowPending = true;
eventLoop().execute(flushLaterTask);
}
}
} else {
@ -581,18 +592,38 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
@Override
public void flushNow() {
if (inFlushNow) {
return;
}
inFlushNow = true;
try {
doFlush(out());
} catch (Throwable t) {
notifyFlushFutures(t);
pipeline().fireExceptionCaught(t);
if (t instanceof IOException) {
close(voidFuture());
Throwable cause = null;
ChannelBufferHolder<Object> out = out();
int oldSize = out.size();
try {
doFlush(out);
} catch (Throwable t) {
cause = t;
} finally {
writeCounter += oldSize - out.size();
}
} finally {
if (cause == null) {
notifyFlushFutures();
} else {
notifyFlushFutures(cause);
pipeline().fireExceptionCaught(cause);
if (cause instanceof IOException) {
close(voidFuture());
}
}
if (!isActive()) {
close(unsafe().voidFuture());
}
} finally {
inFlushNow = false;
}
}
@ -615,6 +646,14 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
private class FlushLater implements Runnable {
@Override
public void run() {
flushNowPending = false;
unsafe().flush(voidFuture);
}
}
protected abstract boolean isCompatible(EventLoop loop);
protected abstract ChannelBufferHolder<Object> firstOut();
@ -631,38 +670,46 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
protected abstract boolean isFlushPending();
protected void notifyFlushFutures() {
private void notifyFlushFutures() {
if (flushCheckpoints.isEmpty()) {
return;
}
final long flushedAmount = AbstractChannel.this.writeCounter;
final long writeCounter = AbstractChannel.this.writeCounter;
for (;;) {
FlushCheckpoint cp = flushCheckpoints.poll();
FlushCheckpoint cp = flushCheckpoints.peek();
if (cp == null) {
// Reset the counter if there's nothing in the notification list.
AbstractChannel.this.writeCounter = 0;
break;
}
if (cp.flushCheckpoint() > flushedAmount) {
if (cp.flushCheckpoint() > writeCounter) {
if (writeCounter > 0 && flushCheckpoints.size() == 1) {
AbstractChannel.this.writeCounter = 0;
cp.flushCheckpoint(cp.flushCheckpoint() - writeCounter);
}
break;
}
flushCheckpoints.remove();
cp.future().setSuccess();
}
// Avoid overflow
if (flushCheckpoints.isEmpty()) {
// Reset the counter if there's nothing in the notification list.
AbstractChannel.this.writeCounter = 0;
} else if (flushedAmount >= 0x1000000000000000L) {
// Otherwise, reset the counter only when the counter grew pretty large
final long newWriteCounter = AbstractChannel.this.writeCounter;
if (newWriteCounter >= 0x1000000000000000L) {
// Reset the counter only when the counter grew pretty large
// so that we can reduce the cost of updating all entries in the notification list.
AbstractChannel.this.writeCounter = 0;
for (FlushCheckpoint cp: flushCheckpoints) {
cp.flushCheckpoint(cp.flushCheckpoint() - flushedAmount);
cp.flushCheckpoint(cp.flushCheckpoint() - newWriteCounter);
}
}
}
protected void notifyFlushFutures(Throwable cause) {
private void notifyFlushFutures(Throwable cause) {
notifyFlushFutures();
for (;;) {
FlushCheckpoint cp = flushCheckpoints.poll();
if (cp == null) {

View File

@ -15,9 +15,6 @@
*/
package io.netty.channel;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.util.concurrent.TimeUnit;
/**
@ -26,9 +23,6 @@ import java.util.concurrent.TimeUnit;
*/
public abstract class CompleteChannelFuture implements ChannelFuture {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(CompleteChannelFuture.class);
private final Channel channel;
/**
@ -45,31 +39,13 @@ public abstract class CompleteChannelFuture implements ChannelFuture {
@Override
public ChannelFuture addListener(final ChannelFutureListener listener) {
if (channel().eventLoop().inEventLoop()) {
notifyListener(listener);
} else {
channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
notifyListener(listener);
}
});
if (listener == null) {
throw new NullPointerException("listener");
}
DefaultChannelFuture.notifyListener(this, listener);
return this;
}
private void notifyListener(ChannelFutureListener listener) {
try {
listener.operationComplete(this);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn(
"An exception was thrown by " +
ChannelFutureListener.class.getSimpleName() + ".", t);
}
}
}
@Override
public ChannelFuture removeListener(ChannelFutureListener listener) {
// NOOP

View File

@ -38,6 +38,14 @@ public class DefaultChannelFuture extends FlushCheckpoint implements ChannelFutu
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(DefaultChannelFuture.class);
private static final int MAX_LISTENER_STACK_DEPTH = 8;
private static final ThreadLocal<Integer> LISTENER_STACK_DEPTH = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return 0;
}
};
private static final Throwable CANCELLED = new Throwable();
private static volatile boolean useDeadLockChecker = true;
@ -154,17 +162,7 @@ public class DefaultChannelFuture extends FlushCheckpoint implements ChannelFutu
}
if (notifyNow) {
if (channel().eventLoop().inEventLoop()) {
notifyListener(listener);
} else {
channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
notifyListener(listener);
}
});
}
notifyListener(this, listener);
}
return this;
@ -433,12 +431,12 @@ public class DefaultChannelFuture extends FlushCheckpoint implements ChannelFutu
}
if (channel().eventLoop().inEventLoop()) {
notifyListener(firstListener);
notifyListener0(this, firstListener);
firstListener = null;
if (otherListeners != null) {
for (ChannelFutureListener l: otherListeners) {
notifyListener(l);
notifyListener0(this, l);
}
otherListeners = null;
}
@ -450,10 +448,10 @@ public class DefaultChannelFuture extends FlushCheckpoint implements ChannelFutu
channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
notifyListener(firstListener);
notifyListener0(DefaultChannelFuture.this, firstListener);
if (otherListeners != null) {
for (ChannelFutureListener l: otherListeners) {
notifyListener(l);
notifyListener0(DefaultChannelFuture.this, l);
}
}
}
@ -461,9 +459,33 @@ public class DefaultChannelFuture extends FlushCheckpoint implements ChannelFutu
}
}
private void notifyListener(ChannelFutureListener l) {
static void notifyListener(final ChannelFuture f, final ChannelFutureListener l) {
EventLoop loop = f.channel().eventLoop();
if (loop.inEventLoop()) {
final Integer stackDepth = LISTENER_STACK_DEPTH.get();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
LISTENER_STACK_DEPTH.set(stackDepth + 1);
try {
notifyListener0(f, l);
} finally {
LISTENER_STACK_DEPTH.set(stackDepth);
}
return;
}
}
loop.execute(new Runnable() {
@Override
public void run() {
notifyListener(f, l);
}
});
}
private static void notifyListener0(ChannelFuture f, ChannelFutureListener l) {
try {
l.operationComplete(this);
l.operationComplete(f);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn(

View File

@ -109,46 +109,65 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl
protected Runnable pollTask() {
assert inEventLoop();
Runnable task = taskQueue.poll();
if (task == null) {
if (fetchScheduledTasks()) {
task = taskQueue.poll();
}
if (task != null) {
return task;
}
return task;
if (fetchScheduledTasks()) {
task = taskQueue.poll();
return task;
}
return null;
}
protected Runnable takeTask() throws InterruptedException {
assert inEventLoop();
for (;;) {
Runnable task = taskQueue.poll(SCHEDULE_CHECK_INTERVAL * 2 / 3, TimeUnit.NANOSECONDS);
if (task != null) {
return task;
}
fetchScheduledTasks();
task = taskQueue.poll();
if (task != null) {
return task;
}
}
}
protected Runnable peekTask() {
assert inEventLoop();
Runnable task = taskQueue.peek();
if (task == null) {
if (fetchScheduledTasks()) {
task = taskQueue.peek();
}
if (task != null) {
return task;
}
return task;
if (fetchScheduledTasks()) {
task = taskQueue.peek();
return task;
}
return null;
}
protected boolean hasTasks() {
assert inEventLoop();
boolean empty = taskQueue.isEmpty();
if (empty) {
if (fetchScheduledTasks()) {
empty = taskQueue.isEmpty();
}
if (!empty) {
return true;
}
return !empty;
if (fetchScheduledTasks()) {
return !taskQueue.isEmpty();
}
return false;
}
protected void addTask(Runnable task) {

View File

@ -82,9 +82,7 @@ abstract class AbstractNioMessageChannel extends AbstractNioChannel {
for (int i = writeSpinCount; i >= 0; i --) {
int localFlushedAmount = doWriteMessages(buf, i == 0);
if (localFlushedAmount > 0) {
writeCounter += localFlushedAmount;
wrote = true;
notifyFlushFutures();
break;
}
}

View File

@ -89,8 +89,6 @@ abstract class AbstractNioStreamChannel extends AbstractNioChannel {
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf, i == 0);
if (localFlushedAmount > 0) {
writeCounter += localFlushedAmount;
notifyFlushFutures();
break;
}
if (!buf.readable()) {

View File

@ -70,11 +70,7 @@ abstract class AbstractOioMessageChannel extends AbstractOioChannel {
private void flushMessageBuf(Queue<Object> buf) throws Exception {
while (!buf.isEmpty()) {
int localFlushedAmount = doWriteMessages(buf);
if (localFlushedAmount > 0) {
writeCounter += localFlushedAmount;
notifyFlushFutures();
}
doWriteMessages(buf);
}
}

View File

@ -72,11 +72,7 @@ abstract class AbstractOioStreamChannel extends AbstractOioChannel {
private void flushByteBuf(ChannelBuffer buf) throws Exception {
while (buf.readable()) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount > 0) {
writeCounter += localFlushedAmount;
notifyFlushFutures();
}
doWriteBytes(buf);
}
buf.clear();
}