Read only when requested (read-on-demand)

This pull request introduces a new operation called read() that replaces the existing inbound traffic control method. EventLoop now performs socket reads only when the read() operation has been issued. Once the requested read() operation is actually performed, EventLoop triggers an inboundBufferSuspended event that tells the handlers that the requested read() operation has been performed and the inbound traffic has been suspended again. A handler can decide to continue reading or not.

Unlike other outbound operations, read() does not use ChannelFuture at all to avoid GC cost. If there's a good reason to create a new future per read at the GC cost, I'll change this.

This pull request consequently removes the readable property in ChannelHandlerContext, which means how the traffic control works changed significantly.

This pull request also adds a new configuration property ChannelOption.AUTO_READ whose default value is true. If true, Netty will call ctx.read() for you. If you need a close control over when read() is called, you can set it to false.

Another interesting fact is that non-terminal handlers do not really need to call read() at all. Only the last inbound handler will have to call it, and that's just enough. Actually, you don't even need to call it at the last handler in most cases because of the ChannelOption.AUTO_READ mentioned above.

There's no serious backward compatibility issue. If the compiler complains your handler does not implement the read() method, add the following:

public void read(ChannelHandlerContext ctx) throws Exception {
    ctx.read();
}

Note that this pull request certainly makes bounded inbound buffer support very easy, but itself does not add the bounded inbound buffer support.
This commit is contained in:
Trustin Lee 2012-12-30 21:53:59 +09:00
parent 926a20f105
commit 0909878581
69 changed files with 722 additions and 607 deletions

View File

@ -16,11 +16,10 @@
package io.netty.handler.codec.marshalling;
import io.netty.buffer.ByteBuf;
import org.jboss.marshalling.ByteOutput;
import java.io.IOException;
import org.jboss.marshalling.ByteOutput;
/**
* {@link ByteOutput} implementation which writes the data to a {@link ByteBuf}
*
@ -39,7 +38,7 @@ class ChannelBufferByteOutput implements ByteOutput {
@Override
public void close() throws IOException {
// Nothing todo
// Nothing to do
}
@Override

View File

@ -15,10 +15,10 @@
*/
package io.netty.handler.codec.marshalling;
import java.io.IOException;
import org.jboss.marshalling.ByteInput;
import java.io.IOException;
/**
* {@link ByteInput} implementation which wraps another {@link ByteInput} and throws a {@link TooBigObjectException}
* if the read limit was reached.
@ -42,7 +42,7 @@ class LimitingByteInput implements ByteInput {
@Override
public void close() throws IOException {
// Nothing todo
// Nothing to do
}
@Override

View File

@ -15,10 +15,6 @@
*/
package io.netty.example.http.snoop;
import static io.netty.handler.codec.http.HttpHeaders.*;
import static io.netty.handler.codec.http.HttpHeaders.Names.*;
import static io.netty.handler.codec.http.HttpResponseStatus.*;
import static io.netty.handler.codec.http.HttpVersion.*;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
@ -44,6 +40,11 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import static io.netty.handler.codec.http.HttpHeaders.Names.*;
import static io.netty.handler.codec.http.HttpHeaders.*;
import static io.netty.handler.codec.http.HttpResponseStatus.*;
import static io.netty.handler.codec.http.HttpVersion.*;
public class HttpSnoopServerHandler extends ChannelInboundMessageHandlerAdapter<Object> {
private HttpRequest request;

View File

@ -19,7 +19,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
public class LocalEchoClientHandler extends ChannelInboundMessageHandlerAdapter<String> {
@Override
public void messageReceived(ChannelHandlerContext ctx, String msg) {
// Print as received

View File

@ -230,6 +230,11 @@ public class LoggingHandler extends ChannelHandlerAdapter {
super.deregister(ctx, future);
}
@Override
public void read(ChannelHandlerContext ctx) {
ctx.read();
}
@Override
public void flush(ChannelHandlerContext ctx, ChannelFuture future)
throws Exception {

View File

@ -400,6 +400,11 @@ public class SslHandler
closeOutboundAndChannel(ctx, future, false);
}
@Override
public void read(ChannelHandlerContext ctx) {
ctx.read();
}
@Override
public void flush(final ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
final ByteBuf in = ctx.outboundByteBuffer();

View File

@ -139,6 +139,11 @@ public class ChunkedWriteHandler
}
}
@Override
public void read(ChannelHandlerContext ctx) {
ctx.read();
}
@Override
public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
queue.add(future);

View File

@ -261,6 +261,11 @@ public class IdleStateHandler extends ChannelStateHandlerAdapter implements Chan
ctx.fireInboundBufferUpdated();
}
@Override
public void read(ChannelHandlerContext ctx) {
ctx.read();
}
@Override
public void flush(final ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
future.addListener(new ChannelFutureListener() {

View File

@ -77,6 +77,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelHandlerAdapte
*/
protected long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
private static final AttributeKey<Boolean> READ_SUSPENDED = new AttributeKey<Boolean>("readSuspended");
private static final AttributeKey<Runnable> REOPEN_TASK = new AttributeKey<Runnable>("reopenTask");
private static final AttributeKey<Runnable> BUFFER_UPDATE_TASK = new AttributeKey<Runnable>("bufferUpdateTask");
@ -188,7 +189,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelHandlerAdapte
/**
* Class to implement setReadable at fix time
*/
private static final class ReopenReadTimerTask implements Runnable {
private static final class ReopenReadTimerTask implements Runnable {
final ChannelHandlerContext ctx;
ReopenReadTimerTask(ChannelHandlerContext ctx) {
this.ctx = ctx;
@ -196,9 +197,8 @@ public abstract class AbstractTrafficShapingHandler extends ChannelHandlerAdapte
@Override
public void run() {
if (ctx.channel().isActive()) {
ctx.readable(true);
}
ctx.attr(READ_SUSPENDED).set(false);
ctx.read();
}
}
@ -259,8 +259,8 @@ public abstract class AbstractTrafficShapingHandler extends ChannelHandlerAdapte
if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
// time in order to
// try to limit the traffic
if (ctx.isReadable()) {
ctx.readable(false);
if (!ctx.attr(READ_SUSPENDED).get()) {
ctx.attr(READ_SUSPENDED).set(true);
// Create a Runnable to reactive the read if needed. If one was create before it will just be
// reused to limit object creation
@ -294,6 +294,13 @@ public abstract class AbstractTrafficShapingHandler extends ChannelHandlerAdapte
ctx.fireInboundBufferUpdated();
}
@Override
public void read(ChannelHandlerContext ctx) {
if (!ctx.attr(READ_SUSPENDED).get()) {
ctx.read();
}
}
@Override
public void flush(final ChannelHandlerContext ctx, final ChannelFuture future) throws Exception {
long curtime = System.currentTimeMillis();

View File

@ -22,7 +22,6 @@ import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandlerAdapter;
import io.netty.channel.ChannelOption;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
@ -31,6 +30,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import static org.junit.Assert.*;
public class ServerSocketSuspendTest extends AbstractServerSocketTest {
private static final int NUM_CHANNELS = 10;
@ -46,10 +47,10 @@ public class ServerSocketSuspendTest extends AbstractServerSocketTest {
AcceptedChannelCounter counter = new AcceptedChannelCounter(NUM_CHANNELS);
sb.option(ChannelOption.SO_BACKLOG, 1);
sb.option(ChannelOption.AUTO_READ, false);
sb.childHandler(counter);
Channel sc = sb.bind().sync().channel();
sc.pipeline().firstContext().readable(false);
List<Socket> sockets = new ArrayList<Socket>();
@ -61,11 +62,13 @@ public class ServerSocketSuspendTest extends AbstractServerSocketTest {
sockets.add(s);
}
sc.pipeline().firstContext().readable(true);
sc.config().setAutoRead(true);
sc.read();
counter.latch.await();
long endTime = System.nanoTime();
Assert.assertTrue(endTime - startTime > TIMEOUT);
assertTrue(endTime - startTime > TIMEOUT);
} finally {
for (Socket s: sockets) {
s.close();
@ -83,7 +86,7 @@ public class ServerSocketSuspendTest extends AbstractServerSocketTest {
}
long endTime = System.nanoTime();
Assert.assertTrue(endTime - startTime < TIMEOUT);
assertTrue(endTime - startTime < TIMEOUT);
} finally {
for (Socket s: sockets) {
s.close();

View File

@ -15,7 +15,6 @@
*/
package io.netty.testsuite.transport.socket;
import static org.junit.Assert.*;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
@ -23,12 +22,13 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandlerAdapter;
import org.junit.Test;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import static org.junit.Assert.*;
public class SocketEchoTest extends AbstractSocketTest {

View File

@ -32,8 +32,7 @@ import java.io.IOException;
import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;
public class SocketFileRegionTest extends AbstractSocketTest {

View File

@ -15,7 +15,6 @@
*/
package io.netty.testsuite.transport.socket;
import static org.junit.Assert.*;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
@ -26,12 +25,13 @@ import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import org.junit.Test;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import static org.junit.Assert.*;
public class SocketFixedLengthEchoTest extends AbstractSocketTest {

View File

@ -15,14 +15,14 @@
*/
package io.netty.testsuite.transport.socket;
import static org.junit.Assert.*;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandlerAdapter;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.SocketChannel;
import org.junit.Test;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
@ -30,7 +30,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import static org.junit.Assert.*;
public class SocketShutdownOutputByPeerTest extends AbstractServerSocketTest {

View File

@ -15,7 +15,6 @@
*/
package io.netty.testsuite.transport.socket;
import static org.junit.Assert.*;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
@ -30,13 +29,14 @@ import io.netty.handler.codec.spdy.SpdyConstants;
import io.netty.handler.codec.spdy.SpdyFrameDecoder;
import io.netty.handler.codec.spdy.SpdyFrameEncoder;
import io.netty.util.NetUtil;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import static org.junit.Assert.*;
public class SocketSpdyEchoTest extends AbstractSocketTest {

View File

@ -225,8 +225,8 @@ public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap> {
return Unpooled.messageBuffer();
}
@SuppressWarnings("unchecked")
@Override
@SuppressWarnings("unchecked")
public void inboundBufferUpdated(ChannelHandlerContext ctx) {
MessageBuf<Channel> in = ctx.inboundMessageBuffer();
for (;;) {

View File

@ -288,6 +288,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return (MessageBuf<T>) pipeline.outboundMessageBuffer();
}
@Override
public void read() {
pipeline.read();
}
@Override
public ChannelFuture flush(ChannelFuture future) {
return pipeline.flush(future);
@ -439,6 +444,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
private final Runnable beginReadTask = new Runnable() {
@Override
public void run() {
beginRead();
}
};
private final Runnable flushLaterTask = new Runnable() {
@Override
public void run() {
@ -725,6 +737,20 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
@Override
public void beginRead() {
if (eventLoop().inEventLoop()) {
try {
doBeginRead();
} catch (Exception e) {
pipeline().fireExceptionCaught(e);
close(unsafe().voidFuture());
}
} else {
eventLoop().execute(beginReadTask);
}
}
@Override
public void flush(final ChannelFuture future) {
if (eventLoop().inEventLoop()) {
@ -931,6 +957,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
// NOOP
}
/**
* Schedule a read operation.
*/
protected abstract void doBeginRead() throws Exception;
/**
* Flush the content of the given {@link ByteBuf} to the remote peer.
*

View File

@ -79,8 +79,12 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
return false;
}
protected abstract class AbstractServerUnsafe extends AbstractUnsafe {
@Override
protected AbstractUnsafe newUnsafe() {
return new DefaultServerUnsafe();
}
private final class DefaultServerUnsafe extends AbstractUnsafe {
@Override
public void flush(final ChannelFuture future) {
if (eventLoop().inEventLoop()) {

View File

@ -18,6 +18,7 @@ package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.AttributeMap;
@ -155,7 +156,7 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPr
* {@code null} if this channel is not connected.
* If this channel is not connected but it can receive messages
* from arbitrary remote addresses (e.g. {@link DatagramChannel},
* use {@link io.netty.channel.socket.DatagramPacket#remoteAddress()} to determine
* use {@link DatagramPacket#remoteAddress()} to determine
* the origination of the received message as this method will
* return {@code null}.
*/
@ -245,6 +246,12 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPr
*/
void deregister(ChannelFuture future);
/**
* Schedules a read operation that fills the inbound buffer of the first {@link ChannelInboundHandler} in the
* {@link ChannelPipeline}. If there's already a pending read operation, this method does nothing.
*/
void beginRead();
/**
* Flush out all data that was buffered in the buffer of the {@link #directOutboundContext()} and was not
* flushed out yet. After that is done the {@link ChannelFuture} will get notified
@ -256,18 +263,6 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPr
*/
void flushNow();
/**
* Suspend reads from the underlying transport, which basicly has the effect of no new data that will
* get dispatched.
*/
void suspendRead();
/**
* Resume reads from the underlying transport. If {@link #suspendRead()} was not called before, this
* has no effect.
*/
void resumeRead();
/**
* Send a {@link FileRegion} to the remote peer and notify the {@link ChannelFuture} once it completes
* or an error was detected. Once the {@link FileRegion} was transfered or an error was thrown it will

View File

@ -149,4 +149,16 @@ public interface ChannelConfig {
* to allocate buffers.
*/
ChannelConfig setAllocator(ByteBufAllocator allocator);
/**
* Returns {@code true} if and only if {@link ChannelHandlerContext#read()} will be invoked automatically so that
* a user application doesn't need to call it at all. The default value is {@code true}.
*/
boolean isAutoRead();
/**
* Sets if {@link ChannelHandlerContext#read()} will be invoked automatically so that a user application doesn't
* need to call it at all. The default value is {@code true}.
*/
ChannelConfig setAutoRead(boolean autoRead);
}

View File

@ -86,6 +86,11 @@ public abstract class ChannelHandlerAdapter extends ChannelStateHandlerAdapter i
ctx.deregister(future);
}
@Override
public void read(ChannelHandlerContext ctx) {
ctx.read();
}
/**
* Calls {@link ChannelHandlerContext#flush(ChannelFuture)} to forward
* to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}.

View File

@ -329,19 +329,4 @@ public interface ChannelHandlerContext
* {@link UnsupportedOperationException} is thrown.
*/
MessageBuf<Object> nextOutboundMessageBuffer();
/**
* Return {@code true} if the {@link ChannelHandlerContext} was marked as readable. This basically means
* that once its not readable anymore no new data will be read from the transport and passed down the
* {@link ChannelPipeline}.
*
* Only if all {@link ChannelHandlerContext}'s {@link #isReadable()} return {@code true}, the data is
* passed again down the {@link ChannelPipeline}.
*/
boolean isReadable();
/**
* Mark the {@link ChannelHandlerContext} as readable or suspend it. See {@link #isReadable()}
*/
void readable(boolean readable);
}

View File

@ -83,4 +83,10 @@ public interface ChannelInboundInvoker {
* {@link Channel}.
*/
void fireInboundBufferUpdated();
/**
* Triggers an {@link ChannelStateHandler#inboundBufferSuspended(ChannelHandlerContext) inboundBufferSuspended}
* event to the next {@link ChannelStateHandler} in the {@link ChannelPipeline}.
*/
void fireInboundBufferSuspended();
}

View File

@ -71,6 +71,11 @@ public interface ChannelOperationHandler extends ChannelHandler {
*/
void deregister(ChannelHandlerContext ctx, ChannelFuture future) throws Exception;
/**
* Intercepts {@link ChannelHandlerContext#read()}.
*/
void read(ChannelHandlerContext ctx);
/**
* Called once a flush operation is made and so the outbound data should be written.
*

View File

@ -135,6 +135,11 @@ public abstract class ChannelOperationHandlerAdapter implements ChannelOperation
ctx.deregister(future);
}
@Override
public void read(ChannelHandlerContext ctx) {
ctx.read();
}
/**
* Calls {@link ChannelHandlerContext#flush(ChannelFuture)} to forward
* to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}.

View File

@ -44,6 +44,8 @@ public class ChannelOption<T> extends UniqueName {
new ChannelOption<Integer>("WRITE_SPIN_COUNT");
public static final ChannelOption<Boolean> ALLOW_HALF_CLOSURE =
new ChannelOption<Boolean>("ALLOW_HALF_CLOSURE");
public static final ChannelOption<Boolean> AUTO_READ =
new ChannelOption<Boolean>("AUTO_READ");
public static final ChannelOption<Boolean> SO_BROADCAST =
new ChannelOption<Boolean>("SO_BROADCAST");

View File

@ -140,6 +140,15 @@ public interface ChannelOutboundInvoker {
*/
ChannelFuture deregister(ChannelFuture future);
/**
* Reads data from the {@link Channel} into the first inbound buffer, triggers an
* {@link ChannelStateHandler#inboundBufferUpdated(ChannelHandlerContext) inboundBufferUpdated} event if data was
* read, and triggers an
* {@link ChannelStateHandler#inboundBufferSuspended(ChannelHandlerContext) inboundBufferSuspended} event so the
* handler can decide to continue reading. If there's a pending read operation already, this method does nothing.
*/
void read();
/**
* Flush all pending data which belongs to this ChannelOutboundInvoker and notify the {@link ChannelFuture}
* once the operation completes, either because the operation was successful or because of an error.

View File

@ -49,4 +49,10 @@ public interface ChannelStateHandler extends ChannelHandler {
* to wait for more data and consume it later.
*/
void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception;
/**
* Invoked when a {@link ChannelHandlerContext#read()} is finished and the inbound buffer of this handler will not
* be updated until another {@link ChannelHandlerContext#read()} request is issued.
*/
void inboundBufferSuspended(ChannelHandlerContext ctx) throws Exception;
}

View File

@ -153,4 +153,9 @@ public class ChannelStateHandlerAdapter implements ChannelStateHandler {
}
ctx.fireInboundBufferUpdated();
}
@Override
public void inboundBufferSuspended(ChannelHandlerContext ctx) throws Exception {
ctx.fireInboundBufferSuspended();
}
}

View File

@ -204,6 +204,11 @@ public class CombinedChannelHandler extends ChannelStateHandlerAdapter implement
out.deregister(ctx, future);
}
@Override
public void read(ChannelHandlerContext ctx) {
out.read(ctx);
}
@Override
public void flush(
ChannelHandlerContext ctx, ChannelFuture future) throws Exception {

View File

@ -36,10 +36,11 @@ public class DefaultChannelConfig implements ChannelConfig {
private volatile ByteBufAllocator allocator = DEFAULT_ALLOCATOR;
private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT;
private volatile int writeSpinCount = 16;
private volatile boolean autoRead = true;
@Override
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(null, CONNECT_TIMEOUT_MILLIS, WRITE_SPIN_COUNT, ALLOCATOR);
return getOptions(null, CONNECT_TIMEOUT_MILLIS, WRITE_SPIN_COUNT, ALLOCATOR, AUTO_READ);
}
protected Map<ChannelOption<?>, Object> getOptions(
@ -70,8 +71,8 @@ public class DefaultChannelConfig implements ChannelConfig {
return setAllOptions;
}
@SuppressWarnings("unchecked")
@Override
@SuppressWarnings("unchecked")
public <T> T getOption(ChannelOption<T> option) {
if (option == null) {
throw new NullPointerException("option");
@ -86,6 +87,9 @@ public class DefaultChannelConfig implements ChannelConfig {
if (option == ALLOCATOR) {
return (T) getAllocator();
}
if (option == AUTO_READ) {
return (T) Boolean.valueOf(isAutoRead());
}
return null;
}
@ -100,6 +104,8 @@ public class DefaultChannelConfig implements ChannelConfig {
setWriteSpinCount((Integer) value);
} else if (option == ALLOCATOR) {
setAllocator((ByteBufAllocator) value);
} else if (option == AUTO_READ) {
setAutoRead((Boolean) value);
} else {
return false;
}
@ -157,4 +163,15 @@ public class DefaultChannelConfig implements ChannelConfig {
this.allocator = allocator;
return this;
}
@Override
public boolean isAutoRead() {
return autoRead;
}
@Override
public ChannelConfig setAutoRead(boolean autoRead) {
this.autoRead = autoRead;
return this;
}
}

View File

@ -31,7 +31,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static io.netty.channel.DefaultChannelPipeline.*;
@ -40,10 +39,11 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
private static final EnumSet<ChannelHandlerType> EMPTY_TYPE = EnumSet.noneOf(ChannelHandlerType.class);
static final int DIR_INBOUND = 1;
static final int DIR_OUTBOUND = 2;
private static final int FLAG_NEEDS_LAZY_INIT = 4;
private static final int FLAG_STATE_HANDLER = 1;
static final int FLAG_OPERATION_HANDLER = 2;
static final int FLAG_INBOUND_HANDLER = 4;
private static final int FLAG_OUTBOUND_HANDLER = 8;
private static final int FLAG_NEEDS_LAZY_INIT = 16;
volatile DefaultChannelHandlerContext next;
volatile DefaultChannelHandlerContext prev;
@ -54,7 +54,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
private final Set<ChannelHandlerType> type;
private final ChannelHandler handler;
final int flags;
final AtomicBoolean readable = new AtomicBoolean(true);
// Will be set to null if no child executor should be used, otherwise it will be set to the
// child executor.
@ -143,7 +142,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public void run() {
DefaultChannelHandlerContext next = nextContext(
DefaultChannelHandlerContext.this.next, DIR_INBOUND);
DefaultChannelHandlerContext.this.next, FLAG_STATE_HANDLER);
if (next != null) {
next.fillBridge();
EventExecutor executor = next.executor();
@ -155,6 +154,17 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
}
};
private final Runnable fireInboundBufferSuspendedTask = new Runnable() {
@Override
public void run() {
DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this;
try {
((ChannelStateHandler) ctx.handler).inboundBufferSuspended(ctx);
} catch (Throwable t) {
pipeline.notifyHandlerException(t);
}
}
};
private final Runnable freeInboundBufferTask = new Runnable() {
@Override
public void run() {
@ -176,12 +186,12 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
}
DefaultChannelHandlerContext nextCtx = nextContext(ctx.next, DIR_INBOUND);
DefaultChannelHandlerContext nextCtx = nextContext(ctx.next, FLAG_STATE_HANDLER);
if (nextCtx != null) {
nextCtx.callFreeInboundBuffer();
} else {
// Freed all inbound buffers. Free all outbound buffers in a reverse order.
pipeline.firstContext(DIR_OUTBOUND).callFreeOutboundBuffer();
pipeline.lastContext(FLAG_OPERATION_HANDLER).callFreeOutboundBuffer();
}
}
};
@ -206,13 +216,20 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
}
DefaultChannelHandlerContext nextCtx = nextContext(ctx.prev, DIR_OUTBOUND);
DefaultChannelHandlerContext nextCtx = prevContext(ctx.prev, FLAG_OPERATION_HANDLER);
if (nextCtx != null) {
nextCtx.callFreeOutboundBuffer();
}
}
};
final Runnable read0Task = new Runnable() {
@Override
public void run() {
pipeline.read0(DefaultChannelHandlerContext.this);
}
};
@SuppressWarnings("unchecked")
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutorGroup group,
@ -232,16 +249,18 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
EnumSet<ChannelHandlerType> type = EMPTY_TYPE.clone();
if (handler instanceof ChannelStateHandler) {
type.add(ChannelHandlerType.STATE);
flags |= DIR_INBOUND;
flags |= FLAG_STATE_HANDLER;
if (handler instanceof ChannelInboundHandler) {
type.add(ChannelHandlerType.INBOUND);
flags |= FLAG_INBOUND_HANDLER;
}
}
if (handler instanceof ChannelOperationHandler) {
type.add(ChannelHandlerType.OPERATION);
flags |= DIR_OUTBOUND;
flags |= FLAG_OPERATION_HANDLER;
if (handler instanceof ChannelOutboundHandler) {
type.add(ChannelHandlerType.OUTBOUND);
flags |= FLAG_OUTBOUND_HANDLER;
}
}
this.type = Collections.unmodifiableSet(type);
@ -267,7 +286,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
executor = null;
}
if (type.contains(ChannelHandlerType.INBOUND)) {
if ((flags & FLAG_INBOUND_HANDLER) != 0) {
Buf buf;
try {
buf = ((ChannelInboundHandler) handler).newInboundBuffer(this);
@ -299,7 +318,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
inMsgBridge = null;
}
if (type.contains(ChannelHandlerType.OUTBOUND)) {
if ((flags & FLAG_OUTBOUND_HANDLER) != 0) {
if (prev == null) {
// Special case: if pref == null, it means this context for HeadHandler.
// HeadHandler is an outbound handler instantiated by the constructor of DefaultChannelPipeline.
@ -930,7 +949,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public void fireChannelRegistered() {
DefaultChannelHandlerContext next = nextContext(this.next, DIR_INBOUND);
DefaultChannelHandlerContext next = nextContext(this.next, FLAG_STATE_HANDLER);
if (next != null) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
@ -943,7 +962,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public void fireChannelUnregistered() {
DefaultChannelHandlerContext next = nextContext(this.next, DIR_INBOUND);
DefaultChannelHandlerContext next = nextContext(this.next, FLAG_STATE_HANDLER);
if (next != null) {
EventExecutor executor = next.executor();
if (executor.inEventLoop() && prev != null) {
@ -956,7 +975,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public void fireChannelActive() {
DefaultChannelHandlerContext next = nextContext(this.next, DIR_INBOUND);
DefaultChannelHandlerContext next = nextContext(this.next, FLAG_STATE_HANDLER);
if (next != null) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
@ -969,7 +988,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public void fireChannelInactive() {
DefaultChannelHandlerContext next = nextContext(this.next, DIR_INBOUND);
DefaultChannelHandlerContext next = nextContext(this.next, FLAG_STATE_HANDLER);
if (next != null) {
EventExecutor executor = next.executor();
if (executor.inEventLoop() && prev != null) {
@ -1062,6 +1081,19 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
}
@Override
public void fireInboundBufferSuspended() {
DefaultChannelHandlerContext next = nextContext(this.next, FLAG_STATE_HANDLER);
if (next != null) {
EventExecutor executor = next.executor();
if (executor.inEventLoop() && prev != null) {
next.fireInboundBufferSuspendedTask.run();
} else {
executor.execute(next.fireInboundBufferSuspendedTask);
}
}
}
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return bind(localAddress, newFuture());
@ -1104,7 +1136,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelFuture future) {
return pipeline.bind(nextContext(prev, DIR_OUTBOUND), localAddress, future);
return pipeline.bind(prevContext(prev, FLAG_OPERATION_HANDLER), localAddress, future);
}
@Override
@ -1114,29 +1146,34 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) {
return pipeline.connect(nextContext(prev, DIR_OUTBOUND), remoteAddress, localAddress, future);
return pipeline.connect(prevContext(prev, FLAG_OPERATION_HANDLER), remoteAddress, localAddress, future);
}
@Override
public ChannelFuture disconnect(ChannelFuture future) {
return pipeline.disconnect(nextContext(prev, DIR_OUTBOUND), future);
return pipeline.disconnect(prevContext(prev, FLAG_OPERATION_HANDLER), future);
}
@Override
public ChannelFuture close(ChannelFuture future) {
return pipeline.close(nextContext(prev, DIR_OUTBOUND), future);
return pipeline.close(prevContext(prev, FLAG_OPERATION_HANDLER), future);
}
@Override
public ChannelFuture deregister(ChannelFuture future) {
return pipeline.deregister(nextContext(prev, DIR_OUTBOUND), future);
return pipeline.deregister(prevContext(prev, FLAG_OPERATION_HANDLER), future);
}
@Override
public void read() {
pipeline.read(prevContext(prev, FLAG_OPERATION_HANDLER));
}
@Override
public ChannelFuture flush(final ChannelFuture future) {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
DefaultChannelHandlerContext prev = nextContext(this.prev, DIR_OUTBOUND);
DefaultChannelHandlerContext prev = prevContext(this.prev, FLAG_OPERATION_HANDLER);
prev.fillBridge();
pipeline.flush(prev, future);
} else {
@ -1269,23 +1306,13 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
}
@Override
public boolean isReadable() {
return readable.get();
}
@Override
public void readable(boolean readable) {
pipeline.readable(this, readable);
}
@Override
public ChannelFuture sendFile(FileRegion region) {
return pipeline.sendFile(nextContext(prev, DIR_OUTBOUND), region, newFuture());
return pipeline.sendFile(prevContext(prev, FLAG_OPERATION_HANDLER), region, newFuture());
}
@Override
public ChannelFuture sendFile(FileRegion region, ChannelFuture future) {
return pipeline.sendFile(nextContext(prev, DIR_OUTBOUND), region, future);
return pipeline.sendFile(prevContext(prev, FLAG_OPERATION_HANDLER), region, future);
}
}

View File

@ -32,7 +32,6 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import static io.netty.channel.DefaultChannelHandlerContext.*;
@ -56,7 +55,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
final Map<EventExecutorGroup, EventExecutor> childExecutors =
new IdentityHashMap<EventExecutorGroup, EventExecutor>();
private final AtomicInteger suspendRead = new AtomicInteger();
public DefaultChannelPipeline(Channel channel) {
if (channel == null) {
@ -102,14 +100,14 @@ final class DefaultChannelPipeline implements ChannelPipeline {
// in order to avoid deadlock
newCtx.executeOnEventLoop(new Runnable() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
checkDuplicateName(name);
addFirst0(name, nextCtx, newCtx);
}
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
checkDuplicateName(name);
addFirst0(name, nextCtx, newCtx);
}
});
}
});
return this;
}
@ -257,14 +255,14 @@ final class DefaultChannelPipeline implements ChannelPipeline {
// in order to avoid deadlock
newCtx.executeOnEventLoop(new Runnable() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
checkDuplicateName(name);
addAfter0(name, ctx, newCtx);
}
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
checkDuplicateName(name);
addAfter0(name, ctx, newCtx);
}
});
}
});
return this;
}
@ -423,9 +421,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
name2ctx.remove(ctx.name());
callAfterRemove(ctx);
// make sure the it's set back to readable
ctx.readable(true);
}
@Override
@ -455,13 +450,13 @@ final class DefaultChannelPipeline implements ChannelPipeline {
// in order to avoid deadlock
oldTail.executeOnEventLoop(new Runnable() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
removeLast0(oldTail);
}
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
removeLast0(oldTail);
}
});
}
});
return oldTail.handler();
}
@ -474,9 +469,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
name2ctx.remove(oldTail.name());
callBeforeRemove(oldTail);
// make sure the it's set back to readable
oldTail.readable(true);
}
@Override
@ -586,10 +578,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
boolean removed = false;
try {
callAfterRemove(ctx);
// clear readable suspend if necessary
ctx.readable(true);
removed = true;
} catch (ChannelPipelineException e) {
removeException = e;
@ -955,6 +943,11 @@ final class DefaultChannelPipeline implements ChannelPipeline {
public void fireChannelActive() {
firedChannelActive = true;
head.fireChannelActive();
if (channel.config().isAutoRead()) {
channel.read();
}
if (fireInboundBufferUpdatedOnActivation) {
fireInboundBufferUpdatedOnActivation = false;
head.fireInboundBufferUpdated();
@ -989,6 +982,14 @@ final class DefaultChannelPipeline implements ChannelPipeline {
head.fireInboundBufferUpdated();
}
@Override
public void fireInboundBufferSuspended() {
head.fireInboundBufferSuspended();
if (channel.config().isAutoRead()) {
channel.read();
}
}
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return bind(localAddress, channel.newFuture());
@ -1031,7 +1032,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelFuture future) {
return bind(firstContext(DIR_OUTBOUND), localAddress, future);
return bind(lastContext(FLAG_OPERATION_HANDLER), localAddress, future);
}
ChannelFuture bind(
@ -1066,7 +1067,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) {
return connect(firstContext(DIR_OUTBOUND), remoteAddress, localAddress, future);
return connect(lastContext(FLAG_OPERATION_HANDLER), remoteAddress, localAddress, future);
}
ChannelFuture connect(
@ -1098,7 +1099,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelFuture disconnect(ChannelFuture future) {
return disconnect(firstContext(DIR_OUTBOUND), future);
return disconnect(lastContext(FLAG_OPERATION_HANDLER), future);
}
ChannelFuture disconnect(final DefaultChannelHandlerContext ctx, final ChannelFuture future) {
@ -1130,7 +1131,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelFuture close(ChannelFuture future) {
return close(firstContext(DIR_OUTBOUND), future);
return close(lastContext(FLAG_OPERATION_HANDLER), future);
}
ChannelFuture close(final DefaultChannelHandlerContext ctx, final ChannelFuture future) {
@ -1156,7 +1157,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelFuture deregister(final ChannelFuture future) {
return deregister(firstContext(DIR_OUTBOUND), future);
return deregister(lastContext(FLAG_OPERATION_HANDLER), future);
}
ChannelFuture deregister(final DefaultChannelHandlerContext ctx, final ChannelFuture future) {
@ -1187,7 +1188,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelFuture sendFile(FileRegion region, ChannelFuture future) {
return sendFile(firstContext(DIR_OUTBOUND), region, future);
return sendFile(lastContext(FLAG_OPERATION_HANDLER), region, future);
}
ChannelFuture sendFile(final DefaultChannelHandlerContext ctx, final FileRegion region,
@ -1216,9 +1217,32 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return future;
}
@Override
public void read() {
read(lastContext(FLAG_OPERATION_HANDLER));
}
void read(final DefaultChannelHandlerContext ctx) {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
read0(ctx);
} else {
executor.execute(ctx.read0Task);
}
}
void read0(DefaultChannelHandlerContext ctx) {
try {
((ChannelOperationHandler) ctx.handler()).read(ctx);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@Override
public ChannelFuture flush(ChannelFuture future) {
return flush(firstContext(DIR_OUTBOUND), future);
return flush(lastContext(FLAG_OPERATION_HANDLER), future);
}
ChannelFuture flush(final DefaultChannelHandlerContext ctx, final ChannelFuture future) {
@ -1230,7 +1254,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
executor.execute(new Runnable() {
@Override
public void run() {
flush(ctx, future);
flush0(ctx, future);
}
});
}
@ -1356,36 +1380,35 @@ final class DefaultChannelPipeline implements ChannelPipeline {
}
}
DefaultChannelHandlerContext firstContext(int direction) {
assert direction == DIR_INBOUND || direction == DIR_OUTBOUND;
if (direction == DIR_INBOUND) {
return nextContext(head.next, DIR_INBOUND);
} else { // DIR_OUTBOUND
return nextContext(tail, DIR_OUTBOUND);
}
DefaultChannelHandlerContext lastContext(int flag) {
return prevContext(tail, flag);
}
static DefaultChannelHandlerContext nextContext(
DefaultChannelHandlerContext ctx, int direction) {
assert direction == DIR_INBOUND || direction == DIR_OUTBOUND;
static DefaultChannelHandlerContext nextContext(DefaultChannelHandlerContext ctx, int flag) {
if (ctx == null) {
return null;
}
DefaultChannelHandlerContext realCtx = ctx;
if (direction == DIR_INBOUND) {
while ((realCtx.flags & DIR_INBOUND) == 0) {
realCtx = realCtx.next;
if (realCtx == null) {
return null;
}
while ((realCtx.flags & flag) == 0) {
realCtx = realCtx.next;
if (realCtx == null) {
return null;
}
} else { // DIR_OUTBOUND
while ((realCtx.flags & DIR_OUTBOUND) == 0) {
realCtx = realCtx.prev;
if (realCtx == null) {
return null;
}
}
return realCtx;
}
static DefaultChannelHandlerContext prevContext(DefaultChannelHandlerContext ctx, int flag) {
if (ctx == null) {
return null;
}
DefaultChannelHandlerContext realCtx = ctx;
while ((realCtx.flags & flag) == 0) {
realCtx = realCtx.prev;
if (realCtx == null) {
return null;
}
}
return realCtx;
@ -1460,20 +1483,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
}
}
void readable(DefaultChannelHandlerContext ctx, boolean readable) {
if (ctx.readable.compareAndSet(!readable, readable)) {
if (!readable) {
if (suspendRead.incrementAndGet() == 1) {
unsafe.suspendRead();
}
} else {
if (suspendRead.decrementAndGet() == 0) {
unsafe.resumeRead();
}
}
}
}
private final class HeadHandler implements ChannelOutboundHandler {
@Override
public Buf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
@ -1542,6 +1551,11 @@ final class DefaultChannelPipeline implements ChannelPipeline {
unsafe.deregister(future);
}
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
@Override
public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
unsafe.flush(future);

View File

@ -233,6 +233,11 @@ public abstract class AbstractEmbeddedChannel<O> extends AbstractChannel {
// NOOP
}
@Override
protected void doBeginRead() throws Exception {
// NOOP
}
@Override
protected AbstractUnsafe newUnsafe() {
return new DefaultUnsafe();
@ -317,16 +322,6 @@ public abstract class AbstractEmbeddedChannel<O> extends AbstractChannel {
SocketAddress localAddress, ChannelFuture future) {
future.setSuccess();
}
@Override
public void suspendRead() {
// NOOP
}
@Override
public void resumeRead() {
// NOOP
}
}
private final class LastInboundMessageHandler extends ChannelInboundHandlerAdapter {

View File

@ -24,7 +24,7 @@ import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelPipeline;
/**
* Embedded {@@link Channel} which operates on messages which can be of any time.
* Embedded {@link Channel} which operates on messages which can be of any time.
*/
public class EmbeddedMessageChannel extends AbstractEmbeddedChannel<Object> {
@ -47,6 +47,7 @@ public class EmbeddedMessageChannel extends AbstractEmbeddedChannel<Object> {
return pipeline().inboundMessageBuffer();
}
@Override
@SuppressWarnings("unchecked")
public MessageBuf<Object> lastOutboundBuffer() {
return (MessageBuf<Object>) lastOutboundBuffer;

View File

@ -56,6 +56,7 @@ public class LocalChannel extends AbstractChannel {
private volatile LocalAddress localAddress;
private volatile LocalAddress remoteAddress;
private volatile ChannelFuture connectFuture;
private volatile boolean readInProgress;
public LocalChannel() {
this(null);
@ -207,6 +208,23 @@ public class LocalChannel extends AbstractChannel {
((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
}
@Override
protected void doBeginRead() throws Exception {
if (readInProgress) {
return;
}
ChannelPipeline pipeline = pipeline();
MessageBuf<Object> buf = pipeline.inboundMessageBuffer();
if (buf.isEmpty()) {
readInProgress = true;
return;
}
pipeline.fireInboundBufferUpdated();
pipeline.fireInboundBufferSuspended();
}
@Override
protected void doFlushMessageBuffer(MessageBuf<Object> buf) throws Exception {
if (state < 2) {
@ -222,7 +240,7 @@ public class LocalChannel extends AbstractChannel {
if (peerLoop == eventLoop()) {
buf.drainTo(peerPipeline.inboundMessageBuffer());
peerPipeline.fireInboundBufferUpdated();
finishPeerRead(peer, peerPipeline);
} else {
final Object[] msgs = buf.toArray();
buf.clear();
@ -231,12 +249,20 @@ public class LocalChannel extends AbstractChannel {
public void run() {
MessageBuf<Object> buf = peerPipeline.inboundMessageBuffer();
Collections.addAll(buf, msgs);
peerPipeline.fireInboundBufferUpdated();
finishPeerRead(peer, peerPipeline);
}
});
}
}
private static void finishPeerRead(LocalChannel peer, ChannelPipeline peerPipeline) {
if (peer.readInProgress) {
peer.readInProgress = false;
peerPipeline.fireInboundBufferUpdated();
peerPipeline.fireInboundBufferSuspended();
}
}
@Override
protected boolean isFlushPending() {
return false;
@ -305,15 +331,5 @@ public class LocalChannel extends AbstractChannel {
});
}
}
@Override
public void suspendRead() {
// TODO: Implement me
}
@Override
public void resumeRead() {
// TODO: Implement me
}
}
}

View File

@ -15,8 +15,10 @@
*/
package io.netty.channel.local;
import io.netty.buffer.MessageBuf;
import io.netty.channel.AbstractServerChannel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import io.netty.channel.ServerChannel;
@ -40,6 +42,7 @@ public class LocalServerChannel extends AbstractServerChannel {
private volatile int state; // 0 - open, 1 - active, 2 - closed
private volatile LocalAddress localAddress;
private volatile boolean acceptInProgress;
/**
* Creates a new instance
@ -127,6 +130,23 @@ public class LocalServerChannel extends AbstractServerChannel {
((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
}
@Override
protected void doBeginRead() throws Exception {
if (acceptInProgress) {
return;
}
ChannelPipeline pipeline = pipeline();
MessageBuf<Object> buf = pipeline.inboundMessageBuffer();
if (buf.isEmpty()) {
acceptInProgress = true;
return;
}
pipeline.fireInboundBufferUpdated();
pipeline.fireInboundBufferSuspended();
}
LocalChannel serve(final LocalChannel peer) {
LocalChannel child = new LocalChannel(this, peer);
serve0(child);
@ -135,8 +155,13 @@ public class LocalServerChannel extends AbstractServerChannel {
private void serve0(final LocalChannel child) {
if (eventLoop().inEventLoop()) {
pipeline().inboundMessageBuffer().add(child);
pipeline().fireInboundBufferUpdated();
final ChannelPipeline pipeline = pipeline();
pipeline.inboundMessageBuffer().add(child);
if (acceptInProgress) {
acceptInProgress = false;
pipeline.fireInboundBufferUpdated();
pipeline.fireInboundBufferSuspended();
}
} else {
eventLoop().execute(new Runnable() {
@Override
@ -146,22 +171,4 @@ public class LocalServerChannel extends AbstractServerChannel {
});
}
}
@Override
protected AbstractUnsafe newUnsafe() {
return new LocalServerUnsafe();
}
private final class LocalServerUnsafe extends AbstractServerUnsafe {
@Override
public void suspendRead() {
// TODO: Implement me
}
@Override
public void resumeRead() {
// TODO: Implement me
}
}
}

View File

@ -163,4 +163,7 @@ public interface DatagramChannelConfig extends ChannelConfig {
@Override
DatagramChannelConfig setAllocator(ByteBufAllocator allocator);
@Override
DatagramChannelConfig setAutoRead(boolean autoRead);
}

View File

@ -376,4 +376,9 @@ public class DefaultDatagramChannelConfig extends DefaultChannelConfig implement
public DatagramChannelConfig setAllocator(ByteBufAllocator allocator) {
return (DatagramChannelConfig) super.setAllocator(allocator);
}
@Override
public DatagramChannelConfig setAutoRead(boolean autoRead) {
return (DatagramChannelConfig) super.setAutoRead(autoRead);
}
}

View File

@ -172,4 +172,9 @@ public class DefaultSctpChannelConfig extends DefaultChannelConfig implements Sc
public SctpChannelConfig setAllocator(ByteBufAllocator allocator) {
return (SctpChannelConfig) super.setAllocator(allocator);
}
@Override
public SctpChannelConfig setAutoRead(boolean autoRead) {
return (SctpChannelConfig) super.setAutoRead(autoRead);
}
}

View File

@ -26,9 +26,7 @@ import io.netty.util.NetUtil;
import java.io.IOException;
import java.util.Map;
import static com.sun.nio.sctp.SctpStandardSocketOptions.SCTP_INIT_MAXSTREAMS;
import static com.sun.nio.sctp.SctpStandardSocketOptions.SO_RCVBUF;
import static com.sun.nio.sctp.SctpStandardSocketOptions.SO_SNDBUF;
import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
/**
* The default {@link SctpServerChannelConfig} implementation for SCTP.
@ -169,4 +167,9 @@ public class DefaultSctpServerChannelConfig extends DefaultChannelConfig impleme
public SctpServerChannelConfig setAllocator(ByteBufAllocator allocator) {
return (SctpServerChannelConfig) super.setAllocator(allocator);
}
@Override
public SctpServerChannelConfig setAutoRead(boolean autoRead) {
return (SctpServerChannelConfig) super.setAutoRead(autoRead);
}
}

View File

@ -15,8 +15,6 @@
*/
package io.netty.channel.socket;
import static io.netty.channel.ChannelOption.*;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
@ -27,6 +25,8 @@ import java.net.ServerSocket;
import java.net.SocketException;
import java.util.Map;
import static io.netty.channel.ChannelOption.*;
/**
* The default {@link ServerSocketChannelConfig} implementation.
*/
@ -156,4 +156,9 @@ public class DefaultServerSocketChannelConfig extends DefaultChannelConfig
public ServerSocketChannelConfig setAllocator(ByteBufAllocator allocator) {
return (ServerSocketChannelConfig) super.setAllocator(allocator);
}
@Override
public ServerSocketChannelConfig setAutoRead(boolean autoRead) {
return (ServerSocketChannelConfig) super.setAutoRead(autoRead);
}
}

View File

@ -15,8 +15,6 @@
*/
package io.netty.channel.socket;
import static io.netty.channel.ChannelOption.*;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
@ -26,6 +24,8 @@ import java.net.Socket;
import java.net.SocketException;
import java.util.Map;
import static io.netty.channel.ChannelOption.*;
/**
* The default {@link SocketChannelConfig} implementation.
*/
@ -280,4 +280,9 @@ public class DefaultSocketChannelConfig extends DefaultChannelConfig
public SocketChannelConfig setAllocator(ByteBufAllocator allocator) {
return (SocketChannelConfig) super.setAllocator(allocator);
}
@Override
public SocketChannelConfig setAutoRead(boolean autoRead) {
return (SocketChannelConfig) super.setAutoRead(autoRead);
}
}

View File

@ -18,7 +18,7 @@ package io.netty.channel.socket;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelConfig;
import static com.sun.nio.sctp.SctpStandardSocketOptions.InitMaxStreams;
import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
/**
* A {@link ChannelConfig} for a {@link SctpChannel}.
@ -101,4 +101,7 @@ public interface SctpChannelConfig extends ChannelConfig {
@Override
SctpChannelConfig setAllocator(ByteBufAllocator allocator);
@Override
SctpChannelConfig setAutoRead(boolean autoRead);
}

View File

@ -18,7 +18,7 @@ package io.netty.channel.socket;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelConfig;
import static com.sun.nio.sctp.SctpStandardSocketOptions.InitMaxStreams;
import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
/**
* A {@link ChannelConfig} for a {@link SctpServerChannelConfig}.
@ -100,4 +100,7 @@ public interface SctpServerChannelConfig extends ChannelConfig {
@Override
SctpServerChannelConfig setAllocator(ByteBufAllocator allocator);
@Override
SctpServerChannelConfig setAutoRead(boolean autoRead);
}

View File

@ -90,4 +90,7 @@ public interface ServerSocketChannelConfig extends ChannelConfig {
@Override
ServerSocketChannelConfig setAllocator(ByteBufAllocator allocator);
@Override
ServerSocketChannelConfig setAutoRead(boolean autoRead);
}

View File

@ -153,4 +153,7 @@ public interface SocketChannelConfig extends ChannelConfig {
@Override
SocketChannelConfig setAllocator(ByteBufAllocator allocator);
@Override
SocketChannelConfig setAutoRead(boolean autoRead);
}

View File

@ -94,7 +94,12 @@ abstract class AbstractAioChannel extends AbstractChannel {
return loop instanceof AioEventLoop;
}
protected abstract class AbstractAioUnsafe extends AbstractUnsafe {
@Override
protected AbstractUnsafe newUnsafe() {
return new DefaultAioUnsafe();
}
protected final class DefaultAioUnsafe extends AbstractUnsafe {
@Override
public void connect(final SocketAddress remoteAddress,
@ -144,13 +149,13 @@ abstract class AbstractAioChannel extends AbstractChannel {
}
}
protected final void connectFailed(Throwable t) {
protected void connectFailed(Throwable t) {
connectFuture.setFailure(t);
pipeline().fireExceptionCaught(t);
closeIfClosed();
}
protected final void connectSuccess() {
protected void connectSuccess() {
assert eventLoop().inEventLoop();
assert connectFuture != null;
try {

View File

@ -31,7 +31,6 @@ import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* {@link ServerSocketChannel} implementation which uses NIO2.
@ -47,16 +46,8 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
InternalLoggerFactory.getInstance(AioServerSocketChannel.class);
private final AioServerSocketChannelConfig config;
private boolean acceptInProgress;
private boolean closed;
private final AtomicBoolean readSuspended = new AtomicBoolean();
private final Runnable acceptTask = new Runnable() {
@Override
public void run() {
doAccept();
}
};
private static AsynchronousServerSocketChannel newSocket(AsynchronousChannelGroup group) {
try {
@ -122,13 +113,15 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
protected void doBind(SocketAddress localAddress) throws Exception {
AsynchronousServerSocketChannel ch = javaChannel();
ch.bind(localAddress, config.getBacklog());
doAccept();
}
private void doAccept() {
if (readSuspended.get()) {
@Override
protected void doBeginRead() {
if (acceptInProgress) {
return;
}
acceptInProgress = true;
javaChannel().accept(this, ACCEPT_HANDLER);
}
@ -172,17 +165,17 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
@Override
protected void completed0(AsynchronousSocketChannel ch, AioServerSocketChannel channel) {
// register again this handler to accept new connections
channel.doAccept();
channel.acceptInProgress = false;
// create the socket add it to the buffer and fire the event
channel.pipeline().inboundMessageBuffer().add(
new AioSocketChannel(channel, null, ch));
channel.pipeline().fireInboundBufferUpdated();
channel.pipeline().fireInboundBufferSuspended();
}
@Override
protected void failed0(Throwable t, AioServerSocketChannel channel) {
channel.acceptInProgress = false;
boolean asyncClosed = false;
if (t instanceof AsynchronousCloseException) {
asyncClosed = true;
@ -200,28 +193,4 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
public ServerSocketChannelConfig config() {
return config;
}
@Override
protected AbstractUnsafe newUnsafe() {
return new AioServerSocketUnsafe();
}
private final class AioServerSocketUnsafe extends AbstractAioUnsafe {
@Override
public void suspendRead() {
readSuspended.set(true);
}
@Override
public void resumeRead() {
if (readSuspended.compareAndSet(true, false)) {
if (eventLoop().inEventLoop()) {
doAccept();
} else {
eventLoop().execute(acceptTask);
}
}
}
}
}

View File

@ -15,8 +15,6 @@
*/
package io.netty.channel.socket.aio;
import static io.netty.channel.ChannelOption.*;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
@ -32,6 +30,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import static io.netty.channel.ChannelOption.*;
/**
* The Async {@link ServerSocketChannelConfig} implementation.
*/
@ -213,4 +213,9 @@ final class AioServerSocketChannelConfig extends DefaultChannelConfig
public ServerSocketChannelConfig setAllocator(ByteBufAllocator allocator) {
return (ServerSocketChannelConfig) super.setAllocator(allocator);
}
@Override
public ServerSocketChannelConfig setAutoRead(boolean autoRead) {
return (ServerSocketChannelConfig) super.setAutoRead(autoRead);
}
}

View File

@ -20,11 +20,11 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFlushFutureNotifier;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.FileRegion;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.SocketChannel;
import java.io.IOException;
@ -38,7 +38,6 @@ import java.nio.channels.CompletionHandler;
import java.nio.channels.InterruptedByTimeoutException;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@ -68,19 +67,9 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
private volatile boolean inputShutdown;
private volatile boolean outputShutdown;
private boolean asyncWriteInProgress;
private boolean readInProgress;
private boolean writeInProgress;
private boolean inDoFlushByteBuffer;
private boolean asyncReadInProgress;
private boolean inBeginRead;
private final AtomicBoolean readSuspended = new AtomicBoolean();
private final Runnable readTask = new Runnable() {
@Override
public void run() {
beginRead();
}
};
/**
* Create a new instance which has not yet attached an {@link AsynchronousSocketChannel}. The
@ -207,16 +196,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
config.assign(javaChannel());
}
if (remoteAddress() == null) {
return null;
}
return new Runnable() {
@Override
public void run() {
beginRead();
}
};
return null;
}
private static void expandReadBuffer(ByteBuf byteBuf) {
@ -267,7 +247,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
@Override
protected void doFlushByteBuffer(ByteBuf buf) throws Exception {
if (inDoFlushByteBuffer || asyncWriteInProgress) {
if (inDoFlushByteBuffer || writeInProgress) {
return;
}
@ -284,7 +264,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
// discardReadBytes() later, modifying the readerIndex and the writerIndex unexpectedly.
buf.discardReadBytes();
asyncWriteInProgress = true;
writeInProgress = true;
if (buf.nioBufferCount() == 1) {
javaChannel().write(
buf.nioBuffer(), config.getWriteTimeout(), TimeUnit.MILLISECONDS, this, WRITE_HANDLER);
@ -300,7 +280,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
}
}
if (asyncWriteInProgress) {
if (writeInProgress) {
// JDK decided to write data (or notify handler) later.
buf.suspendIntermediaryDeallocations();
break;
@ -328,53 +308,35 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
region.transferTo(new WritableByteChannelAdapter(region, future), 0);
}
private void beginRead() {
if (inBeginRead || asyncReadInProgress || readSuspended.get()) {
@Override
protected void doBeginRead() {
if (readInProgress || inputShutdown) {
return;
}
inBeginRead = true;
ByteBuf byteBuf = pipeline().inboundByteBuffer();
if (!byteBuf.readable()) {
byteBuf.discardReadBytes();
}
try {
for (;;) {
ByteBuf byteBuf = pipeline().inboundByteBuffer();
if (inputShutdown) {
break;
}
expandReadBuffer(byteBuf);
if (!byteBuf.readable()) {
byteBuf.discardReadBytes();
}
expandReadBuffer(byteBuf);
asyncReadInProgress = true;
if (byteBuf.nioBufferCount() == 1) {
// Get a ByteBuffer view on the ByteBuf
ByteBuffer buffer = byteBuf.nioBuffer(byteBuf.writerIndex(), byteBuf.writableBytes());
javaChannel().read(
buffer, config.getReadTimeout(), TimeUnit.MILLISECONDS, this, READ_HANDLER);
} else {
ByteBuffer[] buffers = byteBuf.nioBuffers(byteBuf.writerIndex(), byteBuf.writableBytes());
if (buffers.length == 1) {
javaChannel().read(
buffers[0], config.getReadTimeout(), TimeUnit.MILLISECONDS, this, READ_HANDLER);
} else {
javaChannel().read(
buffers, 0, buffers.length, config.getReadTimeout(), TimeUnit.MILLISECONDS,
this, SCATTERING_READ_HANDLER);
}
}
if (asyncReadInProgress) {
// JDK decided to read data (or notify handler) later.
break;
}
// The read operation has been finished immediately - schedule another read operation.
readInProgress = true;
if (byteBuf.nioBufferCount() == 1) {
// Get a ByteBuffer view on the ByteBuf
ByteBuffer buffer = byteBuf.nioBuffer(byteBuf.writerIndex(), byteBuf.writableBytes());
javaChannel().read(
buffer, config.getReadTimeout(), TimeUnit.MILLISECONDS, this, READ_HANDLER);
} else {
ByteBuffer[] buffers = byteBuf.nioBuffers(byteBuf.writerIndex(), byteBuf.writableBytes());
if (buffers.length == 1) {
javaChannel().read(
buffers[0], config.getReadTimeout(), TimeUnit.MILLISECONDS, this, READ_HANDLER);
} else {
javaChannel().read(
buffers, 0, buffers.length, config.getReadTimeout(), TimeUnit.MILLISECONDS,
this, SCATTERING_READ_HANDLER);
}
} finally {
inBeginRead = false;
}
}
@ -382,7 +344,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
@Override
protected void completed0(T result, AioSocketChannel channel) {
channel.asyncWriteInProgress = false;
channel.writeInProgress = false;
ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer();
buf.resumeIntermediaryDeallocations();
@ -419,7 +381,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
@Override
protected void failed0(Throwable cause, AioSocketChannel channel) {
channel.asyncWriteInProgress = false;
channel.writeInProgress = false;
channel.flushFutureNotifier.notifyFlushFutures(cause);
// Check if the exception was raised because of an InterruptedByTimeoutException which means that the
@ -444,7 +406,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
@Override
protected void completed0(T result, AioSocketChannel channel) {
channel.asyncReadInProgress = false;
channel.readInProgress = false;
if (channel.inputShutdown) {
// Channel has been closed during read. Because the inbound buffer has been deallocated already,
@ -457,6 +419,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
boolean closed = false;
boolean read = false;
boolean firedInboundBufferSuspended = false;
try {
int localReadAmount = result.intValue();
if (localReadAmount > 0) {
@ -477,11 +440,16 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
pipeline.fireInboundBufferUpdated();
}
firedInboundBufferSuspended = true;
pipeline.fireInboundBufferSuspended();
pipeline.fireExceptionCaught(t);
} finally {
if (read) {
pipeline.fireInboundBufferUpdated();
}
if (!firedInboundBufferSuspended) {
pipeline.fireInboundBufferSuspended();
}
// Double check because fireInboundBufferUpdated() might have triggered the closure by a user handler.
if (closed || !channel.isOpen()) {
@ -493,16 +461,13 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
channel.unsafe().close(channel.unsafe().voidFuture());
}
}
} else {
// Schedule another read operation.
channel.beginRead();
}
}
}
@Override
protected void failed0(Throwable t, AioSocketChannel channel) {
channel.asyncReadInProgress = false;
channel.readInProgress = false;
if (t instanceof ClosedChannelException) {
channel.inputShutdown = true;
return;
@ -516,9 +481,6 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
// See http://openjdk.java.net/projects/nio/javadoc/java/nio/channels/AsynchronousSocketChannel.html
if (t instanceof IOException || t instanceof InterruptedByTimeoutException) {
channel.unsafe().close(channel.unsafe().voidFuture());
} else {
// Schedule another read operation.
channel.beginRead();
}
}
}
@ -527,14 +489,13 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
@Override
protected void completed0(Void result, AioSocketChannel channel) {
((AbstractAioUnsafe) channel.unsafe()).connectSuccess();
((DefaultAioUnsafe) channel.unsafe()).connectSuccess();
channel.pipeline().fireChannelActive();
channel.beginRead();
}
@Override
protected void failed0(Throwable exc, AioSocketChannel channel) {
((AbstractAioUnsafe) channel.unsafe()).connectFailed(exc);
((DefaultAioUnsafe) channel.unsafe()).connectFailed(exc);
}
}
@ -543,34 +504,6 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
return config;
}
@Override
protected AbstractUnsafe newUnsafe() {
return new AioSocketChannelAsyncUnsafe();
}
private final class AioSocketChannelAsyncUnsafe extends AbstractAioUnsafe {
@Override
public void suspendRead() {
readSuspended.set(true);
}
@Override
public void resumeRead() {
if (readSuspended.compareAndSet(true, false)) {
if (inputShutdown) {
return;
}
if (eventLoop().inEventLoop()) {
beginRead();
} else {
eventLoop().execute(readTask);
}
}
}
}
private final class WritableByteChannelAdapter implements WritableByteChannel {
private final FileRegion region;
private final ChannelFuture future;

View File

@ -17,6 +17,7 @@ package io.netty.channel.socket.aio;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.socket.SocketChannelConfig;
import java.nio.channels.InterruptedByTimeoutException;
@ -61,6 +62,9 @@ public interface AioSocketChannelConfig extends SocketChannelConfig {
@Override
AioSocketChannelConfig setAllocator(ByteBufAllocator allocator);
@Override
AioSocketChannelConfig setAutoRead(boolean autoRead);
/**
* Return the read timeout in milliseconds after which a {@link InterruptedByTimeoutException} will get thrown.
* Once such an exception was detected it will get propagated to the handlers first. After that the channel

View File

@ -336,4 +336,9 @@ final class DefaultAioSocketChannelConfig extends DefaultChannelConfig
public AioSocketChannelConfig setAllocator(ByteBufAllocator allocator) {
return (AioSocketChannelConfig) super.setAllocator(allocator);
}
@Override
public AioSocketChannelConfig setAutoRead(boolean autoRead) {
return (AioSocketChannelConfig) super.setAutoRead(autoRead);
}
}

View File

@ -18,10 +18,10 @@ package io.netty.channel.socket.nio;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.FileRegion;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
@ -55,11 +55,14 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
@Override
public void read() {
assert eventLoop().inEventLoop();
final SelectionKey key = selectionKey();
key.interestOps(key.interestOps() & ~readInterestOp);
final ChannelPipeline pipeline = pipeline();
final ByteBuf byteBuf = pipeline.inboundByteBuffer();
boolean closed = false;
boolean read = false;
boolean firedInboundBufferSuspended = false;
try {
expandReadBuffer(byteBuf);
loop: for (;;) {
@ -96,6 +99,10 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
read = false;
pipeline.fireInboundBufferUpdated();
}
firedInboundBufferSuspended = true;
pipeline.fireInboundBufferSuspended();
pipeline().fireExceptionCaught(t);
if (t instanceof IOException) {
close(voidFuture());
@ -104,11 +111,14 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
if (read) {
pipeline.fireInboundBufferUpdated();
}
if (!firedInboundBufferSuspended) {
pipeline.fireInboundBufferSuspended();
}
if (closed) {
setInputShutdown();
if (isOpen()) {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
suspendReadTask.run();
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
close(voidFuture());

View File

@ -43,25 +43,11 @@ public abstract class AbstractNioChannel extends AbstractChannel {
InternalLoggerFactory.getInstance(AbstractNioChannel.class);
private final SelectableChannel ch;
private final int readInterestOp;
protected final int readInterestOp;
private volatile SelectionKey selectionKey;
private volatile boolean inputShutdown;
final Queue<NioTask<SelectableChannel>> writableTasks = new ConcurrentLinkedQueue<NioTask<SelectableChannel>>();
final Runnable suspendReadTask = new Runnable() {
@Override
public void run() {
selectionKey().interestOps(selectionKey().interestOps() & ~readInterestOp);
}
};
final Runnable resumeReadTask = new Runnable() {
@Override
public void run() {
selectionKey().interestOps(selectionKey().interestOps() | readInterestOp);
}
};
/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
@ -249,30 +235,6 @@ public abstract class AbstractNioChannel extends AbstractChannel {
connectFuture = null;
}
}
@Override
public void suspendRead() {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
suspendReadTask.run();
} else {
loop.execute(suspendReadTask);
}
}
@Override
public void resumeRead() {
if (inputShutdown) {
return;
}
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
resumeReadTask.run();
} else {
loop.execute(resumeReadTask);
}
}
}
@Override
@ -288,9 +250,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
@Override
protected Runnable doRegister() throws Exception {
NioEventLoop loop = eventLoop();
selectionKey = javaChannel().register(
loop.selector, isActive() && !inputShutdown ? readInterestOp : 0, this);
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return null;
}
@ -299,6 +259,25 @@ public abstract class AbstractNioChannel extends AbstractChannel {
eventLoop().cancel(selectionKey());
}
@Override
protected void doBeginRead() throws Exception {
if (inputShutdown) {
return;
}
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) != 0) {
return;
}
this.selectionKey.interestOps(interestOps | readInterestOp);
}
/**
* Conect to the remote peer
*/

View File

@ -21,6 +21,7 @@ import io.netty.channel.ChannelPipeline;
import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
/**
* {@link AbstractNioChannel} base class for {@link Channel}s that operate on messages.
@ -44,11 +45,14 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
@Override
public void read() {
assert eventLoop().inEventLoop();
final SelectionKey key = selectionKey();
key.interestOps(key.interestOps() & ~readInterestOp);
final ChannelPipeline pipeline = pipeline();
final MessageBuf<Object> msgBuf = pipeline.inboundMessageBuffer();
boolean closed = false;
boolean read = false;
boolean firedInboundBufferSuspended = false;
try {
for (;;) {
int localReadAmount = doReadMessages(msgBuf);
@ -66,6 +70,10 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
read = false;
pipeline.fireInboundBufferUpdated();
}
firedInboundBufferSuspended = true;
pipeline.fireInboundBufferSuspended();
pipeline().fireExceptionCaught(t);
if (t instanceof IOException) {
close(voidFuture());
@ -74,6 +82,9 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
if (read) {
pipeline.fireInboundBufferUpdated();
}
if (!firedInboundBufferSuspended) {
pipeline.fireInboundBufferSuspended();
}
if (closed && isOpen()) {
close(voidFuture());
}

View File

@ -118,8 +118,6 @@ public class NioSctpServerChannel extends AbstractNioMessageChannel
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().bind(localAddress, config.getBacklog());
SelectionKey selectionKey = selectionKey();
selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_ACCEPT);
}
@Override

View File

@ -90,8 +90,6 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().socket().bind(localAddress, config.getBacklog());
SelectionKey selectionKey = selectionKey();
selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_ACCEPT);
}
@Override

View File

@ -213,10 +213,6 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
@Override
protected int doWriteBytes(ByteBuf buf, boolean lastSpin) throws Exception {
final int expectedWrittenBytes = buf.readableBytes();
// FIXME: This is not as efficient as Netty 3's SendBufferPool if heap buffer is used
// because of potentially unwanted repetitive memory copy in case of
// a slow connection or a large output buffer that triggers OP_WRITE.
final int writtenBytes = buf.readBytes(javaChannel(), expectedWrittenBytes);
final SelectionKey key = selectionKey();

View File

@ -17,9 +17,9 @@ package io.netty.channel.socket.oio;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import java.io.IOException;
@ -42,88 +42,85 @@ abstract class AbstractOioByteChannel extends AbstractOioChannel {
}
@Override
protected AbstractOioUnsafe newUnsafe() {
return new OioByteUnsafe();
}
private final class OioByteUnsafe extends AbstractOioUnsafe {
@Override
public void read() {
assert eventLoop().inEventLoop();
if (inputShutdown) {
try {
Thread.sleep(SO_TIMEOUT);
} catch (InterruptedException e) {
// ignore
}
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBuf byteBuf = pipeline.inboundByteBuffer();
boolean closed = false;
boolean read = false;
protected void doRead() {
if (inputShutdown) {
try {
for (;;) {
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount > 0) {
read = true;
} else if (localReadAmount < 0) {
closed = true;
}
Thread.sleep(SO_TIMEOUT);
} catch (InterruptedException e) {
// ignore
}
return;
}
final int available = available();
if (available <= 0) {
break;
}
final ChannelPipeline pipeline = pipeline();
final ByteBuf byteBuf = pipeline.inboundByteBuffer();
boolean closed = false;
boolean read = false;
boolean firedInboundBufferSuspeneded = false;
try {
for (;;) {
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount > 0) {
read = true;
} else if (localReadAmount < 0) {
closed = true;
}
if (byteBuf.writable()) {
continue;
}
final int available = available();
if (available <= 0) {
break;
}
final int capacity = byteBuf.capacity();
final int maxCapacity = byteBuf.maxCapacity();
if (capacity == maxCapacity) {
if (read) {
read = false;
pipeline.fireInboundBufferUpdated();
if (!byteBuf.writable()) {
throw new IllegalStateException(
"an inbound handler whose buffer is full must consume at " +
"least one byte.");
}
if (byteBuf.writable()) {
continue;
}
final int capacity = byteBuf.capacity();
final int maxCapacity = byteBuf.maxCapacity();
if (capacity == maxCapacity) {
if (read) {
read = false;
pipeline.fireInboundBufferUpdated();
if (!byteBuf.writable()) {
throw new IllegalStateException(
"an inbound handler whose buffer is full must consume at " +
"least one byte.");
}
}
} else {
final int writerIndex = byteBuf.writerIndex();
if (writerIndex + available > maxCapacity) {
byteBuf.capacity(maxCapacity);
} else {
final int writerIndex = byteBuf.writerIndex();
if (writerIndex + available > maxCapacity) {
byteBuf.capacity(maxCapacity);
} else {
byteBuf.ensureWritableBytes(available);
}
byteBuf.ensureWritableBytes(available);
}
}
} catch (Throwable t) {
if (read) {
read = false;
pipeline.fireInboundBufferUpdated();
}
pipeline().fireExceptionCaught(t);
if (t instanceof IOException) {
close(voidFuture());
}
} finally {
if (read) {
pipeline.fireInboundBufferUpdated();
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
close(voidFuture());
}
}
} catch (Throwable t) {
if (read) {
read = false;
pipeline.fireInboundBufferUpdated();
}
firedInboundBufferSuspeneded = true;
pipeline.fireInboundBufferSuspended();
pipeline.fireExceptionCaught(t);
if (t instanceof IOException) {
unsafe().close(unsafe().voidFuture());
}
} finally {
if (read) {
pipeline.fireInboundBufferUpdated();
}
if (!firedInboundBufferSuspeneded) {
pipeline.fireInboundBufferSuspended();
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
unsafe().close(unsafe().voidFuture());
}
}
}

View File

@ -30,7 +30,15 @@ abstract class AbstractOioChannel extends AbstractChannel {
static final int SO_TIMEOUT = 1000;
protected volatile boolean readSuspended;
private boolean readInProgress;
private final Runnable readTask = new Runnable() {
@Override
public void run() {
readInProgress = false;
doRead();
}
};
/**
* @see AbstractChannel#AbstractChannel(Channel, Integer)
@ -50,15 +58,11 @@ abstract class AbstractOioChannel extends AbstractChannel {
}
@Override
public OioUnsafe unsafe() {
return (OioUnsafe) super.unsafe();
protected AbstractUnsafe newUnsafe() {
return new DefaultOioUnsafe();
}
public interface OioUnsafe extends Unsafe {
void read();
}
abstract class AbstractOioUnsafe extends AbstractUnsafe implements OioUnsafe {
private final class DefaultOioUnsafe extends AbstractUnsafe {
@Override
public void connect(
final SocketAddress remoteAddress,
@ -88,16 +92,6 @@ abstract class AbstractOioChannel extends AbstractChannel {
});
}
}
@Override
public void suspendRead() {
readSuspended = true;
}
@Override
public void resumeRead() {
readSuspended = false;
}
}
@Override
@ -115,4 +109,16 @@ abstract class AbstractOioChannel extends AbstractChannel {
*/
protected abstract void doConnect(
SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
@Override
protected void doBeginRead() throws Exception {
if (readInProgress) {
return;
}
readInProgress = true;
eventLoop().execute(readTask);
}
protected abstract void doRead();
}

View File

@ -34,42 +34,39 @@ abstract class AbstractOioMessageChannel extends AbstractOioChannel {
}
@Override
protected AbstractOioUnsafe newUnsafe() {
return new OioMessageUnsafe();
}
private final class OioMessageUnsafe extends AbstractOioUnsafe {
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline();
final MessageBuf<Object> msgBuf = pipeline.inboundMessageBuffer();
boolean closed = false;
boolean read = false;
try {
int localReadAmount = doReadMessages(msgBuf);
if (localReadAmount > 0) {
read = true;
} else if (localReadAmount < 0) {
closed = true;
}
} catch (Throwable t) {
if (read) {
read = false;
pipeline.fireInboundBufferUpdated();
}
pipeline().fireExceptionCaught(t);
if (t instanceof IOException) {
close(voidFuture());
}
} finally {
if (read) {
pipeline.fireInboundBufferUpdated();
}
if (closed && isOpen()) {
close(voidFuture());
}
protected void doRead() {
final ChannelPipeline pipeline = pipeline();
final MessageBuf<Object> msgBuf = pipeline.inboundMessageBuffer();
boolean closed = false;
boolean read = false;
boolean firedInboundBufferSuspended = false;
try {
int localReadAmount = doReadMessages(msgBuf);
if (localReadAmount > 0) {
read = true;
} else if (localReadAmount < 0) {
closed = true;
}
} catch (Throwable t) {
if (read) {
read = false;
pipeline.fireInboundBufferUpdated();
}
firedInboundBufferSuspended = true;
pipeline.fireInboundBufferSuspended();
pipeline.fireExceptionCaught(t);
if (t instanceof IOException) {
unsafe().close(unsafe().voidFuture());
}
} finally {
if (read) {
pipeline.fireInboundBufferUpdated();
}
if (!firedInboundBufferSuspended) {
pipeline.fireInboundBufferSuspended();
}
if (closed && isOpen()) {
unsafe().close(unsafe().voidFuture());
}
}
}

View File

@ -181,15 +181,6 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
@Override
protected int doReadMessages(MessageBuf<Object> buf) throws Exception {
if (readSuspended) {
try {
Thread.sleep(SO_TIMEOUT);
} catch (InterruptedException e) {
// ignore;
}
return 0;
}
int packetSize = config().getReceivePacketSize();
byte[] data = new byte[packetSize];
tmpPacket.setData(data);
@ -202,11 +193,7 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
buf.add(new DatagramPacket(Unpooled.wrappedBuffer(
data, tmpPacket.getOffset(), tmpPacket.getLength()), remoteAddr));
if (readSuspended) {
return 0;
} else {
return 1;
}
return 1;
} catch (SocketTimeoutException e) {
// Expected
return 0;

View File

@ -78,8 +78,6 @@ class OioEventLoop extends SingleThreadEventLoop {
}
}
ch.unsafe().read();
// Handle deregistration
if (!ch.isRegistered()) {
runAllTasks();

View File

@ -148,7 +148,7 @@ public class OioSctpChannel extends AbstractOioMessageChannel
@Override
protected int doReadMessages(MessageBuf<Object> buf) throws Exception {
if (readSuspended || !readSelector.isOpen()) {
if (!readSelector.isOpen()) {
return 0;
}
@ -174,10 +174,6 @@ public class OioSctpChannel extends AbstractOioMessageChannel
buf.add(new SctpMessage(messageInfo, Unpooled.wrappedBuffer(data)));
readMessages ++;
if (readSuspended) {
return readMessages;
}
}
} finally {
reableKeys.clear();

View File

@ -185,10 +185,6 @@ public class OioSctpServerChannel extends AbstractOioMessageChannel
return -1;
}
if (readSuspended) {
return 0;
}
SctpChannel s = null;
try {
final int selectedKeys = selector.select(SO_TIMEOUT);

View File

@ -157,15 +157,6 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel
return -1;
}
if (readSuspended) {
try {
Thread.sleep(SO_TIMEOUT);
} catch (InterruptedException e) {
// ignore
}
return 0;
}
Socket s = null;
try {
s = socket.accept();

View File

@ -222,15 +222,6 @@ public class OioSocketChannel extends AbstractOioByteChannel
return -1;
}
if (readSuspended) {
try {
Thread.sleep(SO_TIMEOUT);
} catch (InterruptedException e) {
// ignore
}
return 0;
}
try {
return buf.writeBytes(is, buf.writableBytes());
} catch (SocketTimeoutException e) {

View File

@ -366,6 +366,11 @@ public class LocalTransportThreadModelTest {
ctx.fireInboundBufferUpdated();
}
@Override
public void read(ChannelHandlerContext ctx) {
ctx.read();
}
@Override
public void flush(ChannelHandlerContext ctx,
ChannelFuture future) throws Exception {
@ -446,6 +451,11 @@ public class LocalTransportThreadModelTest {
ctx.fireInboundBufferUpdated();
}
@Override
public void read(ChannelHandlerContext ctx) {
ctx.read();
}
@Override
public void flush(ChannelHandlerContext ctx,
ChannelFuture future) throws Exception {
@ -537,6 +547,11 @@ public class LocalTransportThreadModelTest {
ctx.fireInboundBufferUpdated();
}
@Override
public void read(ChannelHandlerContext ctx) {
ctx.read();
}
@Override
public void flush(final ChannelHandlerContext ctx,
ChannelFuture future) throws Exception {
@ -630,6 +645,11 @@ public class LocalTransportThreadModelTest {
ctx.fireInboundBufferUpdated();
}
@Override
public void read(ChannelHandlerContext ctx) {
ctx.read();
}
@Override
public void flush(ChannelHandlerContext ctx,
ChannelFuture future) throws Exception {
@ -711,6 +731,11 @@ public class LocalTransportThreadModelTest {
}
}
@Override
public void read(ChannelHandlerContext ctx) {
ctx.read();
}
@Override
public void flush(ChannelHandlerContext ctx,
ChannelFuture future) throws Exception {