Add support to suspend reads. See #71
This commit is contained in:
parent
137525d4c5
commit
57e7255566
6
transport/src/main/java/io/netty/channel/AbstractServerChannel.java
Normal file → Executable file
6
transport/src/main/java/io/netty/channel/AbstractServerChannel.java
Normal file → Executable file
@ -62,10 +62,6 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Unsafe newUnsafe() {
|
||||
return new DefaultServerUnsafe();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SocketAddress remoteAddress0() {
|
||||
@ -92,7 +88,7 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
|
||||
return false;
|
||||
}
|
||||
|
||||
protected class DefaultServerUnsafe extends AbstractUnsafe {
|
||||
protected abstract class DefaultServerUnsafe extends AbstractUnsafe {
|
||||
|
||||
@Override
|
||||
public void flush(final ChannelFuture future) {
|
||||
|
2
transport/src/main/java/io/netty/channel/Channel.java
Normal file → Executable file
2
transport/src/main/java/io/netty/channel/Channel.java
Normal file → Executable file
@ -194,5 +194,7 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFu
|
||||
|
||||
void flush(ChannelFuture future);
|
||||
void flushNow();
|
||||
void suspendRead();
|
||||
void resumeRead();
|
||||
}
|
||||
}
|
||||
|
3
transport/src/main/java/io/netty/channel/ChannelHandlerContext.java
Normal file → Executable file
3
transport/src/main/java/io/netty/channel/ChannelHandlerContext.java
Normal file → Executable file
@ -155,4 +155,7 @@ public interface ChannelHandlerContext
|
||||
boolean hasNextOutboundMessageBuffer();
|
||||
ByteBuf nextOutboundByteBuffer();
|
||||
MessageBuf<Object> nextOutboundMessageBuffer();
|
||||
|
||||
boolean isReadable();
|
||||
void readable(boolean readable);
|
||||
}
|
||||
|
1
transport/src/main/java/io/netty/channel/ChannelStateHandler.java
Normal file → Executable file
1
transport/src/main/java/io/netty/channel/ChannelStateHandler.java
Normal file → Executable file
@ -23,4 +23,5 @@ public interface ChannelStateHandler extends ChannelHandler {
|
||||
void channelInactive(ChannelHandlerContext ctx) throws Exception;
|
||||
|
||||
void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception;
|
||||
|
||||
}
|
||||
|
13
transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java
Normal file → Executable file
13
transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java
Normal file → Executable file
@ -28,6 +28,7 @@ import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext {
|
||||
@ -62,6 +63,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
final AtomicReference<ByteBridge> inByteBridge;
|
||||
final AtomicReference<ByteBridge> outByteBridge;
|
||||
|
||||
final AtomicBoolean suspendRead = new AtomicBoolean(false);
|
||||
|
||||
// Runnables that calls handlers
|
||||
final Runnable fireChannelRegisteredTask = new Runnable() {
|
||||
@Override
|
||||
@ -793,4 +796,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReadable() {
|
||||
return !suspendRead.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readable(boolean readable) {
|
||||
this.pipeline.readable(this, readable);
|
||||
}
|
||||
}
|
||||
|
30
transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java
Normal file → Executable file
30
transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java
Normal file → Executable file
@ -35,6 +35,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* The default {@link ChannelPipeline} implementation. It is usually created
|
||||
@ -56,7 +57,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
|
||||
final Map<EventExecutor, EventExecutor> childExecutors =
|
||||
new IdentityHashMap<EventExecutor, EventExecutor>();
|
||||
|
||||
private final AtomicInteger suspendRead = new AtomicInteger();
|
||||
|
||||
public DefaultChannelPipeline(Channel channel) {
|
||||
if (channel == null) {
|
||||
@ -465,6 +466,9 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
name2ctx.remove(ctx.name());
|
||||
|
||||
callAfterRemove(ctx);
|
||||
|
||||
// make sure we clear the readable flag
|
||||
ctx.readable(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -524,6 +528,12 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
name2ctx.remove(oldTail.name());
|
||||
|
||||
callBeforeRemove(oldTail);
|
||||
|
||||
// clear readable suspend if necessary
|
||||
oldTail.readable(true);
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -640,6 +650,10 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
boolean removed = false;
|
||||
try {
|
||||
callAfterRemove(ctx);
|
||||
|
||||
// clear readable suspend if necessary
|
||||
ctx.readable(true);
|
||||
|
||||
removed = true;
|
||||
} catch (ChannelHandlerLifeCycleException e) {
|
||||
removeException = e;
|
||||
@ -1438,6 +1452,20 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
}
|
||||
}
|
||||
|
||||
void readable(DefaultChannelHandlerContext ctx, boolean readable) {
|
||||
if (ctx.suspendRead.compareAndSet(!readable, readable)) {
|
||||
if (!readable) {
|
||||
if (suspendRead.incrementAndGet() == 1) {
|
||||
unsafe.suspendRead();
|
||||
}
|
||||
} else {
|
||||
if (suspendRead.decrementAndGet() == 0) {
|
||||
unsafe.resumeRead();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private final class HeadHandler implements ChannelOutboundHandler {
|
||||
@Override
|
||||
public ChannelBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
|
10
transport/src/main/java/io/netty/channel/embedded/AbstractEmbeddedChannel.java
Normal file → Executable file
10
transport/src/main/java/io/netty/channel/embedded/AbstractEmbeddedChannel.java
Normal file → Executable file
@ -193,6 +193,16 @@ public abstract class AbstractEmbeddedChannel extends AbstractChannel {
|
||||
SocketAddress localAddress, ChannelFuture future) {
|
||||
future.setSuccess();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void suspendRead() {
|
||||
// NOOP
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resumeRead() {
|
||||
// NOOP
|
||||
}
|
||||
}
|
||||
|
||||
private final class LastInboundMessageHandler extends ChannelInboundHandlerAdapter {
|
||||
|
10
transport/src/main/java/io/netty/channel/local/LocalChannel.java
Normal file → Executable file
10
transport/src/main/java/io/netty/channel/local/LocalChannel.java
Normal file → Executable file
@ -295,5 +295,15 @@ public class LocalChannel extends AbstractChannel {
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void suspendRead() {
|
||||
// TODO: Implement me
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resumeRead() {
|
||||
// TODO: Implement me
|
||||
}
|
||||
}
|
||||
}
|
||||
|
17
transport/src/main/java/io/netty/channel/local/LocalServerChannel.java
Normal file → Executable file
17
transport/src/main/java/io/netty/channel/local/LocalServerChannel.java
Normal file → Executable file
@ -138,4 +138,21 @@ public class LocalServerChannel extends AbstractServerChannel {
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Unsafe newUnsafe() {
|
||||
return new AbstractServerChannel.DefaultServerUnsafe() {
|
||||
|
||||
@Override
|
||||
public void suspendRead() {
|
||||
// TODO: Implement me
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resumeRead() {
|
||||
// TODO: Implement me
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
|
7
transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java
Normal file → Executable file
7
transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java
Normal file → Executable file
@ -81,17 +81,12 @@ abstract class AbstractAioChannel extends AbstractChannel {
|
||||
// NOOP
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AsyncUnsafe newUnsafe() {
|
||||
return new AsyncUnsafe();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isCompatible(EventLoop loop) {
|
||||
return loop instanceof AioChildEventLoop;
|
||||
}
|
||||
|
||||
protected class AsyncUnsafe extends AbstractUnsafe {
|
||||
protected abstract class AsyncUnsafe extends AbstractUnsafe {
|
||||
|
||||
@Override
|
||||
public void connect(final SocketAddress remoteAddress,
|
||||
|
37
transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java
Normal file → Executable file
37
transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java
Normal file → Executable file
@ -30,6 +30,7 @@ import java.nio.channels.AsynchronousChannelGroup;
|
||||
import java.nio.channels.AsynchronousCloseException;
|
||||
import java.nio.channels.AsynchronousServerSocketChannel;
|
||||
import java.nio.channels.AsynchronousSocketChannel;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class AioServerSocketChannel extends AbstractAioChannel implements ServerSocketChannel {
|
||||
|
||||
@ -41,6 +42,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
|
||||
|
||||
private final AioServerSocketChannelConfig config;
|
||||
private boolean closed;
|
||||
private AtomicBoolean readSuspend = new AtomicBoolean();
|
||||
|
||||
private static AsynchronousServerSocketChannel newSocket(AsynchronousChannelGroup group) {
|
||||
try {
|
||||
@ -88,7 +90,14 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
|
||||
protected void doBind(SocketAddress localAddress) throws Exception {
|
||||
AsynchronousServerSocketChannel ch = javaChannel();
|
||||
ch.bind(localAddress);
|
||||
ch.accept(this, ACCEPT_HANDLER);
|
||||
doAccept();
|
||||
}
|
||||
|
||||
private void doAccept() {
|
||||
if (readSuspend.get()) {
|
||||
return;
|
||||
}
|
||||
javaChannel().accept(this, ACCEPT_HANDLER);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -125,13 +134,15 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
|
||||
|
||||
@Override
|
||||
protected void completed0(AsynchronousSocketChannel ch, AioServerSocketChannel channel) {
|
||||
// register again this handler to accept new connections
|
||||
channel.javaChannel().accept(channel, this);
|
||||
// register again this handler to accept new connections
|
||||
channel.doAccept();
|
||||
|
||||
// create the socket add it to the buffer and fire the event
|
||||
channel.pipeline().inboundMessageBuffer().add(
|
||||
new AioSocketChannel(channel, null, channel.eventLoop, ch));
|
||||
channel.pipeline().fireInboundBufferUpdated();
|
||||
if (!channel.readSuspend.get()) {
|
||||
channel.pipeline().fireInboundBufferUpdated();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -153,4 +164,22 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
|
||||
public ServerSocketChannelConfig config() {
|
||||
return config;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Unsafe newUnsafe() {
|
||||
return new AsyncUnsafe() {
|
||||
|
||||
@Override
|
||||
public void suspendRead() {
|
||||
readSuspend.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resumeRead() {
|
||||
if (readSuspend.compareAndSet(true, false)) {
|
||||
doAccept();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
46
transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java
Normal file → Executable file
46
transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java
Normal file → Executable file
@ -31,6 +31,7 @@ import java.nio.channels.AsynchronousChannelGroup;
|
||||
import java.nio.channels.AsynchronousSocketChannel;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.nio.channels.CompletionHandler;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
|
||||
public class AioSocketChannel extends AbstractAioChannel implements SocketChannel {
|
||||
@ -52,6 +53,15 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
||||
private final AioSocketChannelConfig config;
|
||||
private boolean flushing;
|
||||
|
||||
private final AtomicBoolean readSuspend = new AtomicBoolean(false);
|
||||
|
||||
private final Runnable readTask = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
AioSocketChannel.this.beginRead();
|
||||
}
|
||||
};
|
||||
|
||||
public AioSocketChannel(AioEventLoop eventLoop) {
|
||||
this(null, null, eventLoop, newSocket(eventLoop.group));
|
||||
}
|
||||
@ -177,6 +187,10 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
||||
}
|
||||
|
||||
private void beginRead() {
|
||||
if (readSuspend.get()) {
|
||||
return;
|
||||
}
|
||||
|
||||
ByteBuf byteBuf = pipeline().inboundByteBuffer();
|
||||
if (!byteBuf.readable()) {
|
||||
byteBuf.discardReadBytes();
|
||||
@ -270,7 +284,9 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
||||
} catch (Throwable t) {
|
||||
if (read) {
|
||||
read = false;
|
||||
pipeline.fireInboundBufferUpdated();
|
||||
if (!channel.readSuspend.get()) {
|
||||
pipeline.fireInboundBufferUpdated();
|
||||
}
|
||||
}
|
||||
|
||||
if (!(t instanceof ClosedChannelException)) {
|
||||
@ -282,7 +298,9 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
||||
}
|
||||
} finally {
|
||||
if (read) {
|
||||
pipeline.fireInboundBufferUpdated();
|
||||
if (!channel.readSuspend.get()) {
|
||||
pipeline.fireInboundBufferUpdated();
|
||||
}
|
||||
}
|
||||
if (closed && channel.isOpen()) {
|
||||
channel.unsafe().close(channel.unsafe().voidFuture());
|
||||
@ -329,4 +347,28 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
||||
public AioSocketChannelConfig config() {
|
||||
return config;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Unsafe newUnsafe() {
|
||||
return new AioSocketChannelAsyncUnsafe();
|
||||
}
|
||||
|
||||
private final class AioSocketChannelAsyncUnsafe extends AsyncUnsafe {
|
||||
|
||||
@Override
|
||||
public void suspendRead() {
|
||||
readSuspend.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resumeRead() {
|
||||
if (readSuspend.compareAndSet(true, false)) {
|
||||
if (eventLoop().inEventLoop()) {
|
||||
beginRead();
|
||||
} else {
|
||||
eventLoop.execute(readTask);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
6
transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java
Normal file → Executable file
6
transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java
Normal file → Executable file
@ -31,11 +31,9 @@ abstract class AbstractNioByteChannel extends AbstractNioChannel {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Unsafe newUnsafe() {
|
||||
return new NioByteUnsafe();
|
||||
}
|
||||
protected abstract NioByteUnsafe newUnsafe();
|
||||
|
||||
private class NioByteUnsafe extends AbstractNioUnsafe {
|
||||
protected abstract class NioByteUnsafe extends AbstractNioUnsafe {
|
||||
@Override
|
||||
public void read() {
|
||||
assert eventLoop().inEventLoop();
|
||||
|
1
transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java
Normal file → Executable file
1
transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java
Normal file → Executable file
@ -187,6 +187,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
|
||||
connectFuture = null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
6
transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java
Normal file → Executable file
6
transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java
Normal file → Executable file
@ -30,11 +30,9 @@ abstract class AbstractNioMessageChannel extends AbstractNioChannel {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Unsafe newUnsafe() {
|
||||
return new NioMessageUnsafe();
|
||||
}
|
||||
protected abstract NioMessageUnsafe newUnsafe();
|
||||
|
||||
private class NioMessageUnsafe extends AbstractNioUnsafe {
|
||||
abstract class NioMessageUnsafe extends AbstractNioUnsafe {
|
||||
@Override
|
||||
public void read() {
|
||||
assert eventLoop().inEventLoop();
|
||||
|
25
transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java
Normal file → Executable file
25
transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java
Normal file → Executable file
@ -55,6 +55,8 @@ public final class NioDatagramChannel
|
||||
private final Map<InetAddress, List<MembershipKey>> memberships =
|
||||
new HashMap<InetAddress, List<MembershipKey>>();
|
||||
|
||||
private volatile boolean connected;
|
||||
|
||||
private static DatagramChannel newSocket() {
|
||||
try {
|
||||
return DatagramChannel.open();
|
||||
@ -149,6 +151,7 @@ public final class NioDatagramChannel
|
||||
try {
|
||||
javaChannel().connect(remoteAddress);
|
||||
selectionKey().interestOps(selectionKey().interestOps() | SelectionKey.OP_READ);
|
||||
connected = true;
|
||||
success = true;
|
||||
return true;
|
||||
} finally {
|
||||
@ -457,4 +460,26 @@ public final class NioDatagramChannel
|
||||
}
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected NioMessageUnsafe newUnsafe() {
|
||||
return new NioDatagramChannelUnsafe();
|
||||
}
|
||||
|
||||
private final class NioDatagramChannelUnsafe extends NioMessageUnsafe {
|
||||
|
||||
@Override
|
||||
public void suspendRead() {
|
||||
if (!connected) {
|
||||
selectionKey().interestOps(selectionKey().interestOps() & ~ SelectionKey.OP_READ);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resumeRead() {
|
||||
if (!connected) {
|
||||
selectionKey().interestOps(selectionKey().interestOps() & ~ SelectionKey.OP_READ);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
17
transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java
Normal file → Executable file
17
transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java
Normal file → Executable file
@ -128,4 +128,21 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel
|
||||
protected int doWriteMessages(MessageBuf<Object> buf, boolean lastSpin) throws Exception {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected NioMessageUnsafe newUnsafe() {
|
||||
return new NioServerSocketUnsafe();
|
||||
}
|
||||
|
||||
private final class NioServerSocketUnsafe extends NioMessageUnsafe {
|
||||
@Override
|
||||
public void suspendRead() {
|
||||
selectionKey().interestOps(selectionKey().interestOps() & ~ SelectionKey.OP_ACCEPT);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resumeRead() {
|
||||
selectionKey().interestOps(selectionKey().interestOps() | SelectionKey.OP_ACCEPT);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
18
transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java
Normal file → Executable file
18
transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java
Normal file → Executable file
@ -191,4 +191,22 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
|
||||
|
||||
return writtenBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected NioByteUnsafe newUnsafe() {
|
||||
return new NioSocketChannelUnsafe();
|
||||
}
|
||||
|
||||
private final class NioSocketChannelUnsafe extends NioByteUnsafe {
|
||||
|
||||
@Override
|
||||
public void suspendRead() {
|
||||
selectionKey().interestOps(selectionKey().interestOps() & ~ SelectionKey.OP_READ);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resumeRead() {
|
||||
selectionKey().interestOps(selectionKey().interestOps() | SelectionKey.OP_READ);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
6
transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java
Normal file → Executable file
6
transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java
Normal file → Executable file
@ -28,11 +28,9 @@ abstract class AbstractOioByteChannel extends AbstractOioChannel {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Unsafe newUnsafe() {
|
||||
return new OioByteUnsafe();
|
||||
}
|
||||
protected abstract OioByteUnsafe newUnsafe();
|
||||
|
||||
private class OioByteUnsafe extends AbstractOioUnsafe {
|
||||
abstract class OioByteUnsafe extends AbstractOioUnsafe {
|
||||
@Override
|
||||
public void read() {
|
||||
assert eventLoop().inEventLoop();
|
||||
|
6
transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java
Normal file → Executable file
6
transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java
Normal file → Executable file
@ -28,11 +28,9 @@ abstract class AbstractOioMessageChannel extends AbstractOioChannel {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Unsafe newUnsafe() {
|
||||
return new OioMessageUnsafe();
|
||||
}
|
||||
protected abstract OioMessageUnsafe newUnsafe();
|
||||
|
||||
private class OioMessageUnsafe extends AbstractOioUnsafe {
|
||||
abstract class OioMessageUnsafe extends AbstractOioUnsafe {
|
||||
@Override
|
||||
public void read() {
|
||||
assert eventLoop().inEventLoop();
|
||||
|
36
transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java
Normal file → Executable file
36
transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java
Normal file → Executable file
@ -52,6 +52,8 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
|
||||
private final DatagramChannelConfig config;
|
||||
private final java.net.DatagramPacket tmpPacket = new java.net.DatagramPacket(EMPTY_DATA, 0);
|
||||
|
||||
private volatile boolean readSuspend;
|
||||
|
||||
private static MulticastSocket newSocket() {
|
||||
try {
|
||||
return new MulticastSocket(null);
|
||||
@ -163,6 +165,15 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
|
||||
|
||||
@Override
|
||||
protected int doReadMessages(MessageBuf<Object> buf) throws Exception {
|
||||
if (readSuspend) {
|
||||
try {
|
||||
Thread.sleep(SO_TIMEOUT);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int packetSize = config().getReceivePacketSize();
|
||||
byte[] data = new byte[packetSize];
|
||||
tmpPacket.setData(data);
|
||||
@ -174,7 +185,12 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
|
||||
}
|
||||
buf.add(new DatagramPacket(Unpooled.wrappedBuffer(
|
||||
data, tmpPacket.getOffset(), tmpPacket.getLength()), remoteAddr));
|
||||
return 1;
|
||||
|
||||
if (readSuspend) {
|
||||
return 0;
|
||||
} else {
|
||||
return 1;
|
||||
}
|
||||
} catch (SocketTimeoutException e) {
|
||||
// Expected
|
||||
return 0;
|
||||
@ -336,4 +352,22 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
|
||||
future.setFailure(new UnsupportedOperationException());
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected OioMessageUnsafe newUnsafe() {
|
||||
return new OioDatagramChannelUnsafe();
|
||||
}
|
||||
|
||||
private final class OioDatagramChannelUnsafe extends OioMessageUnsafe {
|
||||
|
||||
@Override
|
||||
public void suspendRead() {
|
||||
readSuspend = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resumeRead() {
|
||||
readSuspend = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
32
transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java
Normal file → Executable file
32
transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java
Normal file → Executable file
@ -54,6 +54,8 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel
|
||||
final Lock shutdownLock = new ReentrantLock();
|
||||
private final ServerSocketChannelConfig config;
|
||||
|
||||
private volatile boolean readSuspend;
|
||||
|
||||
public OioServerSocketChannel() {
|
||||
this(newServerSocket());
|
||||
}
|
||||
@ -138,11 +140,23 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (readSuspend) {
|
||||
try {
|
||||
Thread.sleep(SO_TIMEOUT);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
Socket s = null;
|
||||
try {
|
||||
s = socket.accept();
|
||||
if (s != null) {
|
||||
buf.add(new OioSocketChannel(this, null, s));
|
||||
if (readSuspend) {
|
||||
return 0;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
} catch (SocketTimeoutException e) {
|
||||
@ -181,4 +195,22 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel
|
||||
protected void doWriteMessages(MessageBuf<Object> buf) throws Exception {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected OioMessageUnsafe newUnsafe() {
|
||||
return new OioServerSocketUnsafe();
|
||||
}
|
||||
|
||||
private final class OioServerSocketUnsafe extends OioMessageUnsafe {
|
||||
|
||||
@Override
|
||||
public void suspendRead() {
|
||||
readSuspend = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resumeRead() {
|
||||
readSuspend = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
38
transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java
Normal file → Executable file
38
transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java
Normal file → Executable file
@ -46,6 +46,7 @@ public class OioSocketChannel extends AbstractOioByteChannel
|
||||
private final SocketChannelConfig config;
|
||||
private InputStream is;
|
||||
private OutputStream os;
|
||||
private volatile boolean suspendRead;
|
||||
|
||||
public OioSocketChannel() {
|
||||
this(new Socket());
|
||||
@ -160,8 +161,24 @@ public class OioSocketChannel extends AbstractOioByteChannel
|
||||
if (socket.isClosed()) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (suspendRead) {
|
||||
try {
|
||||
Thread.sleep(SO_TIMEOUT);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
try {
|
||||
return buf.writeBytes(is, buf.writableBytes());
|
||||
int read = buf.writeBytes(is, buf.writableBytes());
|
||||
if (read > 0 && !suspendRead) {
|
||||
return read;
|
||||
} else {
|
||||
// so the read bytes were 0 or the read was suspend
|
||||
return 0;
|
||||
}
|
||||
} catch (SocketTimeoutException e) {
|
||||
return 0;
|
||||
}
|
||||
@ -175,4 +192,23 @@ public class OioSocketChannel extends AbstractOioByteChannel
|
||||
}
|
||||
buf.readBytes(os, buf.readableBytes());
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected OioByteUnsafe newUnsafe() {
|
||||
return new OioSocketChannelUnsafe();
|
||||
}
|
||||
|
||||
private final class OioSocketChannelUnsafe extends OioByteUnsafe {
|
||||
|
||||
@Override
|
||||
public void suspendRead() {
|
||||
suspendRead = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resumeRead() {
|
||||
suspendRead = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user